Columnstore index for MS SQL SERVER

Columnstore is the most popular storage tech within big data. We must have already heard parquet, delta lake. They are both columnstore format which brings 10x times compress ratio and super faster query speed to analytic work. SQL server, one of fastest evolving relation database, also provides columnstore index with multiple optimizations.

Loading into a clustered columnstore index
Figure 1: Columnstore index compress the data by column oriented. And optimized by deltastore. It can reach 10x times compression and 100x times query speed.

SQL server provides clustered and non-clustered columnstore index. Delta store is a clustered B-tree index used only with columnstore index automatically. It stores rows until the number of rows reaches a threshold(~1048576 rows) them moved data into columnsotre, and set state from Open to Closed. I will show you at the end of this article.

Clustered columnstore index

  • Primary storage method for the entire table
  • All columns are included, there is no keys.
  • can only combined with non-clustered B-tree index to speed up
    • queries that search for specific values or small ranges of values.
    • updates and deletes of specific rows
  • usually for fact table or large dimension tables

Non-clustered columnstore index

  • We can indicate which columns to be indexed, usually for frequently used columns.
  • requires extra storage to store a copy of columns in the index(~10%)
  • can be combined with other index.

How to choose columnstore index?

Microsoft already provided the conventions on his document.

Figure 2: Choose the best columnstore index for your needs

I would recommend use columnstored index for most of OLAP work, as we need fast query without much delete/update tasks.

How to delete large size of data from columnstore table

Although Microsoft doesn’t suggest us to delete more than 10% data from columnstore table, there is still have chance we have to. In this case, I summarized some of my experience.

| Delete from columnstore is a soft delete

If you tried to delete rows from columnstore table, you will not actually delete the data, but sql server will mark this row as deleted.

You can run sql blew to find out the number of delete rows and deltastore. Once there are too many row marked “delete”, then you have to rebuild/reorganize the columnstore table. Remember , it is not like Optimize clause in deltalake, which is more like bin-packing for small files.

SELECT
       tables.name AS table_name,
       indexes.name AS index_name,
       partitions.partition_number,
       column_store_row_groups.row_group_id,
       column_store_row_groups.state_description,
       column_store_row_groups.total_rows,
       column_store_row_groups.size_in_bytes,
       column_store_row_groups.deleted_rows,
       internal_partitions.partition_id,
       internal_partitions.internal_object_type_desc,
       internal_partitions.rows
FROM sys.column_store_row_groups
INNER JOIN sys.indexes
ON indexes.index_id = column_store_row_groups.index_id
AND indexes.object_id = column_store_row_groups.object_id
INNER JOIN sys.tables
ON tables.object_id = indexes.object_id
INNER JOIN sys.partitions
ON partitions.partition_number = column_store_row_groups.partition_number
AND partitions.index_id = indexes.index_id
AND partitions.object_id = tables.object_id
LEFT JOIN sys.internal_partitions
ON internal_partitions.object_id = tables.object_id
AND column_store_row_groups.deleted_rows > 0
WHERE tables.name = 'table_name'
Figure 3: One delta group is open, and there is no delete rows

| Steps to delete large size of data in columnstored table

1. delete non-clustered B-tree index( after delete operation, rebuild it if needed)

If we run execution plan for bulk delete, you will find B-tree index related operations spend most of time rather than columnsotre index.

Figure 4: B-tree index related operations spend 90% time.

2. delete by a small batch

One of the down side of Azure SQL is we can not set log as simple. So it will take lots of transaction time when we try to delete a large table. The work around is delete by a small batch to make each log transaction smaller and quicker. Blew I gave an example to delete data by chunksize = 1000000.

deleteMore:
delete top(1000000) from table_name
where id%2=0
IF @@ROWCOUNT != 0
begin
       print current_timestamp
    goto deleteMore
end

3. rebuild/reorganizing columnstored index if needed

Sometimes, we have to rebuild/reorganizing index if we delete too many data and it affects performance of query. here I give a snippet to show how to archive it.

-- rebuild column stored index
alter index indexname on table_name rebuild/reorgnize
alter index all on tablename rebuild/reorgnize

-- check fragment
SELECT a.object_id, object_name(a.object_id) AS TableName,
    a.index_id, name AS IndedxName, avg_fragmentation_in_percent
FROM sys.dm_db_index_physical_stats
    (DB_ID (N'schema_name')
        , OBJECT_ID(N'table_name')
        , NULL
        , NULL
        , NULL) AS a
INNER JOIN sys.indexes AS b
    ON a.object_id = b.object_id
    AND a.index_id = b.index_id;
