Insights from paper: Procella: Unifying serving and analytical data at YouTube
1. Abstract
Youtube has to deal with exploding data volume and increasing demand for data-driven applications.
YouTube solved this problem by building a new SQL query engine called Procella.
Procella implements capabilities for reporting and dashboarding, embedded statistics in pages, time-series monitoring, and ad hoc analysis.
2. Introduction
YouTube generates trillions of new data items daily, based on billions of videos, hundreds of millions of creators and fans, billions of views, and billions of watch time hours.
This data is used for different use cases. Let’s understand them one by one.
Reporting and dashboarding:
Video creators, content owners, and various internal stakeholders need access to detailed real-time dashboards to understand how their videos and channels perform.
The system should support executing tens of thousands of queries per second with low latency (tens of milliseconds). The queries may use filters, aggregations, set operations, and joins.
The challenge is ever-growing data - hundreds of billions of new rows are added daily.
Embedded statistics:
YouTube exposes many real-time statistics to users, such as likes or views of a video.
These are very high cardinality queries. These values are constantly changing, so the system must support millions of real-time updates concurrently with millions of low-latency queries per second.
Monitoring:
The query volume is lower for monitoring, but additional data management functions, such as automatic downsampling, expiry of old data, and additional query features, are needed.
Ad-hoc analysis:
Data scientists, business analysts, product managers, and engineers need to perform complex ad hoc queries.
There is a low volume of these queries (at most tens per second), and they have moderate latency (seconds to minutes).
The query patterns are very unpredictable.
Most organizations ( including Google and YouTube) rely on working with different storage and query backends to meet all these requirements. Some of those are Google Dremel, Google Mesa, Google BigTable, Google Monarch, and Vitess.
There are challenges with these systems:
Data needed to be loaded into multiple systems using different Extract, Transform, and Load (ETL) processes.
Each internal system uses different languages and APIs, so managing that is very cumbersome.
Many underlying components had performance, scalability, and efficiency issues with the YouTube data scale.
YouTube has to solve all these problems, so they built a new distributed query engine. Procella implements the following features:
Rich API: It supports an almost complete standard SQL, including complex multistage joins, analytic functions, and set operations, with several useful extensions.
High Scalability: It separates compute and storage. So that both can be scaled independently.
High Performance: It uses state-of-the-art query execution techniques to enable the execution of high volume (millions of QPS) queries with very low latency (milliseconds).
Data Freshness: It supports high-volume, low-latency data ingestion in batch and streaming modes and natively supports lambda architecture.
3. Architecture
3.1 Google Infrastructure
Procella is designed to run on Google’s infrastructure.
Disaggregated storage
All the durable has to be stored in the Google File System (now known as Colossus). Colossus is almost infinitely scalable. It is different from local storage in the following ways:
Data is immutable
Common metadata operations, such as listing and opening files, can have significantly higher latency.
All durable data is remote.
Shared Compute
All servers must run on Borg ( internal Kubernetes cluster). There are the following implications of that:
Vertical scaling is challenging. Each server runs many tasks, each potentially with different resource needs.
Borg master can often bring down machines for maintenance, upgrades, etc.
A typical Borg cluster will have thousands of inexpensive machines with varying hardware configurations. The performance of a task can be unpredictable.
The diagram below shows the overall Procella architecture.
3.2 Procella’s Data
3.2. 1 Data Storage
Procella data is logically organized into tables. Each table's data is stored across multiple files (tablets/partitions). Procella uses its columnar format called Artus.
Most of the data is in Artus, but other formats like Capacitor are also supported.
Since all data is stored in Colossus, there is proper decoupling between compute and storage.
3.2.2 Metadata Storage
Procella does not use conventional B-tree-style secondary indexes. Instead, it uses lightweight secondary structures such as zone maps, bitmaps, bloom filters, partitions, and sort keys.
The metadata server serves this information during query planning time.
The metadata store has schemas, table-to-file mapping, stats, zone maps, and other metadata. Google BigTable or Google Spanner powers the store.
3.2.3 Table management
Procella uses standard DDL commands for table management. The commands are sent to the registration server (RgS), which stores this information in the metadata store mentioned in the previous section.
For real-time tables, the user can specify how to age out, down-sample, or compact the data. This information is used to monitor applications and is also required to support the lambda architecture.
3.2.4 Batch ingestion
Procell supports batch data ingestion. Offline processes like MapReduce are used for data generation. They are registered using DDL RPC to the RgS servers.
There are no prerequisites regarding data organization for registering data with Procella. However, for best performance, data should be laid out optimally using partitioning, sorting, etc.
The RgS does sanity checks during the table and data registration step.
3.2.5 Realtime ingestion
Procell also supports the ingestion of data using real-time streaming.
The ingestion servers (IgS) are used for this.
Users can stream the data into IgS using a supported streaming mechanism such as RPC or PubSub.
The IgS receives the data, optionally transforms it, and appends it to a write-ahead log on Colossus. Compaction tasks that provide durable ingestion compact the write-ahead log in the background.
Parallely, IgS also sends the data to the data servers (DS) according to the table's data partitioning scheme.
The IgS (optionally) sends the data to multiple data servers for redundancy.
3.2.6 Compaction
The compaction server periodically compacts and repartitions the logs written by the IgS.
The compaction server can apply user-defined SQL-based logic to reduce data size by filtering, aggregating, aging out old data, keeping only the latest value, etc.
After each cycle, the compaction server updates the metadata store (through the RgS), removing metadata about the old files and inserting metadata about the newly generated files.
3.3 Query Lifecycle
Clients send SQL queries to Root servers ( RS).
The RS performs query rewrites, parsing, planning, and optimizations to generate the execution plan. RS uses metadata such as schema, partitioning, and index information from the Metadata Server (MDS) to prune the files to be read.
RS orchestrates the query execution as it goes through the different stages.
RS builds a tree composed of query blocks as nodes and data streams as edges (Aggregate, Execute Remotely, Stagger Execution, etc.) and executes it accordingly for complex distributed queries.
This graph structure allows several optimizations by inserting custom operations into the tree based on the query structure.
The Data Server (DS) receives plan fragments from the RS or another DS and does most of the heavy lifting, such as reading the required data, executing the plan fragment, and sending the results back to the requesting RS or DS.
Data servers use Stubby to exchange data with other data servers.
They use RDMA for shuffle (BigQuery shuffle library).
4. Optimizations
Procella employs several techniques to achieve high query performance. Let’s discuss those.
4.1 Caching
Procella achieves high scalability and efficiency by segregating storage (in Colossus) from compute (on Borg) and employing multiple caches to enhance performance.
Colossus metadata caching: The data servers cache the file handles to avoid file open calls.
Header caching: The header of columnar files contains metadata such as start offset, column size, and minimum and maximum values. Procella data servers cache the headers in a separate LRU cache.
Data caching: The Procella data format, Artus, is designed so that data has the exact representation in memory and on disk, making cache population fairly cheap.
Metadata caching: Procella scales metadata storage using a distributed storage system (Bigtable or Spanner) to store metadata and a distributed metadata service. The metadata servers cache the metadata information in a local LRU cache.
Affinity scheduling: Procella implements affinity scheduling to the data and metadata servers to ensure that operations on the same data/metadata go to the same server with high probability.
4.2 Data format
Initially, Procella used Capacitor, a data format primarily designed for large scans. Later, the team built a new columnar file format called Artus, designed for high performance on lookups and scans.
Let’s go into detail about a few of Artus's capabilities.
It uses custom encodings and avoids using generic compression algorithms like LZW.
It does multi-pass adaptive encoding. To achieve higher compression, Artus uses various methods to encode data, including dictionary encoding with various dictionary and indexer types, run length, delta, etc..
It chooses encodings that allow binary search for sorted columns.
It uses a better than ColumnIO representation for nested and repeated
data types.
It directly exposes dictionary indices, Run Length Encoding information, and other encoding information to the evaluation engine.
It records rich metadata in the file and column header.
It supports storing inverted indexes.
The diagram above presents the results of a simple benchmark comparing Artus with Capacitor on typical query patterns on a typical YouTube Analytics dataset.
Table 1 below lists the queries used for the benchmark. Table 2 shows Artus's uncompressed in-memory size and LZW-compressed on-disk size compared to Capacitor for the data set used for the benchmark.
4.3 Evaluation Engine
High-performance evaluation is critical for low-latency queries.
Most systems use LLVM to compile the execution plan to native code at query time.
Procella's evaluation engine, Superluminal, serves both analytical and high QPS use cases.
Let’s see the capabilities of Superluminal:
It makes extensive use of C++ template metaprogramming for compile-time code generation.
It processes data in blocks to exploit vectorized computation and cache-aware algorithms.
It operates on the underlying data encodings natively and preserves them during function application wherever possible.
It processes structured data in a fully columnar fashion.
It dynamically combines filters and pushes them down the execution plan. This allows the system to scan only the rows required for each column independently.
The diagram below shows Superluminal’s performance.
The team benchmarked the aggregation performance of Superluminal against the open-source Supersonic engine. See the results in the diagram below:
4.4 Partitioning & Indexing
Procella supports multi-level partitioning and clustering.
The fact tables are typically date partitioned. For a given date, they clustered by multiple dimensions. Dimension tables would typically be partitioned and sorted by the dimension key.
The above approach helps Procella prune tablets that do not need to be scanned and perform co-partitioned joins without large shuffling.
The MDS is responsible for efficient client storage and retrieval. It is implemented as a distributed service with affinity scheduling.
The table above shows the performance of the MDS tablets/partition pruning. The pruning is performed based on the filtering criteria within the query.
4.5 Distributed operations
4.5.1 Distributed Joins
Procella has several join strategies that can be controlled explicitly or implicitly.
Broadcast: This is the simplest join, where one side is small enough to be loaded into the memory of each DS running the query.
Co-partitioned: This strategy can be used when the fact and dimension tables can be partitioned on the same (join) key.
Shuffle: When both sides are large, and neither is partitioned on the join key, data is shuffled on the join key to a set of intermediate servers.
Pipelined: When the RHS is a complex query but likely to result in a small set, it is executed first. The result is inlined and sent to the LHS shards, resulting in a broadcast-like join.
Remote lookup: This strategy can be used if the dimension table(build side) is large and partitioned on the join key and the fact table(prob side) is not on the join key. The DS sends remote RPCs to the DS serving the build-side tablets to get the required keys and values for the joins.
4.5.2 Addressing Tail Latency
Procella employs a few techniques to achieve low tail latencies for its queries.
The RS maintains quantiles of DS response latency dynamically while executing a query. If a request takes significantly longer than the median, it sends a backup RPC to a secondary data server.
The RS limits the rates of batch requests to the DS for large queries to avoid overwhelming the same data server with too many requests.
The RS attaches a priority to each request it sends to the DS. The DS, in turn,
maintains separate threads for high and low-priority requests. This ensures faster response for the smaller queries.
Intermediate Merging
Some queries need heavy aggregation. To speed up those, Procella adds an intermediate operator at the input of the final aggregation, which buffers the data and dynamically spins up additional threads to perform intermediate aggregations.
4.6 Query Optimization
4.6.1 Virtual Tables
The most common technique to maximize the performance of low latency high QPS queries is to use materialized views.
The core idea is to generate multiple aggregates of the underlying base table and choose the right aggregate at query time. Procella uses virtual tables for this. They support:
Index-aware aggregate selection: Choose the right table(s) based not just on size but also on data organization, such as clustering, partitioning, etc
Stitched queries: Stitch together multiple tables to extract different metrics.
Lambda architecture awareness: Stitch together multiple tables with different time range availabilities.
Join awareness: The virtual table layer understands star joins and can automatically insert joins in the generated query.
4.6.2 Query Optimizer
Procella has a query optimizer that uses static and adaptive query optimization techniques.
Compile time optimizations include filter push-down, subquery decorrelation, and constant folding.
Adaptive techniques are used to select/tune physical operators based on statistics (cardinality, distinct count, quantiles) collected on a sample of the data used in the query.
4.7 Data Ingestion
Creating suitably partitioned, clustered, and sorted datasets is important for optimal query processing performance. Procella provides an offline data generation tool.
The tool takes the input and output schemas and data mapping and executes an offline MapReduce-based pipeline to create data in Procella-optimized format and layout.
4.8 Serving Embedded Statistics
Procella powers various embedded statistical counters on YouTube's high-traffic pages, such as views, likes, and subscriptions.
The data volumes are relatively small: up to a few billion rows and a small number of columns per row.
However, each Procella instance needs to be able to serve over a million QPS of such queries with millisecond latency.
5. Related Work
Google has written similar software. To learn more, read my posts (given in the references section) on papers based on this software.
External Technologies
Presto is an open-source query engine initially developed at Facebook. Its design is similar to Google Dremel's. Now, it is known as Trino, and the AWS Athena service is also based on it.
Spark SQL is an open-source SQL engine that runs on the Spark framework.
Redshift is a popular distributed database from Amazon that tightly couples storage and computing.
Apache Druid and Apache Pinot share some of Procella’s characteristics, such as columnar storage, mixed streaming and batch ingestion modes, and distributed query execution.
Apache Kylin builds aggregates from data cubes and stores them in a key-value store such as HBase.
6. Conclusions
The paper presents Procella, a SQL query engine.
It successfully addresses YouTube's need for a single system to serve diverse use cases.
The most important use cases are real-time reporting, serving statistical counters, time-series monitoring, and ad-hoc analysis, all at a very high scale and performance.
References
Presto: A Decade of SQL Analytics at Meta
Spark SQL: Relational Data Processing in Spark
The Snowflake Elastic Data Warehouse
Apache Kylin: An efficient and scalable graph data processing system