A simple mistaken occurred leveraging spark in python multiprocessing

Look at this snippet first:

def processEachCustomer(client_id):
    df_test = sql('')
    df_test.write.format('delta').save(path)

num_cores = multiprocessing.cpu_count()
with ThreadPoolExecutor(max_workers=num_cores) as executor:
  executor.map(processEachCustomer,customer_list)

It looks fine at the first glance. However, after the validation, the output was incomplete in delta table. At the end, the issue happens in the df_test, which is not a local variable in a function. So, when it ran as multicore, df_test was overwritten.

The best way to avoid this issue is using pyspark code only. If you have to combine them within the same notebook. Here is maybe a work around.

df_test = {}
def processEachCustomer(client_id):
    df_test[client_id] = sql('')
    df_test[client_id].write.format('delta').save(path)

num_cores = multiprocessing.cpu_count()
with ThreadPoolExecutor(max_workers=num_cores) as executor:
  executor.map(processEachCustomer,customer_list)

AI makes my photo better (and easier)

I shot some pictures this morning when I was taking early morning hiking. The photos were common as other days, the scene was nice but not attractive. However, after I was back home, Adobe released a new beta version of photoshop supporting AI function with prompt. Here are some results mixed my reality and AI work.

Remember, except AI generated animals, all other parts are exactly as same as my original photos. I am confident that AI will get increasingly involved in our photo produce. Copyright? I don’t know. I would claim the copyright belongs to humans.

Failure Sensor Detection by Pattern Comparison on Time Series

Purpose

Find the interesting patterns in the time series data in order to detect the failure sensors.

Process

  1. Time series data source. We leveraged Databricks and delta table to store our source data which includes sensor_id, timestamp, feature_id, feature_value. To make it run quicker, you may aggregate the time to hour or day level.
  2. In order to vectorize the features, the features in the rows have to be converted into columns.
  3. Clean outlier. Even the data source is spotless on the aggregation level. There may still have some outliers, like extremely high or low value. We set 5 STDs to identify and clean these outliers by per sensor ID.
  4. Normalization. Use max_min function to normalize selected columns. You can either use max_min normalization for the features based on all sensors or by each sensor, depends on how variety between these sensor data.
  5. Pattern comparison. We use an existing reference pattern to compare the time series from very left to very right time point. For example, list [1,1,1,1,0,0,1,1,1,1] (visualization: ) has 10 points, it slides on 30 points’ time series, you will get 20 comparison results. There are two ways to compare two time series.
    • Pearson correlation.
    • Euclidean distance.
  6. Filters. The result of comparison will give you how similarity of all-time points. You can set the threshold for this similarity value to filter non-interesting points. Also, you can set how frequency of pattern happened as threshold to further filter. Here is an example of threshold.
    • corr_ep_thredhold= 0.8 (similarity greater than 0.8)
    • freq_days_thredhold = 20 (the pattern happened more than 20 times in all time series)
    • freq_thredhold = 40% (from this pattern starts to ends, more than 40% days it happened)
  7. The result you can persist into a database or table.

Pros

  • Since the feature is normalized, the comparison is based on the sharp rather than absolute value.
  • Statistic comparison is quick, special running on the spark.
  • The result is easy to understand and apply to configured filters.

Cons

  • Right now, the Pearson correlation and Euclidean distance only apply to one feature in a time. We need to figure out a way to calculate the similarity on the higher dimensions in order to run all features in a matrix.
  • Pattern discovery. To find the existing pattern, we based on experience. However, there is an automatic way to discover pattern. Stumpy is a python library to do so. It can also run on GPU.

Future work

  • How to leverage steaming to find the pattern immediately once it happens, rather than the batch process.
  • High dimension matrixes comparison.
  • Associate Stumpy to find interesting pattern automatically, then apply the new patterns into configured filters.

Service Principle on Databricks and Trivial zip file Ingestion

For security reason, we got to use service principle instead of personal token to control databricsk cluster and run the notebooks, queries. The document of either Azure or Databricks didn’t explain the steps very well, as they evolve the products so quick that hadn’t time to keep their documents updating. After several diggings, I found the way by the following steps.

  1. Create service principle through Azure AD. In this case, we named “DBR_Tableau”
  2. Assign the new created service principle “DBR_Tableau” to Databricks.
