Optimize concurrency for merge operation in delta table

Concurrency control is normal in OLTP operations, but for OLAP, not really. So I didn’t take care of it until I met the error blew:

com.databricks.sql.transaction.tahoe.ConcurrentAppendException: Files were added to partition [cust_id=000] by a concurrent update. Please try the operation again.

This error caused by write conflict into single delta table by two merge operations. Base on conflicts matrix provided by databricks, we knew even in writeSerializable isolation levels, two merge operation can conflict.

To solve this problem, I did two steps:

  • Choose right partition columns. Here in my example, I only partitioned by cust_id, but the merge operation in two scripts, they need to update based on two columns: cust_id and report_type. So the first step is to change the partition columns.
  • Change condition clause in merge operations. I do put two partition columns into my merge condition clause. It likes:
A.cust_id=B.cust_id and A.report_type=B.report_type # where A is the merged table

But it still got conflict even after right partitioned. So I tried to hard code part of condition. It works, no conflict happened anymore. So I guess the delta table can not infer partition correctly by join.

"A.cust_id={0} and A.report_type={1}".format(cust_id, report_type)


Isolation Levels in delta lake: https://docs.databricks.com/delta/optimizations/isolation-level.html#isolation-levels

Concurrency control: https://docs.databricks.com/delta/concurrency-control.html

How to improve performance of Delta Lake MERGE INTO queries using partition pruning: https://kb.databricks.com/delta/delta-merge-into.html

Delta Lake Step by Step(1)

Before we start to talk about delta lake, we have to take time to deal with data lake and understand why we need to use data lake. Blew is the best definition I think.

A data lake is a repository for structured, unstructured, and semi-structured data. Data lakes are much different from data warehouses since they allow data to be in its rawest form without needing to be converted and analyzed first.

Devin Pickell 

Data Lake stored everything(raw) with every format. In a short nut, it is a repository. e.g. in Azure, Data Lake Gen2 is a repository based on Blob storage.

What’s the difference between data lake and data warehouse?

Table 1. Difference between data warehouse and data lake

From table 1, we can figure out that dw and dl use in the different scenarios. Data warehouse is used for the classic data analysis, and the data stored in structured rational table; while data lake is used for big data, ELT situation, every raw data coming into data lake then doing following actions. Since data lake is on the cluster cloud, it is very faster to handle the raw data.

What is the downside of data lake?

  • No Transaction.
  • Schema on read.
  • No version control.

These three downsides are very critical in applying data lake into real project. It will affect data quality and efficiency both. It is why most big data projects are failed. Because the data lake can not provide stable data like the database.

Delta Lake comes out!

Figure 1. Delta Lake created ACID and Transaction based on Parquet.

Delta Lake has its own abilities which enables a stable data lake for analysis.

  • Open source storage layer, based on parquet file
  • ACID transaction: serialize isolation
  • Scalable meta data handling: spark distribution data processing
  • stream and batch uniform: batch table as well as stream source
  • Schema enforcement
  • Time travel: data version

Let’s see some simple samples. If you are familiar with spark, you can change to delta lake super quick.

# set up
# upgrade pyspark
pip install --upgrade pyspark
# install
pyspark --packages io.delta:delta-core_2.12:0.1.0

# write table
df= spark.read.csv("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", header="true", inferSchema="true")
# append
# we can also partition the table
# update
data.write.format("delta").mode("overwrite").option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'").save("/tmp/delta-table")
# by default overwrite doesn't change schema, only if option("overwriteSchema", "true") exiting
# add column automatically
df.write.format("delta").mode("append").option("mergeSchema", "true").save("/delta/events")

# read
df = spark.read.format("delta").load("/tmp/delta-table")
SELECT * FROM delta.`/tmp/delta-table`

# time travel
# create a read version 
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events") 
# For timestamp_string, only date or timestamp strings are accepted
df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")

# structured stream
streamingDf = spark.readStream.format("delta").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")
# set mini-batch
# ignore updates and deletes with "ignoreDeletes" and "ignoreChanges"
# append mode and complete mode
streamingDf = spark.writeStream.format("delta").outputMode("append").load() 
streamingDf = spark.writeStream.format("delta").outputMode("complete").load()

stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()

# check point
.option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg")

In next article, I will talk about when to use delta lake and how to optimize it in data brick.

Here is one of my example which covert three small files into one delta table: https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2499619125013057/2952616815059893/2608746070181068/latest.html


  1. Delta Lake Document. https://delta.io
  2. What Is a Data Lake and Why Is It Essential for Big Data? https://learn.g2.com/what-is-a-data-lake
  3. Delta Lake Document for databricks. https://docs.databricks.com/delta