GO

Reference:

Choose the best columnstore index for your needs. https://docs.microsoft.com/en-us/sql/relational-databases/indexes/columnstore-indexes-design-guidance?view=sql-server-ver15#choose-the-best-columnstore-index-for-your-needs

Columnstore indexes – Query performance. https://docs.microsoft.com/en-us/sql/relational-databases/indexes/columnstore-indexes-query-performance?view=sql-server-ver15

How to efficiently delete rows while NOT using Truncate Table in a 500,000+ rows table. https://stackoverflow.com/questions/11230225/how-to-efficiently-delete-rows-while-not-using-truncate-table-in-a-500-000-rows

Hands-On with Columnstore Indexes: Part 1 Architecture. https://www.red-gate.com/simple-talk/sql/sql-development/hands-on-with-columnstore-indexes-part-1-architecture/

Enable GPU Accelerate in WSL2 to support AI frameworks

Since Microsoft upgraded WSL to version 2, it introduced full Linux kernel and full VM manage features. Except the performance benefit through deep integration with windows, WSL2 allows installing additional powerful apps like docker and upgrading Linux kernel anytime when it is available.

Two months ago, Microsoft with NVIDIA brought GPU acceleration to WSL2. This new feature made me exciting, so that we don’t have to train our models on a separated Linux machine or install dual OS startup.

Figure 1: Stack image showing layers involved while running AI frameworks in WSL 2 containers. The container provides integration with CUDA related components. WSL2 communicates with windows host through GPU paravirtualization protocol

Before I start, I did some search about basic ideas of virtualization and WSL2 GPU. It is good for me to understand how GPU paravirtualization works in WSL2.

Types of virtualization

Figure 2: four major types of virtualization
  • Full virtualization. In full virtualization, there is almost a complete model of the underlying physical system resources that allows any and all installed software to run without modification. There are two types of full virtualization.
software assisted full virtualization( binary translation). like VMware workstation(32bit), virtual PC, VirtualBox(32 bits). issue: low performance
hardware- assisted full virtualization. eliminates the binary translation and directly interrupts with hardware ( intel VT-x and AMD-V). like , KVM, VMware ESX, Hyper-V, Xen. issue: virtual context execute privileged instruction directly on the processor.
  • Paravirtualization. Paravirtualization (PV) is an enhancement of virtualization technology in which a guest operating system (guest OS) is modified prior to installation inside a virtual machine (VM) in order to allow all guest OS within the system to share resources and successfully collaborate, rather than attempt to emulate an entire hardware environment. so the guests aware that it has been virtualized.  products like Xen, IBM LPAR, Oracle VM for X86
Xen supports both Full virtualization and Para-virtualization
  • Hybrid virtualization(hardware virtualized with PV drivers). virtual machine uses PV for specific hardware drivers(like I/O), and the host use full virtualization for other features. products like Oracle VM for x86, Xen. 
VMware paravirtual with hardware full virtualization
  • OS level Virtualization. aka containerization. No overhead . Products like docker, Linux LCX, AIX WPAR
The difference between VM and container

Except containerization, all virtualization use hypervisor to communicate with the host. We can take a look how hypervisor works blew.

  • Hypervisor
    • Emulation. (software full virtualization)
      • emulate a certain piece of hardware which guest VM can only see.
      • expense of performance since “common lowest” denominator
      • need to translate instruction
      • wide compatibility
    • Paravirtualization
      • only support certain hardware in certain configurations.
      • Direct hardware access is possible
      • Compatibility is limited
    • hardware pass-through(hardware full virtualization)
      • native performance, but need proper drivers for the real physical hardware
      • hardware specific images
      • GPU supported

GPU Virtualization on Windows

How it works on WSL

  • a new kernel driver “dxgkrnl” which expoes “/dev/dxg” device to user mode.
  • /dev/dxg mimic the native WDDM D3DKMT kernel service layer on Windows.
  • dxgkrnl communicate with its big brother on Windows through VM Bus WDDM paravirtualization protocol.
Figure 3: there is no partitioning of resources between Linux and Windows or limit on Linux application

DxCore & D3D12 on Linux

  • libd3d12.so is compiled from the same source code as d3d12.dll on windows
  • except Present() function, all others are same with windows. 
  • libxcore(DxCore) is a simplified version of dxgi
  • GPU manufacturer partners provide UMD(user mode driver) for Linux
Figure 4: D3D12 builds upon the /dev/dxg device

DirectML and AI Training

  • DirectML sits on top of D3D12 API, provides a a collection of compute compute operations.
  • Tensorflow with an integrated DirectML backend.