# assing two variables
export DATABRICKS_HOST="https://<instance_name>.azuredatabricks.net"
export DATABRICKS_TOKEN="<your personal token>"

curl -X POST \
${DATABRICKS_HOST}/api/2.0/preview/scim/v2/ServicePrincipals \
--header "Content-type: application/scim+json" \
--header "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data @add-service-principal.json \
| jq .


# please create add-service-principal.json under the same folder with command above.

{
  "applicationId": "<application_id>",
  "displayName": "DBR_Tableau",
  "entitlements": [
    {
      "value": "allow-cluster-create"
    }
  ],
"groups": [
    {
      "value": "8791019324147712"  # group id can be found easily through URL 
    }
 ],
  "schemas": [ "urn:ietf:params:scim:schemas:core:2.0:ServicePrincipal" ],
  "active": true
} 

3. Create Databricks API token by setting the lifetime.

# Databricks Token API
curl -X POST -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data '{ "comment": "databricks token", "lifetime_seconds": 77760000 }' \
https://adb-5988643963865686.6.azuredatabricks.net/api/2.0/token/create

Another thing I get to talk is about trivial files ingestion. In my case, we have to unzip files containing hundreds to thousands of small files. Since Databricks didn’t support zip file natively, we get to unzip it by shell or python. By default, we’d like to unzip files into /dbfs folder, then using spark.read to ingest all of them. However, unzip large size of trivial files into /dbfs is quite slow, supposedly reason is /dbfs is cluster path locates on the storage account remotely. We figured out to unzip to local path /databricks/driver, which is driver disk. Although unzip is much quicker than before, spark.read doesn’t support local file at this moment. We have to use very tricky python code to ingest data first, then covert into saprk dataframe.

for binary_file in glob.glob('/databricks/driver/'+cust_name+'/output/*/*.dcu'):
  file = open(binary_file, "rb")
  length=os.path.getsize(binary_file)
  binary_data=file.read()
  dcu_file=binary_file.rsplit('/',1)[-1]
  folder_name=binary_file.split('/')[5]
  tupple=(cust_id,cust_name,binary_data,dcu_file,folder_name+'.zip',binary_file,length)
  binary_list.append(tupple)
  file.close()

"""from the list of the tupple of binary data creating a spark data frame with parallelize and create DataFrame"""
binary_rd=spark.sparkContext.parallelize(binary_list,24)
binary_df=spark.createDataFrame(binary_rd,binary_schema)
spark.read doesn’t support local files.

In this conversion process, you may meet an error related to “spark.rpc.message.maxSize”. This error only happens when you use RDD. Simply change the size to 1024 would solve this error. But recommend use spark.read in the most cases.

Summary@202205

It is harder and harder to write down a long journal focusing on a single topic since most of the time is spent burning my fat these days. Maybe it is time to list some problems we solved or still listed after half a year without updating.

Issue 1. Too many job clusters launched in pipelines.

An example of the basic pipeline for single clients.

One year ago, we implemented Bronze, Silver, Gold structure with delta tables. It was very convenient to process big data with correct partitions. However, there are some issues in our pipelines:

  • Too many resources are needed and cluster launch time is also wasted. The screenshot above shows 3 clusters requested. And there is a loop outside of these 3 clusters to parallel clients( usually 5 running at the same time). So the number of the total clusters is 3X5=15. Depending on the cluster size, let’s say 4 VMs for each cluster, we need at least 15*4=60 VMs. Since workers are spot VM, they need more time to request and re-request if lost. The above example is the simplest case, some pipelines are more complex, maybe need hundreds VMs.
  • The cluster on Bronze tier is utilized with low performance due to the use of zip format. I will talk about it later.

Solution:

Option 1: Create two High Concurrency clusters. One for Bronze, Silver, Another for Gold.

  • Pro: Cluster starts time significantly reduced. On-demand cluster only lanuch once within the period of the batch. Based on our test, the overall pipeline running time was reduced by 50%. And the cost was reduced by 10-20%.
  • Con: All pipelines run on the same cluster. You have to monitor the cluster size and memory usage to reallocate the size of clusters although auto-scale and round-robin can provide great help to prevent system crashes. You also need to optimize the code to reduce the pressure on the driver. We found lots of delays in this mode due to the bad code on the driver.

