A real case of optimazing spark notebook

At the beginning of my optimazation, I tried to find some standard principles that can quickly and smoothly help me. Altough lots of information online that indicate where I can improve my spark, none gives an easy solution to fix the key issues from end to end. So, I summried what I understand through online materical and apply some strateges to my spark notebook which I found may have some performance issues.

  • Start with the some slow jobs, you can order by “Druations” column. Some guides may go to the details of slowest stages. But here I suggest go to the related SQL Query page to review the whole process on the query level. It will help us to quickly figure out the common query performance issues, like duplicate scans without filters or inappropriate join, etc.
In Jobs tab of Spark UI, we can order by duration of each job to find the some low performance jobs.
In the page of job detail, we click “Associated SQL Query” firstly.
  • One table read multible times. If we found one table is scaned multiple time at the beginning, we may consider create caches to save the temporary result into the memory/local disk, rather than go through the remote disk.
In my query DAG, there are two table scan parallelly, but they are from the same table and read the same data out. record read are both 884,688,530.
  • Too many similar branches. If we see many similar branches, it may be caused by lack of cache too. The medium results have to be recomputed when reaching the point where we need it.
Rad/Orange lines show us three copies of data, but essinssically they are doing the same process. We have to think about how to cache the medium compute results.
  • Duplicate slow stages seem in the stage tab. It may caused by uncautioned code mistake. The case blew shows same dataframe execute computing twice. But this piece code is very normal in python.
two jobs spent same time and with same piece of code, we may consider it as missing cache.
Back to the notebook, we found there is a dataframe executed twice.
To cache table is like blew:
  • Figure out strange operations. This process will take longer time compared with previous two. We need to match the DAG to SQL in the notebook and understand the wide/narrow shuffle. Then pick some wild shuffle we didn’t expect.
When I went through the notebook with DAG, I found this “HashAggregate” is not my expect. Then I realized I write “Union” rather than “Union All”, this misktake took addtional time to reduce duplicates (which should not exist at all).
  • Reduce data size by adding project to select the columns we need. I think this is the easiest way to improve the performance, especially for the big tables.
Simply add the column names into select query, it reduced the data reading size from 27.9GB to 23.5GB.
  • Reduce data size by using filter as early as possible. We can check if filter push down into scan, or manuly move the filter when read the table.
  • Corroctly partition destination table. When are use Merge or Replacewhere, it is better to partition correctly.
After we created reading_date as partiton column, the time cost reduced from 2.49 mins to 6.67 seconds.Similar optimazation will happen in merge and replacewhere.
  • Reduce spill. When executor is out of memory, it will spill into disk which is much slower than memory. To solve this issue. we can do one or more of following steps:
    • Increase executor memory by change a bigger cluster or set spark.executor.memory(if workable)
    • Increase the fraction of execution and storage memory(spark.memory.fraction, default 0.6).
    • Adjust spark.memory.storageFraction(default 0.5). Depend on when the spill occurs, either in cache or shuffle, we can adjust storage fraction.
    • Adjust spark.sql.shuffle.partitions (default 200). Try to increase shuffle partition size to avoid spill.


Spark Configuration, https://spark.apache.org/docs/2.3.0/configuration.html

Performance Tuning, https://spark.apache.org/docs/latest/sql-performance-tuning.html#performance-tuning

From Query Plan to Performance: Supercharging your Apache Spark Queries using the Spark UI SQL Tab, https://www.youtube.com/watch?v=_Ne27JcLnEc&t=553s

How to Read Spark DAGs | Rock the JVM, https://www.youtube.com/watch?v=LoFN_Q224fQ

Optimize performance with caching, https://docs.microsoft.com/en-us/azure/databricks/delta/optimizations/delta-cache

Some working performance/cost improvement tips applying to ADF and Databricks recently

While switching to the cloud, we found some pipelines running slowly and cost increased rapidly. To solve the problems, we did flowing steps to optimize the pipelines or data structures. They are all not hard to be implemented.

1. Set the different triggers for different recurring periods.

No matter for what reason, it is very common that one data pipeline is triggered recursively. However, the sources of this data pipeline may be refreshed with different frequency. In this case, we may set two triggers for this pipeline. So that this pipeline doesn’t have to run on the highest frequency.

In the ForEach loop, there are two kinds of clients, one is updated every 6 hours, another is updated daily. So two triggers were created, but running in the same pipeline.

2. Set the different pools for notebooks.

Databricks provides pool to efficiently reuse the VMs in the new created clusters. However, one pool can only has one type of instance. If we set pool instance too small, the out of memory or slow process will occur, while if the pool instances are too big, we are wasting our money.

To solve this problem, we set two kinds of pools. The heavy one with instance of 4 cores, 28GB. the light one with instance of 4 core, 8GB memory. For some light weight job, like ingest the source files into bronze table, the light one is a better choice. while the heavy weight one could be use to some aggregation work.

Two pools are created for the different weight processes in the notebooks.

3. Create second hand index or archive folder for the source files.

At the beginning of switching to the cloud, we put everything into a single folder. It is fine for azure. But once we need to scan the spec files or get the location of the file, the huge number of files in this folder will significantly effect the performance, the situation will become worse as file number increase with time. To solve this problem. we have three solutions:

  • create second hand index when file moves into the folder. The second hand index like a table which records the important information of the file, like name, modify date, location in datalake. So next time, when someone need to scan the folder, he only need to scan this second hand index rather than the whole blob dictionary.