Figure 5: DirectML provides beginner a basic ML framework

OpenGL, OpenCL & Vulkan

  • Mesa library is the mapping layer which bring hardware acceleration for OpenCL , OpenGL
  • vulkan is not supported right now.
Figure 6: WSL2 only support OpenGL and OpenCL right now.

Nvidia CUDA

  • a version of CUDA taht directly targets WDDM 2.9 abstraction exposed by /dev/dxg. 
  • libcuda.so enables CUDA-X libaries such as cuDNN, cuBLAS, TensorRT.
  • available on any glibc-based WSL distro
Figure 7: NVIDIA-docker tolls available ( NVIDIA container toolkit), which provides us container like plugin and usage experience.

GPU container in WSL

  • libnvidia-container libarary is able to detect the presence of libdxcore.so at runtime and uses it to detect all the GPUs exposed to this interface.
  • driver store is a folder that containers all driver librarians for both Linux and Windows
Figure 8: NVIDIA docker provides NVIDIA container toolkits along with lots of good images.

GUI Application is still under developing.

How to enable GPU Acceleration in WSL

for the detail step, we can refer https://docs.nvidia.com/cuda/wsl-user-guide/index.html. Here I brief some keypoints:

  1. Windows version: 20150 or above (Dev Channel)
  2. Enable WSL 2
  3. Install Ubuntu On WSL
  4. Install Windows Terminal
  5. Upgrade kernel to 4.19.121 or higher
  6. NVIDA DRIVERS FOR CUDA ON WSL  https://developer.nvidia.com/cuda/wsl/download
  7. Install docker in WSL:  
    • curl https://get.docker.com | sh
    • You can see vmmen process on your windows task manger. It is the process for virtual machine in wsl2
  8. Install Nvidia Container Toolkit( nvidia-docker2)
Figure 9: docker in WSL2 with NIVIDA container toolkit

9. Start A TensorFlow Container

# test for docker
docker run --gpus all nvcr.io/nvidia/k8s/cuda-sample:nbody nbody -gpu -benchmark
# pull tersorflow image and run it
docker run -it --gpus all -p 8888:8888 tensorflow/tensorflow:latest-gpu-py3-jupyter

After you pull tersoflow image, and run it. You can see following instruction:

Figure 10: replace 127.0.0.1 to localhost, and open this URL on your browser then we can use GPU acceleration in our WSL2

Reference:

Para virtualization vs Full virtualization vs Hardware assisted Virtualization, https://www.unixarena.com/2017/12/para-virtualization-full-virtualization-hardware-assisted-virtualization.html/

Emulation, paravirtualization, and pass-through: what you need to know for client hypervisors, https://searchvirtualdesktop.techtarget.com/opinion/Emulation-paravirtualization-and-pass-through-what-you-need-to-know-for-client-hypervisors

DirectX is coming to the Windows Subsystem for Linux, https://devblogs.microsoft.com/directx/directx-heart-linux/

NVIDIA Container Toolkit, https://github.com/NVIDIA/nvidia-docker

CUDA on WSL User Guide, https://docs.nvidia.com/cuda/wsl-user-guide/index.html

NVIDIA Drivers for CUDA on WSL, https://developer.nvidia.com/cuda/wsl/download

Tensorflow image on Docker, https://www.tensorflow.org/install/docker

Run your Linux on Android phone

As Samsung canceled its “Linux on Dex” project, there was only option to run Linux on SBC like raspberry pi, but that leave us ugly design and lack of flexibility. Why shouldn’t we use our phone to run Linux directly? So there upcoming two choices:

  1. Native phone installed Linux, eg. pinephone. Its not expensive, and very cool. You can buy it here. The downside is there are not many apps you can install…
  2. Run Linux on Android phone. So that you can install android app as well as Linux environment. All your need is three apps:
  • bNCV Free: connect to Linux GUI.
  • Termux: terminator to install Linux commands
  • AndroNix: download Linux Distro.

I was using my Samsung S8, which provides big screen projection through Dex. Dex is still an android desktop, but pretty like windows. Then I follow the steps mentioned on this YouTube.

After install Linux (I chose Ubuntu KDE), and connect to it. You can do whatever in it. Blew is my screenshot after I installed visual code.

Visual Code for ARM version

Let’s summary:

Pros:

  1. Flexibility. You don’t have to bring your laptop, but only have to find a monitor or TV.
  2. Apps. Run android and Linux apps at single phone.
  3. Very Geek.

