Windows package manager(winget)

Since Microsoft moves from windows to cloud in the last 10 years, he is more welcome to opensource especially Linux. I always hate to use windows expect when playing games. Every time, I have to take long time to reinstall all apps. Because not like Linux, which provides great package management tools, like dpkg, snapd, pacman, etc, windows app installation is clicky too much.

Now, there is a new tool named winget. Similar to dpkg or apt command, we only need to indicate the name of software. It will install from network. Not all software are supported right now, but the most common ones will be found there. Blew, I installed Anaconda for example. We can easy find others by using winget search <name>.

In official document, they give a piece of PowerShell script to install powertoys and terminal in one piece.

@echo off  
Echo Install Powertoys and Terminal  
REM Powertoys  
winget install Microsoft.Powertoys  
if %ERRORLEVEL% EQU 0 Echo Powertoys installed successfully.  
REM Terminal  
winget install Microsoft.WindowsTerminal  
if %ERRORLEVEL% EQU 0 Echo Terminal installed successfully.   %ERRORLEVEL%

Winget download page: https://github.com/microsoft/winget-cli/releases

Spark 3.0 new features – Learning from Dr.Kazuaki Ishizaki

Dr.Kazuaki Ishizaki gives a great summary of spark 3.0 features in his presentation “SQL Performance Improvements at a Glance in Apache Spark 3.0” . It is very helpful for us to understand how these new features work and where we can use it.

New explain format

Spark 3.0 provides a terse format explain with detail information.

EXPLAIN [ EXTENDED | CODEGEN | COST | FORMATTED ] statement

There are five formats:

  1. default. Physical plan only.
  2. extended. It equals df.explain(true) in spark 2.4, which generates parsed logical plan, analyzed logical plan , optimized logical plan and physical plan.
  3. codegen. Generates java code for the statement.
  4. code. If plan stats are available, it generates a logical plan and the states.
  5. formatted. This is most useful in my mind. It has two sections, a physical plan outline with simple tree format and node details.
