Batch Data Engineering

E-Commerce Batch Pipeline: Kinesis Firehose → S3 → Snowflake

Data Bootcamp·Batch Data Engineering·8 min read

A fully idempotent, Airflow-orchestrated batch pipeline that streams e-commerce orders and customer records from CSV producers through AWS Kinesis Data Firehose into Amazon S3, then loads and transforms them inside Snowflake — stamping every batch with a unique BATCH_ID so any run can be safely replayed without duplicates or data loss.

Azure data pipeline architecture
Figure 1 — End-to-end pipeline architecture
2
Data Sources
7
DAG Tasks
3
S3 Lifecycle States
Zero
Hardcoded Credentials

The Challenge

Building a reliable batch pipeline for e-commerce data required solving three problems: (1) decoupling CSV producers from Snowflake's load schedule — producers write files continuously, but loads should happen in discrete, auditable batches; (2) guaranteeing idempotency so a failed DAG run can be retried without double-loading rows or leaving files stranded between states; and (3) orchestrating two independent data entities (orders and customers) that must both complete their extract-load legs before a shared transform can run — requiring a fan-out/fan-in dependency graph, not a simple linear chain.

The Solution

Kinesis Data Firehose provides two managed delivery streams — one for orders, one for customers — that buffer CSV records from the Kinesis Agent on an EC2 producer and land batched files in S3 landing/ prefixes without any coupling to downstream processing. The Airflow DAG ecommerce_pipeline generates a single BATCH_ID from ts_nodash[:12] and stamps it across every S3 path prefix and Snowflake row for the entire run. Parallel BashOperator tasks atomically aws s3 mv files from landing/ into processing/BATCH_ID/ — claiming the batch and emptying the landing buffer in one step. Two parallel SQLExecuteQueryOperator tasks then run Snowflake COPY INTO against external stages pointing at those processing prefixes, loading ORDERS_RAW and CUSTOMERS_RAW independently. After both loads succeed, a single transform task joins orders and customers on O_CUSTKEY, filters to fulfilled orders (O_ORDERSTATUS = 'F'), aggregates SUM(O_TOTALPRICE) per customer per date, and inserts results into ORDER_CUSTOMER_DATE_PRICE with the BATCH_ID stamped for lineage. Archive tasks then move the batch to processed/BATCH_ID/.

Pipeline Architecture

1
Ingest

Kinesis Agent → Kinesis Data Firehose

An EC2 producer host writes order and customer records as CSV files to /tmp/orders.csv and /tmp/customers.csv. The AWS Kinesis Agent runs on the same host, monitors both file paths, and streams records in real time to two separate Kinesis Data Firehose delivery streams. Firehose buffers incoming records and delivers batched files to S3 landing/ prefixes — decoupling the continuous producer from the batch pipeline entirely.

AWS Kinesis AgentKinesis Data FirehoseEC2 (Producer)Amazon S3
2
Orchestration

Airflow DAG — Batch Claim

The Airflow DAG ecommerce_pipeline is triggered against an MWAA-compatible environment. At run time, a BATCH_ID is derived from ts_nodash[:12] — the first 12 characters of the ISO run timestamp — and applied consistently to every S3 path prefix and Snowflake row for the entire run. Two parallel BashOperator tasks atomically aws s3 mv all files from orders/landing/ and customers/landing/ into orders/processing/BATCH_ID/ and customers/processing/BATCH_ID/. This move claims the batch and empties the landing prefix in one operation, making the current batch immediately visible in S3.

Apache Airflow 3.xBashOperatorAWS CLI (s3 mv)Amazon S3
3
Load

Parallel COPY INTO Snowflake Raw Tables

Two independent SQLExecuteQueryOperator tasks run in parallel after their respective move tasks complete. Each task issues a Snowflake COPY INTO command against an external stage (ORDERS_STAGE, CUSTOMERS_STAGE) pointed at the current batch's processing/BATCH_ID/ S3 prefix. The CSV_FORMAT file format handles quoted fields, header skipping, and null inference. Snowflake's storage integration S3_INTEGRATION_PRO uses a cross-account IAM trust relationship with sts:ExternalId to read from S3 without access keys. Results land in ORDERS_RAW and CUSTOMERS_RAW.

SQLExecuteQueryOperatorSnowflake COPY INTOORDERS_RAWCUSTOMERS_RAWS3_INTEGRATION_PRO
4
Transform

SQL Join + Aggregate → Curated Table

After both COPY INTO tasks complete (fan-in), a single SQLExecuteQueryOperator transform task joins ORDERS_RAW and CUSTOMERS_RAW on O_CUSTKEY = C_CUSTKEY, filtering to fulfilled orders only (O_ORDERSTATUS = 'F'). It aggregates SUM(O_TOTALPRICE) grouped by customer name and order date, then inserts results into ORDER_CUSTOMER_DATE_PRICE with the BATCH_ID stamped for row-level lineage. Only fulfilled orders enter the curated layer — open orders (status='O') are excluded intentionally.

Snowflake SQLORDER_CUSTOMER_DATE_PRICERETAIL_DB.RETAIL_SCHEMASQLExecuteQueryOperator
5
Archive

S3 Batch Archive

Two parallel BashOperator tasks run aws s3 mv to relocate each entity's batch files from processing/BATCH_ID/ to processed/BATCH_ID/. This creates an immutable, partition-keyed archive in S3 that preserves the exact raw files that fed each Snowflake load — enabling per-batch audit, point-in-time replay, and debugging without re-triggering the producer. The landing prefix is now empty and ready to receive the next batch from Firehose.

BashOperatorAWS CLI (s3 mv)Amazon S3 (processed/)

The Results

The pipeline processes orders and customers in parallel across 7 DAG tasks, with every batch isolated by BATCH_ID in both S3 and Snowflake. Three S3 states — landing, processing, and processed — make the status of every batch immediately visible by browsing the bucket. Only fulfilled orders (STATUS='F') flow into the curated ORDER_CUSTOMER_DATE_PRICE reporting table, which provides per-customer spending summaries by date for downstream analytics. Failed runs replay cleanly: re-triggering the DAG re-generates the same BATCH_ID from the original timestamp and moves any stranded files through the correct state transitions. No credentials are hardcoded anywhere — Snowflake uses a storage integration with an IAM trust relationship, and Airflow workers rely on EC2 instance profiles for AWS CLI access.

Technologies Used

Apache Airflow 3.x / MWAAAWS Kinesis Data FirehoseAWS Kinesis AgentAmazon S3SnowflakeSnowflake Storage IntegrationSQLExecuteQueryOperatorBashOperatorAWS IAMAWS CLI (s3 mv)Python

Want a similar solution?

Let's talk about your data pipeline needs.

Get in Touch →