Data Factory CI/CD in Azure DevOps

Azure Pipeline is consist of two parts: pipeline and release. They represent
CI and CD separately.

  1. Build Pipeline – to build and test the code. The build creates an artifact that’s used by the rest of your pipeline to run tasks such as deploying to staging or production. 
  2. Release Pipeline – once the code is updated, built and packaged, it can be deployed to target services using Release Pipelines. 
top half: CI pipeline; down half: CD pipeline

Let’s talk about how to implement this process for Azure data factory. Before you start, I suppose your current ADF has matched these requirements (if not, please refer to here):

  • You already have a Azure Repos.
  • ADF is integrated with this Git Repos.
  • A key Vault. which is used for storing database and datalake connection information, plus all configuration parameters for release.

The whole ADF CI/CD pipeline is like this:

Building Pipeline (CI)

  • Create new pipeline, choose “adf_publish” for default branch
    • “adf_publish” branch is created by ADF automatically, after you click “publish” in your ADF GUI.
the three json files are used to set parameters in the different environments
  • Add new Agent job
  • search for “Data factory”, add a new publish artifacts
    • set the path to publish(git path)
    • set artifact publish location as Azure pipelines

Release Pipeline (CD)

left part: CI right part: CD

So next step is deploying artifact into three environments: DEV, QA and PROD.

use key vault to set sQL Server in pipeline
  • Import Azure Key Vault. where you stored all the connection information. We would use these information in two areas.
    • Release pipeline: get the basic information in each environment, e.g, database name, data lake name, etc. see the screenshot above.
    • template parameters used in adf_publish.
  • Set the Datafactory.
    • Add a “Azure Resource group deployment”. where we need to focus one the template path.
we have to set the ARM template and parameters which point to adf_publish branch

You can also add trigger stop and start before and end of data factory job.

# stop trigger, you have to add variables for each release environment
$triggersADF = Get-AzDataFactoryV2Trigger -DataFactoryName $(DataFactoryName) -ResourceGroupName $(ResourceGroupName)

$triggersADF | ForEach-Object { Stop-AzDataFactoryV2Trigger -ResourceGroupName $(ResourceGroupName) -DataFactoryName $(DataFactoryName) -Name $ -Force }
# end trigger
$triggersADF = Get-AzDataFactoryV2Trigger -DataFactoryName $(DataFactoryName) -ResourceGroupName $(ResourceGroupName)

$triggersADF | ForEach-Object { Start-AzDataFactoryV2Trigger -ResourceGroupName $(ResourceGroupName) -DataFactoryName $(DataFactoryName) -Name $ -Force }
Stop trigger is mandatory for releasing ADF.
  • How to set up adf_publish
    • go back to azure repos.
    • switch to adf_publish.
    • create corresponding ARM parameter template for each environment. You can find an example blew.