Option 2: Move pipeline into workflow on databricks.

  • Pro: the workflow in databricks can reuse job-cluster. So Bronze, Silver, and Gold can run on the same cluster by clients.
  • Con: We need to move pipelines from ADF to workflow in databricks. Even it is only time matters, there is another issue, unbalance of usage between 3 tires within the same workflow. Maybe we can leverage auto-scale to fix unbalance between Gold and the other two tiers, but auto-scale needs time also, and that time may be longer than the code itself.

Issue 2. Zip format is not supported well in databricks.

We used to use zip format to collect data from client servers since the period of on-premise. We kept this format on the cloud using AZcopy to upload the same files to the staging area then processed them in ADF pipelines. But databricks didn’t support zip format well, the unzip command can only run on the driver.

Solution:

Option 1: Directly upload log files that used to be compressed as zip files into Data lake through . Since lots of logs, the organization of the folder based on timestamps, like yyyy/mm/dd/hh/MM/ss. then use Autoloader to load them.

  • Pro: Simple solution. easy to implement on Azure.
  • Con: The size of the file maybe affect the cost and network. And small files affect the performance of data lake when we read them.

Option 2: Compress log files into a single parquet file rather than zip format. Upload parquet files to data lake and use autoloader to ingest them.

  • Pro: Keep the same logic before on the client-server; quicker ingest into delta table; no small file issue.
  • Con: need to query delta table or leverage other tools to recover original log files.
import pyarrow as pa
import pyarrow.parquet as pq
import glob
import os

folder_path = 'E:/Test/Upload/*.log' # the folder incudles all .dcu files
parquet_path = 'E:/Test/test.parquet' # single output file
os.remove(parquet_path)


schema  = pa.schema([
    ('file_name',pa.string()),
    ('content', pa.binary())
])
pqwriter = pq.ParquetWriter(parquet_path, schema, compression='SNAPPY',write_statistics=False)  
# list all files
file_list = glob.glob(folder_path)
for file in file_list:
    with open(file, mode='rb') as file: # b is important -> binary
        fileContent = file.read()
        pa_content = pa.array([fileContent])
        pa_filename = pa.array([file.name])
        # pylist = {'content':fileContent}
        table = pa.Table.from_arrays([pa_filename,pa_content],names = ['file_name','content'])
        pqwriter.write_table(table,)
    # close the parquet writer
if pqwriter:
    pqwriter.close()

Option 3: Use the Streaming solution. We can create an Event hub capture to receive logs from client-sever and connect with autoloader.

https://www.rakirahman.me/event-hub-capture-with-autoloader/

  • Pro: Event-Driven solution. client log can be immediately processed once it is generated.
  • Con: Need to change code on client-server, also not every client supports streaming due to security requirements.

Issue 3. Need a more flexible solution to trigger pipelines.

We used to create scheduled trigger / tumbling trigger on ADF. But the trigger itself doesn’t support reading metadata and dynamically adjusts trigger time or some parameters. We need to create a metadata table to easily maintain the triggers for each client + reports/pipeline.

Option 1: Use Azure function to call rest API. The azure function can get the metadata from any source.

https://docs.microsoft.com/en-us/rest/api/datafactory/pipelines/create-run

  • Pro: Easy to implement and very flexible with coding.
  • Con: If we put too much logic to trigger different pipelines, it is not as clear as the ones in ADF. And we lost some monitor features which are helpful in ADF unless you want to recreate these features.

Option 2: Leverage airflow to control pipelines.

  • Pro: TBD
  • Con: TBD

Issue 4. Legacy Python code to databricks.

There is some python code to process the data row by row. and covert these rows to multiple rows. By the content of each row, the logic may vary. We can easy to think to use UDF directly to leverage this old python code. But this way is very low efficient especially when the UDF contains lots of subfunctions.

Option 1: Use Azure Batch on cluster. We can split all rows into multiple tasks, then push these tasks into workers to process them. the worker is still running legacy python code, then output the results and merge them together.

Azure Batch: split tasks into workers
  • Pro: No need to change the python code.
  • Con: How to efficiently split tasks running on workers. And Python may be too slow.

Option 2: Use Databricks native pyspark. We need to split the incoming dataset into several small subsets. Then use different logic to process each of them.

Databricks: split dataset by message type into small subsets
  • Pro: Native pyspark/Scala is much quicker than UDF.
  • Con: Need a smart solution to handle different logic to avoid if-else logic. (TBD)

