Download Module (src.download)
S3 downloaders (async/sync) and catalog management for Polygon.io flat files.
AsyncS3Downloader
Module: src.download.async_downloader
High-performance async S3 downloads using aioboto3.
Class: AsyncS3Downloader
AsyncS3Downloader(
credentials: Dict[str, str],
endpoint_url: str = 'https://files.polygon.io',
max_retries: int = 5,
timeout: int = 60,
max_pool_connections: int = 50,
max_concurrent: int = 8
)
Parameters:
credentials: Dict with'access_key_id'and'secret_access_key'endpoint_url: S3 endpoint URLmax_retries: Maximum retry attemptstimeout: Request timeout in secondsmax_pool_connections: Connection pool sizemax_concurrent: Maximum concurrent downloads
Key Methods:
download_one(bucket: str, key: str, decompress: bool = True) -> BytesIO
Download single file (async).
downloader = AsyncS3Downloader(credentials)
data = await downloader.download_one(
bucket='flatfiles',
key='us_stocks_sip/day_aggs_v1/2024/01/2024-01-02.csv.gz',
decompress=True
)
download_batch(bucket: str, keys: List[str], decompress: bool = True) -> List[Optional[BytesIO]]
Download multiple files in parallel.
keys = ['file1.csv.gz', 'file2.csv.gz', 'file3.csv.gz']
results = await downloader.download_batch('flatfiles', keys)
download_to_file(bucket: str, key: str, local_path: Path, decompress: bool = True)
Download and save to disk (async).
list_objects(bucket: str, prefix: str, max_keys: int = 1000) -> List[str]
List S3 objects with prefix.
keys = await downloader.list_objects(
bucket='flatfiles',
prefix='us_stocks_sip/day_aggs_v1/2024/01/'
)
get_statistics() -> Dict[str, Any]
Get download statistics.
Returns:
{
'total_downloads': int,
'successful_downloads': int,
'failed_downloads': int,
'total_retries': int,
'success_rate': float
}
reset_statistics()
Reset statistics counters.
Performance:
3-5x faster than sync downloader
Parallel processing with connection pooling
Exponential backoff retry logic
Automatic decompression of
.gzfiles
SyncS3Downloader
Module: src.download.sync_downloader
Synchronous S3 downloads using boto3.
Class: SyncS3Downloader
SyncS3Downloader(
credentials: Dict[str, str],
endpoint_url: str = 'https://files.polygon.io',
max_retries: int = 5,
timeout: int = 60,
max_pool_connections: int = 10
)
Parameters: Same as AsyncS3Downloader (except no max_concurrent)
Key Methods:
download(bucket: str, key: str, decompress: bool = True) -> BytesIO
Download single file (synchronous).
downloader = SyncS3Downloader(credentials)
data = downloader.download('flatfiles', 'path/to/file.csv.gz')
download_to_file(bucket: str, key: str, local_path: Path, decompress: bool = True)
Download and save to disk.
list_objects(bucket: str, prefix: str, max_keys: int = 1000) -> list
List S3 objects.
check_exists(bucket: str, key: str) -> bool
Check if object exists in S3.
if downloader.check_exists('flatfiles', 'path/to/file.csv.gz'):
data = downloader.download('flatfiles', 'path/to/file.csv.gz')
get_statistics() -> Dict[str, int]
Get download statistics.
reset_statistics()
Reset statistics.
When to Use:
Simple scripts without async support
Sequential processing
Debugging
Recommendation: Use AsyncS3Downloader for production (3-5x faster)
S3Catalog
Module: src.download.s3_catalog
Manage S3 file paths for Polygon.io flat files.
Class: S3Catalog
S3Catalog(bucket: str = 'flatfiles')
Key Methods:
get_stocks_daily_key(date: str) -> str
Get S3 key for stock daily aggregates.
catalog = S3Catalog()
key = catalog.get_stocks_daily_key('2024-01-02')
# Returns: 'us_stocks_sip/day_aggs_v1/2024/01/2024-01-02.csv.gz'
get_stocks_minute_key(date: str) -> str
Get S3 key for stock minute aggregates.
get_options_daily_key(date: str) -> str
Get S3 key for options daily aggregates.
get_options_minute_key(date: str) -> str
Get S3 key for options minute aggregates.
get_date_range_keys(data_type: str, start_date: str, end_date: str, symbols: Optional[List[str]] = None) -> List[str]
Get S3 keys for date range.
keys = catalog.get_date_range_keys(
data_type='stocks_daily',
start_date='2024-01-01',
end_date='2024-01-31'
)
print(f"Found {len(keys)} trading days in January 2024")
Supported Data Types:
stocks_daily: Stock daily aggregatesstocks_minute: Stock minute aggregatesoptions_daily: Options daily aggregatesoptions_minute: Options minute aggregates
parse_key_metadata(key: str) -> Dict[str, str]
Parse metadata from S3 key.
metadata = catalog.parse_key_metadata(
'us_stocks_sip/day_aggs_v1/2024/01/2024-01-02.csv.gz'
)
# Returns: {'data_type': 'stocks_daily', 'date': '2024-01-02', 'year': '2024', 'month': '01'}
validate_key(key: str) -> bool
Validate if key matches expected pattern.
get_summary(keys: List[str]) -> Dict[str, int]
Get summary statistics for keys.
Static Methods:
get_business_days(start_date: str, end_date: str) -> List[str]
Get US stock market business days between dates.
days = S3Catalog.get_business_days('2024-01-01', '2024-01-31')
print(f"Trading days: {len(days)}")
get_missing_dates(existing_dates: List[str], start_date: str, end_date: str) -> List[str]
Get missing business days.
existing = ['2024-01-02', '2024-01-03']
missing = S3Catalog.get_missing_dates(existing, '2024-01-01', '2024-01-05')
# Returns dates that are business days but not in existing
S3 Path Patterns:
Stocks Daily:
us_stocks_sip/day_aggs_v1/{year}/{month}/{date}.csv.gzStocks Minute:
us_stocks_sip/minute_aggs_v1/{year}/{month}/{date}.csv.gzOptions Daily:
us_options/day_aggs_v1/{year}/{month}/{date}.csv.gzOptions Minute:
us_options/minute_aggs_v1/{year}/{month}/{date}.csv.gz