Failure Sensor Detection by Pattern Comparison on Time Series

Purpose

Find the interesting patterns in the time series data in order to detect the failure sensors.

Process

  1. Time series data source. We leveraged Databricks and delta table to store our source data which includes sensor_id, timestamp, feature_id, feature_value. To make it run quicker, you may aggregate the time to hour or day level.
  2. In order to vectorize the features, the features in the rows have to be converted into columns.
  3. Clean outlier. Even the data source is spotless on the aggregation level. There may still have some outliers, like extremely high or low value. We set 5 STDs to identify and clean these outliers by per sensor ID.
  4. Normalization. Use max_min function to normalize selected columns. You can either use max_min normalization for the features based on all sensors or by each sensor, depends on how variety between these sensor data.
  5. Pattern comparison. We use an existing reference pattern to compare the time series from very left to very right time point. For example, list [1,1,1,1,0,0,1,1,1,1] (visualization: ) has 10 points, it slides on 30 points’ time series, you will get 20 comparison results. There are two ways to compare two time series.
    • Pearson correlation.
    • Euclidean distance.
  6. Filters. The result of comparison will give you how similarity of all-time points. You can set the threshold for this similarity value to filter non-interesting points. Also, you can set how frequency of pattern happened as threshold to further filter. Here is an example of threshold.
    • corr_ep_thredhold= 0.8 (similarity greater than 0.8)
    • freq_days_thredhold = 20 (the pattern happened more than 20 times in all time series)
    • freq_thredhold = 40% (from this pattern starts to ends, more than 40% days it happened)
  7. The result you can persist into a database or table.

Pros

  • Since the feature is normalized, the comparison is based on the sharp rather than absolute value.
  • Statistic comparison is quick, special running on the spark.
  • The result is easy to understand and apply to configured filters.

Cons

  • Right now, the Pearson correlation and Euclidean distance only apply to one feature in a time. We need to figure out a way to calculate the similarity on the higher dimensions in order to run all features in a matrix.
  • Pattern discovery. To find the existing pattern, we based on experience. However, there is an automatic way to discover pattern. Stumpy is a python library to do so. It can also run on GPU.

Future work

  • How to leverage steaming to find the pattern immediately once it happens, rather than the batch process.
  • High dimension matrixes comparison.
  • Associate Stumpy to find interesting pattern automatically, then apply the new patterns into configured filters.

Service Principle on Databricks and Trivial zip file Ingestion

For security reason, we got to use service principle instead of personal token to control databricsk cluster and run the notebooks, queries. The document of either Azure or Databricks didn’t explain the steps very well, as they evolve the products so quick that hadn’t time to keep their documents updating. After several diggings, I found the way by the following steps.

  1. Create service principle through Azure AD. In this case, we named “DBR_Tableau”
  2. Assign the new created service principle “DBR_Tableau” to Databricks.
# assing two variables
export DATABRICKS_HOST="https://<instance_name>.azuredatabricks.net"
export DATABRICKS_TOKEN="<your personal token>"

curl -X POST \
${DATABRICKS_HOST}/api/2.0/preview/scim/v2/ServicePrincipals \
--header "Content-type: application/scim+json" \
--header "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data @add-service-principal.json \
| jq .


# please create add-service-principal.json under the same folder with command above.

{
  "applicationId": "<application_id>",
  "displayName": "DBR_Tableau",
  "entitlements": [
    {
      "value": "allow-cluster-create"
    }
  ],
"groups": [
    {
      "value": "8791019324147712"  # group id can be found easily through URL 
    }
 ],
  "schemas": [ "urn:ietf:params:scim:schemas:core:2.0:ServicePrincipal" ],
  "active": true
} 

3. Create Databricks API token by setting the lifetime.

# Databricks Token API
curl -X POST -H "Authorization: Bearer ${DATABRICKS_TOKEN}" \
--data '{ "comment": "databricks token", "lifetime_seconds": 77760000 }' \
https://adb-5988643963865686.6.azuredatabricks.net/api/2.0/token/create