the three json files are used to set parameters in the different environments
	"$schema": "",
	"contentVersion": "",
	"parameters": {
		"factoryName": {
			"value": "datafactory name"
		"LS_AKV_KeyVault_properties_typeProperties_baseUrl": {
			"value": ""
		"LS_SQL_ConfigDb_properties_typeProperties_connectionString_secretName": {
			"value": "sqlconnection"
		"LS_DLS_Datalake_properties_typeProperties_url": {
			"value": ""

These three parameters are used for dynamic linked service. These three parameters are defined in ARMTemplateForFactory, and set the value in each seprated Json files. To define the parameters, you have to first go to “Parameterization template” table and edit it.

Here is an example that I defined a linkedservice for AzureKey Vault. for more information please refer to :

If we need enable Continuous deployment trigger , we have to link with adf_publish branch as well.

click the little logo in release pipeline page.
set to adf_publish


Azure DevOps Pipelines.

Continuous integration and delivery in Azure Data Factory.

Use Azure Key Vault to pass secure parameter value during deployment.

Share some useful/special MS SQL tips as a data engineer

If you are a data scientist, you maybe never need to do the data preprocess work, like ETL/ELT, performance tunning or OLTP database design. Everything is already prepared in the structured data warehouse or flat file, it is beauty and nice. Regarding to data quality, all a data scientist need to do is handle some missing or wrong value, then clear the relationship and do the analysis. I didn’t say it is easy after preprocess, what I mean is data engineer really does lots of time-consuming work for the final success. So I wanna summary and share some of my experience, it maybe can save data engineer much time. And I am welcome if someone can correct me and add more information, please send me email:

1. Incremental Loading. We get three method to do incremental loading.

A. Merge clause. It’s very simple. just the combination of update and insert.

target_table tg_table
USING source_table src_table
ON ( = )

B. CDC(change data capture) in SSIS. More information see my another topic: “Incremental Load DW by using CDC in SSIS

C.Lookup + conditional split in SSIS. Essentially it is as same as the method A. Not find goes to “Insert”, find goes to “update”.

2. CTE. Before CTE coming out, we write the SQL with many sub queries which is a little bit hard to read since the logic is reversed. Now with the help of CTE, we can make our codes more readable and get rid of function in group by.

-- return the customers who had over $10,000 in purchase for their first three transactions.
with OrderRank
select custID, row_number() over(partition by custID order by orderID) as Rank, amount from SalesOrder
select custID, sum(amount) as totalAmount from OrderRank where rank<=3 group by custID
select custID, totalAmount from OrderOver where totalAmount>10000

3. Delete duplicate row. This is very common job as lots of data are manual input. Here we have two simple ways to handle it.
A. use “Sort” component in SSIS, check reduce duplication box.
B. Write script. Using CTE to mark the row number, then delete the row number greater than 1

With CTE RemoveDuplicate
-- partition and order by columns which decide duplication 
select ROW_NUMBER() over (partition id,name.. order by id,name) as row id, column ....... from tablename
delete from RemoveDuplicate where row_id > 1

4. Faster Loading. SQL Server defaults isolation level is “Read committed”. But in most of case, we don’t need it as we only need to load all the data from OLTP. There are two ways to make loading faster and not lock another jobs.
A. use “WITH(NOLOCK)” in statement level.

SELECT FirstName, LastName
WHERE EmpID = 1;

B. use “Set Transaction ISOLATION LEVEL” to read uncommitted.


5. Use Column stored index in data warehouse. Column stored index is very helpful to increase the select performance since the column in the same page, but bad for insert or update. For the fact table with many different values, it is very good for full table scan. Just remember, if we create clustered columnstore index, we cannot create primary key, and all columns should be included into this clustered columnstore index.

--BASIC EXAMPLE: Create a nonclustered index on a clustered columnstore table.  
--Create the table  
CREATE TABLE t_account (  
    AccountKey int NOT NULL,  
    AccountDescription nvarchar (50),  
    AccountType nvarchar(50),  
    UnitSold int  
--Store the table as a clustered columnstore.  
--Add a nonclustered index for table seek.
CREATE UNIQUE INDEX taccount_nc1 ON t_account (AccountKey);

6. Bulk data load. We maybe find its very slow to bulk load huge tables into data warehouse. This is because we missed some steps before loading.
A. drop the clustered index for large table before loading.
B. recreate index for the large table after loading.
C. Update statistics.

7. Update view. Typically, we use view to hide the logic and table behead, and make loading more easier. But in some cases, we need to update view( yes, we dont want to know the detail of view, we just need to update some data). SQL SERVER provides ability to update view directly and indirectly.
A. If the view match following limitation, you can do DML operation directly. a. no subquery and only select b. no distinct or group by(aggregation=NO) c. No order by d. if view contains multiple tables, you can only insert/update one table. e. use ‘with check option‘, otherwise, you will update the data out of you exception.
B. Use instead of trigger to update tables which related to view.

CREATE TRIGGER trigUnion ON vwUnionCustomerSupplier
DECLARE @DelName nvarchar(50)

IF (SELECT inserted.Type FROM inserted) Is Null

SELECT @DelName = deleted.CompanyName FROM deleted

IF (SELECT inserted.Type FROM inserted) = 'Company'
UPDATE Customers
SET CompanyName =
  (SELECT CompanyName
  FROM inserted)
  WHERE Customers.CompanyName =
UPDATE Suppliers
SET CompanyName =
  (SELECT CompanyName
  FROM inserted)
  WHERE Suppliers.CompanyName =

8. Deadlock or long running Query. It’s not normal. but if you find your ELT or ETL is running for a long time. It may be caused by deadlock. check it by sys.dm_tran_lock. or we can use sys.dm_exec_query_stats to get the query running information.

9. Use windows function for rolling aggregation. We can set the row or range option to achieve running aggregation in MS SQL. By default, Range is default option.

-- running total
select customer id, orderId, amount, sum(amount) over (order by orderid) runningtotal from sales_order (in tempDB)
-- revised running total 
select customer id, orderId, amount, sum(amount) over (order by orderid rows unbounded preceding) runningtotal from saels_order (in memory) running total
-- runningtotal from sales order(in memory) all sum, very useful in partition with subtotal
select customer id, orderId, amount, sum(amount) over (order by orderid rows between unbounded preceding and unbounded following) 
-- running 3 month total from sales (in memory)
select customer id, orderId, amount, sum(amount) over (order by orderid rows between 1 preceding and 1 following) 

10. Covering Index. An index that contains all information required to resolve the query is known as a “Covering Index” . If the fields from “select” are not in non-cluster or cluster index, the “key lookup” will happen in execution plan.

To meet the covering Index, but we don’t want move new column into non-clustered index, we can use “Included columns“. It will keep non index in the leaf node of the index.

CREATE NONCLUSTERED INDEX [ix_Customer_Email] ON [dbo].[Customers]
            [Last_Name] ASC,
            [First_Name] ASC

11. Schema Binding. schema binding is used for view and function.
Objects that are referenced by schema bound objects cannot have their definition changed. it can also significantly increase the performance of user defined functions

CREATE FUNCTION dbo.GetProductStatusLabel
  @StatusID tinyint
RETURNS nvarchar(32)
  RETURN (SELECT Label FROM dbo.ProductStatus WHERE StatusID = @StatusID);

12. Table/Index partitioning. If you are working on Azure or cluster platform, please skip this. The HDFS has already helps you to complete similar thing. But if you still work on-prem, table partitioning will help to improve performance a lot. Essentially, table partitioning is creating more than one filegroup to improve its I/O. There are four steps to create partition for table or index.
A. Add filegroups and files

-- Adds four new filegroups to the AdventureWorks2012 database  
ALTER DATABASE AdventureWorks2012  
ADD FILEGROUP test1fg;  
ALTER DATABASE AdventureWorks2012  
ADD FILEGROUP test2fg;  
-- Adds one file for each filegroup.  
ALTER DATABASE AdventureWorks2012   
    NAME = test1dat1,  
    FILENAME = 'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQLSERVER\MSSQL\DATA\t1dat1.ndf',  
    SIZE = 5MB,  
    MAXSIZE = 100MB,  
TO FILEGROUP test1fg;  
ALTER DATABASE AdventureWorks2012   
    NAME = test2dat2,  
    FILENAME = 'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQLSERVER\MSSQL\DATA\t2dat2.ndf',  
    SIZE = 5MB,  
    MAXSIZE = 100MB,  
TO FILEGROUP test2fg;  

B. Add partition function: how map to the partitions based on column’s value

-- Creates a partition function called myRangePF1 that will partition a table into four partitions  

C. Add partition scheme: map the partition function to filegourps.

-- Creates a partition scheme called myRangePS1 that applies myRangePF1 to the four filegroups created above  
    AS PARTITION myRangePF1  
    TO (test1fg, test2fg) ;  

D. participating column: partition function uses it to perform partition

-- Creates a partitioned table called PartitionTable that uses myRangePS1 to partition col1  
CREATE TABLE PartitionTable (col1 int PRIMARY KEY, col2 char(10))  
    ON myRangePS1 (col1) ;  

13. Defragmentation. According to Mircorsoft suggestion, if fragment greater than 30%, we need to rebuild index, if between 5% – 30%, we need to reorganize index. We can use sys.dm_db_index_physical_stats to check the avg_fragmentation_in_percent.

-- use sys.dm_db_index_pysical_stats to check fregmanet
select * from   sys.dm_db_index_physical_stats(DB_ID(N'AdventureWorks2017'),OBJECT_id(N'AdventureWorks2017.Person.Person'),-1,null,'detailed')
-- Check avg_fragmentation_in_percent
-- if this percent 
--> 5% and < = 30%
--> 30%

14. Other convenient code.

-- get all column names of spec table
sp_coulumns table_name, table_owner

-- Object Dependencies
sp_depends table_name, table_owner

-- convert if fail
Try_Convert(data_ype(length), expression, style)

-- split string by demilation.
SELECT * FROM STRING_split('A,B,B',',')
select column1, column2 from table1
cross apply string_split(column3,',')

--  returns the last day of the month containing a specified date, with an optional offset.
EOMONTH ( start_date [, month_to_add ] ) 

-- check the object
IF OBJECT_ID('Sales.uspGetEmployeeSalesYTD', 'P') IS NOT NULL

-- dynamic SQL
-- use sp_executesql
SET @ParmDefinition = N'@BusinessEntityID tinyint'; /* Execute the string with the first parameter value. */ 
SET @IntVariable = 197; 
EXECUTE sp_executesql @SQLString, @ParmDefinition, @BusinessEntityID = @IntVariable;
-- use exec
SET @columnList = 'AddressID, AddressLine1, City'SET @city = '''London'''
SET @sqlCommand = 'SELECT ' + @columnList + ' FROM Person.Address WHERE City = ' + @city
EXEC (@sqlCommand)

Incremental Load DW by using CDC in SSIS

To load data from OLTP system to DW, we have to face a problem: how to balance time and cost. Since data raises faster and faster, we need to increase our hardware ability to match the time requirement. So, incremental load coming out to reduce the data transmission significantly. There are three way to achieve it: 1. use datetime column 2. Changed data capture 3. changed data tracking. For a long time, I use the first way to capture the changed data manual, it is good, but too many works in development and testing. Here I want to introduce the CDC. Basically, CDC just a feature to utilize LSN(Log Sequence Number) and log tables to capture changed data, while SSIS itself provide native components to easy work with this new feature.

Let’s build a CDC workflow in SSIS for example:

Enable CDC feature

  • enable CDC by executing sp_cdc_enable_db(disable by sp_cdc_disable_db). Then check it with select name from sys.databaseswhere is_cdc_enabled=1
  • enable CDC for spec table with sp_cdc_enable_table then we can find the CDC table in systemtable folder or using select namefrom sys.tables tabwhere is_tracked_by_cdc=1 to check.
exec sys.sp_cdc_enable_table
@source_schema = N'Person'
, @source_name = N'Address'
, @role_name = N'cdc_Admin'
, @capture_column_list = N'column1, column2'; //can track spec columns, rather than the whole table

Control flow setting

  • add CDC control task
  • set CDC control operation to mark cdc start and set the cdc states for saving cdc states
  • run this control task, it will create a record in tablecdc_states
  • create two CDC control tasks , one set operation to Get Processing Range , another for Mark process range, they will get changed data and update CDC states respectively.
  • put a dataflow which is response for ETL operation, between two CDC control tasks.

Data flow setting into staging table

  • add CDC source which points to the table enabled CDC and choose the correct cdc_states table as well.
  • choose Net CDC processing mode in CDC source.
  • add CDC splinter after CDC source, create three Derived Column transformation for insert(0), update(2) and delete(1) data.
  • create a Union All transformation to union all data and export to stage database.
  • if necessary, we need to add a truncate script before all control flow to delete everything in stage database.

Update fact table through staging tables

  • create a oledb source to connect to stage database
  • use conditional split to split insert and update+delete
  • for insert, we directly export; for update+delete we need to delete from fact table by identifier by OLE DB Command transformation, and use conditional split to export update data.
  • if necessary, use lookup to replace some dimensional columns
  • export to fact database.