How to build a data pipeline in Databricks

For a long term, I thought there was no pipeline concept in Databricks. For the most engineers they will write the whole script into one notebook rather than split into several activities like in Data factory. In this case, we have to rewirte everything in the script when the next pipeline coming.

But recently, I found there was indeed a way to achieve pipeline in databricks. Here is the document. And follows my understanding.

  • Create widgets as parameters in callee notebook
dbutils.widgets.text("input", "default") # define a text widget in callee notebook
......
dbutils.notebook.exit(defaultText) # set return variable
  • Call callee notebook through caller notebook(we can write our pipeline in this caller notebook, then call all following activity notebooks)
status = dbutils.notebook.run("pipeline",30,{"input":"pass success"}) # transfer input = “pass success” to a notebook called pipeline
  • Callee notebook can return a string to indicate the execution statues( you still remember dbutils.notebook.exit in callee notebook right?)
returned_string = dbutils.notebook.run("pipeline2", 60) 
  • The data exchange between two notebooks can be done with global tempview which locates in memory.
# in callee
sqlContext.range(10).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

# in caller
returned_table = dbutils.notebook.run("pipeline2", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
  • We can also call function in another notebook, which means we can build a library for all notebook
%run ./aclaraLibary # in this notebook, there are two functions which are overloaded.
simple(1,2)
simple(1,2,3)

ADF tends to replace SSIS

Essentially, ADF(Azure Data Factory) only takes responsibility of Extraction and Load in the ELT pipeline. Then we have to use another tool, like databrick to write jupyter notebook to manipulate dataframe or RDD to complete transform activities. However, as Microsoft launched data “Data Flow” in ADF, it becomes more and more similar to SSIS. Most of ETL work can be done in ADF with nicely and friendly GUI. For ETL stuff, I copied a cheat sheet comparing the dataflows between ADF and SSIS. It maybe helpful.

Surprisingly, ADF didn’t provide SCD(slowing changing Dimension). You have to manually work on it. It won’t be hard although. Just set up some start and end time. Here is the article about it.

Anyway, for me. I like this way to complete some simple but repeatable actives with GUI, and let some complex operations in databrick.

Share some useful/special MS SQL tips as a data engineer





If you are a data scientist, you maybe never need to do the data preprocess work, like ETL/ELT, performance tunning or OLTP database design. Everything is already prepared in the structured data warehouse or flat file, it is beauty and nice. Regarding to data quality, all a data scientist need to do is handle some missing or wrong value, then clear the relationship and do the analysis. I didn’t say it is easy after preprocess, what I mean is data engineer really does lots of time-consuming work for the final success. So I wanna summary and share some of my experience, it maybe can save data engineer much time. And I am welcome if someone can correct me and add more information, please send me email: neo_aksa@hotmail.com

1. Incremental Loading. We get three method to do incremental loading.

A. Merge clause. It’s very simple. just the combination of update and insert.

MERGE INTO
target_table tg_table
USING source_table src_table
ON ( src_table.id = tg_table.id )
WHEN MATCHED
THEN UPDATE SET tg_table.name = src_table.name
WHEN NOT MATCHED
THEN INSERT ( tg_table.id, tg_table.name ) VALUES ( src_table.id, src_table.name );

B. CDC(change data capture) in SSIS. More information see my another topic: “Incremental Load DW by using CDC in SSIS

C.Lookup + conditional split in SSIS. Essentially it is as same as the method A. Not find goes to “Insert”, find goes to “update”.

2. CTE. Before CTE coming out, we write the SQL with many sub queries which is a little bit hard to read since the logic is reversed. Now with the help of CTE, we can make our codes more readable and get rid of function in group by.

-- return the customers who had over $10,000 in purchase for their first three transactions.
with OrderRank
as
( 
select custID, row_number() over(partition by custID order by orderID) as Rank, amount from SalesOrder
),
OrderOver
as
(
select custID, sum(amount) as totalAmount from OrderRank where rank<=3 group by custID
)
select custID, totalAmount from OrderOver where totalAmount>10000

3. Delete duplicate row. This is very common job as lots of data are manual input. Here we have two simple ways to handle it.
A. use “Sort” component in SSIS, check reduce duplication box.
B. Write script. Using CTE to mark the row number, then delete the row number greater than 1

With CTE RemoveDuplicate
AS
(
-- partition and order by columns which decide duplication 
select ROW_NUMBER() over (partition id,name.. order by id,name) as row id, column ....... from tablename
)
delete from RemoveDuplicate where row_id > 1

4. Faster Loading. SQL Server defaults isolation level is “Read committed”. But in most of case, we don’t need it as we only need to load all the data from OLTP. There are two ways to make loading faster and not lock another jobs.
A. use “WITH(NOLOCK)” in statement level.

SELECT FirstName, LastName
FROM EmployeeInfo WITH(NOLOCK)
WHERE EmpID = 1;

B. use “Set Transaction ISOLATION LEVEL” to read uncommitted.

SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;

5. Use Column stored index in data warehouse. Column stored index is very helpful to increase the select performance since the column in the same page, but bad for insert or update. For the fact table with many different values, it is very good for full table scan. Just remember, if we create clustered columnstore index, we cannot create primary key, and all columns should be included into this clustered columnstore index.

--BASIC EXAMPLE: Create a nonclustered index on a clustered columnstore table.  
--Create the table  
CREATE TABLE t_account (  
    AccountKey int NOT NULL,  
    AccountDescription nvarchar (50),  
    AccountType nvarchar(50),  
    UnitSold int  
);  
GO  
  
--Store the table as a clustered columnstore.  
CREATE CLUSTERED COLUMNSTORE INDEX taccount_cci ON t_account;  
GO  
  
--Add a nonclustered index for table seek.
CREATE UNIQUE INDEX taccount_nc1 ON t_account (AccountKey);

6. Bulk data load. We maybe find its very slow to bulk load huge tables into data warehouse. This is because we missed some steps before loading.
A. drop the clustered index for large table before loading.
B. recreate index for the large table after loading.
C. Update statistics.

7. Update view. Typically, we use view to hide the logic and table behead, and make loading more easier. But in some cases, we need to update view( yes, we dont want to know the detail of view, we just need to update some data). SQL SERVER provides ability to update view directly and indirectly.
A. If the view match following limitation, you can do DML operation directly. a. no subquery and only select b. no distinct or group by(aggregation=NO) c. No order by d. if view contains multiple tables, you can only insert/update one table. e. use ‘with check option‘, otherwise, you will update the data out of you exception.
B. Use instead of trigger to update tables which related to view.

CREATE TRIGGER trigUnion ON vwUnionCustomerSupplier
INSTEAD OF UPDATE
AS
BEGIN
SET NOCOUNT ON
DECLARE @DelName nvarchar(50)

IF (SELECT inserted.Type FROM inserted) Is Null
RETURN

SELECT @DelName = deleted.CompanyName FROM deleted

IF (SELECT inserted.Type FROM inserted) = 'Company'
UPDATE Customers
SET CompanyName =
  (SELECT CompanyName
  FROM inserted)
  WHERE Customers.CompanyName =
  @DelName
ELSE
UPDATE Suppliers
SET CompanyName =
  (SELECT CompanyName
  FROM inserted)
  WHERE Suppliers.CompanyName =
  @DelName
END

8. Deadlock or long running Query. It’s not normal. but if you find your ELT or ETL is running for a long time. It may be caused by deadlock. check it by sys.dm_tran_lock. or we can use sys.dm_exec_query_stats to get the query running information.

9. Use windows function for rolling aggregation. We can set the row or range option to achieve running aggregation in MS SQL. By default, Range is default option.

-- running total
select customer id, orderId, amount, sum(amount) over (order by orderid) runningtotal from sales_order (in tempDB)
-- revised running total 
select customer id, orderId, amount, sum(amount) over (order by orderid rows unbounded preceding) runningtotal from saels_order (in memory) running total
-- runningtotal from sales order(in memory) all sum, very useful in partition with subtotal
select customer id, orderId, amount, sum(amount) over (order by orderid rows between unbounded preceding and unbounded following) 
-- running 3 month total from sales (in memory)
select customer id, orderId, amount, sum(amount) over (order by orderid rows between 1 preceding and 1 following) 

10. Covering Index. An index that contains all information required to resolve the query is known as a “Covering Index” . If the fields from “select” are not in non-cluster or cluster index, the “key lookup” will happen in execution plan.

To meet the covering Index, but we don’t want move new column into non-clustered index, we can use “Included columns“. It will keep non index in the leaf node of the index.

CREATE NONCLUSTERED INDEX [ix_Customer_Email] ON [dbo].[Customers]
(
            [Last_Name] ASC,
            [First_Name] ASC
)
INCLUDE ( [Email_Address]) WITH (PAD_INDEX  = OFF, STATISTICS_NORECOMPUTE  = OFF, SORT_IN_TEMPDB = OFF, IGNORE_DUP_KEY = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS  = ON, ALLOW_PAGE_LOCKS  = ON) ON [PRIMARY]

11. Schema Binding. schema binding is used for view and function.
Objects that are referenced by schema bound objects cannot have their definition changed. it can also significantly increase the performance of user defined functions

CREATE FUNCTION dbo.GetProductStatusLabel
(
  @StatusID tinyint
)
RETURNS nvarchar(32)
WITH SCHEMABINDING
AS
BEGIN
  RETURN (SELECT Label FROM dbo.ProductStatus WHERE StatusID = @StatusID);
END

12. Table/Index partitioning. If you are working on Azure or cluster platform, please skip this. The HDFS has already helps you to complete similar thing. But if you still work on-prem, table partitioning will help to improve performance a lot. Essentially, table partitioning is creating more than one filegroup to improve its I/O. There are four steps to create partition for table or index.
A. Add filegroups and files

-- Adds four new filegroups to the AdventureWorks2012 database  
ALTER DATABASE AdventureWorks2012  
ADD FILEGROUP test1fg;  
GO  
ALTER DATABASE AdventureWorks2012  
ADD FILEGROUP test2fg;  
-- Adds one file for each filegroup.  
ALTER DATABASE AdventureWorks2012   
ADD FILE   
(  
    NAME = test1dat1,  
    FILENAME = 'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQLSERVER\MSSQL\DATA\t1dat1.ndf',  
    SIZE = 5MB,  
    MAXSIZE = 100MB,  
    FILEGROWTH = 5MB  
)  
TO FILEGROUP test1fg;  
ALTER DATABASE AdventureWorks2012   
ADD FILE   
(  
    NAME = test2dat2,  
    FILENAME = 'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQLSERVER\MSSQL\DATA\t2dat2.ndf',  
    SIZE = 5MB,  
    MAXSIZE = 100MB,  
    FILEGROWTH = 5MB  
)  
TO FILEGROUP test2fg;  
GO  

B. Add partition function: how map to the partitions based on column’s value

-- Creates a partition function called myRangePF1 that will partition a table into four partitions  
CREATE PARTITION FUNCTION myRangePF1 (int)  
    AS RANGE LEFT FOR VALUES (100) ;  
GO  

C. Add partition scheme: map the partition function to filegourps.

-- Creates a partition scheme called myRangePS1 that applies myRangePF1 to the four filegroups created above  
CREATE PARTITION SCHEME myRangePS1  
    AS PARTITION myRangePF1  
    TO (test1fg, test2fg) ;  
GO  

D. participating column: partition function uses it to perform partition

-- Creates a partitioned table called PartitionTable that uses myRangePS1 to partition col1  
CREATE TABLE PartitionTable (col1 int PRIMARY KEY, col2 char(10))  
    ON myRangePS1 (col1) ;  
GO

13. Defragmentation. According to Mircorsoft suggestion, if fragment greater than 30%, we need to rebuild index, if between 5% – 30%, we need to reorganize index. We can use sys.dm_db_index_physical_stats to check the avg_fragmentation_in_percent.

-- use sys.dm_db_index_pysical_stats to check fregmanet
select * from   sys.dm_db_index_physical_stats(DB_ID(N'AdventureWorks2017'),OBJECT_id(N'AdventureWorks2017.Person.Person'),-1,null,'detailed')
-- Check avg_fragmentation_in_percent
-- if this percent 
--> 5% and < = 30%
ALTER INDEX REORGANIZE
--> 30%
ALTER INDEX REBUILD WITH (ONLINE = ON) 1

14. Other convenient code.

-- get all column names of spec table
sp_coulumns table_name, table_owner

-- Object Dependencies
sp_depends table_name, table_owner

-- convert if fail
Try_Convert(data_ype(length), expression, style)

-- split string by demilation.
SELECT * FROM STRING_split('A,B,B',',')
select column1, column2 from table1
cross apply string_split(column3,',')

--  returns the last day of the month containing a specified date, with an optional offset.
EOMONTH ( start_date [, month_to_add ] ) 

-- check the object
IF OBJECT_ID('Sales.uspGetEmployeeSalesYTD', 'P') IS NOT NULL

-- dynamic SQL
-- use sp_executesql
SET @ParmDefinition = N'@BusinessEntityID tinyint'; /* Execute the string with the first parameter value. */ 
SET @IntVariable = 197; 
EXECUTE sp_executesql @SQLString, @ParmDefinition, @BusinessEntityID = @IntVariable;
-- use exec
SET @columnList = 'AddressID, AddressLine1, City'SET @city = '''London'''
SET @sqlCommand = 'SELECT ' + @columnList + ' FROM Person.Address WHERE City = ' + @city
EXEC (@sqlCommand)

Incremental Load DW by using CDC in SSIS

To load data from OLTP system to DW, we have to face a problem: how to balance time and cost. Since data raises faster and faster, we need to increase our hardware ability to match the time requirement. So, incremental load coming out to reduce the data transmission significantly. There are three way to achieve it: 1. use datetime column 2. Changed data capture 3. changed data tracking. For a long time, I use the first way to capture the changed data manual, it is good, but too many works in development and testing. Here I want to introduce the CDC. Basically, CDC just a feature to utilize LSN(Log Sequence Number) and log tables to capture changed data, while SSIS itself provide native components to easy work with this new feature.

Let’s build a CDC workflow in SSIS for example:

Enable CDC feature

  • enable CDC by executing sp_cdc_enable_db(disable by sp_cdc_disable_db). Then check it with select name from sys.databaseswhere is_cdc_enabled=1
  • enable CDC for spec table with sp_cdc_enable_table then we can find the CDC table in systemtable folder or using select namefrom sys.tables tabwhere is_tracked_by_cdc=1 to check.
exec sys.sp_cdc_enable_table
@source_schema = N'Person'
, @source_name = N'Address'
, @role_name = N'cdc_Admin'
, @capture_column_list = N'column1, column2'; //can track spec columns, rather than the whole table

Control flow setting

  • add CDC control task
  • set CDC control operation to mark cdc start and set the cdc states for saving cdc states
  • run this control task, it will create a record in tablecdc_states
  • create two CDC control tasks , one set operation to Get Processing Range , another for Mark process range, they will get changed data and update CDC states respectively.
  • put a dataflow which is response for ETL operation, between two CDC control tasks.

Data flow setting into staging table

  • add CDC source which points to the table enabled CDC and choose the correct cdc_states table as well.
  • choose Net CDC processing mode in CDC source.
  • add CDC splinter after CDC source, create three Derived Column transformation for insert(0), update(2) and delete(1) data.
  • create a Union All transformation to union all data and export to stage database.
  • if necessary, we need to add a truncate script before all control flow to delete everything in stage database.
    img

Update fact table through staging tables

  • create a oledb source to connect to stage database
  • use conditional split to split insert and update+delete
  • for insert, we directly export; for update+delete we need to delete from fact table by identifier by OLE DB Command transformation, and use conditional split to export update data.
  • if necessary, use lookup to replace some dimensional columns
  • export to fact database.

Get Rid of ETL , Move to Spark.

ETL is the most common tool in the process of building EDW, of course the first step in data integration. As big data emerging, we would find more and more customer starting using hadoop and spark. Personally, I agree the idea that spark will replace most ETL tools.

Background

  • Business Intelligence -> big data
  • Data warehouse -> data lake
  • Applications -> Micro services

ETL hell

  • Data getting out of sync, each copy is a risk.
  • Performance issues and waste of server resource(peek Performance), although ETL can do limited parallel work.
  • Plain-text code in hidden stages(VB or java typical)
  • CSV files are not type safe
  • all or nothing approach in batch jobs.
  • legacy code

Spark for ETL

  • parallel processing in build in
  • using steaming to parallel ETL
  • Hadoop which is data source, we don’t need copy and reduce risk
  • just one code(scala or python)
  • Machine learning included
  • security, unit testing, Performance measurement , excepting handling, monitoring

Code Demo

  1. Simple one
spark.read.json("/sourcepath") #extract
.filter(...)   # Transform and blew
.agg(...)
.write.mode("append")  # Load
.parquet("/outputpath")

2.Steam

# @param1: master
# @param2: appname
sc = SparkContext("local[2]", "NetworkWordCount")
# @param1: spark context
# @param2: seconds
ssc = StreamingContext(sc, 1)
steam = ssc.textFileStream("path")
# do transform
# do load
ssc.start()
ssc.awaitTermination()

reference:
1. https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html
2.https://databricks.com/session/get-rid-of-traditional-etl-move-to-spark
3.https://www.slideshare.net/databricks/building-robust-etl-pipelines-with-apache-spark