Structured Streaming#
Structured Streaming is Spark’s stream processing engine. It processes real-time data using the same DataFrame/Dataset API as batch processing.
What is Structured Streaming?#
Structured Streaming treats stream data as an infinitely appending table. When new data arrives, it performs incremental processing.
Core Concepts#
Input Stream (Infinite Table)
↓
[Data Arrival] → [Incremental Processing] → [Result Update]
↓
Output (Continuously Updated Result Table)Batch vs Streaming Code Comparison#
// Batch processing (static data)
Dataset<Row> batchDf = spark.read()
.json("input/data.json");
Dataset<Row> result = batchDf
.groupBy("category")
.count();
result.write().parquet("output");
// Stream processing (dynamic data) - almost identical!
Dataset<Row> streamDf = spark.readStream() // Changed to readStream
.json("input/");
Dataset<Row> result = streamDf // Same processing logic
.groupBy("category")
.count();
result.writeStream() // Changed to writeStream
.outputMode("complete")
.format("console")
.start()
.awaitTermination();Basic Usage#
Reading Streams#
SparkSession spark = SparkSession.builder()
.appName("Structured Streaming")
.master("local[*]")
.getOrCreate();
// File source (directory monitoring)
Dataset<Row> fileStream = spark.readStream()
.schema(schema) // Schema required for streams
.json("input/");
// Kafka source
Dataset<Row> kafkaStream = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic-name")
.load();
// Socket source (for testing)
Dataset<Row> socketStream = spark.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
// Rate source (for testing - generates specified rows per second)
Dataset<Row> rateStream = spark.readStream()
.format("rate")
.option("rowsPerSecond", 10)
.load();Processing Streams#
Apply the same operations as regular DataFrames to stream DataFrames:
// Filtering
Dataset<Row> filtered = stream.filter(col("value").gt(100));
// Transformation
Dataset<Row> transformed = stream
.withColumn("timestamp", current_timestamp())
.withColumn("doubled", col("value").multiply(2));
// Aggregation (stateful)
Dataset<Row> aggregated = stream
.groupBy("category")
.agg(
count("*").alias("count"),
sum("value").alias("total")
);Writing Streams#
StreamingQuery query = result.writeStream()
.outputMode("append") // Output mode
.format("parquet") // Output format
.option("path", "output/") // Output path
.option("checkpointLocation", "checkpoint/") // Checkpoint required
.trigger(Trigger.ProcessingTime("10 seconds")) // Trigger
.start();
// Wait for query termination
query.awaitTermination();
// Or run in background
// Manage with query.isActive(), query.stop() etc.Output Mode#
| Mode | Description | When to Use |
|---|---|---|
| append | Output only new rows | Simple transformations without aggregation |
| complete | Output entire result table | Aggregation queries |
| update | Output only changed rows | Aggregation queries (some sinks only) |
// append - new rows only (without aggregation)
filtered.writeStream()
.outputMode("append")
.format("parquet")
.start();
// complete - entire result (with aggregation)
aggregated.writeStream()
.outputMode("complete")
.format("console")
.start();
// update - changed only (with aggregation)
aggregated.writeStream()
.outputMode("update")
.format("console")
.start();Trigger#
Controls data processing frequency.
import org.apache.spark.sql.streaming.Trigger;
// Default - as fast as possible (micro-batch)
.trigger(Trigger.ProcessingTime(0))
// Specified interval
.trigger(Trigger.ProcessingTime("10 seconds"))
.trigger(Trigger.ProcessingTime("1 minute"))
// Run once (like batch)
.trigger(Trigger.Once())
// Process all available data then terminate
.trigger(Trigger.AvailableNow())
// Continuous processing (millisecond latency, experimental)
.trigger(Trigger.Continuous("1 second"))Kafka Integration#
Reading from Kafka#
Dataset<Row> kafkaStream = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic") // Single topic
// .option("subscribePattern", "topic.*") // Pattern
// .option("assign", "{\"topic\":[0,1,2]}") // Specific partitions
.option("startingOffsets", "earliest") // earliest, latest, or JSON
.load();
// Kafka message structure:
// key (binary), value (binary), topic, partition, offset, timestamp, ...
// Parse value
Dataset<Row> parsed = kafkaStream
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select(from_json(col("value"), schema).alias("data"))
.select("data.*");Writing to Kafka#
// Send results to Kafka
result
.selectExpr("CAST(key AS STRING)", "to_json(struct(*)) AS value")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "output-topic")
.option("checkpointLocation", "checkpoint/kafka-output")
.start();Window Operations#
Window functions for time-based aggregation.
Tumbling Window#
Non-overlapping fixed-size windows:
Dataset<Row> windowedCounts = stream
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "5 minutes"), // 5-minute window
col("category")
)
.count();
// Result:
// +------------------------------------------+--------+-----+
// |window |category|count|
// +------------------------------------------+--------+-----+
// |{2024-01-01 10:00:00, 2024-01-01 10:05:00}|A | 150|
// |{2024-01-01 10:05:00, 2024-01-01 10:10:00}|A | 180|
// +------------------------------------------+--------+-----+Sliding Window#
Overlapping windows:
Dataset<Row> slidingCounts = stream
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"), // 10-min window, 5-min slide
col("category")
)
.count();
// Each event can be included in 2 windowsSession Window#
Activity-based dynamic windows. The session is maintained while the user generates events, and it ends when there is no activity for the specified duration (5 minutes in the example below). Suitable for website visit session analysis, app usage pattern tracking, etc.:
// Spark 3.2+
Dataset<Row> sessionCounts = stream
.withWatermark("timestamp", "10 minutes")
.groupBy(
session_window(col("timestamp"), "5 minutes"), // Session ends after 5 minutes of inactivity
col("user_id")
)
.count();Watermark#
A mechanism for handling late-arriving data.
Dataset<Row> result = stream
.withWatermark("eventTime", "10 minutes") // Allow up to 10 minutes delay
.groupBy(
window(col("eventTime"), "5 minutes"),
col("category")
)
.count();
// Watermark = max(eventTime) - 10 minutes
// Data older than watermark is ignoredWatermark Behavior#
Event time order:
10:00 → 10:05 → 10:03 (late) → 10:10 → 09:55 (very late)
Watermark = 10:00 (10 minutes delay allowed)
- 10:03 event: After watermark (10:00) → Processed
- 09:55 event: Before watermark (10:00) → IgnoredState Management#
Aggregation queries maintain state.
State Store Configuration#
// Use RocksDB state store (suitable for large state)
spark.conf().set(
"spark.sql.streaming.stateStore.providerClass",
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
);
// State store memory settings
spark.conf().set("spark.sql.streaming.stateStore.rocksdb.memory.mb", "256");Custom State Management#
groupBy().agg() only supports standard aggregations like sum and average. However, for complex state logic such as “tracking consecutive login days per user” or “analyzing behavior patterns within a session”, use mapGroupsWithState or flatMapGroupsWithState.
These APIs maintain a custom state object for each key (group), and you write a function that updates the state whenever new data arrives. mapGroupsWithState returns one result per group, while flatMapGroupsWithState can return multiple results.
Advanced TopicmapGroupsWithStateandflatMapGroupsWithStateare the most flexible APIs in Structured Streaming, but require care with state size management and timeout settings. If standard aggregation can solve the problem, considergroupBy().agg()first.
State Timeout#
// Set timeout for group state (mapGroupsWithState/flatMapGroupsWithState)
.groupByKey(row -> row.getString(0), Encoders.STRING())
.mapGroupsWithState(
mappingFunc, // State update function
Encoders.bean(State.class), // State object encoder
Encoders.bean(Output.class), // Output object encoder
GroupStateTimeout.ProcessingTimeTimeout() // Or EventTimeTimeout
);Joins#
Stream-Static Join#
Join stream with static data:
Dataset<Row> staticDf = spark.read().parquet("dimension-data");
Dataset<Row> enriched = stream.join(
staticDf,
stream.col("product_id").equalTo(staticDf.col("id")),
"left"
);Stream-Stream Join#
Join two streams (watermark required). For example, you can match an impressions stream with a clicks stream to count only “clicks within 1 hour of impression” as valid conversions:
Dataset<Row> impressions = spark.readStream()
.format("kafka")
.option("subscribe", "impressions")
.load()
.withWatermark("timestamp", "2 hours");
Dataset<Row> clicks = spark.readStream()
.format("kafka")
.option("subscribe", "clicks")
.load()
.withWatermark("timestamp", "3 hours");
// Time range join condition
Dataset<Row> joined = impressions.join(
clicks,
expr("""
impressionId = clickImpressionId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter"
);Query Monitoring#
StreamingQuery query = result.writeStream()
.format("console")
.start();
// Check query status
System.out.println("Query ID: " + query.id());
System.out.println("Run ID: " + query.runId());
System.out.println("Is Active: " + query.isActive());
System.out.println("Status: " + query.status());
// Progress
StreamingQueryProgress progress = query.lastProgress();
if (progress != null) {
System.out.println("Input rows: " + progress.numInputRows());
System.out.println("Processing rate: " + progress.processedRowsPerSecond());
}
// Stop query
query.stop();Query Listener#
spark.streams().addListener(new StreamingQueryListener() {
@Override
public void onQueryStarted(QueryStartedEvent event) {
System.out.println("Query started: " + event.id());
}
@Override
public void onQueryProgress(QueryProgressEvent event) {
System.out.println("Progress: " + event.progress().numInputRows() + " rows");
}
@Override
public void onQueryTerminated(QueryTerminatedEvent event) {
System.out.println("Query terminated: " + event.id());
}
});Practical Example: Real-Time Sales Aggregation#
public class RealTimeSalesAggregation {
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession.builder()
.appName("Real-Time Sales")
.master("local[*]")
.getOrCreate();
// Define schema
StructType schema = new StructType()
.add("orderId", StringType, false)
.add("productId", StringType, false)
.add("category", StringType, false)
.add("amount", DoubleType, false)
.add("timestamp", TimestampType, false);
// Read order data from Kafka
Dataset<Row> orders = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "orders")
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), schema).alias("order"))
.select("order.*")
.withWatermark("timestamp", "5 minutes");
// Aggregate sales by category per 5-minute window
Dataset<Row> salesByCategory = orders
.groupBy(
window(col("timestamp"), "5 minutes"),
col("category")
)
.agg(
count("*").alias("orderCount"),
sum("amount").alias("totalSales"),
avg("amount").alias("avgOrderValue")
);
// Console output (for debugging)
StreamingQuery consoleQuery = salesByCategory.writeStream()
.outputMode("update")
.format("console")
.option("truncate", false)
.trigger(Trigger.ProcessingTime("30 seconds"))
.start();
// Send results to Kafka
StreamingQuery kafkaQuery = salesByCategory
.selectExpr("to_json(struct(*)) AS value")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "sales-summary")
.option("checkpointLocation", "checkpoint/sales-summary")
.outputMode("update")
.start();
spark.streams().awaitAnyTermination();
}
}Next Steps#
- MLlib - Machine learning with Spark
- Performance Tuning - Streaming performance optimization