How to break down databricks DBU cost to the pipeline level

One day we found databricks DBU cost surged, but we didn’t know which ADF job caused this issue. Then I asked Azure support if any way we can track to the pipeline level. Unfortunately, they told me “ Regarding your question,  please note that you can only track cost against the resource group or workspace(cluster). Unfortunately, there is no way to track Databricks cost at a job or user level.” Ok, that’s fine.

However, when I opened the cost details which exported from cost management in Azure, I found a tag column in DBU resource usage.

"ClusterId": "1210-XXXX-XXXX","DatabricksInstancePoolCreatorId": "XXXXXX","DatabricksInstancePoolId": "1025-204618-XXXXX-pool-XXXXXX","ClusterName": "job-8810145-run-1","JobId": "8810145","RunName": "ADF_df-eus-prod-01_Standard_Module_run_pipeline_935e1d13-02c5-4ae3-9441-3a414d1ad0eb","Creator": "XXXX@XXXX.com","Vendor": "Databricks","DatabricksInstanceGroupId": "-8346123XXXXX6640367"

Are these our databricks information? Maybe we can do something !? After consulting from my coworker Sai, he told me the JobID is similar to notebook run ID. Then everything could be solved since we already records everything when running notebook including ADF pipeline Id, cust information, notebook name, notebook Runid(jobId) and start-end time.

By simple SQL query to join cost csv table and log table, we can category DBU cost to each ADF pipeline.

select ProductName,t2.notebook_name,sum(t1.PreTaxCost)  from cost_df t1
left join (select distinct notebook_name,notebook_runID,cust_name from delta.`/mnt/eus-logging/job-log-details` ) t2 on t2.notebook_runID=get_jobid(Tags)
where ProductName='Azure Databricks - Premium - Jobs Compute - DBU' 
group by ProductName,t2.notebook_name

To go through this process, you have to log the ADF pipeline and notebook runtime information when you execute the notebook. I was doing this by add following function at the start/end of notebook.

# at beginning
notebook_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
notebook_RunID = getRunID()  # this is function in /Public/JOB_LOG
write_log_detail(ADFRunID,cust_id,cust_name,notebook_name,notebook_RunID,startTime=str(datetime.now())) # notebook start time

# at the end
write_log_detail(ADFRunID,cust_id,cust_name,notebook_name,notebook_RunID,endTime=str(datetime.now())) # notebook end time

Here function write_log_detail is the one writing into delta table.

Unfortunately, I have not found the way to break down the VM cost into pipeline level, I think there should be a way. Just need be more time. will update later.

A real case of optimazing spark notebook

At the beginning of my optimazation, I tried to find some standard principles that can quickly and smoothly help me. Altough lots of information online that indicate where I can improve my spark, none gives an easy solution to fix the key issues from end to end. So, I summried what I understand through online materical and apply some strateges to my spark notebook which I found may have some performance issues.

  • Start with the some slow jobs, you can order by “Druations” column. Some guides may go to the details of slowest stages. But here I suggest go to the related SQL Query page to review the whole process on the query level. It will help us to quickly figure out the common query performance issues, like duplicate scans without filters or inappropriate join, etc.
In Jobs tab of Spark UI, we can order by duration of each job to find the some low performance jobs.
In the page of job detail, we click “Associated SQL Query” firstly.
  • One table read multible times. If we found one table is scaned multiple time at the beginning, we may consider create caches to save the temporary result into the memory/local disk, rather than go through the remote disk.
In my query DAG, there are two table scan parallelly, but they are from the same table and read the same data out. record read are both 884,688,530.
  • Too many similar branches. If we see many similar branches, it may be caused by lack of cache too. The medium results have to be recomputed when reaching the point where we need it.
Rad/Orange lines show us three copies of data, but essinssically they are doing the same process. We have to think about how to cache the medium compute results.
  • Duplicate slow stages seem in the stage tab. It may caused by uncautioned code mistake. The case blew shows same dataframe execute computing twice. But this piece code is very normal in python.
two jobs spent same time and with same piece of code, we may consider it as missing cache.
Back to the notebook, we found there is a dataframe executed twice.
To cache table is like blew:
sqlContext.cacheTable('mtu_dcu_trans_reading')
  • Figure out strange operations. This process will take longer time compared with previous two. We need to match the DAG to SQL in the notebook and understand the wide/narrow shuffle. Then pick some wild shuffle we didn’t expect.