An example of second hand index
  • Archive the files. The maybe the easiest one, coz you only need to put the files that you would never use into archive folder. Then you are not time sensitive to get the files back from archive folder.
Part of code to leverage works to move files into Archive folder.
  • Organize files by its receive date. This method is similar with the archive one. but we created many archive folders by date.
Each file are re-organized and put into the date folder when the file received

4. Optimize delta table weekly.

Don’t hesitate to do optimize/vacuum your delta table regularly. The small file will kill the advantage of delta table especially in the “Merge” operation. Following is the example of creating a delta table list that need to be optimized.

delta_table_list = ['customer','job','job-category','job_subscription','missing_file_check','parameter']
# optimize bronze table
for delta_table in delta_table_list:
  sql_string = '''optimize delta.`/mnt/eus-metadata/{0}`'''.format(delta_table)
for delta_table in delta_table_list:
  sql_string = '''VACUUM delta.`/mnt/eus-metadata/{0}`'''.format(delta_table)

First Glance on GPU Accelerated Spark

Since I started to play with cluster, I thought there was no mission which was not able to be completed by cluster. If there is, add another node. However, except Cuda on the sing-alone machine, I have been rarely touched GPU accelerated cluster as data engineer. Yes, maybe spark ML can utilize GPU since spark 3.0, but most jobs of DE are data pipeline and data modeling related. Until recently I googled “GPU ETL”, then it came out SPARK-RAPIDS, which leverage RAPIDS libraries to accelerate processes by GPU.

Easy to configure on Spark to gain advantage of GPU

I almost didn’t change anything from my original spark code( mixed pyspark, spark SQL, python and delta table access) to let it running on GPU cluster. That is a great thing! Everybody wants to get double with half work.The result of mine is speed up to 3.1mins(12 cores, 3 NVIDIA T4 GPUs, 84GB memory) from 5.4 mins(24 cores, 84GB memory). I think I can get better results if my inputs is bigger chunk of data like 500MB+ parquet.

Here is my configuration on databricks. You can also refer to this official document. ( however, I didn’t make cluster running by this document)

# cluster runtime version: 7.3 LTS ML (includes Apache Spark 3.0.1, GPU, Scala 2.12)
# node type: NCasT4_v3-series(Azure)

# spark config:
spark.task.resource.gpu.amount 0.1
spark.databricks.delta.preview.enabled true
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-0.5.0.jar:/databricks/spark/python
spark.plugins com.nvidia.spark.SQLPlugin
spark.locality.wait 0s
spark.rapids.sql.python.gpu.enabled true
spark.rapids.memory.pinnedPool.size 2G
spark.python.daemon.module rapids.daemon_databricks
spark.sql.adaptive.enabled false
spark.databricks.delta.optimizeWrite.enabled false
spark.rapids.sql.concurrentGpuTasks 2

# initial script path

# create initial script on your notebook
sudo wget -O /databricks/jars/rapids-4-spark_2.12-0.5.0.jar https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/0.5.0/rapids-4-spark_2.12-0.5.0.jar
sudo wget -O /databricks/jars/cudf-0.19.2-cuda10-1.jar https://repo1.maven.org/maven2/ai/rapids/cudf/0.19.2/cudf-0.19.2-cuda10-1.jar""", True)
# some key configuration meaning
--conf spark.executor.resource.gpu.amout=1 # one executor per GPU, enforced
--conf spark.task.resource.gpu.amount = 0.5  # two tasks running on the same GPU
--conf spark.sql.files.maxPartitionBytes=512m  # big batch is better for GPU efficient, but not too big which lead to out of memory issue
--conf spark.sql.shffle.partitons=30 # reduce partiton size from default 200 to 30. large size of partition is better for GPU efficient
--conf spark.rapids.sql.explain=NOT_ON_GPU # watch the log message output why a spark operation is not able to run on the GPU

The only problem I met till now was, when the notebook includes the view composed of delta tables, it would popped out error message “covert struct type to GPU is not supported”. Easily persist view to table can solve this problem. Overall, very cool plugin.

The magic of spark-rapids

There are two major features to let spark running on GPU through Rapids.

  • Replace of CPU version ops with GPU version ops on the physical plan level.
  • Optimized spark shuffle using RDMA and GPU-to-GPU directly communication.
How RAPIDS accelerator works in spark: left side- replace CPU ops with GPU version based on data type & op type; right side-Optimized spark shuffle between GPUs.

Since the replacing happens on the physical plan level, we don’t have to change the query. Even the operations are not supported by RAPIDS, it still can run on CPU.

A CPU spark execute plan is like: logical plan —-> optimized by catalyst  ——> optimize the logical plan in a series of phases  —> physical plan  —-> executed on cluster. While A GPU spark execute plan replaces operations on physical plan with GPU version and execute on the columnar batch.

If we look at the physical plan on GPU and CPU, we will find they are almost one to one match except some GPU-CPU data exchange ops, like GPUColumnarToRow, GPURowToColumnar.

parts of GPU physical plan

parts of CPU physical plan

Compare of two physical plans, we can find lots of operations have GPU version already. “Scan parquet” replaced by “GpuScan parquet”, since GPU needs columnar format, so it skipped “ColumnarToRow” which in CPU version. Then “Project” replaced by “GpuProject”. followed by “GpuColumnarToRow”, because next step “InMemoryTelation” was running on CPU. so on so forth.

