Insights from paper: Delta Lake: HighPerformance ACID Table Storage over Cloud Object Stores
1. Abstract
Public cloud providers such as Amazon S3, Azure Blob Storage, and Google Cloud Storage have object or blob storage.
These are the largest and most cost-effective storage systems. They are a good fit for large data warehouses and data lakes for storage need.
The problem is that they are key-value stores, so achieving ACID transactions and high performance is difficult.
The paper presents Delta Lake, an open-source ACID table storage layer over cloud object stores initially developed at Databricks.
Delta Lake uses a transaction log that is compacted into Apache Parquet format.
This log provides ACID properties, time travel, and significantly faster metadata operations for large tabular datasets.
This design is also leveraged to provide high-level features such as automatic data layout optimization, upserts, caching, and audit logs.
Delta Lake tables can be accessed from Apache Spark, Hive, Presto, Redshift, and other systems.
2. Introduction
Cloud object stores are good because they allow independent (decoupled) scaling of computing and storage.
Let’s understand this decoupling from an example. A user can store a petabyte of data. The user may not always be required to run the compute for such a large amount of data. He can run a cluster to execute a query for a few hours.
The major open-source Big Data systems, such as Apache Spark, Hive, and Presto, support reading and writing to cloud object stores.
They usually use file formats such as Apache Parquet and ORC.
Similarly, commercial systems, such as AWS Athena, Google BigQuery, and Redshift Spectrum, can read directly from cloud object storage using these open file formats.
The most common way to store relational datasets in cloud object stores is using columnar file formats. Apache Parquet and ORC are those.
These formats are suitable, but they create correctness and performance challenges for more complex workloads.
In the first few years of cloud service (2014–2016), Databricks received around half the support escalations due to data corruption, consistency, or performance issues.
Delta Lake was designed to overcome these challenges.
Delta Lake is an ACID table storage layer over cloud object stores.
It was open-sourced in 2019.
Delta Lake maintains information about Delta table objects in a write-ahead log. The objects themselves are encoded in Parquet.
This design allows clients to update multiple objects at once or replace a subset with another. This can all be done while achieving high parallel read and write performance from the objects.
The log also contains metadata such as min/max statistics for each data file for faster metadata searches.
Delta Lake stores all the metadata in the underlying object store, so there is no need for separate servers or systems to maintain that.
Delta Lake uses optimistic concurrency protocols against the underlying object store to avoid any other dependency.
Based on this design, Delta Lake provides some unique features not present in traditional data warehouses or data lakes.
Time travel: It lets users query point-in-time snapshots or roll back erroneous updates to their data.
UPSERT, DELETE, and MERGE: These operations rewrite the relevant objects efficiently. It helps in compliance workflows.
Efficient streaming I/O: The streaming is efficient as a user can write small
objects into the table at low latency. A fast read of tailing data is supported.
Users can treat a Delta table as a message bus.
Caching: Because of their immutable nature, cluster nodes can safely cache the Delta table and log it on local storage.
Data layout optimization: Delta Lake automatically optimizes the size of objects in a table and the clustering of data records without impacting running queries.
Schema evolution: The system allows continued reading of old Parquet files without rewriting them if a table’s schema changes.
Audit logging: The system provides it by exposing the transaction log.
The team observed that many Databricks customers could simplify their overall data architectures with Delta Lake by replacing previously separate data lake, data warehouse, and streaming storage systems with Delta tables.
The diagram below shows an example.
A data pipeline that includes object storage, a message queue, and two data warehouses is replaced with just Delta tables on object storage. It runs ETL using Delta’s streaming I/O and performance features.
The open-source Delta Lake project includes connectors to Apache Spark (batch or streaming), Hive, Presto, AWS Athena, Redshift, and Snowflake. It can run over multiple cloud object stores or HDFS.
3. Motivation: Characteristics and Challenges of Object Stores
3.1 Object Store APIs
Cloud object stores offer a simple but easy-to-scale key-value store interface. These systems allow users to create buckets which store multiple objects.
Each object is identified by a string key.
Cloud object stores provide metadata APIs, such as S3’s LIST operations. These metadata APIs are generally expensive.
Cloud object stores usually support byte-range requests for reading the objects. Updating objects usually requires rewriting the whole object at once.
3.2 Consistency Properties
The most popular cloud object stores provide eventual consistency for each key.
There are no consistency guarantees across keys.
There are two inconsistency scenarios to remember:
After a client uploads a new object, other clients are not guaranteed
to see the object in LIST or read operations immediately.
Updates to an existing object may not immediately be visible to other clients.
Each cloud storage provider has its consistency model, so there are a few variations here and there.
3.3 Performance Characteristics
We have seen that reading a sequential byte range is the most granular operation available. Each read operation usually incurs at least 5–10 ms of base latency.
LIST operations require significant parallelism to list large sets of objects quickly.
Write operations generally have to replace a whole object. If a table is expected to receive point updates, then its objects should be kept small for better performance.
There are three considerations for analytical workloads:
Keep frequently accessed data close by sequentially.
Make objects large, but not too large.
Avoid LIST operations.
3.4 Existing Approaches for Table Storage
There are three significant approaches to managing tabular datasets:
3.4.1 Directories of Files
The first approach is to store the table as a collection of objects in a columnar format such as Parquet.
The records may be partitioned into directories based on one or more attributes.
Partitioning reduces the cost of LIST operations and reads for queries that only access a few partitions.
Apache Hive and some other systems use this approach.
There are a few challenges in this approach:
There is no atomicity across multiple objects.
The store has eventual consistency.
It has poor performance for some operations.
There is no management functionality in it.
3.4.2 Custom Storage Engines
A few storage engines ( not open-sourced) manage the consistency challenges with cloud object stores by managing metadata in a separate, strongly consistent service.
This approach requires running a highly available service to manage the metadata.
This approach also has a few challenges:
All I/O operations to a table need to contact the metadata service.
Connectors to existing computing engines require more engineering
work to implement.
The proprietary metadata service ties users to a specific service
provider.
Apache Hive ACID implements a similar approach on top of HDFS.
3.4.3 Metadata in Object Stores
Delta Lake’s approach is to store a transaction log and metadata directly within the cloud object store.
It uses a set of protocols over object store operations to achieve serializability.
The data within a table is stored in Parquet format.
Apache Hudi and Apache Iceberg also support this approach.
4. Delta Lake Storage Format and Access Protocols
A Delta Lake table is a directory on a cloud object store or file system that holds data objects with the table contents and a log of transaction operations.
Clients update these data structures using optimistic concurrency control protocols.
4.1 Storage Format
The diagram below shows the storage format for a Delta table.
Each table is stored within a file system directory (mytable here).
The date column partitions the table. The data objects are in separate directories for each date.
Each data object in Delta has a unique name. The writer selects it by generating a GUID.
4.1.1 Data Objects
The table contents are stored in Apache Parquet objects.
4.1.2 Log
The log is stored in the _delta_log subdirectory within the table. It contains a sequence of JSON objects. The objects have increasing, zero-padded numerical IDs to store the log records. It also has occasional checkpoints for specific log objects.
Each log record object (e.g., 000003.json) contains an array of actions to apply to the previous version of the table in order to generate the next one. Some of the important actions are:
Change Metadata
Add or Remove Files
Data Change
Protocol Evolution
4.1.3 Log Checkpoints
Checkpoints store all the non-redundant actions in the table’s log up to a certain log record ID.
The end result of the checkpointing process is a Parquet file. The file contains an add record for each object still in the table, remove records for objects that were deleted but need to be retained, and a small number of other records such as txn, protocol and changeMetadata.
Any client may attempt to create a checkpoint up to a given log record ID. It should write it as a .parquet file for the corresponding ID. By default clients write checkpoints every 10 transactions.
Checkpoint writers write their new checkpoint ID in the _delta_log/_last_checkpoint file if it is newer than the current ID in that file.
4.2 Access Protocols
Delta Lake’s access protocols are designed to let clients achieve serializable transactions.
A log record object, such as 000003.json, is the “root” data structure that a client needs to know to read a specific version of the table. Given this object’s content, the client can then query for other objects from the object store.
For transactions that perform writes, clients need a way to ensure that only a single writer can create the next log record and can then use this to implement optimistic concurrency control.
4.2.1 Reading from Tables
Read the _last_checkpoint object.
Use a LIST operation whose start key is the last checkpoint ID if present to find any newer .json and .parquet files. This provides a list files that can be used to reconstruct the table’s state starting from a recent checkpoint.
Use the checkpoint (if present) and subsequent log records identified in the previous step to reconstruct the state of the table.
Use the statistics to identify the set of data object files relevant for the read query.
Query the object store to read the relevant data objects, possibly in parallel across a cluster.
4.2.2 Write Transactions
Identify a recent log record ID, say r, using steps 1–2 of the read protocol.
Read data at table version r.
Write any new data objects that the transaction aims to add to the table into new files. This step can happen in parallel.
Attempt to write the transaction’s log record into the r + 1 .json log object. This step needs to be atomic. If the step fails, the transaction can be retried.
Optionally, write a new .parquet checkpoint for log record r + 1.
Let’s understand how to do step 4: Adding Log Records Atomically.
Only one client should succeed in creating the object with that name.
Google Cloud Storage and Azure Blob Store support atomic
put-if-absent operations so Delta lake uses them.
On distributed filesystems such as HDFS, Delta lake use atomic renames to rename a temporary file to the target name (e.g., 000004.json) or fail if it already exists.
Databricks service deployments uses a separate lightweight coordination service to ensure that only one client can add a record with each log ID.
4.3 Available Isolation Levels
All transactions that perform writes are serializable.
Read transactions can achieve either snapshot isolation or serializability.
At the time of paper writing Delta Lake only supports transactions within one table.
4.4 Transaction Rates
Delta Lake’s write transaction rate is limited by the latency of the put-if-absent operations to write new log records.
In practice, the latency of writes to object stores can be tens to hundreds of milliseconds. The team found this rate sufficient for virtually all current Delta Lake applications.
5. Higher Level Features in Delta
5.1 Time Travel and Rollbacks
Delta Lake’s data objects and log are immutable. Delta Lake makes it straightforward to query a past snapshot of the data.
A client simply needs to read the table state based on an older log record ID.
Delta Lake allows users to configure a per-table data retention interval. It supports SQL AS OF timestamp and VERSION AS OF commit_id syntax for reading past snapshots.
Users can use time travel for fixing errors in data pipelines.
5.2 Efficient UPSERT, DELETE and MERGE
A lot of times analytical datasets in enterprises need to be modified over time.
In traditional data lake storage formats , it is hard to perform updates without stopping concurrent readers.
With Delta Lake, updates can be executed transactionally, replacing any updated objects through new add and remove records in the Delta log.
Delta Lake supports standard SQL UPSERT, DELETE and MERGE syntax.
5.3 Streaming Ingest and Consumption
A lot of data teams need to deploy streaming pipelines. In traditional cloud data lakes they need to use Apache Kafka or Amazon Kinesis kind of service for it.
Delta Lake is designed so that a table’s log can be used by both data producers and consumers a message queue.
Write Compaction
Delta Lake allows users to run a background process that compacts small data objects transactionally, without affecting readers.
Users can set dataChange flag to false. It allow streaming consumers to ignore the compaction operations.
Exactly-Once Streaming Writes.
Writers can use the txn action type in log records to keep track of which data they wrote into a Delta Lake table.
The stream processing systems need some mechanism to make their writes idempotent into external stores.
It could be done by ensuring that each record has a unique key in the case of overwrites. Delta lake atomically updates a “last version written” record together with each write.
Delta Lake facilitates this pattern by allowing applications to update an (appId, version) pair with each transaction.
Efficient Log Tailing
The storage format for the log, in a series of .json objects with lexicographically increasing IDs makes consumers efficiently find new writes.
A consumer can simply run object store LIST operations starting at the last log record ID to discover new ones.
Users can implement low latency streaming pipelines with combining all three strategies.
5.4 Data Layout Optimization
Delta Lake can transactional update the data structures (representing a table).
Delta lake can support a variety of layout optimizations without affecting concurrent operations.
Let’s see how Delta lake take advantage of this property to implement a number of data layout optimization features:
OPTIMIZE Command: Users can manually run an OPTIMIZE command on a table that compacts small objects without affecting ongoing transactions.
Z-Ordering by Multiple Attribute: Delta Lake supports reorganizing the records in a table in Z-order. The Z-order curve is an easy-to-compute space-filling curve that creates locality in given dimensions. Users can set a Z-order specification on a table and then run OPTIMIZE to move a desired subset of the data into Z-ordered objects.
5.5 Caching
Databricks built a feature on top of Delta Lake to cache data on clusters. Caching is safe because data, log and checkpoint objects in Delta Lake tables are immutable.
5.6 Audit Logging
Delta Lake’s transaction log can be used for audit logging.
Databricks offer an execution mode for Spark clusters where user-defined functions cannot access cloud storage directly. Users can view the history of a Delta Lake table using the DESCRIBE HISTORY command. An example is shown in the below diagram:
5.7 Schema Evolution and Enforcement
Delta Lake can perform schema changes transactional and update the underlying objects along with the schema change if needed.
Delta lake keeps the history of schema updates in the transaction log. It allow using older Parquet objects without rewriting them for certain schema changes.
Delta clients ensure that newly written data follows the table’s schema.
5.8 Connectors to Query and ETL Engines
Delta Lake provides full-fledged connectors to Spark SQL and Structured Streaming using Apache Spark’s data source API. You can read my post on Apache Structured Streaming Paper for more details.
Delta lake currently provides read-only integrations with several other systems: Apache Hive, Presto, AWS Athena, AWS Redshift, and Snowflake.
6. Delta Lake Use Cases
Delta Lake is currently in active use at thousands of Databricks customers. It processes exabytes of data per day.
The data types stored in Delta Lake include Change Data Capture (CDC) logs from enterprise OLTP systems, application logs, time series data, graphs, aggregate tables for reporting, and image or feature data for machine learning (ML).
The applications running over this data include SQL workloads, business intelligence, streaming, data science, machine learning and graph analytics.
6.1 Data Engineering and ETL
Many organizations are migrating ETL/ELT and data warehousing workloads to the cloud to simplify their management.
Some are augmenting traditional enterprise data sources with much larger data streams from other sources for downstream data and machine learning applications.\
Delta Lake’s ACID transactions, UPSERT/MERGE support and time travel features allow these organizations to reuse existing SQL queries to perform their ETL process directly on the object store.
6.2 Data Warehousing and BI
To enable interactive query workloads such as business intelligence (BI), traditional data warehouse systems combine ETL/ELT functionality with efficient tools to query.
Delta Lake supports required features directly for tables in a cloud object store.
Most Delta Lake users run ad-hoc query and BI workloads against their lakehouse datasets, either through SQL directly or through BI software such as Tableau.
6.3 Compliance and Reproducibility
Users leverage the audit logging feature for data governance. Delta Lake’s time travel support is useful for reproducible data science and machine learning.
6.4 Specialized Use Cases
There some very industry specific use cases. Some of those are listed below
Computer System Event Data
Bioinformatics
Media Datasets for Machine Learning
7. Discussions and Limitations
Delta Lake’s design is attractive because it does not require any other heavyweight system to mediate access to cloud storage.
Delta Lake’s support for ACID enables other powerful performance and management features.
There are a few limitations as of now:
Delta Lake currently only provides serializable transactions within a single table.
Delta Lake is limited by the latency of the underlying cloud object store for
streaming workloads.
Delta Lake does not currently support secondary indexes.
8. Related Work
Building a Database on S3 paper explored building an OLTP database system over S3.
Bolt-on Causal Consistency paper implements causal consistency on top of eventually consistent key-value stores.
Google BigQuery , AWS Redshift Spectrum, and Snowflake are OLAP DBMSes that can scale computing clusters separately from storage and can read data from cloud object stores.
The closest systems to Delta Lake’s design and goals are Apache Hudi and Apache Iceberg. Both define data formats and access protocols to implement transactional operations on cloud object stores.
Apache Hive ACID also implements transactions over object stores or distributed file systems.
C-Store combine high-performance transactional and analytical processing.
References
Apache Structured Streaming paper post
Spark SQL: Relational Data Processing in Spark
The Snowflake Elastic Data Warehouse
Hive – A Petabyte Scale Data Warehouse Using Hadoop
Amazon Aurora: Design considerations for high throughput cloud-native relational databases