Corns:

  1. It’s better you have a high end phone, like S20. My s8 still slow sometimes as its 3GB memory.
  2. Not native Linux. I feel its working pattern pretty like WSL on windows. Performance may be an issue.

Let Linux make your daily life easier

If you work on Windows during your most time, you must be curious about why should I shift to Linux and why suddenly Linux becomes so popular in the recently decade, even Microsoft migrated Linux kernel into latest windows version named WSL/WSL2 and run its 70% cloud service on Linux including SQL server.

Why should we use Linux?

Although Windows is still the dominate leader especially in the desktop market, more and more professional industries are moving to Linux. The emerging areas like loT device, cloud service, smart phone are all using Linux. In my points, there are several things Linux significantly better than Windows: Reliability, Software management, Hardware compatibility and Customization.

Reliability: How many times you meet blue screen and reboot on windows? Install an application —> reboot; uninstall —> reboot; change a setting —> reboot; even security upgrade will cause a reboot. On Linux, you will never worry about it. It significantly reduces downtime of server.

Software management. On windows, “Click-Installation” maybe an easy way for a normal user, but it would never be a good way for programmer or server admin. Linux adopts a repository methodology, pretty like “Android store”. We only need to run a very short script like “sudo apt install <appname>” or “sudo snap install <appname>”. Even more, we can put all scripts together as a bash, then we can leave it alone and have a cup of coffee.

Hardware compatibility. So many smart devices coming out these years. They have weaker computational ability compared to the desktop or laptop, but we can put them anywhere and let them control home security, TV, freezer or washer. You can not install windows on these device since it is too “fat”. Linux is a good choice and has a nice compatibility with ARM device.

Customization. Not everyone like flexibility. My wife, for example, likes out of box using. So I would never recommend Linux for her. but for the most geek or tech worker, Linux could provide varies distros and desktop environments. In each environment, you can also deeply customize in term of your needs.

How to set Linux for your daily life?

First thing first, you need to choose Linux distro. Linux has hundreds distros, and coming out many branches each year. but most of them are child-parent relationship. Debian, Red-hat and Arch are three most popular families. Here we look at Debian and Arch family. For the daily life, we need to use stable and easy to use one. Debian, Ubuntu, Mint Linux are all good in Debian family. Manjaro, Arch are the most popular in Arch family. Beside these, you can also find Android and ChromeOS in the left distribution map. But they are only designed for mobile or special device.

If you are beginner for Linux and want to get wiki answer quickly. Ubuntu LTS would be a good choice. In end of April, 2020, Ubuntu 20.04 LTS is going to launch.

In the following steps, I will use Ubuntu for example by default.

Second, How to install application. As I mentioned before, Linux use Repository to download and install application. Basically, Repository is like a store where you can find a list of goods(application) in it. For different Linux distros family, you would use different repositories. In Ubuntu, you can run following simple script to install application.

# update repository as root
sudo apt update
# install application
sudo apt-get install <app name>
# find the install path of application
whereis <app name>

There is another popular repository called snapcraft which is my favor. Install snapcraft is super easy: https://snapcraft.io/docs/installing-snapd. In Ubuntu, you only need to run:

sudo apt update
sudo apt install snapd 

Then in the page of application page in snapcraft, you can copy the installation code from there. I took Visual Studio Code for example blew.

“sudo snap install code -classic” is the installation script.

If you feel even script is complex, you can install snap store, which provides you “Android store” like GUI and click-install experience.

sudo snap install snap-store
Categories in Snap store

In some case, you can only find the application on the official website, like anaconda, Microsoft Team. What should we do?

For deb file, just click and install, this is the package for Debian family. If failed, use script below.

sudo dpkg -i <file name.deb>

For shell file end with “.sh” like anaconda.

# download package
wget <download URL>
# add execution mode
chmode +x <file name.sh>
# execute file
sudo ./<file name.sh>

Most applications you can find Linux version, they are some alternatives in Linux.

Windows Application NameAlternative
Google Driveinsync
EvernoteNixNote2
MS Office 365LibreOffice or office online
check point VPNSNX
my workplace

Third, Command Line. Terminal is the most important tool in Linux. Once you familiar with it, you would never use your mice. Since terminal saves you lot of time. Here I list some common commands. For more information you can check: https://files.fosswire.com/2007/08/fwunixref.pdf.

# check command meaning
man <command name>

# file operation
ls # listing files and dir
cd <path> # change to the path
pwd # show current dir
mkdir <dir> # create dir
rm <file> # delete file
rm -r <dir> # delete dir recursively
cp -r <dir1> <dir2> # copy dir1 to dir2
touch <file> # create or update file
cat > <file> # places standard input into file
more <file> output contents of file

