Insights form Paper: Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark
1. Abstract
Structured Streaming is a high-level API in Apache Spark. The API is based on the previous high-level API called Spark Streaming.
If you want to learn about Spark and its high-level APIs, read my post about Apache Spark.
It facilitates scalable, easy-to-use, and integrated streaming systems.
It differs from other streaming solutions because it offers a purely declarative model.
Structured Streaming is designed to support end-to-end real-time applications that seamlessly integrate streaming with batch and interactive analysis.
It uses Spark SQL’s code generation engine.
Structured Streaming delivers high performance, significantly outperforming other systems like Apache Flink and Kafka Streams in various benchmarks.
It includes robust operational features such as fault tolerance, rollback capabilities, and hybrid batch/streaming execution.
This paper presents the design, architecture, and critical use cases of Structured Streaming.
2. Introduction
There has been tremendous progress in the distributed stream processing systems in the past few years, but these systems remain pretty challenging to use in practice.
There are two challenges.
First, streaming systems often require users to consider complex physical execution concepts, such as at least once delivery, state storage, and triggering modes.
Second, many systems focus only on streaming computation, but in actual use cases, streaming is often part of a more extensive business application that includes batch analytics, joins with static data, and interactive queries.
Structured Streaming differs from other widely used open-source streaming APIs in two ways:
Declarative API Based on Incrementalizing Static Queries: Structured Streaming offers a purely declarative API that automatically increments the size of a static relational query expressed using SQL or DataFrames.
Support for End-to-End Real-Time Applications: Structured Streaming is designed to integrate seamlessly with batch and interactive analysis. This integration is crucial for building comprehensive business applications that often require combining streaming data with static datasets and performing interactive queries.
3. Stream Processing Challenges
There are four significant challenges in stream processing:
3.1 Complex and Low-Level APIs
Streaming systems often present APIs that are more complex than those used in batch processing due to the unique concerns of streaming, such as handling intermediate results before all relevant data is received.
Many APIs require users to define applications at the level of physical operators, which involves complex semantics. This complexity makes streaming systems challenging to use.
For example, Google's Dataflow model requires detailed configuration for event time aggregation, windowing, and out-of-order data handling, which can be challenging for users.
3.2 Integration in End-to-End Applications
Streaming is often just one part of a more extensive application that includes batch analytics, joins with static data, and interactive queries.
Integrating streaming systems into these broader applications poses significant engineering challenges, such as maintaining transactional consistency across different systems and ensuring that streaming and batch processes work together seamlessly.
3.3 Operational Challenges
Deploying and managing streaming applications is complex due to several operational issues, including handling failures, updating code, rescaling applications, managing stragglers, and monitoring system health.
These issues require robust support for fault tolerance, dynamic scaling, and clear visibility into the system's operations.
3.4 Cost and Performance Challenges
Streaming applications typically run continuously, leading to resource efficiency and cost concerns.
Without dynamic rescaling, resources can be wasted during low-traffic periods.
Additionally, achieving the right balance between throughput (handling large volumes of data) and latency (minimizing delay in data processing) is a critical challenge for streaming systems.
4. Structured Streaming Overview
Structured Streaming aims to tackle the challenges discussed in the previous section by combining API and execution engine design.
Input and Output
Structured Streaming supports a variety of input sources and output sinks.
Input sources must be replayable, allowing the system to re-read recent data if a failure occurs. Amazon Kinesis or Apache Kafka helps in this.
Output sinks must support idempotent writes, ensuring that data can be reliably recovered even if a node fails during writing.
Structured Streaming can also work with Spark SQL tables as inputs and outputs, allowing seamless integration with batch and interactive analytics.
API
Structured Streaming offers a high-level, declarative API built on Spark SQL's existing batch APIs (SQL and DataFrames).
This API allows users to define streaming queries similarly to batch queries.
The API includes features like triggers to control the frequency of updates, event time processing for accurate data handling based on when events occurred, and stateful operators for managing complex stream processing tasks.
The declarative nature of the API abstracts away the complexities of physical execution.
Execution
Structured Streaming can operate in two execution modes: microbatching and continuous processing.
The micro-batch mode divides work into small tasks, which are dynamically scheduled across nodes. This mode ensures load balancing, fault recovery, and scalability. It leverages Spark’s existing batch-processing engine to optimize throughput.
Continuous processing mode, introduced in Spark 2.3, enables lower latency by running long-lived operators for certain types of queries. However, it offers less operational flexibility than micro batching.
Both modes use a combination of write-ahead logs and state stores to ensure fault tolerance and consistent state management across the system.
Operational Features
Structured Streaming includes several features to simplify the operation of streaming applications.
These include robust fault tolerance, support for updating user-defined functions (UDFs) without restarting the entire job, and manual rollback capabilities to recover from errors or incorrect outputs.
Additionally, the system supports hybrid batch and streaming execution, allowing the same code to be used for both scenarios.
Adaptive batching allows the system to handle spikes in data volume efficiently, and built-in monitoring tools provide visibility into system performance and health.
5. Programming Model
Structured Streaming combines elements of Google Dataflow, incremental queries, and Spark Streaming to enable stream processing.
5.1 A Short Example
Structured Streaming operates within Spark’s structured data APIs: SQL, DataFrames, and Datasets. The central abstraction is tables. The DataFrames or Dataset classes represent it.
When users create a table or data frame from a streaming input source and attempt to compute it, Spark automatically launches a streaming computation.
For example, let us start with a batch job that counts clicks by country of origin for a web application.
// Define a DataFrame to read from static data
val data = spark.read.format("json").load("/in")
// Transform it to compute a result
val counts = data.groupBy($"country").count()
// Write to a static data sink
counts.write.format("parquet").save("/counts")
To convert this batch job into a streaming job, only the input and output sections of the code need to be modified:
// Define a DataFrame to read streaming data
val data = spark.readStream.format("json").load("/in")
// Transform it to compute a result
val counts = data.groupBy($"country").count()
// Write to a streaming data sink
counts.writeStream.format("parquet")
.outputMode("complete")
.start("/counts")
5.2 Programming Model Semantics
The programming model of Structured Streaming is designed to simplify stream processing by abstracting complex concepts.
It treats input streams as continuously updating tables and executes queries incrementally to produce real-time results.
The model ensures prefix consistency, meaning that the output at any given time is consistent with a specific prefix of the input data, and new data will not invalidate previous results.
Triggers control when the system processes new data, and different output modes (complete, append, update) determine how results are written to output sinks.
5.3 Streaming-Specific Operators
Event Time Watermarks:
Event time watermarks are used to manage late-arriving data in streaming applications. They allow the system to track the "lateness" of data and determine when it is safe to finalize the results for a specific time window.
The watermark defines a threshold beyond which the system will no longer accept late data for a particular event time.
Stateful Operators:
Stateful operators in Structured Streaming allow users to maintain and update state across multiple records in a stream.
These operators, such as mapGroupsWithState
and flatMapGroupsWithState
, enable complex processing like session management or custom aggregation.
Users can define custom logic for how the state is updated, persisted, and eventually timed out.
These operators are crucial for applications that require maintaining context or history.
6. Query Planning
The team uses Catalyst extensible optimizer in Spark SQL. It allows composable rules to be written using pattern matching in Scala.
Query planning proceeds in three stages: analysis to determine whether the query is valid, incrementalization, and optimization.
6.1 Analysis
In the analysis stage, Structured Streaming validates the user’s query to ensure it can be executed incrementally.
It verifies that the chosen output mode (such as complete
, append
, or update
) is compatible with the query.
For example, the append
mode can only be used with queries where the output is guaranteed to be monotonic.
6.2 Incrementalization
Once the query passes the analysis stage, Structured Streaming transforms it into an incremental form.
Incrementalization refers to adapting a static query to run efficiently on streaming data.
The goal is to ensure that the query's result can be updated in a time proportional to the new data received.
The engine supports various query types, including selections, projections, joins, and aggregations.
During incrementalization, the system maps these logical operations to physical operators that manage computation and state.
6.3 Query Optimization
In the final stage, Structured Streaming applies optimization techniques to the incremental query plan.
This involves using Spark SQL’s existing Catalyst optimizer.
It includes rules for predicate pushdown, projection pushdown, and other optimizations that reduce the amount of data processed and improve query performance.
Structured Streaming benefits from Spark SQL’s Tungsten execution engine, which uses a compact binary format for data and generates optimized Java bytecode to execute queries.
These optimizations ensure streaming queries run as efficiently as possible, leveraging the same advanced techniques Spark SQL uses for batch processing.
7. Application Execution
The final component of Structured Streaming is its execution strategy.
In this section, we will learn how the engine tracks state and then the two execution modes: microbatching via fine-grained tasks and continuous processing using long-lived operators.
7.1 State Management and Recovery
Structured Streaming maintains the state of a streaming application using two types of durable storage: a write-ahead log and a state store. This approach is similar to the one used in Spark Streaming. The approach ensures fault tolerance and enables recovery in case of failures.
Write-Ahead Log: The write-ahead log records data processed and successfully written to the output sink. This log is crucial for ensuring that, even if a node fails, the system can recover by replaying the log and reprocessing only the necessary parts of the stream.
State Store: The state store holds snapshots of the operator states, especially for long-running aggregation operations. These snapshots are written asynchronously, meaning they may lag slightly behind the most recent data, but they provide a consistent basis for recovery in case of failure. The state store can be implemented on distributed storage systems like HDFS or Amazon S3.
In the event of a failure, Structured Streaming can reconstruct the application's state by reading the write-ahead log and the latest snapshots from the state store. For consistency, the system reprocesses the data from the point of failure.
7.2 Microbatch Execution Mode
In micro-batch mode, Structured Streaming operates similarly to the Discretized Streams (DStreams) model used in Spark Streaming. The data stream is divided into small, manageable batches. Each batch is processed as a Spark job composed of independent tasks. This mode offers several advantages:
Dynamic Load Balancing: Since work is divided into tasks, the system can dynamically distribute these tasks across nodes, balancing the load and ensuring efficient resource utilization.
Fine-Grained Fault Recovery: If a node fails, only the tasks running on that node must be rerun. This reduces recovery time and allows for parallel task execution during recovery.
Straggler Mitigation: Spark can detect slow-running tasks and launch backup copies on other nodes. The result from the fastest task is used, minimizing the impact of slow or faulty nodes.
Rescaling: The micro-batch model makes scaling the application by adding or removing nodes easy, as tasks are automatically scheduled based on the available resources.
7.3 Continuous Processing Mode
The continuous processing mode was introduced in Apache Spark 2.3 to enable lower-latency execution for streaming queries. It operates using long-lived operators, similar to traditional streaming systems.
Long-Lived Tasks: In this mode, tasks run continuously, processing data as it arrives without breaking it into batches. This reduces the overhead of scheduling tasks and launching jobs, allowing for much lower latency.
Epoch Coordination: The system still uses epochs to coordinate between nodes and ensure consistent output. The master node periodically triggers new epochs, which define logical checkpoints for the input stream, ensuring that the system can maintain a consistent state and recover from failures.
This mode is particularly useful for latency-sensitive applications where the delay between data ingestion and processing needs to be minimized.
8. Operational Features
In this section, we will learn several key capabilities of Structured Streaming that simplify streaming applications' management, deployment, and operation.
8.1 Code Updates
In Structured Streaming, developers can update the application code, including UDFs, and restart the application without losing the processing state or reprocessing the entire stream.
The system is designed to resume processing from the point it paused using the updated code.
8.2 Manual Rollback
An application might produce incorrect results due to bugs, misconfigurations, or unexpected data.
Structured Streaming allows administrators to roll back the application to a previous state manually
8.3 Hybrid Batch and Streaming Execution
Structured Streaming supports running the same code in both streaming and batch modes.
Run-Once Triggers: One practical application of this feature is using "run-once" triggers, which allow users to execute a streaming application as a batch job. This is particularly useful for Extract, Transform, and Load (ETL) jobs that don't need to run continuously but still benefit from a streaming engine's transactional and state management features.
Cost Savings: Organizations can achieve significant cost savings by running ETL jobs in batch mode periodically rather than keeping a streaming job active 24/7.
Adaptive Batching: Structured Streaming can dynamically adjust the size of microbatches in response to data backlog or spikes in input data. This allows the system to catch up with delayed data processing without significantly increasing latency and then return to low-latency processing once the backlog is cleared.
8.4 Monitoring
Structured Streaming uses Spark’s existing metrics API and structured event log to report the number of records processed, bytes shuffled across the network, etc.
8.5 Fault and Straggler Recovery
Structured Streaming’s micro-batch mode can recover from node failures, stragglers, and load imbalances using Spark’s fine-grained task execution model. The continuous processing mode recovers from node failures but does not yet protect against stragglers or load imbalance.
9. Production Use Cases
The paper gives a few famous examples of use cases.
Information Security Platform
Monitoring Live Video Delivery
Analyzing Game Performance
Cloud Monitoring at Databricks
11. Related Work
Structured Streaming builds on many existing systems for stream processing and big data analytics, including Spark SQL’s DataFrame API, Spark Streaming, Dataflow, incremental query systems, and distributed stream processing.
12. Conclusion
Simplification of Streaming Systems: Structured Streaming is designed to make stream processing more accessible by adopting a high-level, declarative API.
Integration with Batch Processing: Structured streaming's significant advantage is its ability to integrate batch and streaming workloads within a single framework.
Operational Robustness: The system includes powerful operational features that address common challenges in deploying and managing streaming applications.
High Performance: Structured Streaming delivers high throughput and low latency by leveraging Spark SQL's execution engine.