It is harder and harder to write down a long journal focusing on a single topic since most of the time is spent burning my fat these days. Maybe it is time to list some problems we solved or still listed after half a year without updating.

Issue 1. Too many job clusters launched in pipelines.

An example of the basic pipeline for single clients.

One year ago, we implemented Bronze, Silver, Gold structure with delta tables. It was very convenient to process big data with correct partitions. However, there are some issues in our pipelines:

  • Too many resources are needed and cluster launch time is also wasted. The screenshot above shows 3 clusters requested. And there is a loop outside of these 3 clusters to parallel clients( usually 5 running at the same time). So the number of the total clusters is 3X5=15. Depending on the cluster size, let’s say 4 VMs for each cluster, we need at least 15*4=60 VMs. Since workers are spot VM, they need more time to request and re-request if lost. The above example is the simplest case, some pipelines are more complex, maybe need hundreds VMs.
  • The cluster on Bronze tier is utilized with low performance due to the use of zip format. I will talk about it later.


Option 1: Create two High Concurrency clusters. One for Bronze, Silver, Another for Gold.

  • Pro: Cluster starts time significantly reduced. On-demand cluster only lanuch once within the period of the batch. Based on our test, the overall pipeline running time was reduced by 50%. And the cost was reduced by 10-20%.
  • Con: All pipelines run on the same cluster. You have to monitor the cluster size and memory usage to reallocate the size of clusters although auto-scale and round-robin can provide great help to prevent system crashes. You also need to optimize the code to reduce the pressure on the driver. We found lots of delays in this mode due to the bad code on the driver.

Option 2: Move pipeline into workflow on databricks.

  • Pro: the workflow in databricks can reuse job-cluster. So Bronze, Silver, and Gold can run on the same cluster by clients.
  • Con: We need to move pipelines from ADF to workflow in databricks. Even it is only time matters, there is another issue, unbalance of usage between 3 tires within the same workflow. Maybe we can leverage auto-scale to fix unbalance between Gold and the other two tiers, but auto-scale needs time also, and that time may be longer than the code itself.

Issue 2. Zip format is not supported well in databricks.

We used to use zip format to collect data from client servers since the period of on-premise. We kept this format on the cloud using AZcopy to upload the same files to the staging area then processed them in ADF pipelines. But databricks didn’t support zip format well, the unzip command can only run on the driver.


Option 1: Directly upload log files that used to be compressed as zip files into Data lake through . Since lots of logs, the organization of the folder based on timestamps, like yyyy/mm/dd/hh/MM/ss. then use Autoloader to load them.

  • Pro: Simple solution. easy to implement on Azure.
  • Con: The size of the file maybe affect the cost and network. And small files affect the performance of data lake when we read them.

Option 2: Compress log files into a single parquet file rather than zip format. Upload parquet files to data lake and use autoloader to ingest them.

  • Pro: Keep the same logic before on the client-server; quicker ingest into delta table; no small file issue.
  • Con: need to query delta table or leverage other tools to recover original log files.
import pyarrow as pa
import pyarrow.parquet as pq
import glob
import os

folder_path = 'E:/Test/Upload/*.log' # the folder incudles all .dcu files
parquet_path = 'E:/Test/test.parquet' # single output file