# file permission
chmod # change file mode
chown # change owner of file

# ssh
ssh user@host

# system info
top/htop # task monitor
whereis app # app location
df # disk usage
w # who is online

# compression
tar cf file.tar files # create tar file
tar xzf file.tar.gz  # extract a tar using Gzip
tar xf file.tar # extract a tar file

# download
wget

# search 
grep <pattern>

# split terminal
tmux

# schedule
crontab

# back up 
sudo dd if=/dev/sda1 of=/media/c/back.img

What else benefit can Linux bring for you?

For big data engineer, the most popular foundation is Apache software, which provides many excellent projects, like spark, hadoop, Kafka, Maven,Tomcat. All of these are natively support Linux. And they recommend using Linux as well.

For application developer, python, java, C# have already cross platform. You can choose whatever OS you like. As I said before, you won’t meet the problems of windows in Linux, which saves you lots of time.

For administrator, Linux provides sable and flexible solutions. You would send out mails less that explains downtime for some upgrading maintenance.

Who is not recommended moving to Linux .

If you are very sticking to some software like Adobe suit, you need to leave Linux away.

Data Factory CI/CD in Azure DevOps

Azure Pipeline is consist of two parts: pipeline and release. They represent
CI and CD separately.

  1. Build Pipeline – to build and test the code. The build creates an artifact that’s used by the rest of your pipeline to run tasks such as deploying to staging or production. 
  2. Release Pipeline – once the code is updated, built and packaged, it can be deployed to target services using Release Pipelines. 
top half: CI pipeline; down half: CD pipeline

Let’s talk about how to implement this process for Azure data factory. Before you start, I suppose your current ADF has matched these requirements (if not, please refer to here):

  • You already have a Azure Repos.
  • ADF is integrated with this Git Repos.
  • A key Vault. which is used for storing database and datalake connection information, plus all configuration parameters for release.

The whole ADF CI/CD pipeline is like this:

Building Pipeline (CI)

  • Create new pipeline, choose “adf_publish” for default branch
    • “adf_publish” branch is created by ADF automatically, after you click “publish” in your ADF GUI.
the three json files are used to set parameters in the different environments
  • Add new Agent job
  • search for “Data factory”, add a new publish artifacts
    • set the path to publish(git path)
    • set artifact publish location as Azure pipelines

Release Pipeline (CD)

left part: CI right part: CD

So next step is deploying artifact into three environments: DEV, QA and PROD.

use key vault to set sQL Server in pipeline
  • Import Azure Key Vault. where you stored all the connection information. We would use these information in two areas.
    • Release pipeline: get the basic information in each environment, e.g, database name, data lake name, etc. see the screenshot above.
    • template parameters used in adf_publish.
  • Set the Datafactory.
    • Add a “Azure Resource group deployment”. where we need to focus one the template path.
we have to set the ARM template and parameters which point to adf_publish branch
  • How to set up adf_publish
    • go back to azure repos.
    • switch to adf_publish.
    • create corresponding ARM parameter template for each environment. You can find an example blew.
the three json files are used to set parameters in the different environments
{
	"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
	"contentVersion": "1.0.0.0",
	"parameters": {
		"factoryName": {
			"value": "datafactory name"
		},
		"LS_AKV_KeyVault_properties_typeProperties_baseUrl": {
			"value": "https://XXXXkeyvault.vault.azure.net/"
		},
		"LS_SQL_ConfigDb_properties_typeProperties_connectionString_secretName": {
			"value": "sqlconnection"
		},
		"LS_DLS_Datalake_properties_typeProperties_url": {
			"value": "https://XXXXXdatalake.dfs.core.windows.net/"
		}
	}
}

If we need enable Continuous deployment trigger , we have to link with adf_publish branch as well.

click the little logo in release pipeline page.
set to adf_publish

Reference:

Azure DevOps Pipelines. https://docs.microsoft.com/en-us/azure/devops/pipelines/?view=azure-devops

Continuous integration and delivery in Azure Data Factory. https://docs.microsoft.com/en-us/azure/data-factory/continuous-integration-deployment

Use Azure Key Vault to pass secure parameter value during deployment. https://docs.microsoft.com/en-us/azure/azure-resource-manager/templates/key-vault-parameter?tabs=azure-cli

Apache Kafka in Practice – 1

First thing first, I should remind all visitors I am not a master in Kafka. Actually I am just a beginner learning through official Apache Kafka website and some free udemy class. There might be some mistakes although, I will fix them once I find.

