Insights from Paper - Google Percolator: Large-scale Incremental Processing Using Distributed Transactions and Notifications
Introduction
Google crawls the web and creates an index of web pages as documents. It stores them in a large repository. As the new web pages are crawled, this large repository has to take care of this new content.
This task is one example of a class of data processing tasks that transform a large data repository via small, independent mutations.
The current infrastructure capabilities of either MapReduce jobs or DBMSs are unsuitable for this task. But why? Let’s examine them one by one.
Map Reduce
Before we go deep, let’s understand the task in more detail.
The indexing system for the web starts by crawling every page on the web and processing them while maintaining a set of invariants on the index.
For example, one invariant is: if the same content is crawled under multiple URLs, only the URL with the highest Page- Rank appears in the index.
Another invariant is: Each link is inverted so that the anchor text from each outgoing link is attached to the page the link points to. This link inversion must work across duplicates. Simply, it means the links to a page duplicate should be forwarded to the highest PageRank duplicate if necessary.
We can do these bulk processing tasks using MapReduce operations—one for link inversion and another for cluster duplicates. This can be a solution as MapReduce limits the parallelism, and the invariants will be maintained.
So what is the challenge with this approach?
Consider updating that index after recrawling some small portion of the web. It’s not sufficient to run the MapReduce over just the new pages. The MapReduce must be rerun over the entire repository. This makes the work latency proportional to the total repository size, while it should be proportional to the update size.
Google’s web search index was produced this way before Percolater was built.
DBMSs
Another possible solution is that the indexing system could store the repository in a DBMS. Update individual documents while using transactions to maintain invariants.
Existing DBMSs can’t handle the sheer volume of data ( tens of petabytes of data across thousands of machines). For example, Bigtable can scale to the size of our repository but doesn’t provide tools to help programmers maintain data invariants in the face of concurrent updates. You can read out my post on Bigtable paper to know more.
So ideal solution should allow us to maintain a very large repository of documents and update it efficiently as each new document is crawled.
Percolator does that. It provides the user with random access to a multi-petabyte repository. Users can process documents individually. Also, Percolator provides ACID-compliant transactions to make updates. It implements snapshot isolation semantics.
Percolator also provides observers: The pieces of code invoked by the system whenever a user-specified column changes. You can think of them like Change Data Capture(CDC) is in-built.
Within Google, the primary application of Percolator is preparing web pages for inclusion in the live web search index.
By converting the indexing system to an incremental system, Percolater can process individual documents as they are crawled. The system has also been used to render pages into images.
Design
Percolator provides two main abstractions:
ACID transactions over a random-access repository and observers
A way to organize an incremental computation
A Percolator system consists of three binaries on every cluster machine: a Percolator worker, a Bigtable tablet server, and a GFS chunk server.
All observers are linked to the Percolator worker, which scans the Bigtable for changed columns and invokes the corresponding observers as a function call in the worker process.
The observers perform transactions by sending read/write RPCs to Bigtable tablet servers, which in turn send read/write RPCs to GFS chunk servers.
The system also depends on two small services:
The timestamp oracle
Lightweight lock service.
a Percolator repository consists of a small number of tables. Each table is a collection of “cells” indexed by row and column. Each cell contains a value: an uninterpreted array of bytes. Internally, Percolater represent each cell as a series of values indexed by timestamp. It helps in supporting snapshot isolation.
The design of the Percolator was influenced by two requirements:
Run at massive scales
No need for extremely low latency
Due to the relaxed latency requirements, Percolater cleans up locks left behind by transactions running on failed machines. Percolator has no central location for transaction management. It increases the latency of conflicting transactions but allows the system to scale to thousands of machines.
Bigtable overview
Percolator is built on top of the Bigtable distributed storage system. You can read my post on Bigtable paper. Here, I will cover a very high-level overview required to understand this paper.
Bigtable presents a multi-dimensional sorted map. Bigtable provides lookup and updates operations on each row. Bigtable row transactions enable atomic read-modify-write operations on individual rows.Bigtable handles petabytes of data and runs reliably on large numbers of (unreliable) machines.
Bigtable consists of a master server and a collection of tablet servers. Each tablet server is responsible for serving several tablets. Each tablet is a contiguous region of the key space. A tablet is stored as a collection of read-only files in the Google SSTable format. SSTables are stored in GFS for data availability, durability, and scalability.
The master server is responsible for coordinating the operations of tablet servers.
Percolator maintains the gist of Bigtable’s interface. Data is organized into Bigtable rows and columns, with Percolator metadata stored alongside in special columns. The Percolator library largely consists of Bigtable operations wrapped in Percolator-specific computation.
The only big challenge is to provide the features which are not in the Bigtable like multirow transactions and the observer framework.
Transactions
Percolator provides cross-row, cross-table transactions with ACID snapshot-isolation semantics.
Percolator users write their transaction code in an imperative language and mix calls to the Percolator API with their code.
bool UpdateDocument(Document doc) {
Transaction t(&cluster);
t.Set(doc.url(), "contents", "document", doc.contents());
int hash = Hash(doc.contents());
// dups table maps hash ! canonical URL
string canonical;
if (!t.Get(hash, "canonical-url", "dups", &canonical)) {
// No canonical yet; write myself in
t.Set(hash, "canonical-url", "dups", doc.url());
} // else this document already exists, ignore new copy
return t.Commit();
}
The above code shows a simplified version of clustering documents by a hash of their contents. If the Commit() call returns false, the transaction has conflicted and should be retried after a backoff. Calls to Get() andCommit() are blocking.
In Percolater, parallelism is achieved by running many transactions simultaneously in a thread pool.
It is possible to process data incrementally without the benefit of strong transactions. Transactions make it more tractable for the user to reason about the system's state and avoid introducing errors into a long-lived repository.
Percolator stores multiple versions of each data item using Bigtable’s timestamp dimension. Multiple versions are required to provide snapshot isolation.
Let’s understand what snapshot isolation is quickly. In snapshot Isolation:
Each transaction reads data from a snapshot of the committed data as of the time the transaction started. This time is called its Start-Timestamp.
This time may be any time before the transaction’s first Read.
A transaction running in Snapshot Isolation is never blocked attempting a read.
The transaction's writes (updates, inserts, and deletes) will also be reflected in this snapshot. So transactions can access the data a second time.
Updates by other transactions active after the transaction Start-Timestamp are invisible to the transaction.
Snapshot Isolation is a type of multi-version concurrency control (MVCC).
You can read more about isolation levels in this paper. I will write a post on it in the future.
For our context, we must understand that snapshot isolation protects against write-write conflicts. And the main advantage of snapshot isolation is more efficient reads.
Let’s understand what challenge Percolator faced in implementing distributed transactions.
Other databases integrate locking into the system component that manages access to the disk. As each node in such systems controls access to the data on disk, it can grant locks on requests and deny accesses that violate locking requirements.
In the case of Percolater, Data is in Bigtable, so there is no convenient place to intercept traffic and assign locks. It means Percolaer has to create a kind of distributed lock manager with persistence, availability, and scalability. But all this is there in Bigtable, so why not use that?
Yes, Percolator uses Bigtable for that. It stores its locks in special in-memory columns in the same Bigtable that stores data and reads or modifies the locks in a Bigtable row transaction when accessing data in that row. See the below diagram to get a sense of those columns.
We have learned the basics, and it is time to understand the transaction protocol in more detail. See below the pseudocode for Percolator transactions.
The transaction’s constructor asks the timestamp oracle for a start timestamp (line 6). Calls to Set() are buffered (line 7) until commit time. The basic approach for committing buffered writes is two-phase commit, which is coordinated by the client.
In the first phase of commit (“prewrite”), we try to lock all the cells being written.
The transaction reads two metadata to check for conflicts:
If the transaction sees another write record after its start timestamp,
it aborts (line 32). This is the write-write conflict.
It aborts if the transaction sees another lock at any timestamp (line 34).
If there is no conflict, we write the lock and the data to each cell at the start timestamp (lines 36-38). If no cells conflict, the transaction may commit and proceeds to the second phase.
At the beginning of the second phase, the client obtains the commit timestamp from the timestamp oracle (line 48).
Then, at each cell (starting with the primary):
The client releases its lock and make its write visible to readers by replacing the lock with a write record.
The write record indicates to readers that committed data exists in this cell; it contains a pointer to the start timestamp where readers can find the actual data.
Once the primary’s write is visible (line 58 returns true), the transaction must commit since it has made a write visible to readers.
Let’s now move toward the Get() call.
A Get() operation first checks for a lock in the timestamp range [0, start timestamp] (line 12).
If a lock is present, another transaction is concurrently writing this cell, so the reading transaction must wait until the lock is released.
If no conflicting lock is found, Get() reads the latest write record in that timestamp range (line 19) and returns the data item corresponding to that write record (line 22).
Transaction processing is complicated by the possibility of client failure. Server failure does not affect the system since Bigtable guarantees persistence.
If a client fails while a transaction is being committed, locks will be left behind. Percolator must clean up those locks or they will cause future transactions to hang indefinitely. The Percolator takes a lazy approach to cleanup: when a transaction A encounters a conflicting lock left behind by transaction B, A may determine that B has failed and erase its locks.
Percolator handles this lazy clean up by designating one cell in every transaction as a synchronizing point for any commit or cleanup operations. Since the modification in this cell is performed under a Bigtable row transaction, only one of the cleanup or commit, operations will succeed.
When a client crashes during the second phase of commit, a transaction will be past the commit point. It has written at least one write record. We must perform roll-forward on these transactions because here, the problem is at the client’s end. If a clean up transaction detects that the primary lock has been replaced by a write record, it knows the transaction which has written this record has to be rolled forward.
Timestamps
The timestamp oracle is a server that hands out timestamps in strictly increasing order. The oracle periodically allocates a range of timestamps by writing the highest allocated timestamp to stable storage. If the oracle restarts, the timestamps will jump forward to the maximum allocated timestamp.
For avoid overload, each Percolator worker batches timestamp requests across transactions by maintaining only one pending request to the oracle.
The transaction protocol we saw uses strictly increasing timestamps to guarantee that Get() returns all committed writes before the transaction’s start timestamp.
How does Percolater do that?
Consider a transaction R reading at timestamp Tr and a transaction W that committed at timestamp Tw, which is less than Tr. The timestamp oracle gave out Tw before or in the same batch as Tr; hence, W requested Tw before R received Tr.
W wrote locks before requesting its commit timestamp Tw , and R can’t do reads before receiving its start timestamp Tr. It means W must have at least written all its locks before R did any reads.
Notifications
Transactions let the user mutate the table while maintaining invariants. Here user needs a way to trigger the transaction once the mutation is done.
Percolator provides a mechanism so that user writes code (“observers”) to be triggered by changes to the table. We can link all the observers into a binary running alongside every tablet server in the system.
Each observer registers a function and a set of columns with Percolator. The Percolator invokes the function after data is written to one of those columns in any row.
Percolator applications are structured as a series of observers; each observer completes a task and creates more work for “downstream” observers by writing to the table.
Percolator applications consist of very few observers — the Google indexing system has roughly 10 observers.
It is possible for several observers to observe the same column, but we avoid this feature so it is clear what observer will run when a particular column is written.
Percolator provides one guarantee: at most one observer’s transaction will commit for each change of an observed column.
One more important point is that multiple writes to an observed column may cause the corresponding observer to be invoked only once.
How does Percolater do that?
Each observed column has an accompanying “acknowledgment” column for each observer, containing the latest start timestamp at which the observer ran.
When the observed column is written, Percolator starts a transaction to process the notification. The transaction reads the observed column and its corresponding acknowledgment column.
If the observed column was written after its last acknowledgment, then we run the observer and set the acknowledgment column to our start timestamp.
Otherwise, the observer has already been run, so we do not run it again.
To implement notifications, Percolator needs to efficiently find dirty cells with observers that need to be run. Percolator maintains a special “notify” Bigtable column, containing an entry for each dirty cell. When a transaction writes an observed cell, it also sets the corresponding notify cell.
The workers perform a distributed scan over the notify column to find dirty cells. After the observer is triggered and the transaction commits, we remove the notified cell.
To make this scan efficient, Percolator stores the notify column in a separate Bigtable locality group so that scanning over the column requires reading only the millions of dirty cells rather than the trillions of total data cells.
Evaluation
Percolator lies somewhere in the performance space between MapReduce and DBMSs.
For example, Percolator uses far more resources to process a fixed amount of data than a traditional DBMS would. This is the cost of its scalability.
Compared to MapReduce, Percolator can process data with far lower latency, but again, at the cost of additional resources required to support random lookups.
These are engineering tradeoffs that are difficult to quantify.
In this section, we attempt to answer some of these questions.
Converting from MapReduce
Each day Google crawled several billion documents and fed them along with a repository of existing documents through a series of 100 MapReduces.The result was an index. Most of the document spent 2-3 days being indexed before it could be returned as a search result.
The Percolator-based indexing system (known as Caffeine), crawls the same number of documents, but Google feeds each document through Percolator as it is crawled.
The main design goal, of Caffeine, is a reduction in latency. This latency improvement grows as the system becomes more complex. Additional task phases can be implemented in the same transaction rather than in another MapReduce. And this simplification is one reason the number of observers in Caffeine (10) is far smaller than the number of MapReduce in the previous system (100).
Caffeine’s document collection is currently 3x larger than the previous system’s and is limited only by available disk space.
The new system is also easier to operate.
The simplicity of writing straight-line code and the ability to do random lookups into the repository makes developing new features for Percolator easy.
To quantify the benefits of moving from MapReduce to Percolator, Google created a synthetic benchmark that clusters newly crawled documents against a billion document repository. It removes duplicates in much the same way the old indexing pipeline operates.
This experiment simulates clustering documents crawled at a uniform rate. Whether MapReduce or Percolator performs better under this metric is a function of how frequently documents are crawled(the crawl rate) and the repository size.
Percolator does work proportional only to the small batch of newly arrived documents. See the diagram above. At very large crawl rates where the number of newly crawled documents approaches the size of the repository, MapReduce will perform better than Percolator.
Related Work
Batch processing systems like MapReduce are well suited for efficiently transforming or analyzing an entire corpus. In these systems, straggler shards still set the minimum time to complete the pipeline.
Percolator avoids the expense of repeated scans by, essentially, creating indexes on the keys used to cluster documents. One of the criticisms leveled by Stonebraker and DeWitt in their initial critique of MapReduce was that MapReduce did not support such indexes.
While Percolator provides distributed transactions, it is by no means a full-fledged DBMS: it lacks a query language.
The organization of data in Percolator mirrors that of shared-nothing parallel databases.
Percolator is a data transformation system, not only a data storage system: it provides a way to structure computation to transform that data. In contrast, systems like Dynamo, Bigtable, and PNUTS provide highly available data storage without the attendant mechanisms of transformation.
Sinfonia provides a transactional interface to a distributed repository. Sinfonia and Percolator differ in their intended use: Sinfonia is designed to build distributed infrastructure, while Percolator is intended to be used directly by applications.
CloudTPS, like Percolator, builds an ACID-compliant data store on top of a distributed storage system. Percolator and CloudTPS systems differ in design.The transaction management layer of CloudTPS is handled by an intermediate layer of servers called local transaction managers that cache mutations before they are persisted to the underlying distributed storage system.
Conclusion and Future Work
Google built and deployed Percolator, and it has been used to produce Google’s web search index since April 2010.
Google chose an architecture that scales linearly over many orders of magnitude on commodity machines, but they have seen that this costs a significant 30-fold overhead compared to traditional database architectures.
Google is very interested in exploring this tradeoff and characterizing the nature of this overhead.