-- example from spark document
-- Using Formatted
EXPLAIN FORMATTED select k, sum(v) from values (1, 2), (1, 3) t(k, v) group by k;
+----------------------------------------------------+
|                                                plan|
+----------------------------------------------------+
| == Physical Plan ==
 * HashAggregate (4)
 +- Exchange (3)
    +- * HashAggregate (2)
       +- * LocalTableScan (1)
   
   
 (1) LocalTableScan [codegen id : 1]
 Output: [k#19, v#20]
        
 (2) HashAggregate [codegen id : 1]
 Input: [k#19, v#20]
        
 (3) Exchange
 Input: [k#19, sum#24L]
        
 (4) HashAggregate [codegen id : 2]
 Input: [k#19, sum#24L]
|
+----------------------------------------------------+

As these syntax are not exactly match with spark, the following list is help to explain the “meaning of spark explain”

  • scan. Basic file access. In spark 3.0, it can achieve some predication tasks before load data in.
    • ColumnPruning: select the columns only needed.
    • Partitionfilters: only grab data from certain partitions
    • Pushedfilters: filter fields that can be directly to file scan(push down prediction)
  • filter. Due to pushdown prediction, lots of filter work has moved to scan stage, so you may not find filter in explain matching with query. But there are still some operations like first, last, we need to do it in filter.
    • Pushdown prediction.
    • Combine filters: combines two neighboring operations into one
    • Infer filter from constraints. create a new filter form a join condition. we will talk about it in next section “dynamic partitioning pruning”.
    • prune filter.
  • project. Select operation for columns, like select, drop, withColumn.
  • exchange. shuffle operation, like sortmerge, shuffle hash
  • HashAggregate. data aggregation.
  • BroadcastHashJoin & broadcastExchange. Broadcast shuffle.
  • columnarToRow. a transition between columnar and row execution.

All type of Join hints

Spark 2.4 only supports broadcast, while spark 3.0 support all type of join hints

Spark uses two types of hints, one is partition hints, other is join hints. Since spark 3.0, join hints support all type of join.

  • Broadcast join. which is famous join for joining small table(dimension table) with big table(fact table) by avoiding costly data shuffling.
    • table less than 10MB is broadcast across all nodes to avoid shuffling
    • two steps: broadcast –> hash join
    • spark.sql.autoBroadcastJoinThreshold
  • shuffle merge join
    • Sort merge join perform the Sort operation first and then merges the datasets.
    • steps:
      • shuffle. 2 big tables are partitioned as per the join keys across the partitions.
      • sort. sort the data within each partition
      • merge. join the 2 sorted and partitioned data.
    • work well when
      • two big tables as it doesn’t need load all data into memory like hash join
      • highly scalable approach
  • shuffle hash join 
    • Shuffle hash join shuffles the data based on join key, so that rows related to same keys from both tables will be moved on to same node and then perform the join.
    • works well when
      • dataframes are distributed evenly with the keys
      • dataframes has enough number of keys for parallelism 
      • memory is enough for hash join
    • supported for all join except full outer join
    • spark.sql.join.preferSortMergeJoin = false
  • shuffle replicate nl
    • cartesian product(similar to SQL) of the two relations is calculated to evaluate join.

Adaptive query execution(AQE)

AQE is automatic feature enabled for strategy choose in the running time.

  • Set the number of reducers to avoid wasting memory and I/O resource. Dynamically coalescing shuffle partitions.  
    • spark.sql.adaptive.enabled=true
    • spark.sql.adaptive.coalescePartitions.enabled=ture
AQE can merge serval short partitions into one reducer to even the pressure
  • select better join strategy to improve performance
    • dynamically choose from 3 join strategy. broadcast has best performance, but static strategy choose is not accurate sometimes. 
    • spark.sql.adaptive.enabled=true
AQE get the size of join table dynamically, so that it can choose broadcast rather then shuffle operation.
  • Optimize skewed join to avoid imbalance workload
    • the large partition is split into multiple partitions
    • spark.sql.adaptive.skewJoin.enabled=true
AQE split skewed partition into multiple partitions.

Dynamic partitioning pruning

We already peek part of it in explain format. Spark 3.0 is smart that avoid to read unnecessary partitions in a join operations by using results of filter operations in another table. for example,

SELECT * FROM dim_iteblog
JOIN fact_iteblog
ON (dim_iteblog.partcol = fact_iteblog.partcol)
WHERE dim_iteblog.othercol > 10

In this case, spark will do the prune prediction and add a new filter for join table “fact_iteblog”.

Enhanced nested column pruning & pushdown

  • nested column pruning can be applied to all operators, like limits, repartition
    • select col2._1 from(select col2 from tp limit1000)
  • parquet can apply pushdown filter and can read part of columns
    • spark.read.parquet(‘filename’).filter(‘col2._1 = 100’)

Improved aggregation code generation

  • Catalyst translates a given query to java code, Hotspot compiler in OpenJDK translates Java code into native code
  • HotSpot compiler gives up generating native code for more than 8000 Java bytecode instruction per method.
  • Catalyst splits a large java method into small ones  to allow hotspoot to generate native code

New Scala and Java (infrastructure updates)

  • Java 11
  • Scala 2.12

Summary

I think it is better to take a screenshot from presentation of Dr.Kazuaki Ishizaki to do the summary.

Reference

SQL Performance Improvements at a Glance in Apache Spark 3.0, https://www.iteblog.com/ppt/sparkaisummit-north-america-2020-iteblog/sql-performance-improvements-at-a-glance-in-apache-spark-30-iteblog.com.pdf

Spark 3.0.1 – Explain, http://spark.apache.org/docs/latest/sql-ref-syntax-qry-explain.html

Mastering Query Plans in Spark 3.0, https://towardsdatascience.com/mastering-query-plans-in-spark-3-0-f4c334663aa4

Fast Filtering with Spark PartitionFilters and PushedFilters, https://mungingdata.com/apache-spark/partition-filters-pushed-filters/

Spark 3.0.1 – Hints, https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html

Columnstore index for MS SQL SERVER

Columnstore is the most popular storage tech within big data. We must have already heard parquet, delta lake. They are both columnstore format which brings 10x times compress ratio and super faster query speed to analytic work. SQL server, one of fastest evolving relation database, also provides columnstore index with multiple optimizations.

Loading into a clustered columnstore index
Figure 1: Columnstore index compress the data by column oriented. And optimized by deltastore. It can reach 10x times compression and 100x times query speed.

SQL server provides clustered and non-clustered columnstore index. Delta store is a clustered B-tree index used only with columnstore index automatically. It stores rows until the number of rows reaches a threshold(~1048576 rows) them moved data into columnsotre, and set state from Open to Closed. I will show you at the end of this article.

Clustered columnstore index

  • Primary storage method for the entire table
  • All columns are included, there is no keys.
  • can only combined with non-clustered B-tree index to speed up
    • queries that search for specific values or small ranges of values.
    • updates and deletes of specific rows
  • usually for fact table or large dimension tables

Non-clustered columnstore index

  • We can indicate which columns to be indexed, usually for frequently used columns.
  • requires extra storage to store a copy of columns in the index(~10%)
  • can be combined with other index.

How to choose columnstore index?

Microsoft already provided the conventions on his document.

Figure 2: Choose the best columnstore index for your needs

I would recommend use columnstored index for most of OLAP work, as we need fast query without much delete/update tasks.

How to delete large size of data from columnstore table

Although Microsoft doesn’t suggest us to delete more than 10% data from columnstore table, there is still have chance we have to. In this case, I summarized some of my experience.

| Delete from columnstore is a soft delete

If you tried to delete rows from columnstore table, you will not actually delete the data, but sql server will mark this row as deleted.

You can run sql blew to find out the number of delete rows and deltastore. Once there are too many row marked “delete”, then you have to rebuild/reorganize the columnstore table. Remember , it is not like Optimize clause in deltalake, which is more like bin-packing for small files.

SELECT
       tables.name AS table_name,
       indexes.name AS index_name,
       partitions.partition_number,
       column_store_row_groups.row_group_id,
       column_store_row_groups.state_description,
       column_store_row_groups.total_rows,
       column_store_row_groups.size_in_bytes,
       column_store_row_groups.deleted_rows,
       internal_partitions.partition_id,
       internal_partitions.internal_object_type_desc,
       internal_partitions.rows
FROM sys.column_store_row_groups
INNER JOIN sys.indexes
ON indexes.index_id = column_store_row_groups.index_id
AND indexes.object_id = column_store_row_groups.object_id
INNER JOIN sys.tables
ON tables.object_id = indexes.object_id
INNER JOIN sys.partitions
ON partitions.partition_number = column_store_row_groups.partition_number
AND partitions.index_id = indexes.index_id
AND partitions.object_id = tables.object_id
LEFT JOIN sys.internal_partitions
ON internal_partitions.object_id = tables.object_id
AND column_store_row_groups.deleted_rows > 0
WHERE tables.name = 'table_name'
Figure 3: One delta group is open, and there is no delete rows

| Steps to delete large size of data in columnstored table

1. delete non-clustered B-tree index( after delete operation, rebuild it if needed)

If we run execution plan for bulk delete, you will find B-tree index related operations spend most of time rather than columnsotre index.

Figure 4: B-tree index related operations spend 90% time.

2. delete by a small batch

One of the down side of Azure SQL is we can not set log as simple. So it will take lots of transaction time when we try to delete a large table. The work around is delete by a small batch to make each log transaction smaller and quicker. Blew I gave an example to delete data by chunksize = 1000000.

deleteMore:
delete top(1000000) from table_name
where id%2=0
IF @@ROWCOUNT != 0
begin
       print current_timestamp
    goto deleteMore
end

3. rebuild/reorganizing columnstored index if needed

Sometimes, we have to rebuild/reorganizing index if we delete too many data and it affects performance of query. here I give a snippet to show how to archive it.

-- rebuild column stored index
alter index indexname on table_name rebuild/reorgnize
alter index all on tablename rebuild/reorgnize

-- check fragment
SELECT a.object_id, object_name(a.object_id) AS TableName,
    a.index_id, name AS IndedxName, avg_fragmentation_in_percent
FROM sys.dm_db_index_physical_stats
    (DB_ID (N'schema_name')
        , OBJECT_ID(N'table_name')
        , NULL
        , NULL
        , NULL) AS a
INNER JOIN sys.indexes AS b
    ON a.object_id = b.object_id
    AND a.index_id = b.index_id;
GO

Reference:

Choose the best columnstore index for your needs. https://docs.microsoft.com/en-us/sql/relational-databases/indexes/columnstore-indexes-design-guidance?view=sql-server-ver15#choose-the-best-columnstore-index-for-your-needs

Columnstore indexes – Query performance. https://docs.microsoft.com/en-us/sql/relational-databases/indexes/columnstore-indexes-query-performance?view=sql-server-ver15

How to efficiently delete rows while NOT using Truncate Table in a 500,000+ rows table. https://stackoverflow.com/questions/11230225/how-to-efficiently-delete-rows-while-not-using-truncate-table-in-a-500-000-rows

Hands-On with Columnstore Indexes: Part 1 Architecture. https://www.red-gate.com/simple-talk/sql/sql-development/hands-on-with-columnstore-indexes-part-1-architecture/

Enable GPU Accelerate in WSL2 to support AI frameworks

Since Microsoft upgraded WSL to version 2, it introduced full Linux kernel and full VM manage features. Except the performance benefit through deep integration with windows, WSL2 allows installing additional powerful apps like docker and upgrading Linux kernel anytime when it is available.

Two months ago, Microsoft with NVIDIA brought GPU acceleration to WSL2. This new feature made me exciting, so that we don’t have to train our models on a separated Linux machine or install dual OS startup.

Figure 1: Stack image showing layers involved while running AI frameworks in WSL 2 containers. The container provides integration with CUDA related components. WSL2 communicates with windows host through GPU paravirtualization protocol

Before I start, I did some search about basic ideas of virtualization and WSL2 GPU. It is good for me to understand how GPU paravirtualization works in WSL2.

Types of virtualization

Figure 2: four major types of virtualization
  • Full virtualization. In full virtualization, there is almost a complete model of the underlying physical system resources that allows any and all installed software to run without modification. There are two types of full virtualization.
software assisted full virtualization( binary translation). like VMware workstation(32bit), virtual PC, VirtualBox(32 bits). issue: low performance
hardware- assisted full virtualization. eliminates the binary translation and directly interrupts with hardware ( intel VT-x and AMD-V). like , KVM, VMware ESX, Hyper-V, Xen. issue: virtual context execute privileged instruction directly on the processor.
  • Paravirtualization. Paravirtualization (PV) is an enhancement of virtualization technology in which a guest operating system (guest OS) is modified prior to installation inside a virtual machine (VM) in order to allow all guest OS within the system to share resources and successfully collaborate, rather than attempt to emulate an entire hardware environment. so the guests aware that it has been virtualized.  products like Xen, IBM LPAR, Oracle VM for X86
Xen supports both Full virtualization and Para-virtualization
  • Hybrid virtualization(hardware virtualized with PV drivers). virtual machine uses PV for specific hardware drivers(like I/O), and the host use full virtualization for other features. products like Oracle VM for x86, Xen. 
VMware paravirtual with hardware full virtualization
  • OS level Virtualization. aka containerization. No overhead . Products like docker, Linux LCX, AIX WPAR
The difference between VM and container

Except containerization, all virtualization use hypervisor to communicate with the host. We can take a look how hypervisor works blew.

  • Hypervisor
    • Emulation. (software full virtualization)
      • emulate a certain piece of hardware which guest VM can only see.
      • expense of performance since “common lowest” denominator
      • need to translate instruction
      • wide compatibility
    • Paravirtualization
      • only support certain hardware in certain configurations.
      • Direct hardware access is possible
      • Compatibility is limited
    • hardware pass-through(hardware full virtualization)
      • native performance, but need proper drivers for the real physical hardware
      • hardware specific images
      • GPU supported

GPU Virtualization on Windows

How it works on WSL

  • a new kernel driver “dxgkrnl” which expoes “/dev/dxg” device to user mode.
  • /dev/dxg mimic the native WDDM D3DKMT kernel service layer on Windows.
  • dxgkrnl communicate with its big brother on Windows through VM Bus WDDM paravirtualization protocol.
Figure 3: there is no partitioning of resources between Linux and Windows or limit on Linux application

DxCore & D3D12 on Linux

  • libd3d12.so is compiled from the same source code as d3d12.dll on windows
  • except Present() function, all others are same with windows. 
  • libxcore(DxCore) is a simplified version of dxgi
  • GPU manufacturer partners provide UMD(user mode driver) for Linux
Figure 4: D3D12 builds upon the /dev/dxg device

DirectML and AI Training

  • DirectML sits on top of D3D12 API, provides a a collection of compute compute operations.
  • Tensorflow with an integrated DirectML backend.
Figure 5: DirectML provides beginner a basic ML framework

OpenGL, OpenCL & Vulkan

  • Mesa library is the mapping layer which bring hardware acceleration for OpenCL , OpenGL
  • vulkan is not supported right now.
Figure 6: WSL2 only support OpenGL and OpenCL right now.

Nvidia CUDA

  • a version of CUDA taht directly targets WDDM 2.9 abstraction exposed by /dev/dxg. 
  • libcuda.so enables CUDA-X libaries such as cuDNN, cuBLAS, TensorRT.
  • available on any glibc-based WSL distro
Figure 7: NVIDIA-docker tolls available ( NVIDIA container toolkit), which provides us container like plugin and usage experience.

GPU container in WSL

  • libnvidia-container libarary is able to detect the presence of libdxcore.so at runtime and uses it to detect all the GPUs exposed to this interface.
  • driver store is a folder that containers all driver librarians for both Linux and Windows
Figure 8: NVIDIA docker provides NVIDIA container toolkits along with lots of good images.

GUI Application is still under developing.

How to enable GPU Acceleration in WSL

for the detail step, we can refer https://docs.nvidia.com/cuda/wsl-user-guide/index.html. Here I brief some keypoints:

  1. Windows version: 20150 or above (Dev Channel)
  2. Enable WSL 2
  3. Install Ubuntu On WSL
  4. Install Windows Terminal
  5. Upgrade kernel to 4.19.121 or higher
  6. NVIDA DRIVERS FOR CUDA ON WSL  https://developer.nvidia.com/cuda/wsl/download
  7. Install docker in WSL:  
    • curl https://get.docker.com | sh
    • You can see vmmen process on your windows task manger. It is the process for virtual machine in wsl2
  8. Install Nvidia Container Toolkit( nvidia-docker2)
Figure 9: docker in WSL2 with NIVIDA container toolkit

9. Start A TensorFlow Container

# test for docker
docker run --gpus all nvcr.io/nvidia/k8s/cuda-sample:nbody nbody -gpu -benchmark
# pull tersorflow image and run it
docker run -it --gpus all -p 8888:8888 tensorflow/tensorflow:latest-gpu-py3-jupyter

After you pull tersoflow image, and run it. You can see following instruction:

Figure 10: replace 127.0.0.1 to localhost, and open this URL on your browser then we can use GPU acceleration in our WSL2

Reference:

Para virtualization vs Full virtualization vs Hardware assisted Virtualization, https://www.unixarena.com/2017/12/para-virtualization-full-virtualization-hardware-assisted-virtualization.html/

Emulation, paravirtualization, and pass-through: what you need to know for client hypervisors, https://searchvirtualdesktop.techtarget.com/opinion/Emulation-paravirtualization-and-pass-through-what-you-need-to-know-for-client-hypervisors

DirectX is coming to the Windows Subsystem for Linux, https://devblogs.microsoft.com/directx/directx-heart-linux/

NVIDIA Container Toolkit, https://github.com/NVIDIA/nvidia-docker

CUDA on WSL User Guide, https://docs.nvidia.com/cuda/wsl-user-guide/index.html

NVIDIA Drivers for CUDA on WSL, https://developer.nvidia.com/cuda/wsl/download

Tensorflow image on Docker, https://www.tensorflow.org/install/docker

Run your Linux on Android phone

As Samsung canceled its “Linux on Dex” project, there was only option to run Linux on SBC like raspberry pi, but that leave us ugly design and lack of flexibility. Why shouldn’t we use our phone to run Linux directly? So there upcoming two choices:

  1. Native phone installed Linux, eg. pinephone. Its not expensive, and very cool. You can buy it here. The downside is there are not many apps you can install…
  2. Run Linux on Android phone. So that you can install android app as well as Linux environment. All your need is three apps:
  • bNCV Free: connect to Linux GUI.
  • Termux: terminator to install Linux commands
  • AndroNix: download Linux Distro.

I was using my Samsung S8, which provides big screen projection through Dex. Dex is still an android desktop, but pretty like windows. Then I follow the steps mentioned on this YouTube.

After install Linux (I chose Ubuntu KDE), and connect to it. You can do whatever in it. Blew is my screenshot after I installed visual code.

Visual Code for ARM version

Let’s summary:

Pros:

  1. Flexibility. You don’t have to bring your laptop, but only have to find a monitor or TV.
  2. Apps. Run android and Linux apps at single phone.
  3. Very Geek.

Corns:

  1. It’s better you have a high end phone, like S20. My s8 still slow sometimes as its 3GB memory.
  2. Not native Linux. I feel its working pattern pretty like WSL on windows. Performance may be an issue.

Let Linux make your daily life easier

If you work on Windows during your most time, you must be curious about why should I shift to Linux and why suddenly Linux becomes so popular in the recently decade, even Microsoft migrated Linux kernel into latest windows version named WSL/WSL2 and run its 70% cloud service on Linux including SQL server.

Why should we use Linux?

Although Windows is still the dominate leader especially in the desktop market, more and more professional industries are moving to Linux. The emerging areas like loT device, cloud service, smart phone are all using Linux. In my points, there are several things Linux significantly better than Windows: Reliability, Software management, Hardware compatibility and Customization.

Reliability: How many times you meet blue screen and reboot on windows? Install an application —> reboot; uninstall —> reboot; change a setting —> reboot; even security upgrade will cause a reboot. On Linux, you will never worry about it. It significantly reduces downtime of server.

Software management. On windows, “Click-Installation” maybe an easy way for a normal user, but it would never be a good way for programmer or server admin. Linux adopts a repository methodology, pretty like “Android store”. We only need to run a very short script like “sudo apt install <appname>” or “sudo snap install <appname>”. Even more, we can put all scripts together as a bash, then we can leave it alone and have a cup of coffee.

Hardware compatibility. So many smart devices coming out these years. They have weaker computational ability compared to the desktop or laptop, but we can put them anywhere and let them control home security, TV, freezer or washer. You can not install windows on these device since it is too “fat”. Linux is a good choice and has a nice compatibility with ARM device.

Customization. Not everyone like flexibility. My wife, for example, likes out of box using. So I would never recommend Linux for her. but for the most geek or tech worker, Linux could provide varies distros and desktop environments. In each environment, you can also deeply customize in term of your needs.

How to set Linux for your daily life?

First thing first, you need to choose Linux distro. Linux has hundreds distros, and coming out many branches each year. but most of them are child-parent relationship. Debian, Red-hat and Arch are three most popular families. Here we look at Debian and Arch family. For the daily life, we need to use stable and easy to use one. Debian, Ubuntu, Mint Linux are all good in Debian family. Manjaro, Arch are the most popular in Arch family. Beside these, you can also find Android and ChromeOS in the left distribution map. But they are only designed for mobile or special device.

If you are beginner for Linux and want to get wiki answer quickly. Ubuntu LTS would be a good choice. In end of April, 2020, Ubuntu 20.04 LTS is going to launch.

In the following steps, I will use Ubuntu for example by default.

Second, How to install application. As I mentioned before, Linux use Repository to download and install application. Basically, Repository is like a store where you can find a list of goods(application) in it. For different Linux distros family, you would use different repositories. In Ubuntu, you can run following simple script to install application.

# update repository as root
sudo apt update
# install application
sudo apt-get install <app name>
# find the install path of application
whereis <app name>

There is another popular repository called snapcraft which is my favor. Install snapcraft is super easy: https://snapcraft.io/docs/installing-snapd. In Ubuntu, you only need to run:

sudo apt update
sudo apt install snapd 

Then in the page of application page in snapcraft, you can copy the installation code from there. I took Visual Studio Code for example blew.

“sudo snap install code -classic” is the installation script.

If you feel even script is complex, you can install snap store, which provides you “Android store” like GUI and click-install experience.

sudo snap install snap-store
Categories in Snap store

In some case, you can only find the application on the official website, like anaconda, Microsoft Team. What should we do?

For deb file, just click and install, this is the package for Debian family. If failed, use script below.

sudo dpkg -i <file name.deb>

For shell file end with “.sh” like anaconda.

# download package
wget <download URL>
# add execution mode
chmode +x <file name.sh>
# execute file
sudo ./<file name.sh>

Most applications you can find Linux version, they are some alternatives in Linux.

Windows Application NameAlternative
Google Driveinsync
EvernoteNixNote2
MS Office 365LibreOffice or office online
check point VPNSNX
my workplace

Third, Command Line. Terminal is the most important tool in Linux. Once you familiar with it, you would never use your mice. Since terminal saves you lot of time. Here I list some common commands. For more information you can check: https://files.fosswire.com/2007/08/fwunixref.pdf.

# check command meaning
man <command name>

# file operation
ls # listing files and dir
cd <path> # change to the path
pwd # show current dir
mkdir <dir> # create dir
rm <file> # delete file
rm -r <dir> # delete dir recursively
cp -r <dir1> <dir2> # copy dir1 to dir2
touch <file> # create or update file
cat > <file> # places standard input into file
more <file> output contents of file

# file permission
chmod # change file mode
chown # change owner of file

# ssh
ssh user@host

# system info
top/htop # task monitor
whereis app # app location
df # disk usage
w # who is online

# compression
tar cf file.tar files # create tar file
tar xzf file.tar.gz  # extract a tar using Gzip
tar xf file.tar # extract a tar file

# download
wget

# search 
grep <pattern>

# split terminal
tmux

# schedule
crontab

# back up 
sudo dd if=/dev/sda1 of=/media/c/back.img

What else benefit can Linux bring for you?

For big data engineer, the most popular foundation is Apache software, which provides many excellent projects, like spark, hadoop, Kafka, Maven,Tomcat. All of these are natively support Linux. And they recommend using Linux as well.

For application developer, python, java, C# have already cross platform. You can choose whatever OS you like. As I said before, you won’t meet the problems of windows in Linux, which saves you lots of time.

For administrator, Linux provides sable and flexible solutions. You would send out mails less that explains downtime for some upgrading maintenance.

Who is not recommended moving to Linux .

If you are very sticking to some software like Adobe suit, you need to leave Linux away.

Data Factory CI/CD in Azure DevOps

Azure Pipeline is consist of two parts: pipeline and release. They represent
CI and CD separately.

  1. Build Pipeline – to build and test the code. The build creates an artifact that’s used by the rest of your pipeline to run tasks such as deploying to staging or production. 
  2. Release Pipeline – once the code is updated, built and packaged, it can be deployed to target services using Release Pipelines. 
top half: CI pipeline; down half: CD pipeline

Let’s talk about how to implement this process for Azure data factory. Before you start, I suppose your current ADF has matched these requirements (if not, please refer to here):

  • You already have a Azure Repos.
  • ADF is integrated with this Git Repos.
  • A key Vault. which is used for storing database and datalake connection information, plus all configuration parameters for release.

The whole ADF CI/CD pipeline is like this:

Building Pipeline (CI)

  • Create new pipeline, choose “adf_publish” for default branch
    • “adf_publish” branch is created by ADF automatically, after you click “publish” in your ADF GUI.
the three json files are used to set parameters in the different environments
  • Add new Agent job
  • search for “Data factory”, add a new publish artifacts
    • set the path to publish(git path)
    • set artifact publish location as Azure pipelines

Release Pipeline (CD)

left part: CI right part: CD

So next step is deploying artifact into three environments: DEV, QA and PROD.

use key vault to set sQL Server in pipeline
  • Import Azure Key Vault. where you stored all the connection information. We would use these information in two areas.
    • Release pipeline: get the basic information in each environment, e.g, database name, data lake name, etc. see the screenshot above.
    • template parameters used in adf_publish.
  • Set the Datafactory.
    • Add a “Azure Resource group deployment”. where we need to focus one the template path.
we have to set the ARM template and parameters which point to adf_publish branch

You can also add trigger stop and start before and end of data factory job.

# stop trigger, you have to add variables for each release environment
$triggersADF = Get-AzDataFactoryV2Trigger -DataFactoryName $(DataFactoryName) -ResourceGroupName $(ResourceGroupName)

$triggersADF | ForEach-Object { Stop-AzDataFactoryV2Trigger -ResourceGroupName $(ResourceGroupName) -DataFactoryName $(DataFactoryName) -Name $_.name -Force }
# end trigger
$triggersADF = Get-AzDataFactoryV2Trigger -DataFactoryName $(DataFactoryName) -ResourceGroupName $(ResourceGroupName)

$triggersADF | ForEach-Object { Start-AzDataFactoryV2Trigger -ResourceGroupName $(ResourceGroupName) -DataFactoryName $(DataFactoryName) -Name $_.name -Force }
Stop trigger is mandatory for releasing ADF.
  • How to set up adf_publish
    • go back to azure repos.
    • switch to adf_publish.
    • create corresponding ARM parameter template for each environment. You can find an example blew.
the three json files are used to set parameters in the different environments
{
	"$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentParameters.json#",
	"contentVersion": "1.0.0.0",
	"parameters": {
		"factoryName": {
			"value": "datafactory name"
		},
		"LS_AKV_KeyVault_properties_typeProperties_baseUrl": {
			"value": "https://XXXXkeyvault.vault.azure.net/"
		},
		"LS_SQL_ConfigDb_properties_typeProperties_connectionString_secretName": {
			"value": "sqlconnection"
		},
		"LS_DLS_Datalake_properties_typeProperties_url": {
			"value": "https://XXXXXdatalake.dfs.core.windows.net/"
		}
	}
}

These three parameters are used for dynamic linked service. These three parameters are defined in ARMTemplateForFactory, and set the value in each seprated Json files. To define the parameters, you have to first go to “Parameterization template” table and edit it.

Here is an example that I defined a linkedservice for AzureKey Vault. for more information please refer to : https://docs.microsoft.com/en-us/azure/data-factory/continuous-integration-deployment#use-custom-parameters-with-the-resource-manager-template

If we need enable Continuous deployment trigger , we have to link with adf_publish branch as well.

click the little logo in release pipeline page.
set to adf_publish

Reference:

Azure DevOps Pipelines. https://docs.microsoft.com/en-us/azure/devops/pipelines/?view=azure-devops

Continuous integration and delivery in Azure Data Factory. https://docs.microsoft.com/en-us/azure/data-factory/continuous-integration-deployment

Use Azure Key Vault to pass secure parameter value during deployment. https://docs.microsoft.com/en-us/azure/azure-resource-manager/templates/key-vault-parameter?tabs=azure-cli

Apache Kafka in Practice – 1

First thing first, I should remind all visitors I am not a master in Kafka. Actually I am just a beginner learning through official Apache Kafka website and some free udemy class. There might be some mistakes although, I will fix them once I find.

I put this as Kafka in practice as I want to introduce some basic skills such as launching a Kafka service, creating a producer also consumer, UI tools and basic python library.

Install and Launch Kafka

Kafka is based on Java and Zookeeper( which is used to manage cluster). I recommend to use Linux or WSL on windows.

sudo apt update
sudo apt install openjdk-8-jdk

# test 
java -version

Download kafka, and unzip it

# download from website
wget <URL of kafka.tgz>

# unzip
tar -xvf kafka.****.tgz
cd <kafka_folder>

# test, run following
bin/kafka-topics.sh

Add command to path(optional)

# need to restart after these steps
nano ~/.bashrc
add this to the end:
PATH="$PATH:/home/<your name>/<kafka folder>/bin"

# check under any folder
kafka-topics.sh

Start Zookeeper

# change zookeeper data dictionary
mkdir data
mkdir data/zookeeper
nano config/zookeeper.properties
# add blew into zookeeper.properties:
dataDir=/home/name/data/zookeeper

# run zookeeper:
zookeeper-server-start.sh config/zookeeper.properties

Start a kafaka broker

mkdir data/kafka
nano config/server.properties

# modify server.properties to change log folder:
log.dirs=/home/name/data/kafka

# start kafka
kafka-server-start.sh config/server.properties

Right now you should have a zookeeper and a broker running. Then let’s do some operations about topic, producer and consumer.

Topic operations

# create a topic
# you can not have replication factor greater than available brokers
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --create --partitions 3 --replication-factor 1


# list all topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe


# delete topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic second_topic --delete

# recover in case broker starting failed after deleting topic
1. delete topic under broker log folder, which you can find log.dirs in server.property
2. start zookeeper
3. enter zookeeper shell, zookeeper-shell.sh host:port
    3.1 list the topics using: ls /brokers/topics
    3.2 remove topic:rmr /brokers/topics/yourtopic
4. restart kafka server

Create a console producer

# console producer
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic

# add some properties
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --producer-property acks=all

# if you set a topic that not exists, the kafka will create a new topic, with one partition and one replication by default. we can change it in config/server.properties

Create a console consumer and Set consumer groups

# read from now on
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic
# read all messages
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning

# set consumers in group
# all the message sending to this group will be split into all consumers
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group group1

# consumer groups
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group group1
# Lag: show how many message has not received yet
# reset offset ---> offset descide by consumer
# --to-earliest / --shift-by n [offsets]
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092  --group group1 --reset-offsets --to-earliest --execute  --topic first_topic
# when a consumer leaves, reblance will happen

Add Keys to producer

# keys
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --property parse.key=true --property key.separator=,
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --property print.key=true --property key.separator=,

UI Tools

kafka Tool is easy to use UI tool for manage topics, brokers, consumers. You can install it on Linux or windows. If you use WSL on windows, the zookeeper port is also open for windows.

# how to use in Linux
wget http://www.kafkatool.com/download2/kafkatool.sh
chmod +x kafkatool.sh
# after installation
cd ~/kafkatool2/
./kafkatool

Developing in Python

Kafka-python provides common functions for kafka. we can find more information here.

Consumer API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# basic
from kafka import KafkaConsumer
consumer = KafkaConsumer('first_topic',bootstrap_servers=['localhost:9092'],auto_offset_reset='earliest')
for msg in consumer:
    print (msg.value)

# other key parameters
# group_id, key_deserializer, value_deserializer,auto_offset_reset[earliest, latest,None]

# assign and seek offset
consumer.assign(partitions)
consumer.seek(partitions, offset) #Manually specify the fetch offset for a TopicPartition.
consumer.assignment() # read assigned partition
beginning_offsets(partitions)
end_offset(partitions)

Producer API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# basic
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],acks = 1)

for _ in range(10):
    future = producer.send('test-topic', b'some_message_bytes')
    # or add key to fix the partitions by ordering
    future = producer.send('test-topic', key = 'key', value = b'some_message_bytes')
    # result = future.get(timeout=60)  # do not block, it will kill performance
    # print(result)
# producer.flush()  # Block until all pending messages are at least put on the network
producer.close()  # make sure to .close() your producer before shutting down your application

Serialization and Deserialization

  • Serialization: the process of converting an object into a stream of bytes for the purpose of transmission
  • Deserialization: the opposite of Serialization
# Serialize json messages
import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('fizzbuzz', {'foo': 'bar'})

# Deserialize msgpack-encoded values
consumer = KafkaConsumer(value_deserializer=msgpack.loads)  # same as key_deserializer
consumer.subscribe(['msgpackfoo'])
for msg in consumer:
     assert isinstance(msg.value, dict)

# Serialize string keys
producer = KafkaProducer(key_serializer=str.encode)
producer.send('flipflap', key='ping', value=b'1234')

# Compress messages
producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
     producer.send('foobar', b'msg %d' % i)

Thread

Producer is thread safe, however, consumer is not. recommend use multiprocessing.

Client Compatibility

Always use the latest client library version, since older client/newer client can talk to any broker.

Reference:

kafka-python: https://github.com/dpkp/kafka-python

Apache Kafka Series - Learn Apache Kafka for Beginners v2

Apache Kafka Series – Learn Apache Kafka for Beginners v2

Apache Kafka Website, https://kafka.apache.org/

Apache Kafka Concepts and Theory.

It’s a little bit late to talk about Kafka, since this technology has been widely used for a long time. These days, I finally has time to learn it ans summary the major concepts inside. In this nutshell, I will split the page into three parts: why do we need it, basic concepts and how it works.

Why do we need Kafka?

Related image
before we use kafka

As we have more and more systems merged into a big network structure. and each system has their own protocols and communication methods. we have to take more time and endure the super complex structure. At the end, the cost will be raised as well.

Kafka as a central messaging bus

Kafka provides a central messaging bus with distributed, resilient and fault tolerant. Each source and target connects to kafka cluster as producers and consumers. They use same protocols to send and receive the messages(key-value and timestamp). We can seem it as a hybrid solution combining message system with distributed retention.

Concepts

Producers publish the data into kafka, it’s source of the data. which could be sensors, laptop, loT, logs etc.

  • producers choose which records to assign to which partition within the topic ( topic is stream of data, like table in database; partition is part of topic. we will talk about them soon)
  • Producers use round-robin or function to choose partition to archive load balance between nodes(brokers) in the cluster.
    • targetPartition=utils.abs(utils.murmur2(record.key()))%numPartitions
    • use murmur2 algorithm

Brokers are nodes/servers in the cluster. They are key features for kafka features like fault tolerance, high performance.

Thumbnail
  • broker identified by ID
  • each brokers contains certain topic partitions
  • connect any broker = connect to the entire cluster
  • typically 3 or more brokers to achieve redundancy
  • message in a topic are spread across partitions in difference brokers, each partition can be replicated aross multiple brokers

Consumers subscribe the message from kafka cluster.

  • consumer can be a single receiver or a distributed cluster named consumer group
  • a consumer group can read from topic in parallel

Topics are a particular stream of data which identified by its name. Each topic is split into partitions. We can set partition number and replication factor for each topic. Partition is ordered immutable sequence of records(messages) with their id named offset.

  • offset’s order is only guaranteed in one partition
  • data is kept only for a limited time(default is one week).
  • partition is immutable
  • consumer can choose any offset as start reading point, so that each consumer wouldn’t impact others.

Connector is used to connect between topics and app/database. A Source Connector (with help of Source Tasks) is responsible for getting data into kafka while a Sink Connector (with help of Sink Tasks) is responsible for getting data out of Kafka.

Image result for kafka connector
  • stream an entire sql database to kafka
  • stream kafka topics into hdfs
  • recommend to leverage build-in connectors

How does it work?

Producer to kafka

  • Producer chooses topic and partition to send the inbound message into kafka.
  • Topic splits into several partitions. Each partition has one broker as leader and zero or more brokers as follower named ISR (in sync replica) . Only the leader can receive and serve data for partitions. Other brokers will sync the data from the leader.
  • A topic with replication factor N, it can tolerate up to N-1 server failures.

kafka to consumers

  • As we mentioned ahead, consumer can be single node or consumer groups.
  • if consumer number is greater than partitions number, some consumers will be idle. 
  • if consumer number is less than partitions number, some consumers will receive messages from multiple partitions. 
  • if consumer number is equal to partitions number, each consumer reads messages in order from exactly one partition

Data consistency and availability

  • Partition
    • Messages sent to a topic partition will be appended to the commit log in the order they are sent
    • a single consumer instance will see messages in the order they appear in the log (message order only guaranteed in a partition)
    • a message is ‘committed’ when all in sync replicas have applied it to their log
    • any committed message will not be lost, as long as at least one in sync replica is alive.
  • Producer options
    • wait for all in sync replicas to acknowledge the message
    • wait for only the leader to acknowledge the message
    • do not wait for acknowledgement
  • consumer options
    • receive each message at most once
      • restart from the next offset without ever having processed the message
      • potentially message loss
    • receive each message at least once
      • restart and process message again. duplicate messages in downstream,
      • no data loss
      • ( recommended, downside stream handles duplicate message)
    • receive each message exactly once
      • transitional level
      • re read the last transaction committed
      • no data loss and no data duplication
      • significantly decreasing the throughput using a transaction

Architecture

Message structure

Fundamental data flow

Kafka in Azure through HDinsight

Reference

Apache Kafka official website. https://kafka.apache.org/intro.

Kafka in a Nutshell. https://sookocheff.com/post/kafka/kafka-in-a-nutshell/

Apache Kafka Series – Learn Apache Kafka for Beginners v2, https://tpl.udemy.com/course/apache-kafka/learn/lecture/11566878?start=180#overview

What is Apache Kafka in Azure HDInsight, https://docs.microsoft.com/en-us/azure/hdinsight/kafka/apache-kafka-introduction