I put this as Kafka in practice as I want to introduce some basic skills such as launching a Kafka service, creating a producer also consumer, UI tools and basic python library.

Install and Launch Kafka

Kafka is based on Java and Zookeeper( which is used to manage cluster). I recommend to use Linux or WSL on windows.

sudo apt update
sudo apt install openjdk-8-jdk

# test 
java -version

Download kafka, and unzip it

# download from website
wget <URL of kafka.tgz>

# unzip
tar -xvf kafka.****.tgz
cd <kafka_folder>

# test, run following
bin/kafka-topics.sh

Add command to path(optional)

# need to restart after these steps
nano ~/.bashrc
add this to the end:
PATH="$PATH:/home/<your name>/<kafka folder>/bin"

# check under any folder
kafka-topics.sh

Start Zookeeper

# change zookeeper data dictionary
mkdir data
mkdir data/zookeeper
nano config/zookeeper.properties
# add blew into zookeeper.properties:
dataDir=/home/name/data/zookeeper

# run zookeeper:
zookeeper-server-start.sh config/zookeeper.properties

Start a kafaka broker

mkdir data/kafka
nano config/server.properties

# modify server.properties to change log folder:
log.dirs=/home/name/data/kafka

# start kafka
kafka-server-start.sh config/server.properties

Right now you should have a zookeeper and a broker running. Then let’s do some operations about topic, producer and consumer.

Topic operations

# create a topic
# you can not have replication factor greater than available brokers
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --create --partitions 3 --replication-factor 1


# list all topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe


# delete topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic second_topic --delete

# recover in case broker starting failed after deleting topic
1. delete topic under broker log folder, which you can find log.dirs in server.property
2. start zookeeper
3. enter zookeeper shell, zookeeper-shell.sh host:port
    3.1 list the topics using: ls /brokers/topics
    3.2 remove topic:rmr /brokers/topics/yourtopic
4. restart kafka server

Create a console producer

# console producer
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic

# add some properties
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --producer-property acks=all

# if you set a topic that not exists, the kafka will create a new topic, with one partition and one replication by default. we can change it in config/server.properties

Create a console consumer and Set consumer groups

# read from now on
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic
# read all messages
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning

# set consumers in group
# all the message sending to this group will be split into all consumers
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group group1

# consumer groups
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group group1
# Lag: show how many message has not received yet
# reset offset ---> offset descide by consumer
# --to-earliest / --shift-by n [offsets]
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092  --group group1 --reset-offsets --to-earliest --execute  --topic first_topic
# when a consumer leaves, reblance will happen

Add Keys to producer

# keys
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --property parse.key=true --property key.separator=,
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --property print.key=true --property key.separator=,

UI Tools

kafka Tool is easy to use UI tool for manage topics, brokers, consumers. You can install it on Linux or windows. If you use WSL on windows, the zookeeper port is also open for windows.

# how to use in Linux
wget http://www.kafkatool.com/download2/kafkatool.sh
chmod +x kafkatool.sh
# after installation
cd ~/kafkatool2/
./kafkatool

Developing in Python

Kafka-python provides common functions for kafka. we can find more information here.

Consumer API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# basic
from kafka import KafkaConsumer
consumer = KafkaConsumer('first_topic',bootstrap_servers=['localhost:9092'],auto_offset_reset='earliest')
for msg in consumer:
    print (msg.value)

# other key parameters
# group_id, key_deserializer, value_deserializer,auto_offset_reset[earliest, latest,None]

# assign and seek offset
consumer.assign(partitions)
consumer.seek(partitions, offset) #Manually specify the fetch offset for a TopicPartition.
consumer.assignment() # read assigned partition
beginning_offsets(partitions)
end_offset(partitions)

Producer API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# basic
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],acks = 1)

for _ in range(10):
    future = producer.send('test-topic', b'some_message_bytes')
    # or add key to fix the partitions by ordering
    future = producer.send('test-topic', key = 'key', value = b'some_message_bytes')
    # result = future.get(timeout=60)  # do not block, it will kill performance
    # print(result)
# producer.flush()  # Block until all pending messages are at least put on the network
producer.close()  # make sure to .close() your producer before shutting down your application

Serialization and Deserialization

  • Serialization: the process of converting an object into a stream of bytes for the purpose of transmission
  • Deserialization: the opposite of Serialization
# Serialize json messages
import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('fizzbuzz', {'foo': 'bar'})

# Deserialize msgpack-encoded values
consumer = KafkaConsumer(value_deserializer=msgpack.loads)  # same as key_deserializer
consumer.subscribe(['msgpackfoo'])
for msg in consumer:
     assert isinstance(msg.value, dict)