When I went through the notebook with DAG, I found this “HashAggregate” is not my expect. Then I realized I write “Union” rather than “Union All”, this misktake took addtional time to reduce duplicates (which should not exist at all).
  • Reduce data size by adding project to select the columns we need. I think this is the easiest way to improve the performance, especially for the big tables.
Simply add the column names into select query, it reduced the data reading size from 27.9GB to 23.5GB.
  • Reduce data size by using filter as early as possible. We can check if filter push down into scan, or manuly move the filter when read the table.
  • Corroctly partition destination table. When are use Merge or Replacewhere, it is better to partition correctly.
After we created reading_date as partiton column, the time cost reduced from 2.49 mins to 6.67 seconds.Similar optimazation will happen in merge and replacewhere.
  • Reduce spill. When executor is out of memory, it will spill into disk which is much slower than memory. To solve this issue. we can do one or more of following steps:
    • Increase executor memory by change a bigger cluster or set spark.executor.memory(if workable)
    • Increase the fraction of execution and storage memory(spark.memory.fraction, default 0.6).
    • Adjust spark.memory.storageFraction(default 0.5). Depend on when the spill occurs, either in cache or shuffle, we can adjust storage fraction.
    • Adjust spark.sql.shuffle.partitions (default 200). Try to increase shuffle partition size to avoid spill.

Reference:

Spark Configuration, https://spark.apache.org/docs/2.3.0/configuration.html

Performance Tuning, https://spark.apache.org/docs/latest/sql-performance-tuning.html#performance-tuning

From Query Plan to Performance: Supercharging your Apache Spark Queries using the Spark UI SQL Tab, https://www.youtube.com/watch?v=_Ne27JcLnEc&t=553s

How to Read Spark DAGs | Rock the JVM, https://www.youtube.com/watch?v=LoFN_Q224fQ

Optimize performance with caching, https://docs.microsoft.com/en-us/azure/databricks/delta/optimizations/delta-cache

Some working performance/cost improvement tips applying to ADF and Databricks recently

While switching to the cloud, we found some pipelines running slowly and cost increased rapidly. To solve the problems, we did flowing steps to optimize the pipelines or data structures. They are all not hard to be implemented.

1. Set the different triggers for different recurring periods.

No matter for what reason, it is very common that one data pipeline is triggered recursively. However, the sources of this data pipeline may be refreshed with different frequency. In this case, we may set two triggers for this pipeline. So that this pipeline doesn’t have to run on the highest frequency.

In the ForEach loop, there are two kinds of clients, one is updated every 6 hours, another is updated daily. So two triggers were created, but running in the same pipeline.

2. Set the different pools for notebooks.

Databricks provides pool to efficiently reuse the VMs in the new created clusters. However, one pool can only has one type of instance. If we set pool instance too small, the out of memory or slow process will occur, while if the pool instances are too big, we are wasting our money.

To solve this problem, we set two kinds of pools. The heavy one with instance of 4 cores, 28GB. the light one with instance of 4 core, 8GB memory. For some light weight job, like ingest the source files into bronze table, the light one is a better choice. while the heavy weight one could be use to some aggregation work.

Two pools are created for the different weight processes in the notebooks.

3. Create second hand index or archive folder for the source files.

At the beginning of switching to the cloud, we put everything into a single folder. It is fine for azure. But once we need to scan the spec files or get the location of the file, the huge number of files in this folder will significantly effect the performance, the situation will become worse as file number increase with time. To solve this problem. we have three solutions:

  • create second hand index when file moves into the folder. The second hand index like a table which records the important information of the file, like name, modify date, location in datalake. So next time, when someone need to scan the folder, he only need to scan this second hand index rather than the whole blob dictionary.
An example of second hand index
  • Archive the files. The maybe the easiest one, coz you only need to put the files that you would never use into archive folder. Then you are not time sensitive to get the files back from archive folder.
Part of code to leverage works to move files into Archive folder.
  • Organize files by its receive date. This method is similar with the archive one. but we created many archive folders by date.
Each file are re-organized and put into the date folder when the file received

4. Optimize delta table weekly.

