Insights from paper - Twitter Heron: Stream Processing at Scale
Abstract
Before I proceed, I recommend reading my previous post on the paper Storm @ Twitter. Heron was built as Storm was not up to the mark. So knowing Storm will give the necessary context.
Strom was the primary platform for real-time analytics at Twitter for quite some time. With the growth of data and use cases, Storm was falling short. The Twitter team sought better debug-ability, better performance, and an easy-to-manage system that can run on shared cluster infrastructure. There was no better alternative available, so the team built Heron.
Introduction
The biggest challenge with Storm was its ability to debug in production.
We discussed Storm and its issues for real-time analytics use cases at Twitter. A topology can misbehave because of load changes, misbehaving user code, or failing hardware. It is essential to determine the root causes quickly.
Storm has one operating process where multiple components of a topology are bundled. It makes debugging challenging. The team needed a mapping from the logical computation units to each physical process.
Another challenge was the need for a dedicated cluster for Storm. The team needed the ability to work with a shared cluster.
Storm required manual isolation of machines for a topology to run. It made machine provisioning cumbersome work.
The team tried to find an open-source alternative to fulfill their needs. Also, there were a lot of applications written for Storm, so compatibility with Storm was essential. There were no such options available.
So finally, they decided to build a new System, and this way, Heron was born. Heron is API-compatible with Storm.
Heron provides significant improvement in performance and low resource consumption over Storm.
Related Work
The Stanford Stream Data Manager, Aurora, and a few other systems have been available for quite some time. Apache Samza, MillWheel, Photon, Summingbird, S4, and Spark Streaming are some of the new development in this area. Now Stream processing capabilities are being integrated with traditional database products also.
Motivation for Heron
We already talked about the limitations of Storm. These limitations motivated the team to develop Heron.
Storm Background
Storm topology is a directed graph of spouts and bolts. Spouts are sources of input data, and bolts are an abstraction to represent computation on the stream.
Spouts often pull data from queues like Kafka and generate a stream of tuples fed into a network of bolts that carry out the required computation.
Spouts and bolts are run as tasks, and multiple such tasks are grouped into an executor. Multiple executors are grouped into a worker. Each worker runs as a JVM process. A single host may run multiple worker processes.
Storm Worker Architecture: Limitations
A Storm worker has a fairly complex design.
The OS schedules several worker processes in a host. Inside this JVM process, each executor is mapped to two threads. JVM scheduled these threads. Also, the executor implements another scheduling algorithm to invoke different tasks.
All this made uncertainty about when the tasks are being scheduled.
Logs from multiple tasks are written into a single file. It was challenging to identify any errors or exceptions associated with a task.
Storm assumed that every worker was homogenous but the actual load was not. So it created inefficiency in resource utilization.
Issues with the Storm Nimbus
In Storm, a master worker, Nimbus, performs several functions, including scheduling, monitoring, and distributing JARs. So Nimbus component is functionally overloaded and often becomes an operational bottleneck.
Lack of Backpressure
There was no way to figure out the backpressure in Storm. The sender drops tuples if the receiver component cannot handle incoming data/tuples. This strategy has different disadvantages.
If acknowledgments are disabled, there is no visibility about the drops. The work done by upstream components is lost. Also, the system behavior could be more predictable.
Efficiency
The team experienced several instances of unpredictable performance during topology execution in production. The most important causes were:
Suboptimal replays
Long Garbage Collection cycles
Queue contention
Heron
Data Model and API
The data model and API for Heron are identical to that of Storm. It was because the team planned to use applications written for Storm to be compatible with Heron.
Heron has topologies like Storm. A topology is a directed acyclic graph (DAG) of spouts and bolts.
A Heron topology is a logical plan converted into a physical one before execution.
A programmer specifies the number of tasks for each spout and each bolt and how the data is partitioned.
Heron supports at-most-once and at-least-once semantics like Storm.
Architecture overview
Heron architecture is shown in the above diagram.
Heorn has a scheduler called Aurora Scheduler. Users can deploy topologies on that using a command line tool. Aurora is a generic service scheduler that runs as a framework on Mesos.
Heron also supports a scheduler abstraction so that other scheduler’s such as YARN, Mesos, and ECS can be used.
This is a critical difference with Storm, where the team used open-source schedulers instead of re-inventing a new one.
Each topology is run as an Aurora job. Each topology consists of several containers.
The first container runs a process called the Topology Master.
The remaining containers each run a Stream Manager, a Metrics Manager, and a number of processes called Heron Instances.
Heron Instances are spouts/bolts that run the logic the user provides.
A physical node can run one or more containers.
Aurora allocates and schedules containers to all node base on the availability of resources on a node in the cluster.
A standby Topology Master can be run for availability.
There is one JVM per Heron Instance, and Heron processes communicate with each other using protocol buffers.
Topology Master
The Topology Master (TM) is responsible for managing the topology.
TM makes itself discoverable at startup by creating an ephemeral node in Zookeeper.
This ephemeral node serves two purposes:
It prevents multiple TMs from becoming the master
It allows any other process to discover the TM
Stream Manager
The Stream Manager (SM) is responsible for efficiently routing tuples.
Each Heron Instance (HI) connects to its SM to send and receive tuples.
All the SMs in a topology connect themselves.
The topology's physical plan has k SMs and n Heron Instances, where n is larger than k.
Topology Backpressure
Heron employs a backpressure mechanism to dynamically adjust the rate at which data flows through the topology. There are a few implementation strategies like below:
TCP Backpressure: This is based on TCP windowing mechanism. If an HI executes slowly, its receive buffer will start filling up. The SM pushing data to this HI will recognize this situation, as its send buffer will also fill up. The backpressure will only be cleared when the original (slow) HI starts catching up again.
Spout Backpressure: In this mechanism, the SMs clamp their local spouts to reduce the new data injected into the topology. This s used with TCP backpressure between the SMs and the HIs. When an SM realizes that one or more of its HIs are slowing down, it identifies its local spouts and stops reading their data.
Stage-by-Stage Backpressure: A topology can be viewed as consisting of multiple stages. In this approach, the team gradually propagates the backpressure stage-by-stage until it reaches the spouts. This strategy is also used with the TCP backpressure mechanism between the SMs and the HIs.
Implementation
Heron has implemented the spout backpressure strategy. It is easy to implement.
This strategy works well in practice. It also helps debug as we see when skew-related events happen and which component was the root cause of the backpressure trigger.
Every socket channel is associated with an application-level buffer bounded in size by both a high and a low watermark. Backpressure is triggered when the buffer size reaches the high water mark and remains in effect until the buffer size goes below the low water mark.
Heron Instance
Heron instances (HIs) do the actual work of spout and bolt.
We have already talked about each HI as one JVM process. It helps in debugging.
There are two designs to implement an HI.
Single-threaded approach:
In this approach, a central thread maintains a TCP communication channel to the local SM and waits for tuples.
The output of the user logic o input tuples is buffered and delivered to the local SM when the buffer exceeds a certain threshold.
There are a few challenges to this approach:
Invoking the sleep system calls for a finite duration of time
Using read/write system calls for file or socket I/O
Calling thread synchronization primitives
Two-threaded approach:
In this approach, the HIs have two threads, a Gateway thread, and a Task Execution thread.
The Gateway thread is responsible for three things:
Control all the communication and data movement in and out from the HI.
Receive incoming tuples from the local SM.
Send the tupples for the Task Execution thread for processing.
The task execution thread is responsible for running the user code.
The Gateway thread and the Task Execution thread communicate between themselves using three unidirectional queues.
The Gateway thread uses the data-in queue to push tuples to the Task Execution thread for processing.
The Task Execution thread uses the metrics-out queue to pass the collected metrics to the Gateway thread.
The size of data-out and data-in queues are checked periodically to avoid GC issues. and increased or decreased accordingly.
Metrics Manager
The Metrics Manager (MM) collects and exports metrics from all the components in the system. These metrics include system metrics and user metrics for the topologies.
There is one metrics manager for each container.
Startup Sequence and Failure Scenarios
Startup sequence
Aurora allocates the necessary resources and schedules the topology containers in several machines in the cluster.
The Topology Master (TM) comes up on the first container and makes itself discoverable using the Zookeeper ephemeral node.
Stream Manager (SM) on each container consults Zookeeper to discover the TM. It connects to the TM and periodically sends heartbeats.
When all the SMs are connected, the TM runs assign different components of the topology (spouts and bolts) to different containers. It is called the physical plan.
On completion, the SMs get the entire physical plan from the TM.
SMs connect to each other to form a fully-connected network.
During network formation, Heron instances (HI) come up, discover their local SM, download their portion of the physical plan, and start executing.
After completing these steps, data/tuples starts flowing through the topology.
Failure Scenarios
When the TM process dies, the container restarts the failed process, and the TM recovers its state from Zookeeper.
When an SM dies, it gets restarted in the same container, it rediscovers the TM, and initiates a connection to fetch the physical plan to check if there are any changes in its state.
When any container is rescheduled or relocated to a new machine, the newly minted SM discovers the TM and follows the same sequence of steps of an SM failure and an HI failure.
Architecture Features: Summary
Following is the summary of salient aspects of design:
The provisioning of resources is cleanly abstracted from the duties of the cluster manager.
Each Heron Instance is executing only a single task.
It is transparent which component of the topology is failing or slowing down.
Component-level resource allocation allows a topology writer to specify exactly the resources for each component.
Having a Topology Master per topology allows each topology to be managed independently.
The backpressure mechanism allows them to achieve a consistent rate of delivering results.
There is no single point of failure
I am omitting the “Heron in Production” section as it states only facts and figures.
Conclusions and Future Work
Storm could not meet the need for real-time stream analytics at Twitter, so Heron was created from scratch. It takes care of all the issues with Storm and provides the same interface as Storm so existing applications can work without any change.
The design of Heron allows supporting exactly once semantics, but the first version of Heron does not have this implementation.
References
The original Twitter Heron paper: https://dl.acm.org/doi/10.1145/2723372.2742788
My article on Storm@Twitter paper: https://www.hemantkgupta.com/p/insights-from-paper-storm-twitter
Apache Samza:
http://samza.incubator.apache.org
MillWheel: Fault-Tolerant Stream Processing at Internet Scale: Tyler Akidau, Alex Balikov, Kaya Bekiroglu, Slava Chernyak, Josh Haberman, Reuven Lax, Sam McVeety, Daniel Mills, Paul Nordstrom, Sam Whittle: PVLDB 6(11): 1033–1044 (2013)
Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams: Rajagopal Ananthanarayanan, Venkatesh Basker, Sumit Das, Ashish Gupta, Haifeng Jiang, Tianhao Qiu, Alexey Reznichenko, Deomid Ryabkov, Manpreet Singh, Shivakumar Venkataraman: SIGMOD 2013: 577–588
STREAM: The Stanford Stream Data Manager: IEEE Data Eng. Bull. 26(1): 19–26 (2003)Arvind Arasu, Brian Babcock, Shivnath Babu, Mayur Datar, Keith Ito, Rajeev Motwani, Itaru Nishizawa, Utkarsh Srivastava, Dilys Thomas, Rohit Varma, Jennifer Widom:
Summingbird: A Framework for Integrating Batch and Online MapReduce Computations: P. Oscar Boykin, Sam Ritchie, Ian O’Connell, Jimmy Lin: PVLDB 7(13): 1441–1451 (2014) [9] DataTorrent.
https://www.datatorrent.com
Kafka: A Distributed Messaging System for Log Processing: Jay Kreps, Neha Narkhede, and Jun Rao. SIGMOD Workshop on Networking Meets Databases, 2011.
S4 Distributed Stream Computing Platform: http://incubator.apache.org/s4/
Spark Streaming: https://spark.apache.org/streaming/