Real-Time Data Engineering

Real-Time Stock Market Streaming: Amazon MSK → S3 → Snowflake

Data Bootcamp·Real-Time Data Engineering·8 min read

An end-to-end real-time streaming pipeline that publishes simulated NASDAQ stock ticks from a Python Kafka producer through Amazon MSK, batches them via MSK Connect's Confluent S3 Sink Connector into Amazon S3, and auto-ingests them into Snowflake using Snowpipe — achieving sub-five-minute end-to-end latency with IAM-only authentication throughout.

Azure data pipeline architecture
Figure 1 — End-to-end pipeline architecture
1/sec
Message Rate
7
Stock Symbols
< 5 min
End-to-End Latency
Zero
Hardcoded Secrets

The Challenge

Building a real-time streaming pipeline across AWS and Snowflake required solving three interrelated challenges: (1) connecting a Python Kafka producer to a private Amazon MSK cluster using IAM authentication — which demands inheriting from AbstractTokenProvider in the MSKTokenProvider class, a non-obvious requirement that silently breaks token refresh if missed; (2) configuring MSK Connect with the Confluent S3 Sink Connector inside a private VPC, requiring a custom network interface IAM policy (not covered by managed policies) and a CloudWatch log group that must exist before the connector is created or it fails immediately; and (3) wiring Snowpipe's AUTO_INGEST to S3 ObjectCreated events via SQS, which requires a cross-account IAM trust relationship built from Snowflake's storage integration output — an IAM user ARN and external ID generated at runtime.

The Solution

We deployed a two-tier EC2 architecture: a public bastion (pub-ec2) for internet-reachable tooling and a private Kafka client (prt-ec2) in the same private subnet as the MSK brokers. The Python producer authenticates via OAUTHBEARER + SASL_SSL on port 9098 using MSKTokenProvider(AbstractTokenProvider), which calls the MSK IAM auth SDK to generate short-lived tokens at connection time. MSK Connect runs the Confluent S3 Sink Connector with flush.size=100 and rotate.interval.ms=60000, writing partition-keyed JSON files to S3 via an S3 VPC Gateway Endpoint — bypassing the NAT gateway entirely. Snowflake's storage integration outputs an IAM user ARN and external ID used to create a scoped snowflake-s3-role with an sts:ExternalId condition. Snowpipe (AUTO_INGEST=TRUE) registers its SQS notification channel as the S3 bucket's ObjectCreated event target and loads each new JSON file into STOCK_MARKET_DATA within minutes of arrival.

Pipeline Architecture

1
Producer

Python Kafka Producer

A Python script (kafka-producer.py) runs on prt-ec2 in a private subnet, emitting one JSON stock tick per second for 7 NASDAQ symbols: AAPL, GOOGL, MSFT, AMZN, TSLA, META, and NVDA. Each message carries symbol, price, volume, change, timestamp, and exchange. Authentication uses OAUTHBEARER + SASL_SSL on port 9098 via MSKTokenProvider(AbstractTokenProvider) — the class must explicitly inherit AbstractTokenProvider for IAM token refresh to work correctly.

Pythonkafka-pythonaws-msk-iam-sasl-signerEC2 (prt-ec2)
2
Streaming

Amazon MSK Kafka Cluster

An Amazon MSK provisioned cluster running Kafka 3.5.1 with two kafka.t3.small brokers placed across us-east-1a and us-east-1b private subnets. The topic stock-market-data has 3 partitions with replication factor 2. IAM role-based authentication is the only allowed access method — unauthenticated and SASL/SCRAM are disabled. An MSK cluster policy grants Connect and WriteData/ReadData permissions to ec2-msk-role and msk-connect-role.

Amazon MSKKafka 3.5.1IAM AuthVPC Security Groups (sg-msk)
3
Connector

MSK Connect + Confluent S3 Sink

MSK Connect workers running the Confluent S3 Sink Connector v12.1.4 subscribe to the stock-market-data topic and buffer messages in memory, flushing to S3 every 100 messages (flush.size=100) or every 60 seconds (rotate.interval.ms=60000) — whichever comes first. JSON format with schema disabled. Traffic to S3 routes privately via an S3 VPC Gateway Endpoint, avoiding NAT gateway charges entirely. CloudWatch logs stream to /msk-connect/msk-s3-sink-connector.

MSK ConnectConfluent S3 Sink Connector v12.1.4CloudWatchS3 VPC Endpoint
4
Storage

Amazon S3 Data Lake

JSON files land in kafka-msk-snowflake-bucket under a partition-keyed path: topics/stock-market-data/partition=X/. File names encode the topic, partition number, and base offset (e.g. stock-market-data+0+0000000000.json). An S3 bucket policy restricts PutObject to msk-connect-role only. S3 ObjectCreated events are forwarded to Snowpipe's SQS notification channel ARN — the trigger that initiates auto-ingestion.

Amazon S3Bucket PolicyS3 Event NotificationsAmazon SQS
5
Snowpipe

Snowpipe Auto-Ingest

Snowpipe (kafka_snowpipe, AUTO_INGEST=TRUE) polls the SQS queue registered as its notification channel. Each S3 ObjectCreated event queues a COPY INTO STOCK_MARKET_DATA operation that parses the JSON payload and maps symbol, price, volume, change, timestamp, and exchange into typed columns. A Snowflake storage integration (s3_kafka_integration) uses a Snowflake-generated IAM user ARN and sts:ExternalId to assume snowflake-s3-role for cross-account S3 reads.

SnowpipeAmazon SQSSnowflake Storage IntegrationIAM External ID
6
Analytics

Snowflake Analytics Table

The STOCK_MARKET_DATA table in KAFKA_DB.KAFKA_SCHEMA stores every ingested tick with an auto-timestamp (loaded_at DEFAULT CURRENT_TIMESTAMP). Analysts query per-symbol statistics — COUNT(*), AVG(PRICE), MIN/MAX — with data available within five minutes of production. SYSTEM$PIPE_STATUS('kafka_snowpipe') monitors ingestion health. The full table grows continuously as long as the producer runs.

Snowflake SQLKAFKA_DB.KAFKA_SCHEMA.STOCK_MARKET_DATASYSTEM$PIPE_STATUS

The Results

The pipeline continuously streams stock ticks for 7 NASDAQ symbols — AAPL, GOOGL, MSFT, AMZN, TSLA, META, NVDA — at one message per second. MSK Connect batches every 100 messages into a JSON file (approximately one file per partition every 100 seconds), and Snowpipe ingests each file within minutes via SQS event notification. Per-symbol analytics in Snowflake — AVG(PRICE), MIN, MAX, record count — are queryable within five minutes of production with no manual intervention. The full infrastructure — VPC networking, MSK cluster, MSK Connect, S3, and Snowflake — was provisioned from scratch in approximately 2.5 hours following a reproducible step-by-step runbook, and runs at near-zero idle cost once the MSK cluster and NAT gateway are torn down between test runs.

Technologies Used

Amazon MSKApache Kafka 3.5.1Python / kafka-pythonMSK ConnectConfluent S3 Sink ConnectorAmazon S3SnowpipeSnowflakeAWS VPCEC2 (Bastion + Private)AWS IAM / OAUTHBEARERAmazon SQSCloudWatchS3 VPC Endpoint

Project Screenshots

Pipeline infrastructure and execution views.

Screenshot 1

Want a similar solution?

Let's talk about your data pipeline needs.

Get in Touch →