Don’t hesitate to do optimize/vacuum your delta table regularly. The small file will kill the advantage of delta table especially in the “Merge” operation. Following is the example of creating a delta table list that need to be optimized.

delta_table_list = ['customer','job','job-category','job_subscription','missing_file_check','parameter']
                  
# optimize bronze table
for delta_table in delta_table_list:
  sql_string = '''optimize delta.`/mnt/eus-metadata/{0}`'''.format(delta_table)
  sqlContext.sql(sql_string)
for delta_table in delta_table_list:
  sql_string = '''VACUUM delta.`/mnt/eus-metadata/{0}`'''.format(delta_table)
  sqlContext.sql(sql_string)

First Glance on GPU Accelerated Spark

Since I started to play with cluster, I thought there was no mission which was not able to be completed by cluster. If there is, add another node. However, except Cuda on the sing-alone machine, I have been rarely touched GPU accelerated cluster as data engineer. Yes, maybe spark ML can utilize GPU since spark 3.0, but most jobs of DE are data pipeline and data modeling related. Until recently I googled “GPU ETL”, then it came out SPARK-RAPIDS, which leverage RAPIDS libraries to accelerate processes by GPU.

Easy to configure on Spark to gain advantage of GPU

I almost didn’t change anything from my original spark code( mixed pyspark, spark SQL, python and delta table access) to let it running on GPU cluster. That is a great thing! Everybody wants to get double with half work.The result of mine is speed up to 3.1mins(12 cores, 3 NVIDIA T4 GPUs, 84GB memory) from 5.4 mins(24 cores, 84GB memory). I think I can get better results if my inputs is bigger chunk of data like 500MB+ parquet.

Here is my configuration on databricks. You can also refer to this official document. ( however, I didn’t make cluster running by this document)

# cluster runtime version: 7.3 LTS ML (includes Apache Spark 3.0.1, GPU, Scala 2.12)
# node type: NCasT4_v3-series(Azure)

# spark config:
spark.task.resource.gpu.amount 0.1
spark.databricks.delta.preview.enabled true
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-0.5.0.jar:/databricks/spark/python
spark.plugins com.nvidia.spark.SQLPlugin
spark.locality.wait 0s
spark.rapids.sql.python.gpu.enabled true
spark.rapids.memory.pinnedPool.size 2G
spark.python.daemon.module rapids.daemon_databricks
spark.sql.adaptive.enabled false
spark.databricks.delta.optimizeWrite.enabled false
spark.rapids.sql.concurrentGpuTasks 2

# initial script path
dbfs:/databricks/init_scripts/init.sh

# create initial script on your notebook
dbutils.fs.mkdirs("dbfs:/databricks/init_scripts/")
dbutils.fs.put("/databricks/init_scripts/init.sh","""
#!/bin/bash
sudo wget -O /databricks/jars/rapids-4-spark_2.12-0.5.0.jar https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/0.5.0/rapids-4-spark_2.12-0.5.0.jar
sudo wget -O /databricks/jars/cudf-0.19.2-cuda10-1.jar https://repo1.maven.org/maven2/ai/rapids/cudf/0.19.2/cudf-0.19.2-cuda10-1.jar""", True)
# some key configuration meaning
--conf spark.executor.resource.gpu.amout=1 # one executor per GPU, enforced
--conf spark.task.resource.gpu.amount = 0.5  # two tasks running on the same GPU
--conf spark.sql.files.maxPartitionBytes=512m  # big batch is better for GPU efficient, but not too big which lead to out of memory issue
--conf spark.sql.shffle.partitons=30 # reduce partiton size from default 200 to 30. large size of partition is better for GPU efficient
--conf spark.rapids.sql.explain=NOT_ON_GPU # watch the log message output why a spark operation is not able to run on the GPU

The only problem I met till now was, when the notebook includes the view composed of delta tables, it would popped out error message “covert struct type to GPU is not supported”. Easily persist view to table can solve this problem. Overall, very cool plugin.

The magic of spark-rapids

There are two major features to let spark running on GPU through Rapids.

  • Replace of CPU version ops with GPU version ops on the physical plan level.
  • Optimized spark shuffle using RDMA and GPU-to-GPU directly communication.
How RAPIDS accelerator works in spark: left side- replace CPU ops with GPU version based on data type & op type; right side-Optimized spark shuffle between GPUs.

