Production Data Engineering
FMP Stock Pipeline: Dockerized Airflow on EC2 → S3 → Snowflake → SNS
A production-grade daily ETL pipeline that runs Apache Airflow 3.2.1 inside Docker Compose on AWS EC2, fetches company stock profiles from the Financial Modeling Prep API via a custom hook and operator, stages raw JSON in Amazon S3, and upserts into Snowflake using MERGE for zero-duplicate idempotency — with AWS SNS delivering a run summary to stakeholders after every successful execution.

The Challenge
Productionizing a daily stock data pipeline required solving five concerns simultaneously: (1) making every run idempotent — the pipeline must be safely re-runnable without accumulating duplicate symbol profiles in Snowflake, even across retries and backfills; (2) managing credentials for three external services (FMP API, AWS, Snowflake) without hardcoding any value in DAG code or version control; (3) encapsulating FMP API behavior — rate limiting, retries with exponential backoff, and response validation — in a reusable component testable independently of the DAG; (4) deploying Apache Airflow with CeleryExecutor, PostgreSQL, and Redis as a self-contained unit on a single EC2 instance; and (5) surfacing pipeline status to stakeholders automatically, without requiring anyone to log into the Airflow UI.
The Solution
The entire Airflow stack — API server, scheduler, dag-processor, Celery worker, triggerer, PostgreSQL 16, and Redis 7.2 — runs inside Docker Compose on an EC2 t2.large (8 services total). An entrypoint.sh script auto-creates Airflow connections (my_s3_conn, my_snowflake_conn) and Variables (FMP_API_KEY, SNS_TOPIC_ARN) from environment variables at container start, eliminating manual UI setup. A PipelineConfig class centralizes all config and calls validate_config() at DAG import time to fail fast on missing variables. A custom FMPHook wraps requests.Session with a Retry adapter (3 retries, exponential backoff, status_forcelist=[429,500-504]) and 0.2s inter-request rate limiting. The custom FMPToS3Operator calls FMPHook.get_multiple_profiles() then uploads each symbol's JSON to raw/SYMBOL.json via S3Hook with replace=True. The Snowflake load task reads each file and executes a MERGE INTO STOCK_PROFILES on SYMBOL — upsert semantics that make any number of retries safe. Task results pass between stages via XCom, and a final SNS task formats a detailed run summary (symbols, files, records merged, table, bucket, run ID) and publishes it to the configured SNS topic.
Pipeline Architecture
Dockerized Airflow on EC2
Apache Airflow 3.2.1 runs as 8 Docker Compose services on an EC2 t2.large: API server (port 8080), scheduler, dag-processor, Celery worker, triggerer, PostgreSQL 16 (metadata DB), and Redis 7.2 (Celery broker). An entrypoint.sh script auto-creates Airflow connections (my_s3_conn, my_snowflake_conn) and Variables (FMP_API_KEY, SNS_TOPIC_ARN) from environment variables at container start — no manual UI setup needed. PipelineConfig centralizes all config from env vars and calls validate_config() at DAG import time to fail fast if S3_BUCKET_NAME is missing. The DAG runs @daily with max_active_runs=1 and catchup=False.
FMP API → Custom Hook & Operator
FMPToS3Operator (a custom BaseOperator subclass) initializes FMPHook to call the FMP API /stable/profile endpoint for each configured symbol. FMPHook wraps requests.Session with a Retry adapter (total=3, backoff_factor=1.0, status_forcelist=[429,500,502,503,504]) and a 30s timeout. A 0.2s rate-limit delay is enforced between each symbol request. The API key is read from an Airflow Variable (never from env directly). Failed symbols are logged and skipped — the operator does not abort if individual symbols fail. The operator pushes an upload_summary XCom with uploaded_keys, failed_symbols, and counts for the downstream task.
Raw JSON → Amazon S3
For each successfully fetched symbol, FMPToS3Operator calls S3Hook.load_string to write the profile JSON to raw/{SYMBOL}.json in the configured S3 bucket (replace=True). Overwriting ensures the bucket always reflects today's data without date partitioning — safe because Snowflake MERGE handles idempotency at the warehouse layer. The S3 prefix is configurable via S3_PREFIX env var (default: raw/). The final S3 structure is: s3://bucket/raw/AAPL.json, MSFT.json, GOOGL.json, AMZN.json, TSLA.json.
S3 → Snowflake MERGE (Idempotent Upsert)
The load_s3_to_snowflake @task function reads the upload_summary from XCom, initializes S3Hook and SnowflakeHook, sets Snowflake context (COMPUTE_WH, FMP_DB, FMP_schema), then for each JSON file: reads it with S3Hook.read_key, parses it, and executes MERGE INTO STOCK_PROFILES AS target USING (SELECT %s AS symbol, PARSE_JSON(%s) AS profile) AS source ON target.SYMBOL = source.symbol — updating PROFILE if the symbol exists, inserting otherwise. STOCK_PROFILES stores only two columns: SYMBOL (VARCHAR) and PROFILE (VARIANT/JSON). All records commit in one transaction; any failure triggers rollback and raises AirflowException.
AWS SNS Run Summary
The send_sns_notification @task reads snowflake_result from XCom, retrieves SNS_TOPIC_ARN from Airflow Variables, initializes AwsBaseHook(client_type='sns'), and publishes a formatted run summary: DAG name, status, execution date, run ID, symbols processed, files processed, records merged, Snowflake table, S3 bucket and prefix. Any SNS subscriber — email, SMS, Lambda, SQS — receives the notification immediately after successful pipeline completion. Missing or empty SNS_TOPIC_ARN raises AirflowException before the publish call.
The Results
The pipeline fetches company profile data for 5 NASDAQ stocks (AAPL, MSFT, GOOGL, AMZN, TSLA) on a daily schedule with zero duplicate risk — the MERGE statement guarantees STOCK_PROFILES always reflects the latest profile regardless of how many times the DAG has run. Each symbol's JSON lands at raw/SYMBOL.json in S3, and retries overwrite rather than accumulate. AWS SNS delivers a formatted run summary to all configured subscribers (email, SMS, or downstream Lambda) immediately after each successful execution. The full stack deploys from a single docker-compose up -d on any EC2 instance with Docker, with all secrets confined to a gitignored .env file and Airflow's encrypted connection/variable store — no credentials appear anywhere in the codebase.
Technologies Used
Want a similar solution?
Let's talk about your data pipeline needs.