Incremental Load DW by using CDC in SSIS

To load data from OLTP system to DW, we have to face a problem: how to balance time and cost. Since data raises faster and faster, we need to increase our hardware ability to match the time requirement. So, incremental load coming out to reduce the data transmission significantly. There are three way to achieve it: 1. use datetime column 2. Changed data capture 3. changed data tracking. For a long time, I use the first way to capture the changed data manual, it is good, but too many works in development and testing. Here I want to introduce the CDC. Basically, CDC just a feature to utilize LSN(Log Sequence Number) and log tables to capture changed data, while SSIS itself provide native components to easy work with this new feature.

Let’s build a CDC workflow in SSIS for example:

Enable CDC feature

  • enable CDC by executing sp_cdc_enable_db(disable by sp_cdc_disable_db). Then check it with select name from sys.databaseswhere is_cdc_enabled=1
  • enable CDC for spec table with sp_cdc_enable_table then we can find the CDC table in systemtable folder or using select namefrom sys.tables tabwhere is_tracked_by_cdc=1 to check.
exec sys.sp_cdc_enable_table
@source_schema = N'Person'
, @source_name = N'Address'
, @role_name = N'cdc_Admin'
, @capture_column_list = N'column1, column2'; //can track spec columns, rather than the whole table

Control flow setting

  • add CDC control task
  • set CDC control operation to mark cdc start and set the cdc states for saving cdc states
  • run this control task, it will create a record in tablecdc_states
  • create two CDC control tasks , one set operation to Get Processing Range , another for Mark process range, they will get changed data and update CDC states respectively.
  • put a dataflow which is response for ETL operation, between two CDC control tasks.

Data flow setting into staging table

  • add CDC source which points to the table enabled CDC and choose the correct cdc_states table as well.
  • choose Net CDC processing mode in CDC source.
  • add CDC splinter after CDC source, create three Derived Column transformation for insert(0), update(2) and delete(1) data.
  • create a Union All transformation to union all data and export to stage database.
  • if necessary, we need to add a truncate script before all control flow to delete everything in stage database.
    img

Update fact table through staging tables

  • create a oledb source to connect to stage database
  • use conditional split to split insert and update+delete
  • for insert, we directly export; for update+delete we need to delete from fact table by identifier by OLE DB Command transformation, and use conditional split to export update data.
  • if necessary, use lookup to replace some dimensional columns
  • export to fact database.

Do we really know how water moves?

Close your eyes, think about what would happen when the spray beats the shore, or the water from faucet comes into your body.
This is a simplest animation showing how water changes its speed after it counters a wall.

img

The color is speed of water and the water is from left side with 1m/s. Can anyone has ability to simulate it in our brain? I guess its super hard unless you see thousands of similar pictures like this.
So that if the simulation becomes much complex than this one like I mentioned before the spray beats the shore, I guess there will be a big gap between our imaginations and real situations.

Learn Django with me(part 3)

Handle view and templates

View consist of a set of functions which handle the different url request with the specific url pettarns.
And it returns either of HttpResponse or Http404.

Firstly, let’s update webapp/views.py:

from django.shortcuts import render
from .models import Question

def index(request):
# get the lastest 5 questions
latest_question_list = Question.objects.order_by('-pub_date')[:5]
# create context
context = {'latest_question_list': latest_question_list}
# a shortcuts for render request by template
return render(request, 'webapp/index.html', context)

def detail(request, question_id):
question = get_object_or_404(Question, pk=question_id)
return render(request, 'webapp/detail.html', {'question': question})

def results(request, question_id):
response = "You're looking at the results of question %s."
return HttpResponse(response % question_id)

def vote(request, question_id):
return HttpResponse("You're voting on question %s." % question_id)

Here we used template webapp/index.html, which locates in webapp/tempaltes/webapp/index.html. So let’s create a folder templates and its subfolder webapp, the code of index.html:

# list of all items from question object
{% if latest_question_list %}
<ul>
    {% for question in latest_question_list %}
# webapp is the namespace, detail is the name
    <li><a href="{% url 'webapp:detail' question.id %}">{{ question.question_text }}</a></li>
{% endfor %}</ul>
{% else %}

No polls are available.

{% endif %}

The trick point is when we refer to the details, we use {% url 'webapp:detail' question.id %} instand of absolute path. Here webapp is the name space, detail is the name, all can be found in updated webapp/urls.py:

from django.urls import path

from . import views

# namespace
app_name = 'webapp'
urlpatterns = [
# ex: /webapp/
path('', views.index, name='index'),
# ex: /webapp/5/
path('<int:question_id>/', views.detail, name='detail'),
# ex: /webapp/5/results/
path('<int:question_id>/results/', views.results, name='results'),
# ex: /webapp/5/vote/
path('<int:question_id>/vote/', views.vote, name='vote'),
]
```</int:question_id></int:question_id></int:question_id>

Similar to the template `index.html`, we should add the template `webapp/tempaltes/webapp/detail.html`:
``` html
<h1>{{ question.question_text }}</h1>
<ul>
{% for choice in question.choice_set.all %}
    <li>{{ choice.choice_text }}</li>
{% endfor %}</ul>

All the dynamic codes in html are easy to understand, I wouldn’t waste time to explain them.

Now, you can access http://localhost:8000/webapp/ to display the reuslts.The whole process can be described like this:
1. send request to server
2. Djongo pastes the url by ROOT_URLCONF = 'mysite.urls' in mysite/settings.py, which points to mysite.urls.
3. In term of urlpatterns in mysite/urls.py, the request will be transfered to webapp folder.
4. The request can be handled by webapp/urls.py, which points to the different functions in webapp/views.py. Here the second param in the function results is from request pattern.
5. View pastes and handle the request, then retrieves template in template/webapp/.
6. HttpResponse or Http404 back to client

In a nutshell, urls.py handles url patterns and sends request to views.py, views.py calls model.py and templates to send response back.

Get Rid of ETL , Move to Spark.

ETL is the most common tool in the process of building EDW, of course the first step in data integration. As big data emerging, we would find more and more customer starting using hadoop and spark. Personally, I agree the idea that spark will replace most ETL tools.

Background

  • Business Intelligence -> big data
  • Data warehouse -> data lake
  • Applications -> Micro services

ETL hell

  • Data getting out of sync, each copy is a risk.
  • Performance issues and waste of server resource(peek Performance), although ETL can do limited parallel work.
  • Plain-text code in hidden stages(VB or java typical)
  • CSV files are not type safe
  • all or nothing approach in batch jobs.
  • legacy code

Spark for ETL

  • parallel processing in build in
  • using steaming to parallel ETL
  • Hadoop which is data source, we don’t need copy and reduce risk
  • just one code(scala or python)
  • Machine learning included
  • security, unit testing, Performance measurement , excepting handling, monitoring

Code Demo

  1. Simple one
spark.read.json("/sourcepath") #extract
.filter(...)   # Transform and blew
.agg(...)
.write.mode("append")  # Load
.parquet("/outputpath")

2.Steam

# @param1: master
# @param2: appname
sc = SparkContext("local[2]", "NetworkWordCount")
# @param1: spark context
# @param2: seconds
ssc = StreamingContext(sc, 1)
steam = ssc.textFileStream("path")
# do transform
# do load
ssc.start()
ssc.awaitTermination()

reference:
1. https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html
2.https://databricks.com/session/get-rid-of-traditional-etl-move-to-spark
3.https://www.slideshare.net/databricks/building-robust-etl-pipelines-with-apache-spark