Streaming Ingestion: Structured Streaming & Event Hubs
Process real-time data with Spark Structured Streaming and ingest from Azure Event Hubs — the streaming foundations for DP-750.
Spark Structured Streaming
Structured Streaming treats a data stream as an ever-growing table.
Imagine a notebook on your desk that keeps getting new pages added to the bottom. Each time you look, you process only the new pages. That’s Structured Streaming — it remembers where it left off (checkpoint) and only processes new data.
You write streaming code almost identically to batch code. The magic is in readStream and writeStream — Spark handles the continuous processing loop for you.
Core streaming pattern
# Read from a streaming source
stream_df = (spark.readStream
.format("delta")
.table("bronze.raw_transactions"))
# Transform (same API as batch)
clean_df = (stream_df
.filter("amount > 0")
.withColumn("processed_at", current_timestamp()))
# Write to a streaming sink
query = (clean_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/silver_txn")
.trigger(availableNow=True) # process all available, then stop
.toTable("silver.valid_transactions"))
Trigger modes
| Trigger | Behaviour | Use Case |
|---|---|---|
| Default (no trigger) | Processes micro-batches continuously | True real-time processing |
| processingTime=“10 seconds” | Triggers every 10 seconds | Near-real-time with controlled resource use |
| availableNow=True | Processes all available data, then stops | Scheduled incremental batch |
| once=True | Processes one micro-batch, then stops | One-shot catch-up (deprecated, use availableNow) |
Tomás uses default continuous for NovaPay’s fraud detection (every millisecond counts). Ravi uses availableNow for DataPulse’s hourly incremental loads (batch-like but with streaming guarantees).
Checkpoints
The checkpoint location is critical — it tracks which data has been processed:
- Stores offsets (where in the stream you are)
- Stores committed batch IDs
- Enables exactly-once processing — if the job crashes, it resumes from the last checkpoint
- Never delete checkpoints unless you want to reprocess all data
Ingesting from Azure Event Hubs
Azure Event Hubs is a managed streaming platform. Tomás uses it at NovaPay to ingest real-time transaction events:
# Connection configuration
eh_conf = {
"eventhubs.connectionString":
dbutils.secrets.get("kv-prod", "eventhub-connection-string"),
"eventhubs.consumerGroup": "fraud-detection-group",
"maxEventsPerTrigger": 10000
}
# Read from Event Hubs
eh_stream = (spark.readStream
.format("eventhubs")
.options(**eh_conf)
.load())
# Event Hubs data arrives as binary — decode it
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
txn_schema = StructType() \
.add("txn_id", StringType()) \
.add("amount", DoubleType()) \
.add("merchant", StringType()) \
.add("timestamp", TimestampType())
decoded = (eh_stream
.select(from_json(col("body").cast("string"), txn_schema).alias("data"))
.select("data.*"))
# Write to Delta table
query = (decoded.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/eh_transactions")
.toTable("bronze.realtime_transactions"))
Event Hubs key concepts
| Concept | Purpose |
|---|---|
| Namespace | Container for Event Hubs (like a server) |
| Event Hub | The actual stream topic |
| Consumer group | Allows multiple readers of the same stream |
| Partition | Enables parallel reading (more partitions = more throughput) |
| Body | The message payload (usually JSON, arrives as binary bytes) |
Exam tip: Event Hubs data arrives as binary in the body column. You must CAST to string and parse with from_json. Forgetting this step is a common exam trap.
🎬 Video coming soon
Knowledge check
Ravi needs an hourly incremental load that processes only new data from a Delta source table, with exactly-once guarantees. The job should stop after processing, not run continuously. Which approach should he use?
Next up: Auto Loader & Declarative Pipelines — scalable file ingestion and Lakeflow Spark Declarative Pipelines.