Another thing I get to talk is about trivial files ingestion. In my case, we have to unzip files containing hundreds to thousands of small files. Since Databricks didn’t support zip file natively, we get to unzip it by shell or python. By default, we’d like to unzip files into /dbfs folder, then using spark.read to ingest all of them. However, unzip large size of trivial files into /dbfs is quite slow, supposedly reason is /dbfs is cluster path locates on the storage account remotely. We figured out to unzip to local path /databricks/driver, which is driver disk. Although unzip is much quicker than before, spark.read doesn’t support local file at this moment. We have to use very tricky python code to ingest data first, then covert into saprk dataframe.

for binary_file in glob.glob('/databricks/driver/'+cust_name+'/output/*/*.dcu'):
  file = open(binary_file, "rb")
  length=os.path.getsize(binary_file)
  binary_data=file.read()
  dcu_file=binary_file.rsplit('/',1)[-1]
  folder_name=binary_file.split('/')[5]
  tupple=(cust_id,cust_name,binary_data,dcu_file,folder_name+'.zip',binary_file,length)
  binary_list.append(tupple)
  file.close()

"""from the list of the tupple of binary data creating a spark data frame with parallelize and create DataFrame"""
binary_rd=spark.sparkContext.parallelize(binary_list,24)
binary_df=spark.createDataFrame(binary_rd,binary_schema)
spark.read doesn’t support local files.

In this conversion process, you may meet an error related to “spark.rpc.message.maxSize”. This error only happens when you use RDD. Simply change the size to 1024 would solve this error. But recommend use spark.read in the most cases.

How to break down databricks DBU cost to the pipeline level

One day we found databricks DBU cost surged, but we didn’t know which ADF job caused this issue. Then I asked Azure support if any way we can track to the pipeline level. Unfortunately, they told me “ Regarding your question,  please note that you can only track cost against the resource group or workspace(cluster). Unfortunately, there is no way to track Databricks cost at a job or user level.” Ok, that’s fine.

However, when I opened the cost details which exported from cost management in Azure, I found a tag column in DBU resource usage.

"ClusterId": "1210-XXXX-XXXX","DatabricksInstancePoolCreatorId": "XXXXXX","DatabricksInstancePoolId": "1025-204618-XXXXX-pool-XXXXXX","ClusterName": "job-8810145-run-1","JobId": "8810145","RunName": "ADF_df-eus-prod-01_Standard_Module_run_pipeline_935e1d13-02c5-4ae3-9441-3a414d1ad0eb","Creator": "XXXX@XXXX.com","Vendor": "Databricks","DatabricksInstanceGroupId": "-8346123XXXXX6640367"

Are these our databricks information? Maybe we can do something !? After consulting from my coworker Sai, he told me the JobID is similar to notebook run ID. Then everything could be solved since we already records everything when running notebook including ADF pipeline Id, cust information, notebook name, notebook Runid(jobId) and start-end time.

By simple SQL query to join cost csv table and log table, we can category DBU cost to each ADF pipeline.

select ProductName,t2.notebook_name,sum(t1.PreTaxCost)  from cost_df t1
left join (select distinct notebook_name,notebook_runID,cust_name from delta.`/mnt/eus-logging/job-log-details` ) t2 on t2.notebook_runID=get_jobid(Tags)
where ProductName='Azure Databricks - Premium - Jobs Compute - DBU' 
group by ProductName,t2.notebook_name

To go through this process, you have to log the ADF pipeline and notebook runtime information when you execute the notebook. I was doing this by add following function at the start/end of notebook.

# at beginning
notebook_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
notebook_RunID = getRunID()  # this is function in /Public/JOB_LOG
write_log_detail(ADFRunID,cust_id,cust_name,notebook_name,notebook_RunID,startTime=str(datetime.now())) # notebook start time

# at the end
write_log_detail(ADFRunID,cust_id,cust_name,notebook_name,notebook_RunID,endTime=str(datetime.now())) # notebook end time

Here function write_log_detail is the one writing into delta table.

Unfortunately, I have not found the way to break down the VM cost into pipeline level, I think there should be a way. Just need be more time. will update later.

Some working performance/cost improvement tips applying to ADF and Databricks recently

While switching to the cloud, we found some pipelines running slowly and cost increased rapidly. To solve the problems, we did flowing steps to optimize the pipelines or data structures. They are all not hard to be implemented.

1. Set the different triggers for different recurring periods.

