First Glance on GPU Accelerated Spark

Since I started to play with cluster, I thought there was no mission which was not able to be completed by cluster. If there is, add another node. However, except Cuda on the sing-alone machine, I have been rarely touched GPU accelerated cluster as data engineer. Yes, maybe spark ML can utilize GPU since spark 3.0, but most jobs of DE are data pipeline and data modeling related. Until recently I googled “GPU ETL”, then it came out SPARK-RAPIDS, which leverage RAPIDS libraries to accelerate processes by GPU.

Easy to configure on Spark to gain advantage of GPU

I almost didn’t change anything from my original spark code( mixed pyspark, spark SQL, python and delta table access) to let it running on GPU cluster. That is a great thing! Everybody wants to get double with half work.The result of mine is speed up to 3.1mins(12 cores, 3 NVIDIA T4 GPUs, 84GB memory) from 5.4 mins(24 cores, 84GB memory). I think I can get better results if my inputs is bigger chunk of data like 500MB+ parquet.

Here is my configuration on databricks. You can also refer to this official document. ( however, I didn’t make cluster running by this document)

# cluster runtime version: 7.3 LTS ML (includes Apache Spark 3.0.1, GPU, Scala 2.12)
# node type: NCasT4_v3-series(Azure)

# spark config:
spark.task.resource.gpu.amount 0.1
spark.databricks.delta.preview.enabled true
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-0.5.0.jar:/databricks/spark/python
spark.plugins com.nvidia.spark.SQLPlugin
spark.locality.wait 0s
spark.rapids.sql.python.gpu.enabled true
spark.rapids.memory.pinnedPool.size 2G
spark.python.daemon.module rapids.daemon_databricks
spark.sql.adaptive.enabled false
spark.databricks.delta.optimizeWrite.enabled false
spark.rapids.sql.concurrentGpuTasks 2

# initial script path
dbfs:/databricks/init_scripts/init.sh

# create initial script on your notebook
dbutils.fs.mkdirs("dbfs:/databricks/init_scripts/")
dbutils.fs.put("/databricks/init_scripts/init.sh","""
#!/bin/bash
sudo wget -O /databricks/jars/rapids-4-spark_2.12-0.5.0.jar https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/0.5.0/rapids-4-spark_2.12-0.5.0.jar
sudo wget -O /databricks/jars/cudf-0.19.2-cuda10-1.jar https://repo1.maven.org/maven2/ai/rapids/cudf/0.19.2/cudf-0.19.2-cuda10-1.jar""", True)
# some key configuration meaning
--conf spark.executor.resource.gpu.amout=1 # one executor per GPU, enforced
--conf spark.task.resource.gpu.amount = 0.5  # two tasks running on the same GPU
--conf spark.sql.files.maxPartitionBytes=512m  # big batch is better for GPU efficient, but not too big which lead to out of memory issue
--conf spark.sql.shffle.partitons=30 # reduce partiton size from default 200 to 30. large size of partition is better for GPU efficient
--conf spark.rapids.sql.explain=NOT_ON_GPU # watch the log message output why a spark operation is not able to run on the GPU

The only problem I met till now was, when the notebook includes the view composed of delta tables, it would popped out error message “covert struct type to GPU is not supported”. Easily persist view to table can solve this problem. Overall, very cool plugin.

The magic of spark-rapids

There are two major features to let spark running on GPU through Rapids.

  • Replace of CPU version ops with GPU version ops on the physical plan level.
  • Optimized spark shuffle using RDMA and GPU-to-GPU directly communication.
How RAPIDS accelerator works in spark: left side- replace CPU ops with GPU version based on data type & op type; right side-Optimized spark shuffle between GPUs.

Since the replacing happens on the physical plan level, we don’t have to change the query. Even the operations are not supported by RAPIDS, it still can run on CPU.


A CPU spark execute plan is like: logical plan —-> optimized by catalyst  ——> optimize the logical plan in a series of phases  —> physical plan  —-> executed on cluster. While A GPU spark execute plan replaces operations on physical plan with GPU version and execute on the columnar batch.

If we look at the physical plan on GPU and CPU, we will find they are almost one to one match except some GPU-CPU data exchange ops, like GPUColumnarToRow, GPURowToColumnar.

parts of GPU physical plan

parts of CPU physical plan

Compare of two physical plans, we can find lots of operations have GPU version already. “Scan parquet” replaced by “GpuScan parquet”, since GPU needs columnar format, so it skipped “ColumnarToRow” which in CPU version. Then “Project” replaced by “GpuProject”. followed by “GpuColumnarToRow”, because next step “InMemoryTelation” was running on CPU. so on so forth.

Let’s talk about another great feature: Optimized spark shuffle.

Spark needs shuffle on wild transforms while narrow transforms grouped into single stage. Spark-Rapids implements a custom spark shuffle manager for its GPU clusters shuffle operation.It is able to:

  • Spillable cache: it makes the data close to where it is produced, meanwhile it provides out of memory issue by moving data by the rule: GPU memory –> Host memory —> Disk.
Once GPU is out of memory, shuffle manager will put data into host memory, if host memory is not enough, then continuously push data into local disk.
  • Transport(shuffle): Handles block transfers between executors leveraging UCX libraries. ( see picture blew)
    • GPU0-GPU0: cache in GPU0 , zero copy
    • GPU0-GPU1: NVLink
    • GPU0-GPUX(remote): RDMA
      • Infiniband
      • RoCE
    • Disk- GPU: GPU direct storage(GDS)
RDMA and NVLINK is not surprised to be used in the cluster, since UCX is wildly used in OpenMPI for HPC area. These features have been included in RAPIDS libraries.

Some helpful information from official Q&A:

What operators are best suited for the GPU?

  • Group by operations with high cardinality
    • Joins with a high cardinality
    • Sorts with a high cardinality
    • Aggregates with a high cardinality
  • Window operations, especially for large windows
  • Aggregate with lots of distinct operations
  • Complicated processing
  • Writing Parquet/ORC
  • Reading CSV
  • Transcoding (reading an input file and doing minimal processing before writing it out again, possibly in a different format, like CSV to Parquet)