Let’s talk about another great feature: Optimized spark shuffle.

Spark needs shuffle on wild transforms while narrow transforms grouped into single stage. Spark-Rapids implements a custom spark shuffle manager for its GPU clusters shuffle operation.It is able to:

  • Spillable cache: it makes the data close to where it is produced, meanwhile it provides out of memory issue by moving data by the rule: GPU memory –> Host memory —> Disk.
Once GPU is out of memory, shuffle manager will put data into host memory, if host memory is not enough, then continuously push data into local disk.
  • Transport(shuffle): Handles block transfers between executors leveraging UCX libraries. ( see picture blew)
    • GPU0-GPU0: cache in GPU0 , zero copy
    • GPU0-GPU1: NVLink
    • GPU0-GPUX(remote): RDMA
      • Infiniband
      • RoCE
    • Disk- GPU: GPU direct storage(GDS)
RDMA and NVLINK is not surprised to be used in the cluster, since UCX is wildly used in OpenMPI for HPC area. These features have been included in RAPIDS libraries.

Some helpful information from official Q&A:

What operators are best suited for the GPU?

  • Group by operations with high cardinality
    • Joins with a high cardinality
    • Sorts with a high cardinality
    • Aggregates with a high cardinality
  • Window operations, especially for large windows
  • Aggregate with lots of distinct operations
  • Complicated processing
  • Writing Parquet/ORC
  • Reading CSV
  • Transcoding (reading an input file and doing minimal processing before writing it out again, possibly in a different format, like CSV to Parquet)

What  operators are not good for the GPU?

  • small amouts of data( hundred MB)
  • Cache coherent processing
  • Data movement
    • Slow I/O
    • Back forth to CPU(UDFs)
    • Shuffle
  • Limited GPU Memory

What is Rapids

  • a suit of open-source software libaraires and APIs for data science and data pipelines on GPUs. 
  • offering GPU dataframe  that is compatible with ApacheArrow
    • language independent columnar memory format
    • zeo-copy streaming messaging and interprocess communication without serialization overhead
  • Integrated with popluar framwworkd: PyTorch, Chainer, ApcheMxNet; spark, Dask

what is cuDF

  • GPU accelerated data preparation and feature engineering
  • python dropin pands replacement
  • features
    • CUDA 
      • low level libarary containing fuction implementations and c/C++ API
      • importing/exporting Apache Arrow using the CUDA IPC mechanism
      • CUDA kernels to perform element-wise math operations on GPU data frame columns
      • CUDA sort, join, groupby and reductions oprations on GPU dataframes
    • Python bindings
      • a python libaray forr GPU dataframe
      • python interface to CUDA C++ with addtional functionality
      • Creating Apache arrow form numpy arrays, Pands, DF, PyArrow Tables
      • JIT comppilation of UDFs using Numba


Spark-Rapids, https://nvidia.github.io/spark-rapids/

ACCELERATING APACHE SPARK 3.X, https://www.nvidia.com/en-us/deep-learning-ai/solutions/data-science/apache-spark-3/ebook-sign-up/

Getting started with RAPIDS Accelerator on Databricks, https://nvidia.github.io/spark-rapids/docs/get-started/getting-started-databricks.html#getting-started-with-rapids-accelerator-on-databricks

Deep Dive into GPU Support in Apache Spark 3.x, https://databricks.com/session_na20/deep-dive-into-gpu-support-in-apache-spark-3-x

Accelerating Apache Spark 3.0 with GPUs and RAPIDS, https://developer.nvidia.com/blog/accelerating-apache-spark-3-0-with-gpus-and-rapids/

RAPIDS Shuffle Manager, https://nvidia.github.io/spark-rapids/docs/additional-functionality/rapids-shuffle.html

UCX-PYTHON: A FLEXIBLE COMMUNICATION LIBRARY FOR PYTHON APPLICATIONS, https://developer.download.nvidia.com/video/gputechconf/gtc/2019/presentation/s9679-ucx-python-a-flexible-communication-library-for-python-applications.pdf

Unified Communication X, https://www.openucx.org/

Accelerating Apache Spark by Several Orders of Magnitude with GPUs, https://www.youtube.com/watch?v=Qw-TB6EHmR8&t=1017s

SCD II or Snapshot for Dimension

SCD II is widely used to process dimensional data with all historical information. Each change in dimensions will be recorded as a new row configurated valid time period which usually on the date granularity. Since SCD II only keeps the changes, it significantly reduce the storage space in the database.

Understanding Slowly Changing Dimensions
An example of SCD II processing

Everything looks fine, until big data coming. A flaw has been amplified. That is delay arriving dimensions. In SCD II,, you have to do a complex steps to process both dimension and fact data for delay arriving dimensions.

For Dimension(SCDII table) , we have to change retro rows in SCDII.

In the example above, for product ID=010, if we have any change before May-12-06, see April-02-06. And this change is delayed after SID=003. We have to :

  1. Scan the dimensional table, find SID = 0002 and 0003
  2. change End_DT of SID(002) to April-02-06
  3. insert a new record, SID=004, Start_DT=April-02-06, End_DT=May-12-06

For Fact table, we have to change multiple historical data

Update all FK of SID in fact table to 004 where the time period between April-02-06 and May-12-06