No matter for what reason, it is very common that one data pipeline is triggered recursively. However, the sources of this data pipeline may be refreshed with different frequency. In this case, we may set two triggers for this pipeline. So that this pipeline doesn’t have to run on the highest frequency.

In the ForEach loop, there are two kinds of clients, one is updated every 6 hours, another is updated daily. So two triggers were created, but running in the same pipeline.

2. Set the different pools for notebooks.

Databricks provides pool to efficiently reuse the VMs in the new created clusters. However, one pool can only has one type of instance. If we set pool instance too small, the out of memory or slow process will occur, while if the pool instances are too big, we are wasting our money.

To solve this problem, we set two kinds of pools. The heavy one with instance of 4 cores, 28GB. the light one with instance of 4 core, 8GB memory. For some light weight job, like ingest the source files into bronze table, the light one is a better choice. while the heavy weight one could be use to some aggregation work.

Two pools are created for the different weight processes in the notebooks.

3. Create second hand index or archive folder for the source files.

At the beginning of switching to the cloud, we put everything into a single folder. It is fine for azure. But once we need to scan the spec files or get the location of the file, the huge number of files in this folder will significantly effect the performance, the situation will become worse as file number increase with time. To solve this problem. we have three solutions:

  • create second hand index when file moves into the folder. The second hand index like a table which records the important information of the file, like name, modify date, location in datalake. So next time, when someone need to scan the folder, he only need to scan this second hand index rather than the whole blob dictionary.
An example of second hand index
  • Archive the files. The maybe the easiest one, coz you only need to put the files that you would never use into archive folder. Then you are not time sensitive to get the files back from archive folder.
Part of code to leverage works to move files into Archive folder.
  • Organize files by its receive date. This method is similar with the archive one. but we created many archive folders by date.
Each file are re-organized and put into the date folder when the file received

4. Optimize delta table weekly.

Don’t hesitate to do optimize/vacuum your delta table regularly. The small file will kill the advantage of delta table especially in the “Merge” operation. Following is the example of creating a delta table list that need to be optimized.

delta_table_list = ['customer','job','job-category','job_subscription','missing_file_check','parameter']
                  
# optimize bronze table
for delta_table in delta_table_list:
  sql_string = '''optimize delta.`/mnt/eus-metadata/{0}`'''.format(delta_table)
  sqlContext.sql(sql_string)
for delta_table in delta_table_list:
  sql_string = '''VACUUM delta.`/mnt/eus-metadata/{0}`'''.format(delta_table)
  sqlContext.sql(sql_string)

Spark 3.0 new features – Learning from Dr.Kazuaki Ishizaki

Dr.Kazuaki Ishizaki gives a great summary of spark 3.0 features in his presentation “SQL Performance Improvements at a Glance in Apache Spark 3.0” . It is very helpful for us to understand how these new features work and where we can use it.

New explain format

Spark 3.0 provides a terse format explain with detail information.

EXPLAIN [ EXTENDED | CODEGEN | COST | FORMATTED ] statement

There are five formats:

  1. default. Physical plan only.
  2. extended. It equals df.explain(true) in spark 2.4, which generates parsed logical plan, analyzed logical plan , optimized logical plan and physical plan.
  3. codegen. Generates java code for the statement.
  4. code. If plan stats are available, it generates a logical plan and the states.
  5. formatted. This is most useful in my mind. It has two sections, a physical plan outline with simple tree format and node details.