Since the replacing happens on the physical plan level, we don’t have to change the query. Even the operations are not supported by RAPIDS, it still can run on CPU.


A CPU spark execute plan is like: logical plan —-> optimized by catalyst  ——> optimize the logical plan in a series of phases  —> physical plan  —-> executed on cluster. While A GPU spark execute plan replaces operations on physical plan with GPU version and execute on the columnar batch.

If we look at the physical plan on GPU and CPU, we will find they are almost one to one match except some GPU-CPU data exchange ops, like GPUColumnarToRow, GPURowToColumnar.

parts of GPU physical plan

parts of CPU physical plan

Compare of two physical plans, we can find lots of operations have GPU version already. “Scan parquet” replaced by “GpuScan parquet”, since GPU needs columnar format, so it skipped “ColumnarToRow” which in CPU version. Then “Project” replaced by “GpuProject”. followed by “GpuColumnarToRow”, because next step “InMemoryTelation” was running on CPU. so on so forth.

Let’s talk about another great feature: Optimized spark shuffle.

Spark needs shuffle on wild transforms while narrow transforms grouped into single stage. Spark-Rapids implements a custom spark shuffle manager for its GPU clusters shuffle operation.It is able to:

  • Spillable cache: it makes the data close to where it is produced, meanwhile it provides out of memory issue by moving data by the rule: GPU memory –> Host memory —> Disk.
Once GPU is out of memory, shuffle manager will put data into host memory, if host memory is not enough, then continuously push data into local disk.
  • Transport(shuffle): Handles block transfers between executors leveraging UCX libraries. ( see picture blew)
    • GPU0-GPU0: cache in GPU0 , zero copy
    • GPU0-GPU1: NVLink
    • GPU0-GPUX(remote): RDMA
      • Infiniband
      • RoCE
    • Disk- GPU: GPU direct storage(GDS)
RDMA and NVLINK is not surprised to be used in the cluster, since UCX is wildly used in OpenMPI for HPC area. These features have been included in RAPIDS libraries.

Some helpful information from official Q&A:

What operators are best suited for the GPU?

  • Group by operations with high cardinality
    • Joins with a high cardinality
    • Sorts with a high cardinality
    • Aggregates with a high cardinality
  • Window operations, especially for large windows
  • Aggregate with lots of distinct operations
  • Complicated processing
  • Writing Parquet/ORC
  • Reading CSV
  • Transcoding (reading an input file and doing minimal processing before writing it out again, possibly in a different format, like CSV to Parquet)

What  operators are not good for the GPU?

  • small amouts of data( hundred MB)
  • Cache coherent processing
  • Data movement
    • Slow I/O
    • Back forth to CPU(UDFs)
    • Shuffle
  • Limited GPU Memory

What is Rapids

  • a suit of open-source software libaraires and APIs for data science and data pipelines on GPUs. 
  • offering GPU dataframe  that is compatible with ApacheArrow
    • language independent columnar memory format
    • zeo-copy streaming messaging and interprocess communication without serialization overhead
  • Integrated with popluar framwworkd: PyTorch, Chainer, ApcheMxNet; spark, Dask

what is cuDF

  • GPU accelerated data preparation and feature engineering
  • python dropin pands replacement
  • features
    • CUDA 
      • low level libarary containing fuction implementations and c/C++ API
      • importing/exporting Apache Arrow using the CUDA IPC mechanism
      • CUDA kernels to perform element-wise math operations on GPU data frame columns
      • CUDA sort, join, groupby and reductions oprations on GPU dataframes
    • Python bindings
      • a python libaray forr GPU dataframe
      • python interface to CUDA C++ with addtional functionality
      • Creating Apache arrow form numpy arrays, Pands, DF, PyArrow Tables
      • JIT comppilation of UDFs using Numba

Reference:

Spark-Rapids, https://nvidia.github.io/spark-rapids/

ACCELERATING APACHE SPARK 3.X, https://www.nvidia.com/en-us/deep-learning-ai/solutions/data-science/apache-spark-3/ebook-sign-up/

Getting started with RAPIDS Accelerator on Databricks, https://nvidia.github.io/spark-rapids/docs/get-started/getting-started-databricks.html#getting-started-with-rapids-accelerator-on-databricks

Deep Dive into GPU Support in Apache Spark 3.x, https://databricks.com/session_na20/deep-dive-into-gpu-support-in-apache-spark-3-x