What  operators are not good for the GPU?

  • small amouts of data( hundred MB)
  • Cache coherent processing
  • Data movement
    • Slow I/O
    • Back forth to CPU(UDFs)
    • Shuffle
  • Limited GPU Memory

What is Rapids

  • a suit of open-source software libaraires and APIs for data science and data pipelines on GPUs. 
  • offering GPU dataframe  that is compatible with ApacheArrow
    • language independent columnar memory format
    • zeo-copy streaming messaging and interprocess communication without serialization overhead
  • Integrated with popluar framwworkd: PyTorch, Chainer, ApcheMxNet; spark, Dask

what is cuDF

  • GPU accelerated data preparation and feature engineering
  • python dropin pands replacement
  • features
    • CUDA 
      • low level libarary containing fuction implementations and c/C++ API
      • importing/exporting Apache Arrow using the CUDA IPC mechanism
      • CUDA kernels to perform element-wise math operations on GPU data frame columns
      • CUDA sort, join, groupby and reductions oprations on GPU dataframes
    • Python bindings
      • a python libaray forr GPU dataframe
      • python interface to CUDA C++ with addtional functionality
      • Creating Apache arrow form numpy arrays, Pands, DF, PyArrow Tables
      • JIT comppilation of UDFs using Numba

Reference:

Spark-Rapids, https://nvidia.github.io/spark-rapids/

ACCELERATING APACHE SPARK 3.X, https://www.nvidia.com/en-us/deep-learning-ai/solutions/data-science/apache-spark-3/ebook-sign-up/

Getting started with RAPIDS Accelerator on Databricks, https://nvidia.github.io/spark-rapids/docs/get-started/getting-started-databricks.html#getting-started-with-rapids-accelerator-on-databricks

Deep Dive into GPU Support in Apache Spark 3.x, https://databricks.com/session_na20/deep-dive-into-gpu-support-in-apache-spark-3-x

Accelerating Apache Spark 3.0 with GPUs and RAPIDS, https://developer.nvidia.com/blog/accelerating-apache-spark-3-0-with-gpus-and-rapids/

RAPIDS Shuffle Manager, https://nvidia.github.io/spark-rapids/docs/additional-functionality/rapids-shuffle.html

UCX-PYTHON: A FLEXIBLE COMMUNICATION LIBRARY FOR PYTHON APPLICATIONS, https://developer.download.nvidia.com/video/gputechconf/gtc/2019/presentation/s9679-ucx-python-a-flexible-communication-library-for-python-applications.pdf

Unified Communication X, https://www.openucx.org/

Accelerating Apache Spark by Several Orders of Magnitude with GPUs, https://www.youtube.com/watch?v=Qw-TB6EHmR8&t=1017s

SCD II or Snapshot for Dimension

SCD II is widely used to process dimensional data with all historical information. Each change in dimensions will be recorded as a new row configurated valid time period which usually on the date granularity. Since SCD II only keeps the changes, it significantly reduce the storage space in the database.

Understanding Slowly Changing Dimensions
An example of SCD II processing

Everything looks fine, until big data coming. A flaw has been amplified. That is delay arriving dimensions. In SCD II,, you have to do a complex steps to process both dimension and fact data for delay arriving dimensions.

For Dimension(SCDII table) , we have to change retro rows in SCDII.

In the example above, for product ID=010, if we have any change before May-12-06, see April-02-06. And this change is delayed after SID=003. We have to :

  1. Scan the dimensional table, find SID = 0002 and 0003
  2. change End_DT of SID(002) to April-02-06
  3. insert a new record, SID=004, Start_DT=April-02-06, End_DT=May-12-06

For Fact table, we have to change multiple historical data

Update all FK of SID in fact table to 004 where the time period between April-02-06 and May-12-06

It maybe not a problem to execute “UPDATE” process in Row stored table, but in the big data area, most storage format leverages “column stored”, which has great advantage for searching operation, but low efficient on updating. The good news is we don’t have to “UPDATE” data nor to detect the changes on dimension. The only thing we need to do is snapshot the daily dimension data. Since for each day, we have the whole backups, it also much easier for fact table joining the dimension: date-to-date mapping. That’s is all!

The example above will become like :

DateSource Product IDProduct NameProduct descr
May-12-0601010 inch Box10 inch Glued box
May-12-0701010 inch Box10 inch pasted box
May-12-0801011 inch Box11 inch pasted box
Snapshot daily product table

Would it has much redundancy in disk? No. Since we use column stored table, duplicated data in the same column is only saved once physically. Also the big data storage price is much cheaper than the traditional database.

Would it slow down the join due to much more data created in dimension table? No. There is another easy solution call “PARTITON“. Basically, we can partition the data by date, so that each partitioned folder presents a set of data group for that day.

An example for partitioned delta table.

“PARTITION” is transparent for the query. for example, when we execute query :

Select * from product 
where product_name ='10 inch Box' and Date ='May-12-06'

It will firstly go to the folder “Date =’May-12-06′“, rather than scan the whole table, then find the column “product_name =’10 inch Box'”. The partition operation is not only one level, you can create multiple levels.

Would it be faster to join the fact table? Yes. Once fact table is also partitioned by same column, e.g, date. Then we can easily mapping two date columns in dimension and fact. The scan processing is super quick. * some new tech, like delta table, you have to manually indicate the partition.

Conclusion

Transitional SCD is existing for a long time, the environment and prerequisites have been changed a lot to apply this method. I think snapshot is a better, faster and simpler solution in the big data.

Reference:

Functional Data Engineering – A Set of Best Practices | Lyft, Watch Functional Data Engineering – A Set of Best Practices | Lyft – Talk Video by Maxime Beauchemin | ConferenceCast.tv

