Download Module (src.download)

S3 downloaders (async/sync) and catalog management for Polygon.io flat files.

AsyncS3Downloader

Module: src.download.async_downloader

High-performance async S3 downloads using aioboto3.

Class: AsyncS3Downloader

AsyncS3Downloader(
    credentials: Dict[str, str],
    endpoint_url: str = 'https://files.polygon.io',
    max_retries: int = 5,
    timeout: int = 60,
    max_pool_connections: int = 50,
    max_concurrent: int = 8
)

Parameters:

  • credentials: Dict with 'access_key_id' and 'secret_access_key'

  • endpoint_url: S3 endpoint URL

  • max_retries: Maximum retry attempts

  • timeout: Request timeout in seconds

  • max_pool_connections: Connection pool size

  • max_concurrent: Maximum concurrent downloads

Key Methods:

download_one(bucket: str, key: str, decompress: bool = True) -> BytesIO

Download single file (async).

downloader = AsyncS3Downloader(credentials)
data = await downloader.download_one(
    bucket='flatfiles',
    key='us_stocks_sip/day_aggs_v1/2024/01/2024-01-02.csv.gz',
    decompress=True
)

download_batch(bucket: str, keys: List[str], decompress: bool = True) -> List[Optional[BytesIO]]

Download multiple files in parallel.

keys = ['file1.csv.gz', 'file2.csv.gz', 'file3.csv.gz']
results = await downloader.download_batch('flatfiles', keys)

download_to_file(bucket: str, key: str, local_path: Path, decompress: bool = True)

Download and save to disk (async).

list_objects(bucket: str, prefix: str, max_keys: int = 1000) -> List[str]

List S3 objects with prefix.

keys = await downloader.list_objects(
    bucket='flatfiles',
    prefix='us_stocks_sip/day_aggs_v1/2024/01/'
)

get_statistics() -> Dict[str, Any]

Get download statistics.

Returns:

{
    'total_downloads': int,
    'successful_downloads': int,
    'failed_downloads': int,
    'total_retries': int,
    'success_rate': float
}

reset_statistics()

Reset statistics counters.

Performance:

  • 3-5x faster than sync downloader

  • Parallel processing with connection pooling

  • Exponential backoff retry logic

  • Automatic decompression of .gz files


SyncS3Downloader

Module: src.download.sync_downloader

Synchronous S3 downloads using boto3.

Class: SyncS3Downloader

SyncS3Downloader(
    credentials: Dict[str, str],
    endpoint_url: str = 'https://files.polygon.io',
    max_retries: int = 5,
    timeout: int = 60,
    max_pool_connections: int = 10
)

Parameters: Same as AsyncS3Downloader (except no max_concurrent)

Key Methods:

download(bucket: str, key: str, decompress: bool = True) -> BytesIO

Download single file (synchronous).

downloader = SyncS3Downloader(credentials)
data = downloader.download('flatfiles', 'path/to/file.csv.gz')

download_to_file(bucket: str, key: str, local_path: Path, decompress: bool = True)

Download and save to disk.

list_objects(bucket: str, prefix: str, max_keys: int = 1000) -> list

List S3 objects.

check_exists(bucket: str, key: str) -> bool

Check if object exists in S3.

if downloader.check_exists('flatfiles', 'path/to/file.csv.gz'):
    data = downloader.download('flatfiles', 'path/to/file.csv.gz')

get_statistics() -> Dict[str, int]

Get download statistics.

reset_statistics()

Reset statistics.

When to Use:

  • Simple scripts without async support

  • Sequential processing

  • Debugging

Recommendation: Use AsyncS3Downloader for production (3-5x faster)


S3Catalog

Module: src.download.s3_catalog

Manage S3 file paths for Polygon.io flat files.

Class: S3Catalog

S3Catalog(bucket: str = 'flatfiles')

Key Methods:

get_stocks_daily_key(date: str) -> str

Get S3 key for stock daily aggregates.

catalog = S3Catalog()
key = catalog.get_stocks_daily_key('2024-01-02')
# Returns: 'us_stocks_sip/day_aggs_v1/2024/01/2024-01-02.csv.gz'

get_stocks_minute_key(date: str) -> str

Get S3 key for stock minute aggregates.

get_options_daily_key(date: str) -> str

Get S3 key for options daily aggregates.

get_options_minute_key(date: str) -> str

Get S3 key for options minute aggregates.

get_date_range_keys(data_type: str, start_date: str, end_date: str, symbols: Optional[List[str]] = None) -> List[str]

Get S3 keys for date range.

keys = catalog.get_date_range_keys(
    data_type='stocks_daily',
    start_date='2024-01-01',
    end_date='2024-01-31'
)
print(f"Found {len(keys)} trading days in January 2024")

Supported Data Types:

  • stocks_daily: Stock daily aggregates

  • stocks_minute: Stock minute aggregates

  • options_daily: Options daily aggregates

  • options_minute: Options minute aggregates

parse_key_metadata(key: str) -> Dict[str, str]

Parse metadata from S3 key.

metadata = catalog.parse_key_metadata(
    'us_stocks_sip/day_aggs_v1/2024/01/2024-01-02.csv.gz'
)
# Returns: {'data_type': 'stocks_daily', 'date': '2024-01-02', 'year': '2024', 'month': '01'}

validate_key(key: str) -> bool

Validate if key matches expected pattern.

get_summary(keys: List[str]) -> Dict[str, int]

Get summary statistics for keys.

Static Methods:

get_business_days(start_date: str, end_date: str) -> List[str]

Get US stock market business days between dates.

days = S3Catalog.get_business_days('2024-01-01', '2024-01-31')
print(f"Trading days: {len(days)}")

get_missing_dates(existing_dates: List[str], start_date: str, end_date: str) -> List[str]

Get missing business days.

existing = ['2024-01-02', '2024-01-03']
missing = S3Catalog.get_missing_dates(existing, '2024-01-01', '2024-01-05')
# Returns dates that are business days but not in existing

S3 Path Patterns:

  • Stocks Daily: us_stocks_sip/day_aggs_v1/{year}/{month}/{date}.csv.gz

  • Stocks Minute: us_stocks_sip/minute_aggs_v1/{year}/{month}/{date}.csv.gz

  • Options Daily: us_options/day_aggs_v1/{year}/{month}/{date}.csv.gz

  • Options Minute: us_options/minute_aggs_v1/{year}/{month}/{date}.csv.gz