Real-Time Data Engineering
Real-Time Stock Market Streaming: Amazon MSK → S3 → Snowflake
An end-to-end real-time streaming pipeline that publishes simulated NASDAQ stock ticks from a Python Kafka producer through Amazon MSK, batches them via MSK Connect's Confluent S3 Sink Connector into Amazon S3, and auto-ingests them into Snowflake using Snowpipe — achieving sub-five-minute end-to-end latency with IAM-only authentication throughout.

The Challenge
Building a real-time streaming pipeline across AWS and Snowflake required solving three interrelated challenges: (1) connecting a Python Kafka producer to a private Amazon MSK cluster using IAM authentication — which demands inheriting from AbstractTokenProvider in the MSKTokenProvider class, a non-obvious requirement that silently breaks token refresh if missed; (2) configuring MSK Connect with the Confluent S3 Sink Connector inside a private VPC, requiring a custom network interface IAM policy (not covered by managed policies) and a CloudWatch log group that must exist before the connector is created or it fails immediately; and (3) wiring Snowpipe's AUTO_INGEST to S3 ObjectCreated events via SQS, which requires a cross-account IAM trust relationship built from Snowflake's storage integration output — an IAM user ARN and external ID generated at runtime.
The Solution
We deployed a two-tier EC2 architecture: a public bastion (pub-ec2) for internet-reachable tooling and a private Kafka client (prt-ec2) in the same private subnet as the MSK brokers. The Python producer authenticates via OAUTHBEARER + SASL_SSL on port 9098 using MSKTokenProvider(AbstractTokenProvider), which calls the MSK IAM auth SDK to generate short-lived tokens at connection time. MSK Connect runs the Confluent S3 Sink Connector with flush.size=100 and rotate.interval.ms=60000, writing partition-keyed JSON files to S3 via an S3 VPC Gateway Endpoint — bypassing the NAT gateway entirely. Snowflake's storage integration outputs an IAM user ARN and external ID used to create a scoped snowflake-s3-role with an sts:ExternalId condition. Snowpipe (AUTO_INGEST=TRUE) registers its SQS notification channel as the S3 bucket's ObjectCreated event target and loads each new JSON file into STOCK_MARKET_DATA within minutes of arrival.
Pipeline Architecture
Python Kafka Producer
A Python script (kafka-producer.py) runs on prt-ec2 in a private subnet, emitting one JSON stock tick per second for 7 NASDAQ symbols: AAPL, GOOGL, MSFT, AMZN, TSLA, META, and NVDA. Each message carries symbol, price, volume, change, timestamp, and exchange. Authentication uses OAUTHBEARER + SASL_SSL on port 9098 via MSKTokenProvider(AbstractTokenProvider) — the class must explicitly inherit AbstractTokenProvider for IAM token refresh to work correctly.
Amazon MSK Kafka Cluster
An Amazon MSK provisioned cluster running Kafka 3.5.1 with two kafka.t3.small brokers placed across us-east-1a and us-east-1b private subnets. The topic stock-market-data has 3 partitions with replication factor 2. IAM role-based authentication is the only allowed access method — unauthenticated and SASL/SCRAM are disabled. An MSK cluster policy grants Connect and WriteData/ReadData permissions to ec2-msk-role and msk-connect-role.
MSK Connect + Confluent S3 Sink
MSK Connect workers running the Confluent S3 Sink Connector v12.1.4 subscribe to the stock-market-data topic and buffer messages in memory, flushing to S3 every 100 messages (flush.size=100) or every 60 seconds (rotate.interval.ms=60000) — whichever comes first. JSON format with schema disabled. Traffic to S3 routes privately via an S3 VPC Gateway Endpoint, avoiding NAT gateway charges entirely. CloudWatch logs stream to /msk-connect/msk-s3-sink-connector.
Amazon S3 Data Lake
JSON files land in kafka-msk-snowflake-bucket under a partition-keyed path: topics/stock-market-data/partition=X/. File names encode the topic, partition number, and base offset (e.g. stock-market-data+0+0000000000.json). An S3 bucket policy restricts PutObject to msk-connect-role only. S3 ObjectCreated events are forwarded to Snowpipe's SQS notification channel ARN — the trigger that initiates auto-ingestion.
Snowpipe Auto-Ingest
Snowpipe (kafka_snowpipe, AUTO_INGEST=TRUE) polls the SQS queue registered as its notification channel. Each S3 ObjectCreated event queues a COPY INTO STOCK_MARKET_DATA operation that parses the JSON payload and maps symbol, price, volume, change, timestamp, and exchange into typed columns. A Snowflake storage integration (s3_kafka_integration) uses a Snowflake-generated IAM user ARN and sts:ExternalId to assume snowflake-s3-role for cross-account S3 reads.
Snowflake Analytics Table
The STOCK_MARKET_DATA table in KAFKA_DB.KAFKA_SCHEMA stores every ingested tick with an auto-timestamp (loaded_at DEFAULT CURRENT_TIMESTAMP). Analysts query per-symbol statistics — COUNT(*), AVG(PRICE), MIN/MAX — with data available within five minutes of production. SYSTEM$PIPE_STATUS('kafka_snowpipe') monitors ingestion health. The full table grows continuously as long as the producer runs.
The Results
The pipeline continuously streams stock ticks for 7 NASDAQ symbols — AAPL, GOOGL, MSFT, AMZN, TSLA, META, NVDA — at one message per second. MSK Connect batches every 100 messages into a JSON file (approximately one file per partition every 100 seconds), and Snowpipe ingests each file within minutes via SQS event notification. Per-symbol analytics in Snowflake — AVG(PRICE), MIN, MAX, record count — are queryable within five minutes of production with no manual intervention. The full infrastructure — VPC networking, MSK cluster, MSK Connect, S3, and Snowflake — was provisioned from scratch in approximately 2.5 hours following a reproducible step-by-step runbook, and runs at near-zero idle cost once the MSK cluster and NAT gateway are torn down between test runs.
Technologies Used
Project Screenshots
Pipeline infrastructure and execution views.

Want a similar solution?
Let's talk about your data pipeline needs.