Insights from paper (part I) — Google Mesa: Geo Replicated, Near Real-Time, Scalable Data Warehousing
Mesa is Google’s highly scalable analytic data warehousing system. Mesa is designed for near real-time data ingestion and query ability.
It is also designed for other essential capabilities of a distributed system, like high availability, reliability, fault tolerance, and scalability for extensive data and query volumes.
Mesa handles petabytes of data. It processes millions of row updates per second. It serves billions of queries that fetch trillions of rows per day. Mesa is geo-replicated across multiple data centers.
Introduction
Google serves billions of advertisements every day to users all over the globe. Detailed information for each ad is recorded and processed in real time.
This data’s scale and business-critical nature resulted in unique technical and operational challenges for processing, storing, and querying.
A data store with the following requirements is needed:
Atomic Updates: A single-user action may lead to multiple updates at the relational data level.
Consistency and Correctness: The system must return consistent and correct data
Availability: The system must not have any single point of failure.
Near Real-Time Update Throughput: The system must support continuous updates with the update volume on the order of millions of rows updated per second.
Query Performance: The system must support latency-sensitive users with very low latency requirements and batch extraction users requiring very high throughput.
Scalability: The system must scale with the data size and query volume growth.
Online Data and Metadata Transformation: Data schema transformation must not interfere with the regular query and update operations.
Mesa is Google’s solution to these technical and operational challenges.
Many other systems have solved different subsets of these requirements, but Mesa is unique in solving all of these problems simultaneously.
Mesa ingests data generated by upstream services, aggregates and persists the data internally, and serves the data via user queries. Mesa is a generic data warehousing solution that satisfies all the above requirements.
Mesa leverages common Google infrastructure and services like Colossus (GFS), BigTable, and MapReduce.
Scalability and availability are achieved by horizontal partitioning and replication.
Data update scalability is achieved by batching and assigning a new version number for periodically incorporating them in Mesa.
Distributed synchronization protocol based on Paxos achieves data update consistency across multiple data-center.
One important point to note is that most data warehousing solutions( and also Google’s internal systems like BigTable, Megastore, Spanner, and F1) do not support continuous data integration and aggregation every few minutes while providing near real-time answers to user queries.
A few research initiatives in the data analytics area are worth mentioning here, even though they cannot meet all the above requirements.
“Parallel Analytics as a Service“ is developed as a system that provides massively parallel analytics in the cloud. It is designed for many tenants with relatively small data footprints.
“Shark“ is developed to leverage distributed shared memory to support data analytics at scale. But it is designed for in-memory processing of analysis queries.
MaSM (materialized sort-merge) algorithm can be used with flash storage to support online updates in data warehouses.
Now let’s talk about the critical contribution of this paper:
How to create a petascale data warehouse that has the ACID semantics and can still scale up to the high throughput rates?
How to create a version management system that achieves acceptable latencies for batch updates and low latency and high-throughput queries?
How can a scalable distributed architecture resilient to machine and network failures within a single data center be created?
How to create a geo-replicated architecture needed to deal with data center failures?
How can schema changes for many tables be performed dynamically and efficiently without affecting the correctness or performance of existing applications?
How to withstand the problems from software errors and hardware faults?
How to handle the operational challenges of maintaining a system at this scale with strong guarantees of correctness, consistency, and performance?
Mesa Storage Subsystem
Data in Mesa is continuously generated. It is inherently multi-dimensional. Let’s work on the advertising platform data to understand the system in a better way.
The microscopic facts of ads typically consist of two attributes: dimensional attributes (called keys) and measure attributes (called values).
Many dimension attributes are hierarchical, like the date dimension can organize data at the day/month/year level or fiscal week/quarter/year level.
The design should be like that a single fact is consistent across all possible ways the fact is materialized and aggregated.
The Data Model
The data is stored in tables. Each table has a table schema. A table schema specifies the key space K for the table, and the corresponding value space V. Both K and V are sets.
The table schema specifies the aggregation function F: V x V → V. This function aggregates the values corresponding to the key. The aggregation function must be associative. It means
F(F(v0, v1), v2) = F(v0, F(v1, v2) for any values v0, v1, v2 ε V.
The schema specifies one or more indexes for a table, which are total orderings of K. The key space K and value space V are represented as tuples of columns, each of which has a fixed type (e.g., int32, int64, string, etc.).
Let’s take an example to be clear here. The diagram below shows three Mesa tables.
All three tables contain ad click and cost metrics (value columns) broken down by various attributes, such as the click date, the advertiser, the publisher’s website that showed the ad, and the country (key columns).
Updates and Queries
First, let’s understand the updates in Mesa.
Mesa applies updates in batches to achieve high update throughput. The upstream systems outside Mesa produce batch updates.
An update to Mesa specifies a version number n (sequentially assigned from 0) and a set of rows of the form (table name, key, value). Each update contains at most one aggregated value for every (table name, key).
Now let’s talk about queries.
A query consists of a version number n and a predicate P on the key space.
The query response contains one row for each key matching P that appears in some update with a version between 0 and n. The value for a key in the response is the aggregate of all values for that key in those updates.
Mesa supports more complex query functionality, which can be viewed as pre-processing and post-processing to the above primitive query.
It is time to see a concrete example.
The below diagram shows two updates corresponding to the tables we saw.
Take note that each update contains consistent rows for the two tables, A and B. The updates to Table C are computed automatically as they are derived from Table B.
There is one more important point here. Table C corresponds to a materialized view of the following query over Table B:
SELECT SUM(Clicks),SUM(Cost)
GROUP BY AdvertiserId, Country.
This query can be represented directly as a Mesa table because the use of SUM in the query matches the use of SUM as the aggregation function for the value columns in Table B.
Mesa restricts materialized views to use the same aggregation functions for metric columns as the parent table.
We saw both updates and queries. Now the question is how to achieve the atomicity.
Mesa applies updates in order by version number, ensuring atomicity by incorporating an update entirely before moving on to the next update.
There is one more benefit of this strict ordering. The aggregation functions in the Mesa schema may be non-commutative. As for a key-value store, the overwrite will modify the previous value for the key. For example, Fraudulent clicks are offset by negative facts.
Versioned Data Management
Let’s understand the challenges with versioned data.
Storing each version independently is very expensive.
Going over all the versions and aggregating them at query time is also very expensive and increases query latency.
Normal pre-aggregation of all versions on every update can be prohibitively expensive.
Mesa uses the below solution for this:
Mesa pre-aggregates specific versioned data and stores it using deltas.
Each delta consists of rows (with no repeated keys) and a version, represented by [V1, V2], where V1 and V2 are update version numbers.
In this way, rows in a delta [V1, V2] correspond to the set of keys that appeared in updates with version numbers between V1 and V2 (inclusively). The value for each such key is the aggregation of its values in those updates.
Each update in Mesa is done as a singleton delta. The delta version [V1, V2] for a singleton delta corresponding to an update with version number n is denoted by setting V1 = V2 = n.
A delta [V1, V2] and another delta [V2 + 1, V3] can be aggregated to produce the delta [V1, V3], simply by merging row keys and aggregating values accordingly.
Mesa allows users to query a particular version for only a limited time period. It means that versions that are older than this time period can be aggregated into a base delta.
Once a base with a version [0, B] is created, any other deltas [V1, V2] with 0 <= V1 <= V2 <= B can be deleted. This process is called base compaction. Mesa performs it concurrently and asynchronously with other operations.
With base compaction, to answer a query for version number n, we could aggregate the base delta [0, B] with all singleton deltas [B + 1, B + 1], [B + 2, B + 2], . . . , [n, n], and then return the requested rows.
Even if base compaction is done frequently, the number of singletons can still easily approach hundreds/thousands, especially for update-intensive tables.
Mesa maintains a set of cumulative deltas D of the form [U, V ] with B < U < V through the cumulative compaction process to support more efficient query processing.
The delta compaction policy determines the deltas maintained by Mesa at any time. Mesa currently uses a variation of the two-level delta policy in production that uses multiple levels of cumulative deltas.
Physical Data and Index Formats
Mesa deltas are created and deleted based on the delta compaction policy. Once a delta is created, it is immutable.
The immutability of Mesa deltas allows them to use a fairly simple physical format. The primary requirements are only that the format must be space efficient.
Each Mesa table has one or more table indexes to enable efficient seeking using keys. Each table index has its own copy of the data sorted according to the index’s order.
The rows in a delta are stored in sorted order in data files of bounded size. The rows are organized into row blocks, and each block can be transposed and compressed individually.
Mesa also stores an index file corresponding to each data file. An index entry contains the short key for the row block, which is a fixed-size prefix of the first key in the row block, and the offset of the compressed row block in the data file.
Stay tuned for the next part of the article for Architecture and more.
References:
The original Mesa paper: https://research.google.com/pubs/archive/42851.pdf
Parallel Analytics as a Service: P. Wong, Z. He, et al. In SIGMOD, pages 25–36, 2013.
Shark: SQL and Rich Analytics at Scale: R. S. Xin, J. Rosen, et al. In SIGMOD, pages 13–24, 2013.
MaSM: Efficient Online Updates in Data Warehouses: M. Athanassoulis, S. Chen, et al. In SIGMOD, pages 865–876, 2011.