Accelerating Apache Spark 3.0 with GPUs and RAPIDS, https://developer.nvidia.com/blog/accelerating-apache-spark-3-0-with-gpus-and-rapids/

RAPIDS Shuffle Manager, https://nvidia.github.io/spark-rapids/docs/additional-functionality/rapids-shuffle.html

UCX-PYTHON: A FLEXIBLE COMMUNICATION LIBRARY FOR PYTHON APPLICATIONS, https://developer.download.nvidia.com/video/gputechconf/gtc/2019/presentation/s9679-ucx-python-a-flexible-communication-library-for-python-applications.pdf

Unified Communication X, https://www.openucx.org/

Accelerating Apache Spark by Several Orders of Magnitude with GPUs, https://www.youtube.com/watch?v=Qw-TB6EHmR8&t=1017s

SCD II or Snapshot for Dimension

SCD II is widely used to process dimensional data with all historical information. Each change in dimensions will be recorded as a new row configurated valid time period which usually on the date granularity. Since SCD II only keeps the changes, it significantly reduce the storage space in the database.

Understanding Slowly Changing Dimensions
An example of SCD II processing

Everything looks fine, until big data coming. A flaw has been amplified. That is delay arriving dimensions. In SCD II,, you have to do a complex steps to process both dimension and fact data for delay arriving dimensions.

For Dimension(SCDII table) , we have to change retro rows in SCDII.

In the example above, for product ID=010, if we have any change before May-12-06, see April-02-06. And this change is delayed after SID=003. We have to :

  1. Scan the dimensional table, find SID = 0002 and 0003
  2. change End_DT of SID(002) to April-02-06
  3. insert a new record, SID=004, Start_DT=April-02-06, End_DT=May-12-06

For Fact table, we have to change multiple historical data

Update all FK of SID in fact table to 004 where the time period between April-02-06 and May-12-06

It maybe not a problem to execute “UPDATE” process in Row stored table, but in the big data area, most storage format leverages “column stored”, which has great advantage for searching operation, but low efficient on updating. The good news is we don’t have to “UPDATE” data nor to detect the changes on dimension. The only thing we need to do is snapshot the daily dimension data. Since for each day, we have the whole backups, it also much easier for fact table joining the dimension: date-to-date mapping. That’s is all!

The example above will become like :

DateSource Product IDProduct NameProduct descr
May-12-0601010 inch Box10 inch Glued box
May-12-0701010 inch Box10 inch pasted box
May-12-0801011 inch Box11 inch pasted box
Snapshot daily product table

Would it has much redundancy in disk? No. Since we use column stored table, duplicated data in the same column is only saved once physically. Also the big data storage price is much cheaper than the traditional database.

Would it slow down the join due to much more data created in dimension table? No. There is another easy solution call “PARTITON“. Basically, we can partition the data by date, so that each partitioned folder presents a set of data group for that day.

An example for partitioned delta table.

“PARTITION” is transparent for the query. for example, when we execute query :

Select * from product 
where product_name ='10 inch Box' and Date ='May-12-06'

It will firstly go to the folder “Date =’May-12-06′“, rather than scan the whole table, then find the column “product_name =’10 inch Box'”. The partition operation is not only one level, you can create multiple levels.

Would it be faster to join the fact table? Yes. Once fact table is also partitioned by same column, e.g, date. Then we can easily mapping two date columns in dimension and fact. The scan processing is super quick. * some new tech, like delta table, you have to manually indicate the partition.

Conclusion

Transitional SCD is existing for a long time, the environment and prerequisites have been changed a lot to apply this method. I think snapshot is a better, faster and simpler solution in the big data.

Reference:

Functional Data Engineering – A Set of Best Practices | Lyft, Watch Functional Data Engineering – A Set of Best Practices | Lyft – Talk Video by Maxime Beauchemin | ConferenceCast.tv

Building A Modern Batch Data Warehouse Without UPDATEs | by Daniel Mateus Pires | Towards Data Science, Building A Modern Batch Data Warehouse Without UPDATEs | by Daniel Mateus Pires | Towards Data Science

Slowly Changing Dimensions (SCDs) In The Age of The Cloud Data Warehouse (holistics.io), Slowly Changing Dimensions (SCDs) In The Age of The Cloud Data Warehouse (holistics.io)