It maybe not a problem to execute “UPDATE” process in Row stored table, but in the big data area, most storage format leverages “column stored”, which has great advantage for searching operation, but low efficient on updating. The good news is we don’t have to “UPDATE” data nor to detect the changes on dimension. The only thing we need to do is snapshot the daily dimension data. Since for each day, we have the whole backups, it also much easier for fact table joining the dimension: date-to-date mapping. That’s is all!

The example above will become like :

DateSource Product IDProduct NameProduct descr
May-12-0601010 inch Box10 inch Glued box
May-12-0701010 inch Box10 inch pasted box
May-12-0801011 inch Box11 inch pasted box
Snapshot daily product table

Would it has much redundancy in disk? No. Since we use column stored table, duplicated data in the same column is only saved once physically. Also the big data storage price is much cheaper than the traditional database.

Would it slow down the join due to much more data created in dimension table? No. There is another easy solution call “PARTITON“. Basically, we can partition the data by date, so that each partitioned folder presents a set of data group for that day.

An example for partitioned delta table.

“PARTITION” is transparent for the query. for example, when we execute query :

Select * from product 
where product_name ='10 inch Box' and Date ='May-12-06'

It will firstly go to the folder “Date =’May-12-06′“, rather than scan the whole table, then find the column “product_name =’10 inch Box'”. The partition operation is not only one level, you can create multiple levels.

Would it be faster to join the fact table? Yes. Once fact table is also partitioned by same column, e.g, date. Then we can easily mapping two date columns in dimension and fact. The scan processing is super quick. * some new tech, like delta table, you have to manually indicate the partition.


Transitional SCD is existing for a long time, the environment and prerequisites have been changed a lot to apply this method. I think snapshot is a better, faster and simpler solution in the big data.


Functional Data Engineering – A Set of Best Practices | Lyft, Watch Functional Data Engineering – A Set of Best Practices | Lyft – Talk Video by Maxime Beauchemin | ConferenceCast.tv

Building A Modern Batch Data Warehouse Without UPDATEs | by Daniel Mateus Pires | Towards Data Science, Building A Modern Batch Data Warehouse Without UPDATEs | by Daniel Mateus Pires | Towards Data Science

Slowly Changing Dimensions (SCDs) In The Age of The Cloud Data Warehouse (holistics.io), Slowly Changing Dimensions (SCDs) In The Age of The Cloud Data Warehouse (holistics.io)

Parallel and Redundancy

Inspirational Quote Murphy's Law Sign Wall Decor Art | Etsy
 “Anything that can go wrong will go wrong”. — Murphy’s Law

1990, engineers were fighting for optimizing code performance and increasing CPU speeds.

1994, MPI started to be the dominant model used in high-performance computing.

2002, we started to leverage multi-cores processor to parallel computing.

2006, Hadoop opened a door of reliable, scalable, distributed computing using simple programming models in clusters.

2007, NVIDIA released first version of CUDA which enabled GPU for general purpose processing

2011, Kafka provided streaming solutions for handling real-time data feeds.

2014, Spark improved Hadoop, provides in-memory unified analytics engine with several enrichment libraries.

2019, Google scientists said that they have achieved quantum supremacy. We jumped out world of 0/1.

When we look back the evolution of these 30 years, there are two things taking the major role: Parallel and Redundancy. According to vocabulary.com, Parallel means two or more lines that never intersect; while Redundancy means needlessly repeated. Why are they so important in computing evolution?

In the real word, an efficient team can make things easier than working along only due to more people. like 100 farmers can work on a farm at the same time with one person per line. Similarly, computing can achieve excellent improvement by splitting tasks into multi parallel “workers” . Modern CPU and GPU can split tasks into cores; big data related platform can split tasks into nodes of clusters; even in the quantum computer, it leverages superposition state to let single qbit process multi tasks at the same time. In rick and morty, there is an actor “time police”, who can exist in multi parallel time line.

The Red ugly head is “time police” who exists in 4 time lines at the same time.

So, why parallel is so powerful, we still need redundancy? According to Murphy’s Law, “Anything that can go wrong will go wrong”. Fragile is a common thing in whole universe. To anti-fragile, we have to use redundancy. More fragile things have more redundancy to anti-fragile. We human have two eyes, two lungs, two kidneys. Most insects spawn hundreds to then thousands eggs to keep them survive.

Before the age of big data, redundancy is also wildly used in the IT area. The wild-known case is CRC and RAID1. Cyclic redundancy check (CRC) is an error-detecting code commonly used in digital networks and storage devices to detect accidental changes to raw data since signal may be easily interfered/lost within environment especially through wireless or internet. RAID 1 consists of an exact copy (or mirror) of a set of data on two or more disks to avoid some accident data lost.

Error Detection in Computer Networks - GeeksforGeeks
Error Detection in Computer Networks. From “GeeksforGeeks”

Another Redundancy cased is Non-SQL. Not like SQL 3nf, the intention of Non-SQL is high reading speed. Non-SQL improves the performance by adding redundancy so that we don’t have to waste time to join the tables.

Getting Started with Python and MongoDB | MongoDB
MongoDb example, you can see “car” is stored as a nested document rather than a foreign key

After we headed to the age of big data, more and more “distributed” technologies are used to make work faster and parallel. But distributed itself is not as solid as we think. It bases on unstable network, easy broken storage devices and even sometime a natural disaster will destroy all your data. So redundancy is coming for anti-fragile as a brother of parallel. It creates extra “backups” in multi workers in the cluster that big data running. We are not only distributed computing but also distributed storage. Even we lost data in one worker, we immediately have same “backups” running in other workers.