schema  = pa.schema([
    ('content', pa.binary())
pqwriter = pq.ParquetWriter(parquet_path, schema, compression='SNAPPY',write_statistics=False)  
# list all files
file_list = glob.glob(folder_path)
for file in file_list:
    with open(file, mode='rb') as file: # b is important -> binary
        fileContent =
        pa_content = pa.array([fileContent])
        pa_filename = pa.array([])
        # pylist = {'content':fileContent}
        table = pa.Table.from_arrays([pa_filename,pa_content],names = ['file_name','content'])
    # close the parquet writer
if pqwriter:

Option 3: Use the Streaming solution. We can create an Event hub capture to receive logs from client-sever and connect with autoloader.

  • Pro: Event-Driven solution. client log can be immediately processed once it is generated.
  • Con: Need to change code on client-server, also not every client supports streaming due to security requirements.

Issue 3. Need a more flexible solution to trigger pipelines.

We used to create scheduled trigger / tumbling trigger on ADF. But the trigger itself doesn’t support reading metadata and dynamically adjusts trigger time or some parameters. We need to create a metadata table to easily maintain the triggers for each client + reports/pipeline.

Option 1: Use Azure function to call rest API. The azure function can get the metadata from any source.

  • Pro: Easy to implement and very flexible with coding.
  • Con: If we put too much logic to trigger different pipelines, it is not as clear as the ones in ADF. And we lost some monitor features which are helpful in ADF unless you want to recreate these features.

Option 2: Leverage airflow to control pipelines.

  • Pro: TBD
  • Con: TBD

Issue 4. Legacy Python code to databricks.

There is some python code to process the data row by row. and covert these rows to multiple rows. By the content of each row, the logic may vary. We can easy to think to use UDF directly to leverage this old python code. But this way is very low efficient especially when the UDF contains lots of subfunctions.

Option 1: Use Azure Batch on cluster. We can split all rows into multiple tasks, then push these tasks into workers to process them. the worker is still running legacy python code, then output the results and merge them together.

Azure Batch: split tasks into workers
  • Pro: No need to change the python code.
  • Con: How to efficiently split tasks running on workers. And Python may be too slow.

Option 2: Use Databricks native pyspark. We need to split the incoming dataset into several small subsets. Then use different logic to process each of them.

Databricks: split dataset by message type into small subsets
  • Pro: Native pyspark/Scala is much quicker than UDF.
  • Con: Need a smart solution to handle different logic to avoid if-else logic. (TBD)

How to break down databricks DBU cost to the pipeline level

One day we found databricks DBU cost surged, but we didn’t know which ADF job caused this issue. Then I asked Azure support if any way we can track to the pipeline level. Unfortunately, they told me “ Regarding your question,  please note that you can only track cost against the resource group or workspace(cluster). Unfortunately, there is no way to track Databricks cost at a job or user level.” Ok, that’s fine.

However, when I opened the cost details which exported from cost management in Azure, I found a tag column in DBU resource usage.

"ClusterId": "1210-XXXX-XXXX","DatabricksInstancePoolCreatorId": "XXXXXX","DatabricksInstancePoolId": "1025-204618-XXXXX-pool-XXXXXX","ClusterName": "job-8810145-run-1","JobId": "8810145","RunName": "ADF_df-eus-prod-01_Standard_Module_run_pipeline_935e1d13-02c5-4ae3-9441-3a414d1ad0eb","Creator": "","Vendor": "Databricks","DatabricksInstanceGroupId": "-8346123XXXXX6640367"

Are these our databricks information? Maybe we can do something !? After consulting from my coworker Sai, he told me the JobID is similar to notebook run ID. Then everything could be solved since we already records everything when running notebook including ADF pipeline Id, cust information, notebook name, notebook Runid(jobId) and start-end time.

By simple SQL query to join cost csv table and log table, we can category DBU cost to each ADF pipeline.

select ProductName,t2.notebook_name,sum(t1.PreTaxCost)  from cost_df t1
left join (select distinct notebook_name,notebook_runID,cust_name from delta.`/mnt/eus-logging/job-log-details` ) t2 on t2.notebook_runID=get_jobid(Tags)
where ProductName='Azure Databricks - Premium - Jobs Compute - DBU' 
group by ProductName,t2.notebook_name

To go through this process, you have to log the ADF pipeline and notebook runtime information when you execute the notebook. I was doing this by add following function at the start/end of notebook.

# at beginning
notebook_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
notebook_RunID = getRunID()  # this is function in /Public/JOB_LOG
write_log_detail(ADFRunID,cust_id,cust_name,notebook_name,notebook_RunID,startTime=str( # notebook start time

# at the end
write_log_detail(ADFRunID,cust_id,cust_name,notebook_name,notebook_RunID,endTime=str( # notebook end time

Here function write_log_detail is the one writing into delta table.

Unfortunately, I have not found the way to break down the VM cost into pipeline level, I think there should be a way. Just need be more time. will update later.

A real case of optimazing spark notebook

At the beginning of my optimazation, I tried to find some standard principles that can quickly and smoothly help me. Altough lots of information online that indicate where I can improve my spark, none gives an easy solution to fix the key issues from end to end. So, I summried what I understand through online materical and apply some strateges to my spark notebook which I found may have some performance issues.

  • Start with the some slow jobs, you can order by “Druations” column. Some guides may go to the details of slowest stages. But here I suggest go to the related SQL Query page to review the whole process on the query level. It will help us to quickly figure out the common query performance issues, like duplicate scans without filters or inappropriate join, etc.
In Jobs tab of Spark UI, we can order by duration of each job to find the some low performance jobs.
In the page of job detail, we click “Associated SQL Query” firstly.
  • One table read multible times. If we found one table is scaned multiple time at the beginning, we may consider create caches to save the temporary result into the memory/local disk, rather than go through the remote disk.
In my query DAG, there are two table scan parallelly, but they are from the same table and read the same data out. record read are both 884,688,530.
  • Too many similar branches. If we see many similar branches, it may be caused by lack of cache too. The medium results have to be recomputed when reaching the point where we need it.
Rad/Orange lines show us three copies of data, but essinssically they are doing the same process. We have to think about how to cache the medium compute results.
  • Duplicate slow stages seem in the stage tab. It may caused by uncautioned code mistake. The case blew shows same dataframe execute computing twice. But this piece code is very normal in python.
two jobs spent same time and with same piece of code, we may consider it as missing cache.
Back to the notebook, we found there is a dataframe executed twice.
To cache table is like blew:
  • Figure out strange operations. This process will take longer time compared with previous two. We need to match the DAG to SQL in the notebook and understand the wide/narrow shuffle. Then pick some wild shuffle we didn’t expect.
When I went through the notebook with DAG, I found this “HashAggregate” is not my expect. Then I realized I write “Union” rather than “Union All”, this misktake took addtional time to reduce duplicates (which should not exist at all).
  • Reduce data size by adding project to select the columns we need. I think this is the easiest way to improve the performance, especially for the big tables.
Simply add the column names into select query, it reduced the data reading size from 27.9GB to 23.5GB.
  • Reduce data size by using filter as early as possible. We can check if filter push down into scan, or manuly move the filter when read the table.
  • Corroctly partition destination table. When are use Merge or Replacewhere, it is better to partition correctly.
After we created reading_date as partiton column, the time cost reduced from 2.49 mins to 6.67 seconds.Similar optimazation will happen in merge and replacewhere.
  • Reduce spill. When executor is out of memory, it will spill into disk which is much slower than memory. To solve this issue. we can do one or more of following steps:
    • Increase executor memory by change a bigger cluster or set spark.executor.memory(if workable)
    • Increase the fraction of execution and storage memory(spark.memory.fraction, default 0.6).
    • Adjust spark.memory.storageFraction(default 0.5). Depend on when the spill occurs, either in cache or shuffle, we can adjust storage fraction.
    • Adjust spark.sql.shuffle.partitions (default 200). Try to increase shuffle partition size to avoid spill.


Spark Configuration,

Performance Tuning,

From Query Plan to Performance: Supercharging your Apache Spark Queries using the Spark UI SQL Tab,

How to Read Spark DAGs | Rock the JVM,

Optimize performance with caching,

Some working performance/cost improvement tips applying to ADF and Databricks recently

While switching to the cloud, we found some pipelines running slowly and cost increased rapidly. To solve the problems, we did flowing steps to optimize the pipelines or data structures. They are all not hard to be implemented.

1. Set the different triggers for different recurring periods.

No matter for what reason, it is very common that one data pipeline is triggered recursively. However, the sources of this data pipeline may be refreshed with different frequency. In this case, we may set two triggers for this pipeline. So that this pipeline doesn’t have to run on the highest frequency.

In the ForEach loop, there are two kinds of clients, one is updated every 6 hours, another is updated daily. So two triggers were created, but running in the same pipeline.

2. Set the different pools for notebooks.

Databricks provides pool to efficiently reuse the VMs in the new created clusters. However, one pool can only has one type of instance. If we set pool instance too small, the out of memory or slow process will occur, while if the pool instances are too big, we are wasting our money.

To solve this problem, we set two kinds of pools. The heavy one with instance of 4 cores, 28GB. the light one with instance of 4 core, 8GB memory. For some light weight job, like ingest the source files into bronze table, the light one is a better choice. while the heavy weight one could be use to some aggregation work.

Two pools are created for the different weight processes in the notebooks.

3. Create second hand index or archive folder for the source files.

At the beginning of switching to the cloud, we put everything into a single folder. It is fine for azure. But once we need to scan the spec files or get the location of the file, the huge number of files in this folder will significantly effect the performance, the situation will become worse as file number increase with time. To solve this problem. we have three solutions:

  • create second hand index when file moves into the folder. The second hand index like a table which records the important information of the file, like name, modify date, location in datalake. So next time, when someone need to scan the folder, he only need to scan this second hand index rather than the whole blob dictionary.
An example of second hand index
  • Archive the files. The maybe the easiest one, coz you only need to put the files that you would never use into archive folder. Then you are not time sensitive to get the files back from archive folder.
Part of code to leverage works to move files into Archive folder.
  • Organize files by its receive date. This method is similar with the archive one. but we created many archive folders by date.
Each file are re-organized and put into the date folder when the file received

4. Optimize delta table weekly.

Don’t hesitate to do optimize/vacuum your delta table regularly. The small file will kill the advantage of delta table especially in the “Merge” operation. Following is the example of creating a delta table list that need to be optimized.

delta_table_list = ['customer','job','job-category','job_subscription','missing_file_check','parameter']
# optimize bronze table
for delta_table in delta_table_list:
  sql_string = '''optimize delta.`/mnt/eus-metadata/{0}`'''.format(delta_table)
for delta_table in delta_table_list:
  sql_string = '''VACUUM delta.`/mnt/eus-metadata/{0}`'''.format(delta_table)

First Glance on GPU Accelerated Spark

Since I started to play with cluster, I thought there was no mission which was not able to be completed by cluster. If there is, add another node. However, except Cuda on the sing-alone machine, I have been rarely touched GPU accelerated cluster as data engineer. Yes, maybe spark ML can utilize GPU since spark 3.0, but most jobs of DE are data pipeline and data modeling related. Until recently I googled “GPU ETL”, then it came out SPARK-RAPIDS, which leverage RAPIDS libraries to accelerate processes by GPU.

Easy to configure on Spark to gain advantage of GPU

I almost didn’t change anything from my original spark code( mixed pyspark, spark SQL, python and delta table access) to let it running on GPU cluster. That is a great thing! Everybody wants to get double with half work.The result of mine is speed up to 3.1mins(12 cores, 3 NVIDIA T4 GPUs, 84GB memory) from 5.4 mins(24 cores, 84GB memory). I think I can get better results if my inputs is bigger chunk of data like 500MB+ parquet.

Here is my configuration on databricks. You can also refer to this official document. ( however, I didn’t make cluster running by this document)

# cluster runtime version: 7.3 LTS ML (includes Apache Spark 3.0.1, GPU, Scala 2.12)
# node type: NCasT4_v3-series(Azure)

# spark config:
spark.task.resource.gpu.amount 0.1 true
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-0.5.0.jar:/databricks/spark/python
spark.plugins com.nvidia.spark.SQLPlugin
spark.locality.wait 0s
spark.rapids.sql.python.gpu.enabled true
spark.rapids.memory.pinnedPool.size 2G
spark.python.daemon.module rapids.daemon_databricks
spark.sql.adaptive.enabled false false
spark.rapids.sql.concurrentGpuTasks 2

# initial script path

# create initial script on your notebook
sudo wget -O /databricks/jars/rapids-4-spark_2.12-0.5.0.jar
sudo wget -O /databricks/jars/cudf-0.19.2-cuda10-1.jar""", True)
# some key configuration meaning
--conf spark.executor.resource.gpu.amout=1 # one executor per GPU, enforced
--conf spark.task.resource.gpu.amount = 0.5  # two tasks running on the same GPU
--conf spark.sql.files.maxPartitionBytes=512m  # big batch is better for GPU efficient, but not too big which lead to out of memory issue
--conf spark.sql.shffle.partitons=30 # reduce partiton size from default 200 to 30. large size of partition is better for GPU efficient
--conf spark.rapids.sql.explain=NOT_ON_GPU # watch the log message output why a spark operation is not able to run on the GPU

The only problem I met till now was, when the notebook includes the view composed of delta tables, it would popped out error message “covert struct type to GPU is not supported”. Easily persist view to table can solve this problem. Overall, very cool plugin.

The magic of spark-rapids

There are two major features to let spark running on GPU through Rapids.

  • Replace of CPU version ops with GPU version ops on the physical plan level.
  • Optimized spark shuffle using RDMA and GPU-to-GPU directly communication.
How RAPIDS accelerator works in spark: left side- replace CPU ops with GPU version based on data type & op type; right side-Optimized spark shuffle between GPUs.

Since the replacing happens on the physical plan level, we don’t have to change the query. Even the operations are not supported by RAPIDS, it still can run on CPU.

A CPU spark execute plan is like: logical plan —-> optimized by catalyst  ——> optimize the logical plan in a series of phases  —> physical plan  —-> executed on cluster. While A GPU spark execute plan replaces operations on physical plan with GPU version and execute on the columnar batch.

If we look at the physical plan on GPU and CPU, we will find they are almost one to one match except some GPU-CPU data exchange ops, like GPUColumnarToRow, GPURowToColumnar.

parts of GPU physical plan

parts of CPU physical plan

Compare of two physical plans, we can find lots of operations have GPU version already. “Scan parquet” replaced by “GpuScan parquet”, since GPU needs columnar format, so it skipped “ColumnarToRow” which in CPU version. Then “Project” replaced by “GpuProject”. followed by “GpuColumnarToRow”, because next step “InMemoryTelation” was running on CPU. so on so forth.

Let’s talk about another great feature: Optimized spark shuffle.

Spark needs shuffle on wild transforms while narrow transforms grouped into single stage. Spark-Rapids implements a custom spark shuffle manager for its GPU clusters shuffle operation.It is able to:

  • Spillable cache: it makes the data close to where it is produced, meanwhile it provides out of memory issue by moving data by the rule: GPU memory –> Host memory —> Disk.
Once GPU is out of memory, shuffle manager will put data into host memory, if host memory is not enough, then continuously push data into local disk.
  • Transport(shuffle): Handles block transfers between executors leveraging UCX libraries. ( see picture blew)
    • GPU0-GPU0: cache in GPU0 , zero copy
    • GPU0-GPU1: NVLink
    • GPU0-GPUX(remote): RDMA
      • Infiniband
      • RoCE
    • Disk- GPU: GPU direct storage(GDS)
RDMA and NVLINK is not surprised to be used in the cluster, since UCX is wildly used in OpenMPI for HPC area. These features have been included in RAPIDS libraries.

Some helpful information from official Q&A:

What operators are best suited for the GPU?

  • Group by operations with high cardinality
    • Joins with a high cardinality
    • Sorts with a high cardinality
    • Aggregates with a high cardinality
  • Window operations, especially for large windows
  • Aggregate with lots of distinct operations
  • Complicated processing
  • Writing Parquet/ORC
  • Reading CSV
  • Transcoding (reading an input file and doing minimal processing before writing it out again, possibly in a different format, like CSV to Parquet)

What  operators are not good for the GPU?

  • small amouts of data( hundred MB)
  • Cache coherent processing
  • Data movement
    • Slow I/O
    • Back forth to CPU(UDFs)
    • Shuffle
  • Limited GPU Memory

What is Rapids

  • a suit of open-source software libaraires and APIs for data science and data pipelines on GPUs. 
  • offering GPU dataframe  that is compatible with ApacheArrow
    • language independent columnar memory format
    • zeo-copy streaming messaging and interprocess communication without serialization overhead
  • Integrated with popluar framwworkd: PyTorch, Chainer, ApcheMxNet; spark, Dask

what is cuDF

  • GPU accelerated data preparation and feature engineering
  • python dropin pands replacement
  • features
    • CUDA 
      • low level libarary containing fuction implementations and c/C++ API
      • importing/exporting Apache Arrow using the CUDA IPC mechanism
      • CUDA kernels to perform element-wise math operations on GPU data frame columns
      • CUDA sort, join, groupby and reductions oprations on GPU dataframes
    • Python bindings
      • a python libaray forr GPU dataframe
      • python interface to CUDA C++ with addtional functionality
      • Creating Apache arrow form numpy arrays, Pands, DF, PyArrow Tables
      • JIT comppilation of UDFs using Numba




Getting started with RAPIDS Accelerator on Databricks,

Deep Dive into GPU Support in Apache Spark 3.x,

Accelerating Apache Spark 3.0 with GPUs and RAPIDS,

RAPIDS Shuffle Manager,


Unified Communication X,

Accelerating Apache Spark by Several Orders of Magnitude with GPUs,

SCD II or Snapshot for Dimension

SCD II is widely used to process dimensional data with all historical information. Each change in dimensions will be recorded as a new row configurated valid time period which usually on the date granularity. Since SCD II only keeps the changes, it significantly reduce the storage space in the database.

Understanding Slowly Changing Dimensions
An example of SCD II processing

Everything looks fine, until big data coming. A flaw has been amplified. That is delay arriving dimensions. In SCD II,, you have to do a complex steps to process both dimension and fact data for delay arriving dimensions.

For Dimension(SCDII table) , we have to change retro rows in SCDII.

In the example above, for product ID=010, if we have any change before May-12-06, see April-02-06. And this change is delayed after SID=003. We have to :

  1. Scan the dimensional table, find SID = 0002 and 0003
  2. change End_DT of SID(002) to April-02-06
  3. insert a new record, SID=004, Start_DT=April-02-06, End_DT=May-12-06

For Fact table, we have to change multiple historical data

Update all FK of SID in fact table to 004 where the time period between April-02-06 and May-12-06

It maybe not a problem to execute “UPDATE” process in Row stored table, but in the big data area, most storage format leverages “column stored”, which has great advantage for searching operation, but low efficient on updating. The good news is we don’t have to “UPDATE” data nor to detect the changes on dimension. The only thing we need to do is snapshot the daily dimension data. Since for each day, we have the whole backups, it also much easier for fact table joining the dimension: date-to-date mapping. That’s is all!

The example above will become like :

DateSource Product IDProduct NameProduct descr
May-12-0601010 inch Box10 inch Glued box
May-12-0701010 inch Box10 inch pasted box
May-12-0801011 inch Box11 inch pasted box
Snapshot daily product table

Would it has much redundancy in disk? No. Since we use column stored table, duplicated data in the same column is only saved once physically. Also the big data storage price is much cheaper than the traditional database.

Would it slow down the join due to much more data created in dimension table? No. There is another easy solution call “PARTITON“. Basically, we can partition the data by date, so that each partitioned folder presents a set of data group for that day.

An example for partitioned delta table.

“PARTITION” is transparent for the query. for example, when we execute query :

Select * from product 
where product_name ='10 inch Box' and Date ='May-12-06'

It will firstly go to the folder “Date =’May-12-06′“, rather than scan the whole table, then find the column “product_name =’10 inch Box'”. The partition operation is not only one level, you can create multiple levels.

Would it be faster to join the fact table? Yes. Once fact table is also partitioned by same column, e.g, date. Then we can easily mapping two date columns in dimension and fact. The scan processing is super quick. * some new tech, like delta table, you have to manually indicate the partition.


Transitional SCD is existing for a long time, the environment and prerequisites have been changed a lot to apply this method. I think snapshot is a better, faster and simpler solution in the big data.


Functional Data Engineering – A Set of Best Practices | Lyft, Watch Functional Data Engineering – A Set of Best Practices | Lyft – Talk Video by Maxime Beauchemin |

Building A Modern Batch Data Warehouse Without UPDATEs | by Daniel Mateus Pires | Towards Data Science, Building A Modern Batch Data Warehouse Without UPDATEs | by Daniel Mateus Pires | Towards Data Science

Slowly Changing Dimensions (SCDs) In The Age of The Cloud Data Warehouse (, Slowly Changing Dimensions (SCDs) In The Age of The Cloud Data Warehouse (

Parallel and Redundancy

Inspirational Quote Murphy's Law Sign Wall Decor Art | Etsy
 “Anything that can go wrong will go wrong”. — Murphy’s Law

1990, engineers were fighting for optimizing code performance and increasing CPU speeds.

1994, MPI started to be the dominant model used in high-performance computing.

2002, we started to leverage multi-cores processor to parallel computing.

2006, Hadoop opened a door of reliable, scalable, distributed computing using simple programming models in clusters.

2007, NVIDIA released first version of CUDA which enabled GPU for general purpose processing

2011, Kafka provided streaming solutions for handling real-time data feeds.

2014, Spark improved Hadoop, provides in-memory unified analytics engine with several enrichment libraries.

2019, Google scientists said that they have achieved quantum supremacy. We jumped out world of 0/1.

When we look back the evolution of these 30 years, there are two things taking the major role: Parallel and Redundancy. According to, Parallel means two or more lines that never intersect; while Redundancy means needlessly repeated. Why are they so important in computing evolution?

In the real word, an efficient team can make things easier than working along only due to more people. like 100 farmers can work on a farm at the same time with one person per line. Similarly, computing can achieve excellent improvement by splitting tasks into multi parallel “workers” . Modern CPU and GPU can split tasks into cores; big data related platform can split tasks into nodes of clusters; even in the quantum computer, it leverages superposition state to let single qbit process multi tasks at the same time. In rick and morty, there is an actor “time police”, who can exist in multi parallel time line.

The Red ugly head is “time police” who exists in 4 time lines at the same time.

So, why parallel is so powerful, we still need redundancy? According to Murphy’s Law, “Anything that can go wrong will go wrong”. Fragile is a common thing in whole universe. To anti-fragile, we have to use redundancy. More fragile things have more redundancy to anti-fragile. We human have two eyes, two lungs, two kidneys. Most insects spawn hundreds to then thousands eggs to keep them survive.

Before the age of big data, redundancy is also wildly used in the IT area. The wild-known case is CRC and RAID1. Cyclic redundancy check (CRC) is an error-detecting code commonly used in digital networks and storage devices to detect accidental changes to raw data since signal may be easily interfered/lost within environment especially through wireless or internet. RAID 1 consists of an exact copy (or mirror) of a set of data on two or more disks to avoid some accident data lost.

Error Detection in Computer Networks - GeeksforGeeks
Error Detection in Computer Networks. From “GeeksforGeeks”

Another Redundancy cased is Non-SQL. Not like SQL 3nf, the intention of Non-SQL is high reading speed. Non-SQL improves the performance by adding redundancy so that we don’t have to waste time to join the tables.

Getting Started with Python and MongoDB | MongoDB
MongoDb example, you can see “car” is stored as a nested document rather than a foreign key

After we headed to the age of big data, more and more “distributed” technologies are used to make work faster and parallel. But distributed itself is not as solid as we think. It bases on unstable network, easy broken storage devices and even sometime a natural disaster will destroy all your data. So redundancy is coming for anti-fragile as a brother of parallel. It creates extra “backups” in multi workers in the cluster that big data running. We are not only distributed computing but also distributed storage. Even we lost data in one worker, we immediately have same “backups” running in other workers.

Spark fault-tolerance by replicating data in memory
Kafka partitions are good example of redundancy to anti-fragile in streaming platform

Redundancy can also accelerate parallel process, specially hardware redundancy. Since we have replicas on two more places, the parallel requests can extract the same data from different place to balance the load and improve the I/O. There are two good examples: 1. Access, there must be a load balancer to control which server we will connect to. 2. Read data from hadoop, a piece of data stored in the different servers, but we can extract all of them at the same time.

4 Easy Steps to Master Apache Hadoop Development -Big Data Analytics News
Hadoop map-reduce process, it shows us distributed process and storage.

Parallel and Redundancy like twins. Lots of great new technologies in big data are related with both two concepts. The difference I feel is where they implement the “distribute”. in GPU, CPU, or clustered cpu/gpu/memory/disk…


Redundancy in Cloud Computing Means Checking Four Areas (

How Redundancies Increase Your Antifragility | The Art of Manliness

Optimize concurrency for merge operation in delta table

Concurrency control is normal in OLTP operations, but for OLAP, not really. So I didn’t take care of it until I met the error blew:

com.databricks.sql.transaction.tahoe.ConcurrentAppendException: Files were added to partition [cust_id=000] by a concurrent update. Please try the operation again.

This error caused by write conflict into single delta table by two merge operations. Base on conflicts matrix provided by databricks, we knew even in writeSerializable isolation levels, two merge operation can conflict.

To solve this problem, I did two steps:

  • Choose right partition columns. Here in my example, I only partitioned by cust_id, but the merge operation in two scripts, they need to update based on two columns: cust_id and report_type. So the first step is to change the partition columns.
  • Change condition clause in merge operations. I do put two partition columns into my merge condition clause. It likes:
A.cust_id=B.cust_id and A.report_type=B.report_type # where A is the merged table

But it still got conflict even after right partitioned. So I tried to hard code part of condition. It works, no conflict happened anymore. So I guess the delta table can not infer partition correctly by join.

"A.cust_id={0} and A.report_type={1}".format(cust_id, report_type)


Isolation Levels in delta lake:

Concurrency control:

How to improve performance of Delta Lake MERGE INTO queries using partition pruning:

Some features need to be improved in Azure Data Products

  • Azure Storage Explorer/Data Lake
    • Ghost file
      • In some rare case, if you delete files in ASE, then you call APIs or use browser data explorer, you will find 0 byte file is still there which we should already deleted. I reported Microsoft last year, they said they fixed it, which was true until I found ghost file came back last week. I think it should related some sort of soft delete in Hadoop.
    • Page
      • If we have thousands of files under a folder. It is a disaster. You would never easily to find the files you want. Especially if you don’t click “load more”, ASE wouldn’t load it into the cache so that you can not see it.
    • Copy/Move files
      • ASE use AZcopy to move/copy files. Especially, it should be robust and async. But in my experience, when I tried to copy batch of files, it is easy to show me error then ask me to try again. However, if we use API or Azcopy command to execute the same copy activity, they work fine.
    • Soft delete
      • Soft delete is a nice function in case we delete files mistakenly. but the soft delete only enabled down to the container level for ADLS gen2. If we want to recover the spec files, we have to know the file name and use “Restore-AzDataLakeStoreDeletedItem” to recovery them. It is petty hard for distributed structure like delta table whose file name is randomly created and maintained in json file.
  • Data Factory
    • No version control when connect to data bricks notebooks.
      • when you created a dev branch both for datafactory and databricks. Naturally, you thought data factory would call databricks on the same branch. But the truth is you are calling the data bricks notebook in the master(published) branch. I contacted Microsoft if there is a workaround, the answer is you can submit this feature on forum. what!!! 🙁
    • Unclear documentation about Global Parameter in CI/CD .
      • If we tried to change global parameter in release, we have to use ARM templdate. The detail document can be find here. But you have to use you own way( perhaps I should use word “guess”) to figure out the syntax. For example, I spent half a day to test how to make json template workable for global parameter through CI/CD.

   “Microsoft.DataFactory/factories”: {        “properties”: {            “globalParameters”: {                “NotificationWebServiceURL”: {                    “value”: “=:NotificationWebServiceURL:string”                }            }        },        “location”: “=”    }

Based on the official document, we should code like  “NotificationWebServiceURL”: “=:NotificationWebServiceURL:string”   , but it is wrong and generate a json object rather than string.

  • Data Bricks
    • since data bricks is the third part product, not much Microsoft can do to improve it. Frankly, it is very successful commercial product based on spark. Well integrated with key vault, data lake and Azure devops. The only complain will be it couldn’t( at least I don’t know) debug line by line like Google Colaboratory.
    • Delta table schema schema evolution. This is a great feature for continuously ingesting data with schema changing. What we don’t know is if we don’t know the incoming data schema, we have to use schema infer or set all field as string. neither way is perfect. Maybe only solution is using some schema-in-file rather than CSV.
  • Azure Data Studio
    • Pretty good expect only work for SQL Server. Based on the forum, I think MySQL connection extension is on the way.

I bought a raspberry Pi 4, and tried to replace my PC>:<

Since I got a RP4 8GB from microcenter with a fan case, I tried to use it to replace pc for daily use. Here I shared my searching results if you are interested.

I have another RP3 which I used as NAS to share contents in my USB HDD through network

OS system: Ubuntu server arm version installed in RP3 which is used as SMB sever. So that all the devices connect to same router can access documents and video stored in the USB hard Disk. Raspberry PI OS 32bit installed in RP4. I didn’t use 64bit due to lack of DRM support for Netflix or Hulu which are necessary to my family.

Enable Smb service on RP3: How to Setup a Raspberry Pi Samba Server – Pi My Life Up. Remember to add service to startup: sudo systemctl restart smbd.

Overclock for RP4: By default, RP4 only has 1.5Ghz. We can easily overlock it up to 2.147Ghz by editing /boot/config.txt. Remember! You have to attach a fan to your RP4. How to Safely Overclock your Raspberry Pi 4 to 2.147GHz – Latest open tech from seeed studio


Handle DRM: Netflix and hulu need DRM which is not default in chromium. but we have work around to install chromium media edition which enabled DRM. How to Stream Netflix, Fix YouTube on Raspberry Pi | Tom’s Hardware

curl -fsSL -o ventz-media-pi
sh ventz-media-pi

Enable GPU Acceleration for Chromium: First of all, we need to use raspi-config to increase GPU memory to at least 128GB. Then follow the article next to enable each option. Raspberry Pi 4: Hardware accelerated video decoding (GPU) in Chromium – LeMaRiva|tech . If you watch YouTube, you’d better install plug-in “h264ify” in chromium to force stream h264 rather than v9 to decrease CPU utilization.

After enable GPU Acceleration, you can type chrome://GPU to check the results.

HDMI Audio output: In /boot/config.txt, find HDMI_Drive, change value to 2.

Other Settings: It is better to enable SSH and VNC if you need to remote control RP. The setting you can find through command raspi-config or raspberry configuration in menu.

Other Apps: VS Code – works; Teams – Only Browser; Zoom – Only Browser, and if too many people in meeting, it would be very slow; Remmina (RDP remote client) – works. VLC( media player) – 4k works; Spotify – only Browser.

bottom line:

would I use it for works? – NO. I would suggest apple M1

would I use it for play movie or surfing? – Yes. At least, no problem to watch Netflix or YouTube. (1080p)