-- example from spark document
-- Using Formatted
EXPLAIN FORMATTED select k, sum(v) from values (1, 2), (1, 3) t(k, v) group by k;
+----------------------------------------------------+
|                                                plan|
+----------------------------------------------------+
| == Physical Plan ==
 * HashAggregate (4)
 +- Exchange (3)
    +- * HashAggregate (2)
       +- * LocalTableScan (1)
   
   
 (1) LocalTableScan [codegen id : 1]
 Output: [k#19, v#20]
        
 (2) HashAggregate [codegen id : 1]
 Input: [k#19, v#20]
        
 (3) Exchange
 Input: [k#19, sum#24L]
        
 (4) HashAggregate [codegen id : 2]
 Input: [k#19, sum#24L]
|
+----------------------------------------------------+

As these syntax are not exactly match with spark, the following list is help to explain the “meaning of spark explain”

  • scan. Basic file access. In spark 3.0, it can achieve some predication tasks before load data in.
    • ColumnPruning: select the columns only needed.
    • Partitionfilters: only grab data from certain partitions
    • Pushedfilters: filter fields that can be directly to file scan(push down prediction)
  • filter. Due to pushdown prediction, lots of filter work has moved to scan stage, so you may not find filter in explain matching with query. But there are still some operations like first, last, we need to do it in filter.
    • Pushdown prediction.
    • Combine filters: combines two neighboring operations into one
    • Infer filter from constraints. create a new filter form a join condition. we will talk about it in next section “dynamic partitioning pruning”.
    • prune filter.
  • project. Select operation for columns, like select, drop, withColumn.
  • exchange. shuffle operation, like sortmerge, shuffle hash
  • HashAggregate. data aggregation.
  • BroadcastHashJoin & broadcastExchange. Broadcast shuffle.
  • columnarToRow. a transition between columnar and row execution.

All type of Join hints

Spark 2.4 only supports broadcast, while spark 3.0 support all type of join hints

Spark uses two types of hints, one is partition hints, other is join hints. Since spark 3.0, join hints support all type of join.

  • Broadcast join. which is famous join for joining small table(dimension table) with big table(fact table) by avoiding costly data shuffling.
    • table less than 10MB is broadcast across all nodes to avoid shuffling
    • two steps: broadcast –> hash join
    • spark.sql.autoBroadcastJoinThreshold
  • shuffle merge join
    • Sort merge join perform the Sort operation first and then merges the datasets.
    • steps:
      • shuffle. 2 big tables are partitioned as per the join keys across the partitions.
      • sort. sort the data within each partition
      • merge. join the 2 sorted and partitioned data.
    • work well when
      • two big tables as it doesn’t need load all data into memory like hash join
      • highly scalable approach
  • shuffle hash join 
    • Shuffle hash join shuffles the data based on join key, so that rows related to same keys from both tables will be moved on to same node and then perform the join.
    • works well when
      • dataframes are distributed evenly with the keys
      • dataframes has enough number of keys for parallelism 
      • memory is enough for hash join
    • supported for all join except full outer join
    • spark.sql.join.preferSortMergeJoin = false
  • shuffle replicate nl
    • cartesian product(similar to SQL) of the two relations is calculated to evaluate join.

Adaptive query execution(AQE)

AQE is automatic feature enabled for strategy choose in the running time.

  • Set the number of reducers to avoid wasting memory and I/O resource. Dynamically coalescing shuffle partitions.  
    • spark.sql.adaptive.enabled=true
    • spark.sql.adaptive.coalescePartitions.enabled=ture
AQE can merge serval short partitions into one reducer to even the pressure
  • select better join strategy to improve performance
    • dynamically choose from 3 join strategy. broadcast has best performance, but static strategy choose is not accurate sometimes. 
    • spark.sql.adaptive.enabled=true
AQE get the size of join table dynamically, so that it can choose broadcast rather then shuffle operation.
  • Optimize skewed join to avoid imbalance workload
    • the large partition is split into multiple partitions
    • spark.sql.adaptive.skewJoin.enabled=true
AQE split skewed partition into multiple partitions.

Dynamic partitioning pruning

We already peek part of it in explain format. Spark 3.0 is smart that avoid to read unnecessary partitions in a join operations by using results of filter operations in another table. for example,

SELECT * FROM dim_iteblog
JOIN fact_iteblog
ON (dim_iteblog.partcol = fact_iteblog.partcol)
WHERE dim_iteblog.othercol > 10

In this case, spark will do the prune prediction and add a new filter for join table “fact_iteblog”.

Enhanced nested column pruning & pushdown

  • nested column pruning can be applied to all operators, like limits, repartition
    • select col2._1 from(select col2 from tp limit1000)
  • parquet can apply pushdown filter and can read part of columns
    • spark.read.parquet(‘filename’).filter(‘col2._1 = 100’)

Improved aggregation code generation

  • Catalyst translates a given query to java code, Hotspot compiler in OpenJDK translates Java code into native code
  • HotSpot compiler gives up generating native code for more than 8000 Java bytecode instruction per method.
  • Catalyst splits a large java method into small ones  to allow hotspoot to generate native code

New Scala and Java (infrastructure updates)

  • Java 11
  • Scala 2.12

Summary

I think it is better to take a screenshot from presentation of Dr.Kazuaki Ishizaki to do the summary.

Reference

SQL Performance Improvements at a Glance in Apache Spark 3.0, https://www.iteblog.com/ppt/sparkaisummit-north-america-2020-iteblog/sql-performance-improvements-at-a-glance-in-apache-spark-30-iteblog.com.pdf

Spark 3.0.1 – Explain, http://spark.apache.org/docs/latest/sql-ref-syntax-qry-explain.html

Mastering Query Plans in Spark 3.0, https://towardsdatascience.com/mastering-query-plans-in-spark-3-0-f4c334663aa4

Fast Filtering with Spark PartitionFilters and PushedFilters, https://mungingdata.com/apache-spark/partition-filters-pushed-filters/

Spark 3.0.1 – Hints, https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html

Tableau 2019.3 starts to support Databricks

Since version 2019.3, Tableau starts to support databricks with the native connection driver. So we don’t have to use some wired unknown product or custom API to connect them together. Here is the step how to use it simply.

  • Download the new version of Tableau through pre-release page. (it will come to the official version soon)
  • Download the supported Simba ODBC driver. and install it.
  • After all these two steps, you will find Databrick option in your tableau connect.
Select Databricks connector

The next steps are figuring out the authorization information for databricks.

  • Login Databricks.
  • Go to Cluster – <your selection cluster name> – Advanced Options – JDBC/ODBC

Here you can find Server Hostname and Http path, copy them to authorization page.

For user name and password, you have to use token.

  • find the little logo on the right up corner. Press it then choose user setting.
  • Click “Generate new Token”. there will be a popup windows, save the series code which is your PASSWORD.
  • Enter “token” as username, the code you just got as the password. Then you can see the tables you created in databricks.

For Saving tables in databricks, it is also simple.

df_test = spark.read.format("parquet").load("/mnt/aclaradls01/test/dcu_health.parquet")
df_test.write.mode('overwrite').saveAsTable("df_test") 

Tips:

We can also use spark SQL to connect databricks. And even use databricks as API to connect to datalakes. It pretty much like a cluster running Spark. Here is the steps:

  1. create a view in databricks which links to the mounted datalake path. spark.sql(“create view df_test_view as SELECT * FROM parquet./mnt/<mounted_name>/<folder_name>/“)
  2. Add “spark.hadoop.hive.server2.enable.doAs false” into your cluster spark config.
  3. If you tried to load large size data which takes long time, add spark.executor.heartbeatInterval 10s and spark.network.timeout 9999s to your cluster spark config as well.

Reference:

Tableau with databricks. https://docs.databricks.com/user-guide/bi/tableau.html

How to build a data pipeline in Databricks

For a long term, I thought there was no pipeline concept in Databricks. For the most engineers they will write the whole script into one notebook rather than split into several activities like in Data factory. In this case, we have to rewirte everything in the script when the next pipeline coming.

But recently, I found there was indeed a way to achieve pipeline in databricks. Here is the document. And follows my understanding.

  • Create widgets as parameters in callee notebook
dbutils.widgets.text("input", "default") # define a text widget in callee notebook
......
dbutils.notebook.exit(defaultText) # set return variable
  • Call callee notebook through caller notebook(we can write our pipeline in this caller notebook, then call all following activity notebooks)
status = dbutils.notebook.run("pipeline",30,{"input":"pass success"}) # transfer input = “pass success” to a notebook called pipeline
  • Callee notebook can return a string to indicate the execution statues( you still remember dbutils.notebook.exit in callee notebook right?)
returned_string = dbutils.notebook.run("pipeline2", 60) 
  • The data exchange between two notebooks can be done with global tempview which locates in memory.
# in callee
sqlContext.range(10).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

# in caller
returned_table = dbutils.notebook.run("pipeline2", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
  • We can also call function in another notebook, which means we can build a library for all notebook
%run ./aclaraLibary # in this notebook, there are two functions which are overloaded.
simple(1,2)
simple(1,2,3)

Deprecated: preg_replace(): Passing null to parameter #3 ($subject) of type array|string is deprecated in /home/jietao/jie-tao/wp-content/themes/zacklive/library/zacklive.php on line 283