Spark fault-tolerance by replicating data in memory
Kafka partitions are good example of redundancy to anti-fragile in streaming platform

Redundancy can also accelerate parallel process, specially hardware redundancy. Since we have replicas on two more places, the parallel requests can extract the same data from different place to balance the load and improve the I/O. There are two good examples: 1. Access google.com, there must be a load balancer to control which server we will connect to. 2. Read data from hadoop, a piece of data stored in the different servers, but we can extract all of them at the same time.

4 Easy Steps to Master Apache Hadoop Development -Big Data Analytics News
Hadoop map-reduce process, it shows us distributed process and storage.

Parallel and Redundancy like twins. Lots of great new technologies in big data are related with both two concepts. The difference I feel is where they implement the “distribute”. in GPU, CPU, or clustered cpu/gpu/memory/disk…


Redundancy in Cloud Computing Means Checking Four Areas (atsg.net)

How Redundancies Increase Your Antifragility | The Art of Manliness

Optimize concurrency for merge operation in delta table

Concurrency control is normal in OLTP operations, but for OLAP, not really. So I didn’t take care of it until I met the error blew:

com.databricks.sql.transaction.tahoe.ConcurrentAppendException: Files were added to partition [cust_id=000] by a concurrent update. Please try the operation again.

This error caused by write conflict into single delta table by two merge operations. Base on conflicts matrix provided by databricks, we knew even in writeSerializable isolation levels, two merge operation can conflict.

To solve this problem, I did two steps:

  • Choose right partition columns. Here in my example, I only partitioned by cust_id, but the merge operation in two scripts, they need to update based on two columns: cust_id and report_type. So the first step is to change the partition columns.
  • Change condition clause in merge operations. I do put two partition columns into my merge condition clause. It likes:
A.cust_id=B.cust_id and A.report_type=B.report_type # where A is the merged table

But it still got conflict even after right partitioned. So I tried to hard code part of condition. It works, no conflict happened anymore. So I guess the delta table can not infer partition correctly by join.

"A.cust_id={0} and A.report_type={1}".format(cust_id, report_type)


Isolation Levels in delta lake: https://docs.databricks.com/delta/optimizations/isolation-level.html#isolation-levels

Concurrency control: https://docs.databricks.com/delta/concurrency-control.html

How to improve performance of Delta Lake MERGE INTO queries using partition pruning: https://kb.databricks.com/delta/delta-merge-into.html

Some features need to be improved in Azure Data Products

  • Azure Storage Explorer/Data Lake
    • Ghost file
      • In some rare case, if you delete files in ASE, then you call APIs or use browser data explorer, you will find 0 byte file is still there which we should already deleted. I reported Microsoft last year, they said they fixed it, which was true until I found ghost file came back last week. I think it should related some sort of soft delete in Hadoop.
    • Page
      • If we have thousands of files under a folder. It is a disaster. You would never easily to find the files you want. Especially if you don’t click “load more”, ASE wouldn’t load it into the cache so that you can not see it.
    • Copy/Move files
      • ASE use AZcopy to move/copy files. Especially, it should be robust and async. But in my experience, when I tried to copy batch of files, it is easy to show me error then ask me to try again. However, if we use API or Azcopy command to execute the same copy activity, they work fine.
    • Soft delete
      • Soft delete is a nice function in case we delete files mistakenly. but the soft delete only enabled down to the container level for ADLS gen2. If we want to recover the spec files, we have to know the file name and use “Restore-AzDataLakeStoreDeletedItem” to recovery them. It is petty hard for distributed structure like delta table whose file name is randomly created and maintained in json file.
  • Data Factory
    • No version control when connect to data bricks notebooks.
      • when you created a dev branch both for datafactory and databricks. Naturally, you thought data factory would call databricks on the same branch. But the truth is you are calling the data bricks notebook in the master(published) branch. I contacted Microsoft if there is a workaround, the answer is you can submit this feature on forum. what!!! 🙁
    • Unclear documentation about Global Parameter in CI/CD .
      • If we tried to change global parameter in release, we have to use ARM templdate. The detail document can be find here. But you have to use you own way( perhaps I should use word “guess”) to figure out the syntax. For example, I spent half a day to test how to make json template workable for global parameter through CI/CD.

   “Microsoft.DataFactory/factories”: {        “properties”: {            “globalParameters”: {                “NotificationWebServiceURL”: {                    “value”: “=:NotificationWebServiceURL:string”                }            }        },        “location”: “=”    }

Based on the official document, we should code like  “NotificationWebServiceURL”: “=:NotificationWebServiceURL:string”   , but it is wrong and generate a json object rather than string.

  • Data Bricks
    • since data bricks is the third part product, not much Microsoft can do to improve it. Frankly, it is very successful commercial product based on spark. Well integrated with key vault, data lake and Azure devops. The only complain will be it couldn’t( at least I don’t know) debug line by line like Google Colaboratory.
    • Delta table schema schema evolution. This is a great feature for continuously ingesting data with schema changing. What we don’t know is if we don’t know the incoming data schema, we have to use schema infer or set all field as string. neither way is perfect. Maybe only solution is using some schema-in-file rather than CSV.
  • Azure Data Studio
    • Pretty good expect only work for SQL Server. Based on the forum, I think MySQL connection extension is on the way.