# Serialize string keys
producer = KafkaProducer(key_serializer=str.encode)
producer.send('flipflap', key='ping', value=b'1234')

# Compress messages
producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
     producer.send('foobar', b'msg %d' % i)

Thread

Producer is thread safe, however, consumer is not. recommend use multiprocessing.

Client Compatibility

Always use the latest client library version, since older client/newer client can talk to any broker.

Reference:

kafka-python: https://github.com/dpkp/kafka-python

Apache Kafka Series - Learn Apache Kafka for Beginners v2

Apache Kafka Series – Learn Apache Kafka for Beginners v2

Apache Kafka Website, https://kafka.apache.org/

Apache Kafka Concepts and Theory.

It’s a little bit late to talk about Kafka, since this technology has been widely used for a long time. These days, I finally has time to learn it ans summary the major concepts inside. In this nutshell, I will split the page into three parts: why do we need it, basic concepts and how it works.

Why do we need Kafka?

Related image
before we use kafka

As we have more and more systems merged into a big network structure. and each system has their own protocols and communication methods. we have to take more time and endure the super complex structure. At the end, the cost will be raised as well.

Kafka as a central messaging bus

Kafka provides a central messaging bus with distributed, resilient and fault tolerant. Each source and target connects to kafka cluster as producers and consumers. They use same protocols to send and receive the messages(key-value and timestamp). We can seem it as a hybrid solution combining message system with distributed retention.

Concepts

Producers publish the data into kafka, it’s source of the data. which could be sensors, laptop, loT, logs etc.

  • producers choose which records to assign to which partition within the topic ( topic is stream of data, like table in database; partition is part of topic. we will talk about them soon)
  • Producers use round-robin or function to choose partition to archive load balance between nodes(brokers) in the cluster.
    • targetPartition=utils.abs(utils.murmur2(record.key()))%numPartitions
    • use murmur2 algorithm

Brokers are nodes/servers in the cluster. They are key features for kafka features like fault tolerance, high performance.

Thumbnail
  • broker identified by ID
  • each brokers contains certain topic partitions
  • connect any broker = connect to the entire cluster
  • typically 3 or more brokers to achieve redundancy
  • message in a topic are spread across partitions in difference brokers, each partition can be replicated aross multiple brokers

Consumers subscribe the message from kafka cluster.

  • consumer can be a single receiver or a distributed cluster named consumer group
  • a consumer group can read from topic in parallel

Topics are a particular stream of data which identified by its name. Each topic is split into partitions. We can set partition number and replication factor for each topic. Partition is ordered immutable sequence of records(messages) with their id named offset.

  • offset’s order is only guaranteed in one partition
  • data is kept only for a limited time(default is one week).
  • partition is immutable
  • consumer can choose any offset as start reading point, so that each consumer wouldn’t impact others.

Connector is used to connect between topics and app/database. A Source Connector (with help of Source Tasks) is responsible for getting data into kafka while a Sink Connector (with help of Sink Tasks) is responsible for getting data out of Kafka.

Image result for kafka connector
  • stream an entire sql database to kafka
  • stream kafka topics into hdfs
  • recommend to leverage build-in connectors

How does it work?

Producer to kafka

  • Producer chooses topic and partition to send the inbound message into kafka.
  • Topic splits into several partitions. Each partition has one broker as leader and zero or more brokers as follower named ISR (in sync replica) . Only the leader can receive and serve data for partitions. Other brokers will sync the data from the leader.
  • A topic with replication factor N, it can tolerate up to N-1 server failures.

kafka to consumers

  • As we mentioned ahead, consumer can be single node or consumer groups.
  • if consumer number is greater than partitions number, some consumers will be idle. 
  • if consumer number is less than partitions number, some consumers will receive messages from multiple partitions. 
  • if consumer number is equal to partitions number, each consumer reads messages in order from exactly one partition

Data consistency and availability

  • Partition
    • Messages sent to a topic partition will be appended to the commit log in the order they are sent
    • a single consumer instance will see messages in the order they appear in the log (message order only guaranteed in a partition)
    • a message is ‘committed’ when all in sync replicas have applied it to their log
    • any committed message will not be lost, as long as at least one in sync replica is alive.
  • Producer options
    • wait for all in sync replicas to acknowledge the message
    • wait for only the leader to acknowledge the message
    • do not wait for acknowledgement
  • consumer options
    • receive each message at most once
      • restart from the next offset without ever having processed the message
      • potentially message loss
    • receive each message at least once
      • restart and process message again. duplicate messages in downstream,
      • no data loss
      • ( recommended, downside stream handles duplicate message)
    • receive each message exactly once
      • transitional level
      • re read the last transaction committed
      • no data loss and no data duplication
      • significantly decreasing the throughput using a transaction

