Essentially, ADF(Azure Data Factory) only takes responsibility of Extraction and Load in the ELT pipeline. Then we have to use another tool, like databrick to write jupyter notebook to manipulate dataframe or RDD to complete transform activities. However, as Microsoft launched data “Data Flow” in ADF, it becomes more and more similar to SSIS. Most of ETL work can be done in ADF with nicely and friendly GUI. For ETL stuff, I copied a cheat sheet comparing the dataflows between ADF and SSIS. It maybe helpful.
Surprisingly, ADF didn’t provide SCD(slowing changing Dimension). You have to manually work on it. It won’t be hard although. Just set up some start and end time. Here is the article about it.
Anyway, for me. I like this way to complete some simple but repeatable actives with GUI, and let some complex operations in databrick.
If you are a data scientist, you maybe never need to do the data preprocess work, like ETL/ELT, performance tunning or OLTP database design. Everything is already prepared in the structured data warehouse or flat file, it is beauty and nice. Regarding to data quality, all a data scientist need to do is handle some missing or wrong value, then clear the relationship and do the analysis. I didn’t say it is easy after preprocess, what I mean is data engineer really does lots of time-consuming work for the final success. So I wanna summary and share some of my experience, it maybe can save data engineer much time. And I am welcome if someone can correct me and add more information, please send me email: neo_aksa@hotmail.com
1.Incremental Loading. We get three method to do incremental loading.
A. Merge clause. It’s very simple. just the combination of update and insert.
MERGE INTO
target_table tg_table
USING source_table src_table
ON ( src_table.id = tg_table.id )
WHEN MATCHED
THEN UPDATE SET tg_table.name = src_table.name
WHEN NOT MATCHED
THEN INSERT ( tg_table.id, tg_table.name ) VALUES ( src_table.id, src_table.name );
C.Lookup + conditional split in SSIS. Essentially it is as same as the method A. Not find goes to “Insert”, find goes to “update”.
2. CTE. Before CTE coming out, we write the SQL with many sub queries which is a little bit hard to read since the logic is reversed. Now with the help of CTE, we can make our codes more readable and get rid of function in group by.
-- return the customers who had over $10,000 in purchase for their first three transactions.
with OrderRank
as
(
select custID, row_number() over(partition by custID order by orderID) as Rank, amount from SalesOrder
),
OrderOver
as
(
select custID, sum(amount) as totalAmount from OrderRank where rank<=3 group by custID
)
select custID, totalAmount from OrderOver where totalAmount>10000
3. Delete duplicate row. This is very common job as lots of data are manual input. Here we have two simple ways to handle it. A. use “Sort” component in SSIS, check reduce duplication box. B. Write script. Using CTE to mark the row number, then delete the row number greater than 1
With CTE RemoveDuplicate
AS
(
-- partition and order by columns which decide duplication
select ROW_NUMBER() over (partition id,name.. order by id,name) as row id, column ....... from tablename
)
delete from RemoveDuplicate where row_id > 1
4. Faster Loading. SQL Server defaults isolation level is “Read committed”. But in most of case, we don’t need it as we only need to load all the data from OLTP. There are two ways to make loading faster and not lock another jobs. A. use “WITH(NOLOCK)” in statement level.
SELECT FirstName, LastName
FROM EmployeeInfo WITH(NOLOCK)
WHERE EmpID = 1;
B. use “Set Transaction ISOLATION LEVEL” to read uncommitted.
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
5. Use Column stored index in data warehouse. Column stored index is very helpful to increase the select performance since the column in the same page, but bad for insert or update. For the fact table with many different values, it is very good for full table scan. Just remember, if we create clustered columnstore index, we cannot create primary key, and all columns should be included into this clustered columnstore index.
--BASIC EXAMPLE: Create a nonclustered index on a clustered columnstore table.
--Create the table
CREATE TABLE t_account (
AccountKey int NOT NULL,
AccountDescription nvarchar (50),
AccountType nvarchar(50),
UnitSold int
);
GO
--Store the table as a clustered columnstore.
CREATE CLUSTERED COLUMNSTORE INDEX taccount_cci ON t_account;
GO
--Add a nonclustered index for table seek.
CREATE UNIQUE INDEX taccount_nc1 ON t_account (AccountKey);
6. Bulk data load. We maybe find its very slow to bulk load huge tables into data warehouse. This is because we missed some steps before loading. A. drop the clustered index for large table before loading. B. recreate index for the large table after loading. C. Update statistics.
7. Update view. Typically, we use view to hide the logic and table behead, and make loading more easier. But in some cases, we need to update view( yes, we dont want to know the detail of view, we just need to update some data). SQL SERVER provides ability to update view directly and indirectly. A. If the view match following limitation, you can do DML operation directly. a. no subquery and only select b. no distinct or group by(aggregation=NO) c. No order by d. if view contains multiple tables, you can only insert/update one table. e. use ‘with check option‘, otherwise, you will update the data out of you exception. B. Use instead of trigger to update tables which related to view.
CREATE TRIGGER trigUnion ON vwUnionCustomerSupplier
INSTEAD OF UPDATE
AS
BEGIN
SET NOCOUNT ON
DECLARE @DelName nvarchar(50)
IF (SELECT inserted.Type FROM inserted) Is Null
RETURN
SELECT @DelName = deleted.CompanyName FROM deleted
IF (SELECT inserted.Type FROM inserted) = 'Company'
UPDATE Customers
SET CompanyName =
(SELECT CompanyName
FROM inserted)
WHERE Customers.CompanyName =
@DelName
ELSE
UPDATE Suppliers
SET CompanyName =
(SELECT CompanyName
FROM inserted)
WHERE Suppliers.CompanyName =
@DelName
END
8. Deadlock or long running Query. It’s not normal. but if you find your ELT or ETL is running for a long time. It may be caused by deadlock. check it by sys.dm_tran_lock. or we can use sys.dm_exec_query_stats to get the query running information.
9. Use windows function for rolling aggregation. We can set the row or range option to achieve running aggregation in MS SQL. By default, Range is default option.
-- running total
select customer id, orderId, amount, sum(amount) over (order by orderid) runningtotal from sales_order (in tempDB)
-- revised running total
select customer id, orderId, amount, sum(amount) over (order by orderid rows unbounded preceding) runningtotal from saels_order (in memory) running total
-- runningtotal from sales order(in memory) all sum, very useful in partition with subtotal
select customer id, orderId, amount, sum(amount) over (order by orderid rows between unbounded preceding and unbounded following)
-- running 3 month total from sales (in memory)
select customer id, orderId, amount, sum(amount) over (order by orderid rows between 1 preceding and 1 following)
10. Covering Index. An index that contains all information required to resolve the query is known as a “Covering Index” . If the fields from “select” are not in non-cluster or cluster index, the “key lookup” will happen in execution plan.
To meet the covering Index, but we don’t want move new column into non-clustered index, we can use “Included columns“. It will keep non index in the leaf node of the index.
CREATE NONCLUSTERED INDEX [ix_Customer_Email] ON [dbo].[Customers]
(
[Last_Name] ASC,
[First_Name] ASC
)
INCLUDE ( [Email_Address]) WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, IGNORE_DUP_KEY = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
11. Schema Binding. schema binding is used for view and function. Objects that are referenced by schema bound objects cannot have their definition changed. it can also significantly increase the performance of user defined functions
CREATE FUNCTION dbo.GetProductStatusLabel
(
@StatusID tinyint
)
RETURNS nvarchar(32)
WITH SCHEMABINDING
AS
BEGIN
RETURN (SELECT Label FROM dbo.ProductStatus WHERE StatusID = @StatusID);
END
12. Table/Index partitioning. If you are working on Azure or cluster platform, please skip this. The HDFS has already helps you to complete similar thing. But if you still work on-prem, table partitioning will help to improve performance a lot. Essentially, table partitioning is creating more than one filegroup to improve its I/O. There are four steps to create partition for table or index. A. Add filegroups and files
-- Adds four new filegroups to the AdventureWorks2012 database
ALTER DATABASE AdventureWorks2012
ADD FILEGROUP test1fg;
GO
ALTER DATABASE AdventureWorks2012
ADD FILEGROUP test2fg;
-- Adds one file for each filegroup.
ALTER DATABASE AdventureWorks2012
ADD FILE
(
NAME = test1dat1,
FILENAME = 'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQLSERVER\MSSQL\DATA\t1dat1.ndf',
SIZE = 5MB,
MAXSIZE = 100MB,
FILEGROWTH = 5MB
)
TO FILEGROUP test1fg;
ALTER DATABASE AdventureWorks2012
ADD FILE
(
NAME = test2dat2,
FILENAME = 'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQLSERVER\MSSQL\DATA\t2dat2.ndf',
SIZE = 5MB,
MAXSIZE = 100MB,
FILEGROWTH = 5MB
)
TO FILEGROUP test2fg;
GO
B. Add partition function: how map to the partitions based on column’s value
-- Creates a partition function called myRangePF1 that will partition a table into four partitions
CREATE PARTITION FUNCTION myRangePF1 (int)
AS RANGE LEFT FOR VALUES (100) ;
GO
C. Add partition scheme: map the partition function to filegourps.
-- Creates a partition scheme called myRangePS1 that applies myRangePF1 to the four filegroups created above
CREATE PARTITION SCHEME myRangePS1
AS PARTITION myRangePF1
TO (test1fg, test2fg) ;
GO
D. participating column: partition function uses it to perform partition
-- Creates a partitioned table called PartitionTable that uses myRangePS1 to partition col1
CREATE TABLE PartitionTable (col1 int PRIMARY KEY, col2 char(10))
ON myRangePS1 (col1) ;
GO
13. Defragmentation. According to Mircorsoft suggestion, if fragment greater than 30%, we need to rebuild index, if between 5% – 30%, we need to reorganize index. We can use sys.dm_db_index_physical_stats to check the avg_fragmentation_in_percent.
-- use sys.dm_db_index_pysical_stats to check fregmanet
select * from sys.dm_db_index_physical_stats(DB_ID(N'AdventureWorks2017'),OBJECT_id(N'AdventureWorks2017.Person.Person'),-1,null,'detailed')
-- Check avg_fragmentation_in_percent
-- if this percent
--> 5% and < = 30%
ALTER INDEX REORGANIZE
--> 30%
ALTER INDEX REBUILD WITH (ONLINE = ON) 1
14. Other convenient code.
-- get all column names of spec table
sp_coulumns table_name, table_owner
-- Object Dependencies
sp_depends table_name, table_owner
-- convert if fail
Try_Convert(data_ype(length), expression, style)
-- split string by demilation.
SELECT * FROM STRING_split('A,B,B',',')
select column1, column2 from table1
cross apply string_split(column3,',')
-- returns the last day of the month containing a specified date, with an optional offset.
EOMONTH ( start_date [, month_to_add ] )
-- check the object
IF OBJECT_ID('Sales.uspGetEmployeeSalesYTD', 'P') IS NOT NULL
-- dynamic SQL
-- use sp_executesql
SET @ParmDefinition = N'@BusinessEntityID tinyint'; /* Execute the string with the first parameter value. */
SET @IntVariable = 197;
EXECUTE sp_executesql @SQLString, @ParmDefinition, @BusinessEntityID = @IntVariable;
-- use exec
SET @columnList = 'AddressID, AddressLine1, City'SET @city = '''London'''
SET @sqlCommand = 'SELECT ' + @columnList + ' FROM Person.Address WHERE City = ' + @city
EXEC (@sqlCommand)
To load data from OLTP system to DW, we have to face a problem: how to balance time and cost. Since data raises faster and faster, we need to increase our hardware ability to match the time requirement. So, incremental load coming out to reduce the data transmission significantly. There are three way to achieve it: 1. use datetime column 2. Changed data capture 3. changed data tracking. For a long time, I use the first way to capture the changed data manual, it is good, but too many works in development and testing. Here I want to introduce the CDC. Basically, CDC just a feature to utilize LSN(Log Sequence Number) and log tables to capture changed data, while SSIS itself provide native components to easy work with this new feature.
Let’s build a CDC workflow in SSIS for example:
Enable CDC feature
enable CDC by executing sp_cdc_enable_db(disable by sp_cdc_disable_db). Then check it with select name from sys.databaseswhere is_cdc_enabled=1
enable CDC for spec table with sp_cdc_enable_table then we can find the CDC table in systemtable folder or using select namefrom sys.tables tabwhere is_tracked_by_cdc=1 to check.
exec sys.sp_cdc_enable_table
@source_schema = N'Person'
, @source_name = N'Address'
, @role_name = N'cdc_Admin'
, @capture_column_list = N'column1, column2'; //can track spec columns, rather than the whole table
Control flow setting
add CDC control task
set CDC control operation to mark cdc start and set the cdc states for saving cdc states
run this control task, it will create a record in tablecdc_states
create two CDC control tasks , one set operation to Get Processing Range , another for Mark process range, they will get changed data and update CDC states respectively.
put a dataflow which is response for ETL operation, between two CDC control tasks.
Data flow setting into staging table
add CDC source which points to the table enabled CDC and choose the correct cdc_states table as well.
choose Net CDC processing mode in CDC source.
add CDC splinter after CDC source, create three Derived Column transformation for insert(0), update(2) and delete(1) data.
create a Union All transformation to union all data and export to stage database.
if necessary, we need to add a truncate script before all control flow to delete everything in stage database.
Update fact table through staging tables
create a oledb source to connect to stage database
use conditional split to split insert and update+delete
for insert, we directly export; for update+delete we need to delete from fact table by identifier by OLE DB Command transformation, and use conditional split to export update data.
if necessary, use lookup to replace some dimensional columns
export to fact database.
Deprecated: preg_replace(): Passing null to parameter #3 ($subject) of type array|string is deprecated in /home/jietao/jie-tao/wp-content/themes/zacklive/library/zacklive.php on line 283