Building A Modern Batch Data Warehouse Without UPDATEs | by Daniel Mateus Pires | Towards Data Science, Building A Modern Batch Data Warehouse Without UPDATEs | by Daniel Mateus Pires | Towards Data Science

Slowly Changing Dimensions (SCDs) In The Age of The Cloud Data Warehouse (holistics.io), Slowly Changing Dimensions (SCDs) In The Age of The Cloud Data Warehouse (holistics.io)

Parallel and Redundancy

Inspirational Quote Murphy's Law Sign Wall Decor Art | Etsy
 “Anything that can go wrong will go wrong”. — Murphy’s Law

1990, engineers were fighting for optimizing code performance and increasing CPU speeds.

1994, MPI started to be the dominant model used in high-performance computing.

2002, we started to leverage multi-cores processor to parallel computing.

2006, Hadoop opened a door of reliable, scalable, distributed computing using simple programming models in clusters.

2007, NVIDIA released first version of CUDA which enabled GPU for general purpose processing

2011, Kafka provided streaming solutions for handling real-time data feeds.

2014, Spark improved Hadoop, provides in-memory unified analytics engine with several enrichment libraries.

2019, Google scientists said that they have achieved quantum supremacy. We jumped out world of 0/1.

When we look back the evolution of these 30 years, there are two things taking the major role: Parallel and Redundancy. According to vocabulary.com, Parallel means two or more lines that never intersect; while Redundancy means needlessly repeated. Why are they so important in computing evolution?

In the real word, an efficient team can make things easier than working along only due to more people. like 100 farmers can work on a farm at the same time with one person per line. Similarly, computing can achieve excellent improvement by splitting tasks into multi parallel “workers” . Modern CPU and GPU can split tasks into cores; big data related platform can split tasks into nodes of clusters; even in the quantum computer, it leverages superposition state to let single qbit process multi tasks at the same time. In rick and morty, there is an actor “time police”, who can exist in multi parallel time line.

The Red ugly head is “time police” who exists in 4 time lines at the same time.

So, why parallel is so powerful, we still need redundancy? According to Murphy’s Law, “Anything that can go wrong will go wrong”. Fragile is a common thing in whole universe. To anti-fragile, we have to use redundancy. More fragile things have more redundancy to anti-fragile. We human have two eyes, two lungs, two kidneys. Most insects spawn hundreds to then thousands eggs to keep them survive.

Before the age of big data, redundancy is also wildly used in the IT area. The wild-known case is CRC and RAID1. Cyclic redundancy check (CRC) is an error-detecting code commonly used in digital networks and storage devices to detect accidental changes to raw data since signal may be easily interfered/lost within environment especially through wireless or internet. RAID 1 consists of an exact copy (or mirror) of a set of data on two or more disks to avoid some accident data lost.

Error Detection in Computer Networks - GeeksforGeeks
Error Detection in Computer Networks. From “GeeksforGeeks”

Another Redundancy cased is Non-SQL. Not like SQL 3nf, the intention of Non-SQL is high reading speed. Non-SQL improves the performance by adding redundancy so that we don’t have to waste time to join the tables.

Getting Started with Python and MongoDB | MongoDB
MongoDb example, you can see “car” is stored as a nested document rather than a foreign key

After we headed to the age of big data, more and more “distributed” technologies are used to make work faster and parallel. But distributed itself is not as solid as we think. It bases on unstable network, easy broken storage devices and even sometime a natural disaster will destroy all your data. So redundancy is coming for anti-fragile as a brother of parallel. It creates extra “backups” in multi workers in the cluster that big data running. We are not only distributed computing but also distributed storage. Even we lost data in one worker, we immediately have same “backups” running in other workers.

Spark fault-tolerance by replicating data in memory
Kafka partitions are good example of redundancy to anti-fragile in streaming platform

Redundancy can also accelerate parallel process, specially hardware redundancy. Since we have replicas on two more places, the parallel requests can extract the same data from different place to balance the load and improve the I/O. There are two good examples: 1. Access google.com, there must be a load balancer to control which server we will connect to. 2. Read data from hadoop, a piece of data stored in the different servers, but we can extract all of them at the same time.

4 Easy Steps to Master Apache Hadoop Development -Big Data Analytics News
Hadoop map-reduce process, it shows us distributed process and storage.

Parallel and Redundancy like twins. Lots of great new technologies in big data are related with both two concepts. The difference I feel is where they implement the “distribute”. in GPU, CPU, or clustered cpu/gpu/memory/disk…

Reference:

Redundancy in Cloud Computing Means Checking Four Areas (atsg.net)

How Redundancies Increase Your Antifragility | The Art of Manliness

Optimize concurrency for merge operation in delta table

Concurrency control is normal in OLTP operations, but for OLAP, not really. So I didn’t take care of it until I met the error blew:

com.databricks.sql.transaction.tahoe.ConcurrentAppendException: Files were added to partition [cust_id=000] by a concurrent update. Please try the operation again.

This error caused by write conflict into single delta table by two merge operations. Base on conflicts matrix provided by databricks, we knew even in writeSerializable isolation levels, two merge operation can conflict.

To solve this problem, I did two steps:

  • Choose right partition columns. Here in my example, I only partitioned by cust_id, but the merge operation in two scripts, they need to update based on two columns: cust_id and report_type. So the first step is to change the partition columns.
partitonby(['cust_id','report_type'])
  • Change condition clause in merge operations. I do put two partition columns into my merge condition clause. It likes:
A.cust_id=B.cust_id and A.report_type=B.report_type # where A is the merged table

But it still got conflict even after right partitioned. So I tried to hard code part of condition. It works, no conflict happened anymore. So I guess the delta table can not infer partition correctly by join.

"A.cust_id={0} and A.report_type={1}".format(cust_id, report_type)

Reference:

Isolation Levels in delta lake: https://docs.databricks.com/delta/optimizations/isolation-level.html#isolation-levels

Concurrency control: https://docs.databricks.com/delta/concurrency-control.html

How to improve performance of Delta Lake MERGE INTO queries using partition pruning: https://kb.databricks.com/delta/delta-merge-into.html