Architecture

Message structure

Fundamental data flow

Kafka in Azure through HDinsight

Reference

Apache Kafka official website. https://kafka.apache.org/intro.

Kafka in a Nutshell. https://sookocheff.com/post/kafka/kafka-in-a-nutshell/

Apache Kafka Series – Learn Apache Kafka for Beginners v2, https://tpl.udemy.com/course/apache-kafka/learn/lecture/11566878?start=180#overview

What is Apache Kafka in Azure HDInsight, https://docs.microsoft.com/en-us/azure/hdinsight/kafka/apache-kafka-introduction

Eiganvectors from Eiganvalues

This is a unbelievable discovery from PETER B. DENTON, STEPHEN J. PARKE, TERENCE TAO, AND XINING ZHANG. The original paper you can find here. In a short nut, we can get eiganvector through eiganvalues only. This magic formular is like blew:

We only need to know eiganvalues in original matrix then we can calculate its eiganvectors through sub-matrix.

Currently, I don’t know or image what will effect on the road, but the eiganX is the basic for AI, dimension reduction, feature extraction, etc. It also may help us improve the speed to get eiganvector if we need to incrementally add data on a known matrix.

I wrote a very simple python script to prove this formula(surely, it is correct). I think it can archive by GPU as well.

import numpy as np
from numpy import linalg as LA

matrix_size = 6
org_x = np.diag(range(1,matrix_size+1))
w, v = LA.eig(org_x)


print("orgnal matriax: \n %s \n" % org_x)
print("eigan values: \n %s \n" % w)
print("normalized eigenvectors: \n %s \n" % v)

print("START NEW ALGORITHM \n")

result=[]
for _ in range(matrix_size):
    result.append(0)

for n in range(matrix_size):
    for j in range(matrix_size):
        sub_x = np.delete(org_x,j,axis=0)
        sub_x = np.delete(sub_x,j,axis=1)
        w1,v1 = LA.eig(sub_x)

        # in term of new formula to get orignal matrix eigenvecotr through eiganvalue
        numberator = 1
        denominator = 1
        for i in range(matrix_size-1):
            temp_n = w[n] - w1[i]
            numberator = numberator*temp_n

        for i in range(matrix_size):
            if(i!=n):
                temp_d = w[n] - w[i]
                denominator = denominator*temp_d
        result[j] = numberator/denominator

    print("%s \n" % result)

result:

orgnal matriax:
 [[1 0 0 0 0 0]
 [0 2 0 0 0 0]
 [0 0 3 0 0 0]
 [0 0 0 4 0 0]
 [0 0 0 0 5 0]
 [0 0 0 0 0 6]]

eigan values:
 [1. 2. 3. 4. 5. 6.]

normalized eigenvectors:
 [[1. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0.]
 [0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 1.]]

START NEW ALGORITHM

[1.0, -0.0, -0.0, -0.0, -0.0, -0.0]

[0.0, 1.0, -0.0, -0.0, -0.0, -0.0]

[0.0, 0.0, 1.0, -0.0, -0.0, -0.0]

[0.0, 0.0, 0.0, 1.0, -0.0, -0.0]

[0.0, 0.0, 0.0, 0.0, 1.0, -0.0]

[0.0, 0.0, 0.0, 0.0, 0.0, 1.0]

Reference:

EIGENVECTORS FROM EIGENVALUES, https://arxiv.org/pdf/1908.03795.pdf

Tao’s Blog, https://terrytao.wordpress.com/2019/08/13/eigenvectors-from-eigenvalues/

Simple way to create Gantt chart for Azure Devops through power BI

usually we use Gantt chart to track the project progress, take the picture blew for example, the dark part means completed, the light part means waiting for being done.

To reach this result, there are two solutions. The steps in azure devops are same for both solutions.

  • Add start date and end date for each tasks. The Admin can change the layout to make these two fields visible on edit page, but I can only change them in backlogs
  • Add Efforts information: estimate time, already done and left hours. The picture blow shows I have done 80% ‘s workload.
  • Create a view for listing all user stories and tasks. ( one time effort for extracting the data)

Power BI steps:

after imported, you will find gantt chart
  • Import data through “Get Data- More – Online Service – Azure Devops(beta)”, then choose the view you created in Devops.
  • set the Task, start date, end date, % complete for Gantt chat.