A real case of optimazing spark notebook

At the beginning of my optimazation, I tried to find some standard principles that can quickly and smoothly help me. Altough lots of information online that indicate where I can improve my spark, none gives an easy solution to fix the key issues from end to end. So, I summried what I understand through online materical and apply some strateges to my spark notebook which I found may have some performance issues.

  • Start with the some slow jobs, you can order by “Druations” column. Some guides may go to the details of slowest stages. But here I suggest go to the related SQL Query page to review the whole process on the query level. It will help us to quickly figure out the common query performance issues, like duplicate scans without filters or inappropriate join, etc.
In Jobs tab of Spark UI, we can order by duration of each job to find the some low performance jobs.
In the page of job detail, we click “Associated SQL Query” firstly.
  • One table read multible times. If we found one table is scaned multiple time at the beginning, we may consider create caches to save the temporary result into the memory/local disk, rather than go through the remote disk.
In my query DAG, there are two table scan parallelly, but they are from the same table and read the same data out. record read are both 884,688,530.
  • Too many similar branches. If we see many similar branches, it may be caused by lack of cache too. The medium results have to be recomputed when reaching the point where we need it.
Rad/Orange lines show us three copies of data, but essinssically they are doing the same process. We have to think about how to cache the medium compute results.
  • Duplicate slow stages seem in the stage tab. It may caused by uncautioned code mistake. The case blew shows same dataframe execute computing twice. But this piece code is very normal in python.
two jobs spent same time and with same piece of code, we may consider it as missing cache.
Back to the notebook, we found there is a dataframe executed twice.
To cache table is like blew:
sqlContext.cacheTable('mtu_dcu_trans_reading')
  • Figure out strange operations. This process will take longer time compared with previous two. We need to match the DAG to SQL in the notebook and understand the wide/narrow shuffle. Then pick some wild shuffle we didn’t expect.
When I went through the notebook with DAG, I found this “HashAggregate” is not my expect. Then I realized I write “Union” rather than “Union All”, this misktake took addtional time to reduce duplicates (which should not exist at all).
  • Reduce data size by adding project to select the columns we need. I think this is the easiest way to improve the performance, especially for the big tables.
Simply add the column names into select query, it reduced the data reading size from 27.9GB to 23.5GB.
  • Reduce data size by using filter as early as possible. We can check if filter push down into scan, or manuly move the filter when read the table.
  • Corroctly partition destination table. When are use Merge or Replacewhere, it is better to partition correctly.
After we created reading_date as partiton column, the time cost reduced from 2.49 mins to 6.67 seconds.Similar optimazation will happen in merge and replacewhere.
  • Reduce spill. When executor is out of memory, it will spill into disk which is much slower than memory. To solve this issue. we can do one or more of following steps:
    • Increase executor memory by change a bigger cluster or set spark.executor.memory(if workable)
    • Increase the fraction of execution and storage memory(spark.memory.fraction, default 0.6).
    • Adjust spark.memory.storageFraction(default 0.5). Depend on when the spill occurs, either in cache or shuffle, we can adjust storage fraction.
    • Adjust spark.sql.shuffle.partitions (default 200). Try to increase shuffle partition size to avoid spill.

Reference:

Spark Configuration, https://spark.apache.org/docs/2.3.0/configuration.html

Performance Tuning, https://spark.apache.org/docs/latest/sql-performance-tuning.html#performance-tuning

From Query Plan to Performance: Supercharging your Apache Spark Queries using the Spark UI SQL Tab, https://www.youtube.com/watch?v=_Ne27JcLnEc&t=553s

How to Read Spark DAGs | Rock the JVM, https://www.youtube.com/watch?v=LoFN_Q224fQ

Optimize performance with caching, https://docs.microsoft.com/en-us/azure/databricks/delta/optimizations/delta-cache

Some working performance/cost improvement tips applying to ADF and Databricks recently

While switching to the cloud, we found some pipelines running slowly and cost increased rapidly. To solve the problems, we did flowing steps to optimize the pipelines or data structures. They are all not hard to be implemented.

1. Set the different triggers for different recurring periods.

No matter for what reason, it is very common that one data pipeline is triggered recursively. However, the sources of this data pipeline may be refreshed with different frequency. In this case, we may set two triggers for this pipeline. So that this pipeline doesn’t have to run on the highest frequency.

In the ForEach loop, there are two kinds of clients, one is updated every 6 hours, another is updated daily. So two triggers were created, but running in the same pipeline.

2. Set the different pools for notebooks.

Databricks provides pool to efficiently reuse the VMs in the new created clusters. However, one pool can only has one type of instance. If we set pool instance too small, the out of memory or slow process will occur, while if the pool instances are too big, we are wasting our money.

To solve this problem, we set two kinds of pools. The heavy one with instance of 4 cores, 28GB. the light one with instance of 4 core, 8GB memory. For some light weight job, like ingest the source files into bronze table, the light one is a better choice. while the heavy weight one could be use to some aggregation work.

Two pools are created for the different weight processes in the notebooks.

3. Create second hand index or archive folder for the source files.

At the beginning of switching to the cloud, we put everything into a single folder. It is fine for azure. But once we need to scan the spec files or get the location of the file, the huge number of files in this folder will significantly effect the performance, the situation will become worse as file number increase with time. To solve this problem. we have three solutions:

  • create second hand index when file moves into the folder. The second hand index like a table which records the important information of the file, like name, modify date, location in datalake. So next time, when someone need to scan the folder, he only need to scan this second hand index rather than the whole blob dictionary.
An example of second hand index
  • Archive the files. The maybe the easiest one, coz you only need to put the files that you would never use into archive folder. Then you are not time sensitive to get the files back from archive folder.
Part of code to leverage works to move files into Archive folder.
  • Organize files by its receive date. This method is similar with the archive one. but we created many archive folders by date.
Each file are re-organized and put into the date folder when the file received

4. Optimize delta table weekly.

Don’t hesitate to do optimize/vacuum your delta table regularly. The small file will kill the advantage of delta table especially in the “Merge” operation. Following is the example of creating a delta table list that need to be optimized.

delta_table_list = ['customer','job','job-category','job_subscription','missing_file_check','parameter']
                  
# optimize bronze table
for delta_table in delta_table_list:
  sql_string = '''optimize delta.`/mnt/eus-metadata/{0}`'''.format(delta_table)
  sqlContext.sql(sql_string)
for delta_table in delta_table_list:
  sql_string = '''VACUUM delta.`/mnt/eus-metadata/{0}`'''.format(delta_table)
  sqlContext.sql(sql_string)