Some features need to be improved in Azure Data Products

  • Azure Storage Explorer/Data Lake
    • Ghost file
      • In some rare case, if you delete files in ASE, then you call APIs or use browser data explorer, you will find 0 byte file is still there which we should already deleted. I reported Microsoft last year, they said they fixed it, which was true until I found ghost file came back last week. I think it should related some sort of soft delete in Hadoop.
    • Page
      • If we have thousands of files under a folder. It is a disaster. You would never easily to find the files you want. Especially if you don’t click “load more”, ASE wouldn’t load it into the cache so that you can not see it.
    • Copy/Move files
      • ASE use AZcopy to move/copy files. Especially, it should be robust and async. But in my experience, when I tried to copy batch of files, it is easy to show me error then ask me to try again. However, if we use API or Azcopy command to execute the same copy activity, they work fine.
    • Soft delete
      • Soft delete is a nice function in case we delete files mistakenly. but the soft delete only enabled down to the container level for ADLS gen2. If we want to recover the spec files, we have to know the file name and use “Restore-AzDataLakeStoreDeletedItem” to recovery them. It is petty hard for distributed structure like delta table whose file name is randomly created and maintained in json file.
  • Data Factory
    • No version control when connect to data bricks notebooks.
      • when you created a dev branch both for datafactory and databricks. Naturally, you thought data factory would call databricks on the same branch. But the truth is you are calling the data bricks notebook in the master(published) branch. I contacted Microsoft if there is a workaround, the answer is you can submit this feature on forum. what!!! 🙁
    • Unclear documentation about Global Parameter in CI/CD .
      • If we tried to change global parameter in release, we have to use ARM templdate. The detail document can be find here. But you have to use you own way( perhaps I should use word “guess”) to figure out the syntax. For example, I spent half a day to test how to make json template workable for global parameter through CI/CD.

   “Microsoft.DataFactory/factories”: {        “properties”: {            “globalParameters”: {                “NotificationWebServiceURL”: {                    “value”: “=:NotificationWebServiceURL:string”                }            }        },        “location”: “=”    }

Based on the official document, we should code like  “NotificationWebServiceURL”: “=:NotificationWebServiceURL:string”   , but it is wrong and generate a json object rather than string.

  • Data Bricks
    • since data bricks is the third part product, not much Microsoft can do to improve it. Frankly, it is very successful commercial product based on spark. Well integrated with key vault, data lake and Azure devops. The only complain will be it couldn’t( at least I don’t know) debug line by line like Google Colaboratory.
    • Delta table schema schema evolution. This is a great feature for continuously ingesting data with schema changing. What we don’t know is if we don’t know the incoming data schema, we have to use schema infer or set all field as string. neither way is perfect. Maybe only solution is using some schema-in-file rather than CSV.
  • Azure Data Studio
    • Pretty good expect only work for SQL Server. Based on the forum, I think MySQL connection extension is on the way.

Spark 3.0 new features – Learning from Dr.Kazuaki Ishizaki

Dr.Kazuaki Ishizaki gives a great summary of spark 3.0 features in his presentation “SQL Performance Improvements at a Glance in Apache Spark 3.0” . It is very helpful for us to understand how these new features work and where we can use it.

New explain format

Spark 3.0 provides a terse format explain with detail information.

EXPLAIN [ EXTENDED | CODEGEN | COST | FORMATTED ] statement

There are five formats:

  1. default. Physical plan only.
  2. extended. It equals df.explain(true) in spark 2.4, which generates parsed logical plan, analyzed logical plan , optimized logical plan and physical plan.
  3. codegen. Generates java code for the statement.
  4. code. If plan stats are available, it generates a logical plan and the states.
  5. formatted. This is most useful in my mind. It has two sections, a physical plan outline with simple tree format and node details.