Spark 3.0 new features – Learning from Dr.Kazuaki Ishizaki

Dr.Kazuaki Ishizaki gives a great summary of spark 3.0 features in his presentation “SQL Performance Improvements at a Glance in Apache Spark 3.0” . It is very helpful for us to understand how these new features work and where we can use it.

New explain format

Spark 3.0 provides a terse format explain with detail information.


There are five formats:

  1. default. Physical plan only.
  2. extended. It equals df.explain(true) in spark 2.4, which generates parsed logical plan, analyzed logical plan , optimized logical plan and physical plan.
  3. codegen. Generates java code for the statement.
  4. code. If plan stats are available, it generates a logical plan and the states.
  5. formatted. This is most useful in my mind. It has two sections, a physical plan outline with simple tree format and node details.
-- example from spark document
-- Using Formatted
EXPLAIN FORMATTED select k, sum(v) from values (1, 2), (1, 3) t(k, v) group by k;
|                                                plan|
| == Physical Plan ==
 * HashAggregate (4)
 +- Exchange (3)
    +- * HashAggregate (2)
       +- * LocalTableScan (1)
 (1) LocalTableScan [codegen id : 1]
 Output: [k#19, v#20]
 (2) HashAggregate [codegen id : 1]
 Input: [k#19, v#20]
 (3) Exchange
 Input: [k#19, sum#24L]
 (4) HashAggregate [codegen id : 2]
 Input: [k#19, sum#24L]

As these syntax are not exactly match with spark, the following list is help to explain the “meaning of spark explain”

  • scan. Basic file access. In spark 3.0, it can achieve some predication tasks before load data in.
    • ColumnPruning: select the columns only needed.
    • Partitionfilters: only grab data from certain partitions
    • Pushedfilters: filter fields that can be directly to file scan(push down prediction)
  • filter. Due to pushdown prediction, lots of filter work has moved to scan stage, so you may not find filter in explain matching with query. But there are still some operations like first, last, we need to do it in filter.
    • Pushdown prediction.
    • Combine filters: combines two neighboring operations into one
    • Infer filter from constraints. create a new filter form a join condition. we will talk about it in next section “dynamic partitioning pruning”.
    • prune filter.
  • project. Select operation for columns, like select, drop, withColumn.
  • exchange. shuffle operation, like sortmerge, shuffle hash
  • HashAggregate. data aggregation.
  • BroadcastHashJoin & broadcastExchange. Broadcast shuffle.
  • columnarToRow. a transition between columnar and row execution.

All type of Join hints

Spark 2.4 only supports broadcast, while spark 3.0 support all type of join hints

Spark uses two types of hints, one is partition hints, other is join hints. Since spark 3.0, join hints support all type of join.

  • Broadcast join. which is famous join for joining small table(dimension table) with big table(fact table) by avoiding costly data shuffling.
    • table less than 10MB is broadcast across all nodes to avoid shuffling
    • two steps: broadcast –> hash join
    • spark.sql.autoBroadcastJoinThreshold
  • shuffle merge join
    • Sort merge join perform the Sort operation first and then merges the datasets.
    • steps:
      • shuffle. 2 big tables are partitioned as per the join keys across the partitions.
      • sort. sort the data within each partition
      • merge. join the 2 sorted and partitioned data.
    • work well when
      • two big tables as it doesn’t need load all data into memory like hash join
      • highly scalable approach
  • shuffle hash join 
    • Shuffle hash join shuffles the data based on join key, so that rows related to same keys from both tables will be moved on to same node and then perform the join.
    • works well when
      • dataframes are distributed evenly with the keys
      • dataframes has enough number of keys for parallelism 
      • memory is enough for hash join
    • supported for all join except full outer join
    • spark.sql.join.preferSortMergeJoin = false
  • shuffle replicate nl
    • cartesian product(similar to SQL) of the two relations is calculated to evaluate join.

Adaptive query execution(AQE)

AQE is automatic feature enabled for strategy choose in the running time.

  • Set the number of reducers to avoid wasting memory and I/O resource. Dynamically coalescing shuffle partitions.  
    • spark.sql.adaptive.enabled=true
    • spark.sql.adaptive.coalescePartitions.enabled=ture
AQE can merge serval short partitions into one reducer to even the pressure
  • select better join strategy to improve performance
    • dynamically choose from 3 join strategy. broadcast has best performance, but static strategy choose is not accurate sometimes. 
    • spark.sql.adaptive.enabled=true
AQE get the size of join table dynamically, so that it can choose broadcast rather then shuffle operation.
  • Optimize skewed join to avoid imbalance workload
    • the large partition is split into multiple partitions
    • spark.sql.adaptive.skewJoin.enabled=true
AQE split skewed partition into multiple partitions.

Dynamic partitioning pruning

We already peek part of it in explain format. Spark 3.0 is smart that avoid to read unnecessary partitions in a join operations by using results of filter operations in another table. for example,

SELECT * FROM dim_iteblog
JOIN fact_iteblog
ON (dim_iteblog.partcol = fact_iteblog.partcol)
WHERE dim_iteblog.othercol > 10

In this case, spark will do the prune prediction and add a new filter for join table “fact_iteblog”.

Enhanced nested column pruning & pushdown

  • nested column pruning can be applied to all operators, like limits, repartition
    • select col2._1 from(select col2 from tp limit1000)
  • parquet can apply pushdown filter and can read part of columns
    • spark.read.parquet(‘filename’).filter(‘col2._1 = 100’)

Improved aggregation code generation

  • Catalyst translates a given query to java code, Hotspot compiler in OpenJDK translates Java code into native code
  • HotSpot compiler gives up generating native code for more than 8000 Java bytecode instruction per method.
  • Catalyst splits a large java method into small ones  to allow hotspoot to generate native code

New Scala and Java (infrastructure updates)

  • Java 11
  • Scala 2.12


I think it is better to take a screenshot from presentation of Dr.Kazuaki Ishizaki to do the summary.


SQL Performance Improvements at a Glance in Apache Spark 3.0, https://www.iteblog.com/ppt/sparkaisummit-north-america-2020-iteblog/sql-performance-improvements-at-a-glance-in-apache-spark-30-iteblog.com.pdf

Spark 3.0.1 – Explain, http://spark.apache.org/docs/latest/sql-ref-syntax-qry-explain.html

Mastering Query Plans in Spark 3.0, https://towardsdatascience.com/mastering-query-plans-in-spark-3-0-f4c334663aa4

Fast Filtering with Spark PartitionFilters and PushedFilters, https://mungingdata.com/apache-spark/partition-filters-pushed-filters/

Spark 3.0.1 – Hints, https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html

Columnstore index for MS SQL SERVER

Columnstore is the most popular storage tech within big data. We must have already heard parquet, delta lake. They are both columnstore format which brings 10x times compress ratio and super faster query speed to analytic work. SQL server, one of fastest evolving relation database, also provides columnstore index with multiple optimizations.

Loading into a clustered columnstore index
Figure 1: Columnstore index compress the data by column oriented. And optimized by deltastore. It can reach 10x times compression and 100x times query speed.

SQL server provides clustered and non-clustered columnstore index. Delta store is a clustered B-tree index used only with columnstore index automatically. It stores rows until the number of rows reaches a threshold(~1048576 rows) them moved data into columnsotre, and set state from Open to Closed. I will show you at the end of this article.

Clustered columnstore index

  • Primary storage method for the entire table
  • All columns are included, there is no keys.
  • can only combined with non-clustered B-tree index to speed up
    • queries that search for specific values or small ranges of values.
    • updates and deletes of specific rows
  • usually for fact table or large dimension tables

Non-clustered columnstore index

  • We can indicate which columns to be indexed, usually for frequently used columns.
  • requires extra storage to store a copy of columns in the index(~10%)
  • can be combined with other index.

How to choose columnstore index?

Microsoft already provided the conventions on his document.

Figure 2: Choose the best columnstore index for your needs

I would recommend use columnstored index for most of OLAP work, as we need fast query without much delete/update tasks.

How to delete large size of data from columnstore table

Although Microsoft doesn’t suggest us to delete more than 10% data from columnstore table, there is still have chance we have to. In this case, I summarized some of my experience.

| Delete from columnstore is a soft delete

If you tried to delete rows from columnstore table, you will not actually delete the data, but sql server will mark this row as deleted.

You can run sql blew to find out the number of delete rows and deltastore. Once there are too many row marked “delete”, then you have to rebuild/reorganize the columnstore table. Remember , it is not like Optimize clause in deltalake, which is more like bin-packing for small files.

       tables.name AS table_name,
       indexes.name AS index_name,
FROM sys.column_store_row_groups
INNER JOIN sys.indexes
ON indexes.index_id = column_store_row_groups.index_id
AND indexes.object_id = column_store_row_groups.object_id
INNER JOIN sys.tables
ON tables.object_id = indexes.object_id
INNER JOIN sys.partitions
ON partitions.partition_number = column_store_row_groups.partition_number
AND partitions.index_id = indexes.index_id
AND partitions.object_id = tables.object_id
LEFT JOIN sys.internal_partitions
ON internal_partitions.object_id = tables.object_id
AND column_store_row_groups.deleted_rows > 0
WHERE tables.name = 'table_name'
Figure 3: One delta group is open, and there is no delete rows

| Steps to delete large size of data in columnstored table

1. delete non-clustered B-tree index( after delete operation, rebuild it if needed)

If we run execution plan for bulk delete, you will find B-tree index related operations spend most of time rather than columnsotre index.

Figure 4: B-tree index related operations spend 90% time.

2. delete by a small batch

One of the down side of Azure SQL is we can not set log as simple. So it will take lots of transaction time when we try to delete a large table. The work around is delete by a small batch to make each log transaction smaller and quicker. Blew I gave an example to delete data by chunksize = 1000000.

delete top(1000000) from table_name
where id%2=0
       print current_timestamp
    goto deleteMore

3. rebuild/reorganizing columnstored index if needed

Sometimes, we have to rebuild/reorganizing index if we delete too many data and it affects performance of query. here I give a snippet to show how to archive it.

-- rebuild column stored index
alter index indexname on table_name rebuild/reorgnize
alter index all on tablename rebuild/reorgnize

-- check fragment
SELECT a.object_id, object_name(a.object_id) AS TableName,
    a.index_id, name AS IndedxName, avg_fragmentation_in_percent
FROM sys.dm_db_index_physical_stats
    (DB_ID (N'schema_name')
        , OBJECT_ID(N'table_name')
        , NULL
        , NULL
        , NULL) AS a
INNER JOIN sys.indexes AS b
    ON a.object_id = b.object_id
    AND a.index_id = b.index_id;


Choose the best columnstore index for your needs. https://docs.microsoft.com/en-us/sql/relational-databases/indexes/columnstore-indexes-design-guidance?view=sql-server-ver15#choose-the-best-columnstore-index-for-your-needs

Columnstore indexes – Query performance. https://docs.microsoft.com/en-us/sql/relational-databases/indexes/columnstore-indexes-query-performance?view=sql-server-ver15

How to efficiently delete rows while NOT using Truncate Table in a 500,000+ rows table. https://stackoverflow.com/questions/11230225/how-to-efficiently-delete-rows-while-not-using-truncate-table-in-a-500-000-rows

Hands-On with Columnstore Indexes: Part 1 Architecture. https://www.red-gate.com/simple-talk/sql/sql-development/hands-on-with-columnstore-indexes-part-1-architecture/

Data Factory CI/CD in Azure DevOps

Azure Pipeline is consist of two parts: pipeline and release. They represent
CI and CD separately.

  1. Build Pipeline – to build and test the code. The build creates an artifact that’s used by the rest of your pipeline to run tasks such as deploying to staging or production. 
  2. Release Pipeline – once the code is updated, built and packaged, it can be deployed to target services using Release Pipelines. 
top half: CI pipeline; down half: CD pipeline

Let’s talk about how to implement this process for Azure data factory. Before you start, I suppose your current ADF has matched these requirements (if not, please refer to here):

  • You already have a Azure Repos.
  • ADF is integrated with this Git Repos.
  • A key Vault. which is used for storing database and datalake connection information, plus all configuration parameters for release.

The whole ADF CI/CD pipeline is like this:

Building Pipeline (CI)

  • Create new pipeline, choose “adf_publish” for default branch
    • “adf_publish” branch is created by ADF automatically, after you click “publish” in your ADF GUI.
the three json files are used to set parameters in the different environments
  • Add new Agent job
  • search for “Data factory”, add a new publish artifacts
    • set the path to publish(git path)
    • set artifact publish location as Azure pipelines

Release Pipeline (CD)

left part: CI right part: CD

So next step is deploying artifact into three environments: DEV, QA and PROD.

use key vault to set sQL Server in pipeline
  • Import Azure Key Vault. where you stored all the connection information. We would use these information in two areas.
    • Release pipeline: get the basic information in each environment, e.g, database name, data lake name, etc. see the screenshot above.
    • template parameters used in adf_publish.
  • Set the Datafactory.
    • Add a “Azure Resource group deployment”. where we need to focus one the template path.
we have to set the ARM template and parameters which point to adf_publish branch

You can also add trigger stop and start before and end of data factory job.

# stop trigger, you have to add variables for each release environment
$triggersADF = Get-AzDataFactoryV2Trigger -DataFactoryName $(DataFactoryName) -ResourceGroupName $(ResourceGroupName)

$triggersADF | ForEach-Object { Stop-AzDataFactoryV2Trigger -ResourceGroupName $(ResourceGroupName) -DataFactoryName $(DataFactoryName) -Name $_.name -Force }
# end trigger
$triggersADF = Get-AzDataFactoryV2Trigger -DataFactoryName $(DataFactoryName) -ResourceGroupName $(ResourceGroupName)

$triggersADF | ForEach-Object { Start-AzDataFactoryV2Trigger -ResourceGroupName $(ResourceGroupName) -DataFactoryName $(DataFactoryName) -Name $_.name -Force }
Stop trigger is mandatory for releasing ADF.
  • How to set up adf_publish
    • go back to azure repos.
    • switch to adf_publish.
    • create corresponding ARM parameter template for each environment. You can find an example blew.
the three json files are used to set parameters in the different environments
	"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
	"contentVersion": "",
	"parameters": {
		"factoryName": {
			"value": "datafactory name"
		"LS_AKV_KeyVault_properties_typeProperties_baseUrl": {
			"value": "https://XXXXkeyvault.vault.azure.net/"
		"LS_SQL_ConfigDb_properties_typeProperties_connectionString_secretName": {
			"value": "sqlconnection"
		"LS_DLS_Datalake_properties_typeProperties_url": {
			"value": "https://XXXXXdatalake.dfs.core.windows.net/"

These three parameters are used for dynamic linked service. These three parameters are defined in ARMTemplateForFactory, and set the value in each seprated Json files. To define the parameters, you have to first go to “Parameterization template” table and edit it.

Here is an example that I defined a linkedservice for AzureKey Vault. for more information please refer to : https://docs.microsoft.com/en-us/azure/data-factory/continuous-integration-deployment#use-custom-parameters-with-the-resource-manager-template

If we need enable Continuous deployment trigger , we have to link with adf_publish branch as well.

click the little logo in release pipeline page.
set to adf_publish


Azure DevOps Pipelines. https://docs.microsoft.com/en-us/azure/devops/pipelines/?view=azure-devops

Continuous integration and delivery in Azure Data Factory. https://docs.microsoft.com/en-us/azure/data-factory/continuous-integration-deployment

Use Azure Key Vault to pass secure parameter value during deployment. https://docs.microsoft.com/en-us/azure/azure-resource-manager/templates/key-vault-parameter?tabs=azure-cli