Here’s a document capturing the details you requested, with examples and code:
Structured Streaming Concepts in Apache Spark
Apache Spark's structured streaming provides a powerful framework for processing real-time data streams. It offers various output modes, triggers, fault tolerance, and aggregation mechanisms. This document outlines different output modes, processes, categories of transformations, and the Amazon use case scenario.
Different Output Modes
-
Append Mode
- In this mode, only new rows added to the stream will be written to the output sink.
- Example: If new orders are placed, the new rows will be appended to the output, with no update to previously processed data.
-
Update Mode
- Here, all rows updated in the stream since the last trigger will be written to the output sink.
- Example: If an order amount is updated for an existing customer, only the updated rows will be written to the output.
-
Complete Mode
- In this mode, all the rows of the aggregation result will be written to the output sink, which includes both new and updated rows.
- Example: For windowed aggregations, the entire aggregation is written to the output each time, including the aggregated results for all time windows.
Important Processes in Structured Streaming
Note on Microbatch Engine
Spark Structured Streaming operates as a microbatch engine. It does not offer true streaming but simulates it by breaking data into small batches. This approach allows Spark to process data in near real-time.
Categories of Streaming Transformations
- Windowed Aggregations
- These are time-bound operations where data is processed in fixed intervals. Aggregations like sum, average, etc., are performed on time windows.
from pyspark.sql.functions import window, sum window_agg_df = orders_df.groupBy(window("order_date", "15 minutes")).agg(sum("amount").alias("total_sales"))
Amazon Use-Case
Requirement: Track all customer order details from when they first registered on Amazon.
- For 300+ million customers, maintaining a state store is challenging due to the large number of records.
- Solution Approach:
- Time-bound aggregations (weekly, monthly) are feasible since maintaining such records for an unbounded time (from registration onward) would be inefficient.
Approaches for Aggregation:
-
Without Custom Function (State Store Created)
- A state store is created as soon as an aggregation is performed.
- Code Example:
orders_df.groupBy(window("order_date", "1 day")).agg(sum("amount").alias("daily_sales"))
- Spark UI shows the state-store under Structured Streaming tab.
-
With Custom Function (No State Store)
- Aggregations are implemented in a custom function, so no state store is used.
- Code Example:
def custom_agg(df): # Custom aggregation logic return df.groupBy("customer_id").agg(sum("amount").alias("total_spent"))
Triggers
Since Spark operates as a microbatch engine, triggers are used to control when a microbatch is executed. There are three types of triggers:
1. Unspecified (Default Trigger)
- The next microbatch is triggered only when new data arrives.
- Example Scenario: If a microbatch processes File1 and File2, the next microbatch will trigger when File3 arrives.
2. Fixed Interval
- Microbatches are triggered at regular intervals, irrespective of whether new data has arrived.
- Syntax:
.trigger(processingTime="5 minutes")
- Example Scenario:
- If the first microbatch finishes in 2 mins, it waits for 3 mins to meet the 5-min interval before triggering the next microbatch.
- If the first microbatch finishes in 8 mins, the next microbatch is triggered immediately.
3. Available Now
- The trigger processes all available data and then stops, which resembles batch processing.
- Syntax:
.trigger(availableNow=True)
Fault Tolerance and Exactly Once Semantics
Checkpointing
- Spark ensures fault tolerance by using checkpointing, which stores the microbatch state (e.g., read positions, state information).
- Checkpoint Example:
df.writeStream.option("checkpointLocation", "/path/to/checkpoint").start()
Exactly Once Semantics
- To guarantee no data is missed and no duplicates are created:
- Idempotent sinks: Overwrite the previously written records when reprocessing data after a failure.
Replayable Sources
- Kafka and File Sources are replayable, meaning data can be re-read after failure.
- Socket source is not replayable.
Types of Aggregations
-
Continuous Aggregations (Unbounded)
- These aggregations have no time limits.
- Example: Retail store customer reward points that need to be maintained indefinitely.
-
Time-Bound Aggregations (Windowing Aggregations)
- Data is aggregated over fixed time windows.
- Tumbling Window Aggregation Example:
windowed_df = orders_df.groupBy(window("order_date", "15 minutes")).agg(sum("amount").alias("total_sales"))
Watermark Feature in Streaming
- Watermarking is used to handle late-arriving records by specifying an expiry duration.
- Syntax to set Watermark:
df.withWatermark("order_date", "30 minutes")
This ensures that data older than the watermark is ignored, and the state store is cleaned up periodically.
Final Example: Aggregating Sales Every 15 Minutes with Watermarking
Solution:
- Tumbling window aggregation for sales every 15 minutes with watermarking to handle late-arriving data.
Code Example:
from pyspark.sql.functions import window, sum
# Define schema and read data
orders_df = spark.readStream.schema(order_schema).json("path/to/orders")
# Apply windowing and watermarking
window_agg_df = orders_df.withWatermark("order_date", "30 minutes") \
.groupBy(window("order_date", "15 minutes")) \
.agg(sum("amount").alias("total_sales"))
# Write the result to console
window_agg_df.writeStream.outputMode("update").trigger(processingTime="5 minutes") \
.option("checkpointLocation", "/path/to/checkpoint") \
.start()
Summary
This document provides an overview of Spark Structured Streaming, detailing different output modes, triggers, fault tolerance mechanisms, aggregation types, and the handling of late-arriving records using watermarks. The provided code examples demonstrate practical implementations of these concepts in a real-world scenario.