-- example from spark document
-- Using Formatted
EXPLAIN FORMATTED select k, sum(v) from values (1, 2), (1, 3) t(k, v) group by k;
+----------------------------------------------------+
|                                                plan|
+----------------------------------------------------+
| == Physical Plan ==
 * HashAggregate (4)
 +- Exchange (3)
    +- * HashAggregate (2)
       +- * LocalTableScan (1)
   
   
 (1) LocalTableScan [codegen id : 1]
 Output: [k#19, v#20]
        
 (2) HashAggregate [codegen id : 1]
 Input: [k#19, v#20]
        
 (3) Exchange
 Input: [k#19, sum#24L]
        
 (4) HashAggregate [codegen id : 2]
 Input: [k#19, sum#24L]
|
+----------------------------------------------------+

As these syntax are not exactly match with spark, the following list is help to explain the “meaning of spark explain”

  • scan. Basic file access. In spark 3.0, it can achieve some predication tasks before load data in.
    • ColumnPruning: select the columns only needed.
    • Partitionfilters: only grab data from certain partitions
    • Pushedfilters: filter fields that can be directly to file scan(push down prediction)
  • filter. Due to pushdown prediction, lots of filter work has moved to scan stage, so you may not find filter in explain matching with query. But there are still some operations like first, last, we need to do it in filter.
    • Pushdown prediction.
    • Combine filters: combines two neighboring operations into one
    • Infer filter from constraints. create a new filter form a join condition. we will talk about it in next section “dynamic partitioning pruning”.
    • prune filter.
  • project. Select operation for columns, like select, drop, withColumn.
  • exchange. shuffle operation, like sortmerge, shuffle hash
  • HashAggregate. data aggregation.
  • BroadcastHashJoin & broadcastExchange. Broadcast shuffle.
  • columnarToRow. a transition between columnar and row execution.

All type of Join hints

Spark 2.4 only supports broadcast, while spark 3.0 support all type of join hints

Spark uses two types of hints, one is partition hints, other is join hints. Since spark 3.0, join hints support all type of join.

  • Broadcast join. which is famous join for joining small table(dimension table) with big table(fact table) by avoiding costly data shuffling.
    • table less than 10MB is broadcast across all nodes to avoid shuffling
    • two steps: broadcast –> hash join
    • spark.sql.autoBroadcastJoinThreshold
  • shuffle merge join
    • Sort merge join perform the Sort operation first and then merges the datasets.
    • steps:
      • shuffle. 2 big tables are partitioned as per the join keys across the partitions.
      • sort. sort the data within each partition
      • merge. join the 2 sorted and partitioned data.
    • work well when
      • two big tables as it doesn’t need load all data into memory like hash join
      • highly scalable approach
  • shuffle hash join 
    • Shuffle hash join shuffles the data based on join key, so that rows related to same keys from both tables will be moved on to same node and then perform the join.
    • works well when
      • dataframes are distributed evenly with the keys
      • dataframes has enough number of keys for parallelism 
      • memory is enough for hash join
    • supported for all join except full outer join
    • spark.sql.join.preferSortMergeJoin = false
  • shuffle replicate nl
    • cartesian product(similar to SQL) of the two relations is calculated to evaluate join.

Adaptive query execution(AQE)

AQE is automatic feature enabled for strategy choose in the running time.

  • Set the number of reducers to avoid wasting memory and I/O resource. Dynamically coalescing shuffle partitions.  
    • spark.sql.adaptive.enabled=true
    • spark.sql.adaptive.coalescePartitions.enabled=ture
AQE can merge serval short partitions into one reducer to even the pressure
  • select better join strategy to improve performance
    • dynamically choose from 3 join strategy. broadcast has best performance, but static strategy choose is not accurate sometimes. 
    • spark.sql.adaptive.enabled=true
AQE get the size of join table dynamically, so that it can choose broadcast rather then shuffle operation.
  • Optimize skewed join to avoid imbalance workload
    • the large partition is split into multiple partitions
    • spark.sql.adaptive.skewJoin.enabled=true
AQE split skewed partition into multiple partitions.

Dynamic partitioning pruning

We already peek part of it in explain format. Spark 3.0 is smart that avoid to read unnecessary partitions in a join operations by using results of filter operations in another table. for example,

SELECT * FROM dim_iteblog
JOIN fact_iteblog
ON (dim_iteblog.partcol = fact_iteblog.partcol)
WHERE dim_iteblog.othercol > 10

In this case, spark will do the prune prediction and add a new filter for join table “fact_iteblog”.

Enhanced nested column pruning & pushdown

  • nested column pruning can be applied to all operators, like limits, repartition
    • select col2._1 from(select col2 from tp limit1000)
  • parquet can apply pushdown filter and can read part of columns
    • spark.read.parquet(‘filename’).filter(‘col2._1 = 100’)

Improved aggregation code generation

  • Catalyst translates a given query to java code, Hotspot compiler in OpenJDK translates Java code into native code
  • HotSpot compiler gives up generating native code for more than 8000 Java bytecode instruction per method.
  • Catalyst splits a large java method into small ones  to allow hotspoot to generate native code

New Scala and Java (infrastructure updates)

  • Java 11
  • Scala 2.12

Summary

I think it is better to take a screenshot from presentation of Dr.Kazuaki Ishizaki to do the summary.

Reference

SQL Performance Improvements at a Glance in Apache Spark 3.0, https://www.iteblog.com/ppt/sparkaisummit-north-america-2020-iteblog/sql-performance-improvements-at-a-glance-in-apache-spark-30-iteblog.com.pdf

Spark 3.0.1 – Explain, http://spark.apache.org/docs/latest/sql-ref-syntax-qry-explain.html

Mastering Query Plans in Spark 3.0, https://towardsdatascience.com/mastering-query-plans-in-spark-3-0-f4c334663aa4

Fast Filtering with Spark PartitionFilters and PushedFilters, https://mungingdata.com/apache-spark/partition-filters-pushed-filters/

Spark 3.0.1 – Hints, https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html

Columnstore index for MS SQL SERVER

Columnstore is the most popular storage tech within big data. We must have already heard parquet, delta lake. They are both columnstore format which brings 10x times compress ratio and super faster query speed to analytic work. SQL server, one of fastest evolving relation database, also provides columnstore index with multiple optimizations.

Loading into a clustered columnstore index
Figure 1: Columnstore index compress the data by column oriented. And optimized by deltastore. It can reach 10x times compression and 100x times query speed.

SQL server provides clustered and non-clustered columnstore index. Delta store is a clustered B-tree index used only with columnstore index automatically. It stores rows until the number of rows reaches a threshold(~1048576 rows) them moved data into columnsotre, and set state from Open to Closed. I will show you at the end of this article.

Clustered columnstore index

  • Primary storage method for the entire table
  • All columns are included, there is no keys.
  • can only combined with non-clustered B-tree index to speed up
    • queries that search for specific values or small ranges of values.
    • updates and deletes of specific rows
  • usually for fact table or large dimension tables

Non-clustered columnstore index

  • We can indicate which columns to be indexed, usually for frequently used columns.
  • requires extra storage to store a copy of columns in the index(~10%)
  • can be combined with other index.

How to choose columnstore index?

Microsoft already provided the conventions on his document.

Figure 2: Choose the best columnstore index for your needs

I would recommend use columnstored index for most of OLAP work, as we need fast query without much delete/update tasks.

How to delete large size of data from columnstore table

Although Microsoft doesn’t suggest us to delete more than 10% data from columnstore table, there is still have chance we have to. In this case, I summarized some of my experience.

| Delete from columnstore is a soft delete

If you tried to delete rows from columnstore table, you will not actually delete the data, but sql server will mark this row as deleted.

You can run sql blew to find out the number of delete rows and deltastore. Once there are too many row marked “delete”, then you have to rebuild/reorganize the columnstore table. Remember , it is not like Optimize clause in deltalake, which is more like bin-packing for small files.

SELECT
       tables.name AS table_name,
       indexes.name AS index_name,
       partitions.partition_number,
       column_store_row_groups.row_group_id,
       column_store_row_groups.state_description,
       column_store_row_groups.total_rows,
       column_store_row_groups.size_in_bytes,
       column_store_row_groups.deleted_rows,
       internal_partitions.partition_id,
       internal_partitions.internal_object_type_desc,
       internal_partitions.rows
FROM sys.column_store_row_groups
INNER JOIN sys.indexes
ON indexes.index_id = column_store_row_groups.index_id
AND indexes.object_id = column_store_row_groups.object_id
INNER JOIN sys.tables
ON tables.object_id = indexes.object_id
INNER JOIN sys.partitions
ON partitions.partition_number = column_store_row_groups.partition_number
AND partitions.index_id = indexes.index_id
AND partitions.object_id = tables.object_id
LEFT JOIN sys.internal_partitions
ON internal_partitions.object_id = tables.object_id
AND column_store_row_groups.deleted_rows > 0
WHERE tables.name = 'table_name'
Figure 3: One delta group is open, and there is no delete rows

| Steps to delete large size of data in columnstored table

1. delete non-clustered B-tree index( after delete operation, rebuild it if needed)

If we run execution plan for bulk delete, you will find B-tree index related operations spend most of time rather than columnsotre index.

Figure 4: B-tree index related operations spend 90% time.

2. delete by a small batch

One of the down side of Azure SQL is we can not set log as simple. So it will take lots of transaction time when we try to delete a large table. The work around is delete by a small batch to make each log transaction smaller and quicker. Blew I gave an example to delete data by chunksize = 1000000.

deleteMore:
delete top(1000000) from table_name
where id%2=0
IF @@ROWCOUNT != 0
begin
       print current_timestamp
    goto deleteMore
end

3. rebuild/reorganizing columnstored index if needed

Sometimes, we have to rebuild/reorganizing index if we delete too many data and it affects performance of query. here I give a snippet to show how to archive it.

-- rebuild column stored index
alter index indexname on table_name rebuild/reorgnize
alter index all on tablename rebuild/reorgnize

-- check fragment
SELECT a.object_id, object_name(a.object_id) AS TableName,
    a.index_id, name AS IndedxName, avg_fragmentation_in_percent
FROM sys.dm_db_index_physical_stats
    (DB_ID (N'schema_name')
        , OBJECT_ID(N'table_name')
        , NULL
        , NULL
        , NULL) AS a
INNER JOIN sys.indexes AS b
    ON a.object_id = b.object_id
    AND a.index_id = b.index_id;
GO

Reference:

Choose the best columnstore index for your needs. https://docs.microsoft.com/en-us/sql/relational-databases/indexes/columnstore-indexes-design-guidance?view=sql-server-ver15#choose-the-best-columnstore-index-for-your-needs

Columnstore indexes – Query performance. https://docs.microsoft.com/en-us/sql/relational-databases/indexes/columnstore-indexes-query-performance?view=sql-server-ver15

How to efficiently delete rows while NOT using Truncate Table in a 500,000+ rows table. https://stackoverflow.com/questions/11230225/how-to-efficiently-delete-rows-while-not-using-truncate-table-in-a-500-000-rows

Hands-On with Columnstore Indexes: Part 1 Architecture. https://www.red-gate.com/simple-talk/sql/sql-development/hands-on-with-columnstore-indexes-part-1-architecture/

Data Factory CI/CD in Azure DevOps

Azure Pipeline is consist of two parts: pipeline and release. They represent
CI and CD separately.

  1. Build Pipeline – to build and test the code. The build creates an artifact that’s used by the rest of your pipeline to run tasks such as deploying to staging or production. 
  2. Release Pipeline – once the code is updated, built and packaged, it can be deployed to target services using Release Pipelines. 
top half: CI pipeline; down half: CD pipeline

Let’s talk about how to implement this process for Azure data factory. Before you start, I suppose your current ADF has matched these requirements (if not, please refer to here):

  • You already have a Azure Repos.
  • ADF is integrated with this Git Repos.
  • A key Vault. which is used for storing database and datalake connection information, plus all configuration parameters for release.

The whole ADF CI/CD pipeline is like this:

Building Pipeline (CI)

  • Create new pipeline, choose “adf_publish” for default branch
    • “adf_publish” branch is created by ADF automatically, after you click “publish” in your ADF GUI.
the three json files are used to set parameters in the different environments
  • Add new Agent job
  • search for “Data factory”, add a new publish artifacts
    • set the path to publish(git path)
    • set artifact publish location as Azure pipelines

Release Pipeline (CD)

left part: CI right part: CD

So next step is deploying artifact into three environments: DEV, QA and PROD.

use key vault to set sQL Server in pipeline
  • Import Azure Key Vault. where you stored all the connection information. We would use these information in two areas.
    • Release pipeline: get the basic information in each environment, e.g, database name, data lake name, etc. see the screenshot above.
    • template parameters used in adf_publish.
  • Set the Datafactory.
    • Add a “Azure Resource group deployment”. where we need to focus one the template path.
we have to set the ARM template and parameters which point to adf_publish branch

You can also add trigger stop and start before and end of data factory job.

# stop trigger, you have to add variables for each release environment
$triggersADF = Get-AzDataFactoryV2Trigger -DataFactoryName $(DataFactoryName) -ResourceGroupName $(ResourceGroupName)

$triggersADF | ForEach-Object { Stop-AzDataFactoryV2Trigger -ResourceGroupName $(ResourceGroupName) -DataFactoryName $(DataFactoryName) -Name $_.name -Force }
# end trigger
$triggersADF = Get-AzDataFactoryV2Trigger -DataFactoryName $(DataFactoryName) -ResourceGroupName $(ResourceGroupName)

$triggersADF | ForEach-Object { Start-AzDataFactoryV2Trigger -ResourceGroupName $(ResourceGroupName) -DataFactoryName $(DataFactoryName) -Name $_.name -Force }
Stop trigger is mandatory for releasing ADF.
  • How to set up adf_publish
    • go back to azure repos.
    • switch to adf_publish.
    • create corresponding ARM parameter template for each environment. You can find an example blew.
the three json files are used to set parameters in the different environments
{
	"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
	"contentVersion": "1.0.0.0",
	"parameters": {
		"factoryName": {
			"value": "datafactory name"
		},
		"LS_AKV_KeyVault_properties_typeProperties_baseUrl": {
			"value": "https://XXXXkeyvault.vault.azure.net/"
		},
		"LS_SQL_ConfigDb_properties_typeProperties_connectionString_secretName": {
			"value": "sqlconnection"
		},
		"LS_DLS_Datalake_properties_typeProperties_url": {
			"value": "https://XXXXXdatalake.dfs.core.windows.net/"
		}
	}
}

These three parameters are used for dynamic linked service. These three parameters are defined in ARMTemplateForFactory, and set the value in each seprated Json files. To define the parameters, you have to first go to “Parameterization template” table and edit it.

Here is an example that I defined a linkedservice for AzureKey Vault. for more information please refer to : https://docs.microsoft.com/en-us/azure/data-factory/continuous-integration-deployment#use-custom-parameters-with-the-resource-manager-template

If we need enable Continuous deployment trigger , we have to link with adf_publish branch as well.

click the little logo in release pipeline page.
set to adf_publish

Reference:

Azure DevOps Pipelines. https://docs.microsoft.com/en-us/azure/devops/pipelines/?view=azure-devops

Continuous integration and delivery in Azure Data Factory. https://docs.microsoft.com/en-us/azure/data-factory/continuous-integration-deployment

Use Azure Key Vault to pass secure parameter value during deployment. https://docs.microsoft.com/en-us/azure/azure-resource-manager/templates/key-vault-parameter?tabs=azure-cli

Apache Kafka in Practice – 1

First thing first, I should remind all visitors I am not a master in Kafka. Actually I am just a beginner learning through official Apache Kafka website and some free udemy class. There might be some mistakes although, I will fix them once I find.

I put this as Kafka in practice as I want to introduce some basic skills such as launching a Kafka service, creating a producer also consumer, UI tools and basic python library.

Install and Launch Kafka

Kafka is based on Java and Zookeeper( which is used to manage cluster). I recommend to use Linux or WSL on windows.

sudo apt update
sudo apt install openjdk-8-jdk

# test 
java -version

Download kafka, and unzip it

# download from website
wget <URL of kafka.tgz>

# unzip
tar -xvf kafka.****.tgz
cd <kafka_folder>

# test, run following
bin/kafka-topics.sh

Add command to path(optional)

# need to restart after these steps
nano ~/.bashrc
add this to the end:
PATH="$PATH:/home/<your name>/<kafka folder>/bin"

# check under any folder
kafka-topics.sh

Start Zookeeper

# change zookeeper data dictionary
mkdir data
mkdir data/zookeeper
nano config/zookeeper.properties
# add blew into zookeeper.properties:
dataDir=/home/name/data/zookeeper

# run zookeeper:
zookeeper-server-start.sh config/zookeeper.properties

Start a kafaka broker

mkdir data/kafka
nano config/server.properties

# modify server.properties to change log folder:
log.dirs=/home/name/data/kafka

# start kafka
kafka-server-start.sh config/server.properties

Right now you should have a zookeeper and a broker running. Then let’s do some operations about topic, producer and consumer.

Topic operations

# create a topic
# you can not have replication factor greater than available brokers
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --create --partitions 3 --replication-factor 1


# list all topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe


# delete topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic second_topic --delete

# recover in case broker starting failed after deleting topic
1. delete topic under broker log folder, which you can find log.dirs in server.property
2. start zookeeper
3. enter zookeeper shell, zookeeper-shell.sh host:port
    3.1 list the topics using: ls /brokers/topics
    3.2 remove topic:rmr /brokers/topics/yourtopic
4. restart kafka server

Create a console producer

# console producer
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic

# add some properties
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --producer-property acks=all

# if you set a topic that not exists, the kafka will create a new topic, with one partition and one replication by default. we can change it in config/server.properties

Create a console consumer and Set consumer groups

# read from now on
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic
# read all messages
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning

# set consumers in group
# all the message sending to this group will be split into all consumers
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group group1

# consumer groups
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group group1
# Lag: show how many message has not received yet
# reset offset ---> offset descide by consumer
# --to-earliest / --shift-by n [offsets]
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092  --group group1 --reset-offsets --to-earliest --execute  --topic first_topic
# when a consumer leaves, reblance will happen

Add Keys to producer

# keys
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --property parse.key=true --property key.separator=,
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --property print.key=true --property key.separator=,

UI Tools

kafka Tool is easy to use UI tool for manage topics, brokers, consumers. You can install it on Linux or windows. If you use WSL on windows, the zookeeper port is also open for windows.

# how to use in Linux
wget http://www.kafkatool.com/download2/kafkatool.sh
chmod +x kafkatool.sh
# after installation
cd ~/kafkatool2/
./kafkatool

Developing in Python

Kafka-python provides common functions for kafka. we can find more information here.

Consumer API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# basic
from kafka import KafkaConsumer
consumer = KafkaConsumer('first_topic',bootstrap_servers=['localhost:9092'],auto_offset_reset='earliest')
for msg in consumer:
    print (msg.value)

# other key parameters
# group_id, key_deserializer, value_deserializer,auto_offset_reset[earliest, latest,None]

# assign and seek offset
consumer.assign(partitions)
consumer.seek(partitions, offset) #Manually specify the fetch offset for a TopicPartition.
consumer.assignment() # read assigned partition
beginning_offsets(partitions)
end_offset(partitions)

Producer API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# basic
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],acks = 1)

for _ in range(10):
    future = producer.send('test-topic', b'some_message_bytes')
    # or add key to fix the partitions by ordering
    future = producer.send('test-topic', key = 'key', value = b'some_message_bytes')
    # result = future.get(timeout=60)  # do not block, it will kill performance
    # print(result)
# producer.flush()  # Block until all pending messages are at least put on the network
producer.close()  # make sure to .close() your producer before shutting down your application

Serialization and Deserialization

  • Serialization: the process of converting an object into a stream of bytes for the purpose of transmission
  • Deserialization: the opposite of Serialization
# Serialize json messages
import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('fizzbuzz', {'foo': 'bar'})

# Deserialize msgpack-encoded values
consumer = KafkaConsumer(value_deserializer=msgpack.loads)  # same as key_deserializer
consumer.subscribe(['msgpackfoo'])
for msg in consumer:
     assert isinstance(msg.value, dict)

# Serialize string keys
producer = KafkaProducer(key_serializer=str.encode)
producer.send('flipflap', key='ping', value=b'1234')

# Compress messages
producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
     producer.send('foobar', b'msg %d' % i)

Thread

Producer is thread safe, however, consumer is not. recommend use multiprocessing.

Client Compatibility

Always use the latest client library version, since older client/newer client can talk to any broker.

Reference:

kafka-python: https://github.com/dpkp/kafka-python

Apache Kafka Series - Learn Apache Kafka for Beginners v2

Apache Kafka Series – Learn Apache Kafka for Beginners v2

Apache Kafka Website, https://kafka.apache.org/

Apache Kafka Concepts and Theory.

It’s a little bit late to talk about Kafka, since this technology has been widely used for a long time. These days, I finally has time to learn it ans summary the major concepts inside. In this nutshell, I will split the page into three parts: why do we need it, basic concepts and how it works.

Why do we need Kafka?

Related image
before we use kafka

As we have more and more systems merged into a big network structure. and each system has their own protocols and communication methods. we have to take more time and endure the super complex structure. At the end, the cost will be raised as well.

Kafka as a central messaging bus

Kafka provides a central messaging bus with distributed, resilient and fault tolerant. Each source and target connects to kafka cluster as producers and consumers. They use same protocols to send and receive the messages(key-value and timestamp). We can seem it as a hybrid solution combining message system with distributed retention.

Concepts

Producers publish the data into kafka, it’s source of the data. which could be sensors, laptop, loT, logs etc.

  • producers choose which records to assign to which partition within the topic ( topic is stream of data, like table in database; partition is part of topic. we will talk about them soon)
  • Producers use round-robin or function to choose partition to archive load balance between nodes(brokers) in the cluster.
    • targetPartition=utils.abs(utils.murmur2(record.key()))%numPartitions
    • use murmur2 algorithm

Brokers are nodes/servers in the cluster. They are key features for kafka features like fault tolerance, high performance.

Thumbnail
  • broker identified by ID
  • each brokers contains certain topic partitions
  • connect any broker = connect to the entire cluster
  • typically 3 or more brokers to achieve redundancy
  • message in a topic are spread across partitions in difference brokers, each partition can be replicated aross multiple brokers

Consumers subscribe the message from kafka cluster.

  • consumer can be a single receiver or a distributed cluster named consumer group
  • a consumer group can read from topic in parallel

Topics are a particular stream of data which identified by its name. Each topic is split into partitions. We can set partition number and replication factor for each topic. Partition is ordered immutable sequence of records(messages) with their id named offset.

  • offset’s order is only guaranteed in one partition
  • data is kept only for a limited time(default is one week).
  • partition is immutable
  • consumer can choose any offset as start reading point, so that each consumer wouldn’t impact others.

Connector is used to connect between topics and app/database. A Source Connector (with help of Source Tasks) is responsible for getting data into kafka while a Sink Connector (with help of Sink Tasks) is responsible for getting data out of Kafka.

Image result for kafka connector
  • stream an entire sql database to kafka
  • stream kafka topics into hdfs
  • recommend to leverage build-in connectors

How does it work?

Producer to kafka

  • Producer chooses topic and partition to send the inbound message into kafka.
  • Topic splits into several partitions. Each partition has one broker as leader and zero or more brokers as follower named ISR (in sync replica) . Only the leader can receive and serve data for partitions. Other brokers will sync the data from the leader.
  • A topic with replication factor N, it can tolerate up to N-1 server failures.

kafka to consumers

  • As we mentioned ahead, consumer can be single node or consumer groups.
  • if consumer number is greater than partitions number, some consumers will be idle. 
  • if consumer number is less than partitions number, some consumers will receive messages from multiple partitions. 
  • if consumer number is equal to partitions number, each consumer reads messages in order from exactly one partition

Data consistency and availability

  • Partition
    • Messages sent to a topic partition will be appended to the commit log in the order they are sent
    • a single consumer instance will see messages in the order they appear in the log (message order only guaranteed in a partition)
    • a message is ‘committed’ when all in sync replicas have applied it to their log
    • any committed message will not be lost, as long as at least one in sync replica is alive.
  • Producer options
    • wait for all in sync replicas to acknowledge the message
    • wait for only the leader to acknowledge the message
    • do not wait for acknowledgement
  • consumer options
    • receive each message at most once
      • restart from the next offset without ever having processed the message
      • potentially message loss
    • receive each message at least once
      • restart and process message again. duplicate messages in downstream,
      • no data loss
      • ( recommended, downside stream handles duplicate message)
    • receive each message exactly once
      • transitional level
      • re read the last transaction committed
      • no data loss and no data duplication
      • significantly decreasing the throughput using a transaction

Architecture

Message structure

Fundamental data flow

Kafka in Azure through HDinsight

Reference

Apache Kafka official website. https://kafka.apache.org/intro.

Kafka in a Nutshell. https://sookocheff.com/post/kafka/kafka-in-a-nutshell/

Apache Kafka Series – Learn Apache Kafka for Beginners v2, https://tpl.udemy.com/course/apache-kafka/learn/lecture/11566878?start=180#overview

What is Apache Kafka in Azure HDInsight, https://docs.microsoft.com/en-us/azure/hdinsight/kafka/apache-kafka-introduction