Apache Kafka in Practice – 1

First thing first, I should remind all visitors I am not a master in Kafka. Actually I am just a beginner learning through official Apache Kafka website and some free udemy class. There might be some mistakes although, I will fix them once I find.

I put this as Kafka in practice as I want to introduce some basic skills such as launching a Kafka service, creating a producer also consumer, UI tools and basic python library.

Install and Launch Kafka

Kafka is based on Java and Zookeeper( which is used to manage cluster). I recommend to use Linux or WSL on windows.

sudo apt update
sudo apt install openjdk-8-jdk

# test 
java -version

Download kafka, and unzip it

# download from website
wget <URL of kafka.tgz>

# unzip
tar -xvf kafka.****.tgz
cd <kafka_folder>

# test, run following
bin/kafka-topics.sh

Add command to path(optional)

# need to restart after these steps
nano ~/.bashrc
add this to the end:
PATH="$PATH:/home/<your name>/<kafka folder>/bin"

# check under any folder
kafka-topics.sh

Start Zookeeper

# change zookeeper data dictionary
mkdir data
mkdir data/zookeeper
nano config/zookeeper.properties
# add blew into zookeeper.properties:
dataDir=/home/name/data/zookeeper

# run zookeeper:
zookeeper-server-start.sh config/zookeeper.properties

Start a kafaka broker

mkdir data/kafka
nano config/server.properties

# modify server.properties to change log folder:
log.dirs=/home/name/data/kafka

# start kafka
kafka-server-start.sh config/server.properties

Right now you should have a zookeeper and a broker running. Then let’s do some operations about topic, producer and consumer.

Topic operations

# create a topic
# you can not have replication factor greater than available brokers
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --create --partitions 3 --replication-factor 1


# list all topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe


# delete topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic second_topic --delete

# recover in case broker starting failed after deleting topic
1. delete topic under broker log folder, which you can find log.dirs in server.property
2. start zookeeper
3. enter zookeeper shell, zookeeper-shell.sh host:port
    3.1 list the topics using: ls /brokers/topics
    3.2 remove topic:rmr /brokers/topics/yourtopic
4. restart kafka server

Create a console producer

# console producer
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic

# add some properties
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --producer-property acks=all

# if you set a topic that not exists, the kafka will create a new topic, with one partition and one replication by default. we can change it in config/server.properties

Create a console consumer and Set consumer groups

# read from now on
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic
# read all messages
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning

# set consumers in group
# all the message sending to this group will be split into all consumers
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group group1

# consumer groups
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group group1
# Lag: show how many message has not received yet
# reset offset ---> offset descide by consumer
# --to-earliest / --shift-by n [offsets]
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092  --group group1 --reset-offsets --to-earliest --execute  --topic first_topic
# when a consumer leaves, reblance will happen

Add Keys to producer

# keys
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --property parse.key=true --property key.separator=,
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --property print.key=true --property key.separator=,

UI Tools

kafka Tool is easy to use UI tool for manage topics, brokers, consumers. You can install it on Linux or windows. If you use WSL on windows, the zookeeper port is also open for windows.

# how to use in Linux
wget http://www.kafkatool.com/download2/kafkatool.sh
chmod +x kafkatool.sh
# after installation
cd ~/kafkatool2/
./kafkatool

Developing in Python

Kafka-python provides common functions for kafka. we can find more information here.

Consumer API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

# basic
from kafka import KafkaConsumer
consumer = KafkaConsumer('first_topic',bootstrap_servers=['localhost:9092'],auto_offset_reset='earliest')
for msg in consumer:
    print (msg.value)

# other key parameters
# group_id, key_deserializer, value_deserializer,auto_offset_reset[earliest, latest,None]

# assign and seek offset
consumer.assign(partitions)
consumer.seek(partitions, offset) #Manually specify the fetch offset for a TopicPartition.
consumer.assignment() # read assigned partition
beginning_offsets(partitions)
end_offset(partitions)

Producer API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

# basic
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],acks = 1)

for _ in range(10):
    future = producer.send('test-topic', b'some_message_bytes')
    # or add key to fix the partitions by ordering
    future = producer.send('test-topic', key = 'key', value = b'some_message_bytes')
    # result = future.get(timeout=60)  # do not block, it will kill performance
    # print(result)
# producer.flush()  # Block until all pending messages are at least put on the network
producer.close()  # make sure to .close() your producer before shutting down your application

Serialization and Deserialization

  • Serialization: the process of converting an object into a stream of bytes for the purpose of transmission
  • Deserialization: the opposite of Serialization
# Serialize json messages
import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('fizzbuzz', {'foo': 'bar'})

# Deserialize msgpack-encoded values
consumer = KafkaConsumer(value_deserializer=msgpack.loads)  # same as key_deserializer
consumer.subscribe(['msgpackfoo'])
for msg in consumer:
     assert isinstance(msg.value, dict)

# Serialize string keys
producer = KafkaProducer(key_serializer=str.encode)
producer.send('flipflap', key='ping', value=b'1234')

# Compress messages
producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
     producer.send('foobar', b'msg %d' % i)

Thread

Producer is thread safe, however, consumer is not. recommend use multiprocessing.

Client Compatibility

Always use the latest client library version, since older client/newer client can talk to any broker.

Reference:

kafka-python: https://github.com/dpkp/kafka-python

Apache Kafka Series - Learn Apache Kafka for Beginners v2

Apache Kafka Series – Learn Apache Kafka for Beginners v2

Apache Kafka Website, https://kafka.apache.org/

Apache Kafka Concepts and Theory.

It’s a little bit late to talk about Kafka, since this technology has been widely used for a long time. These days, I finally has time to learn it ans summary the major concepts inside. In this nutshell, I will split the page into three parts: why do we need it, basic concepts and how it works.

Why do we need Kafka?

Related image
before we use kafka

As we have more and more systems merged into a big network structure. and each system has their own protocols and communication methods. we have to take more time and endure the super complex structure. At the end, the cost will be raised as well.

Kafka as a central messaging bus

Kafka provides a central messaging bus with distributed, resilient and fault tolerant. Each source and target connects to kafka cluster as producers and consumers. They use same protocols to send and receive the messages(key-value and timestamp). We can seem it as a hybrid solution combining message system with distributed retention.

Concepts

Producers publish the data into kafka, it’s source of the data. which could be sensors, laptop, loT, logs etc.

  • producers choose which records to assign to which partition within the topic ( topic is stream of data, like table in database; partition is part of topic. we will talk about them soon)
  • Producers use round-robin or function to choose partition to archive load balance between nodes(brokers) in the cluster.
    • targetPartition=utils.abs(utils.murmur2(record.key()))%numPartitions
    • use murmur2 algorithm

Brokers are nodes/servers in the cluster. They are key features for kafka features like fault tolerance, high performance.

Thumbnail
  • broker identified by ID
  • each brokers contains certain topic partitions
  • connect any broker = connect to the entire cluster
  • typically 3 or more brokers to achieve redundancy
  • message in a topic are spread across partitions in difference brokers, each partition can be replicated aross multiple brokers

Consumers subscribe the message from kafka cluster.

  • consumer can be a single receiver or a distributed cluster named consumer group
  • a consumer group can read from topic in parallel

Topics are a particular stream of data which identified by its name. Each topic is split into partitions. We can set partition number and replication factor for each topic. Partition is ordered immutable sequence of records(messages) with their id named offset.

  • offset’s order is only guaranteed in one partition
  • data is kept only for a limited time(default is one week).
  • partition is immutable
  • consumer can choose any offset as start reading point, so that each consumer wouldn’t impact others.

Connector is used to connect between topics and app/database. A Source Connector (with help of Source Tasks) is responsible for getting data into kafka while a Sink Connector (with help of Sink Tasks) is responsible for getting data out of Kafka.

Image result for kafka connector
  • stream an entire sql database to kafka
  • stream kafka topics into hdfs
  • recommend to leverage build-in connectors

How does it work?

Producer to kafka

  • Producer chooses topic and partition to send the inbound message into kafka.
  • Topic splits into several partitions. Each partition has one broker as leader and zero or more brokers as follower named ISR (in sync replica) . Only the leader can receive and serve data for partitions. Other brokers will sync the data from the leader.
  • A topic with replication factor N, it can tolerate up to N-1 server failures.

kafka to consumers

  • As we mentioned ahead, consumer can be single node or consumer groups.
  • if consumer number is greater than partitions number, some consumers will be idle. 
  • if consumer number is less than partitions number, some consumers will receive messages from multiple partitions. 
  • if consumer number is equal to partitions number, each consumer reads messages in order from exactly one partition

Data consistency and availability

  • Partition
    • Messages sent to a topic partition will be appended to the commit log in the order they are sent
    • a single consumer instance will see messages in the order they appear in the log (message order only guaranteed in a partition)
    • a message is ‘committed’ when all in sync replicas have applied it to their log
    • any committed message will not be lost, as long as at least one in sync replica is alive.
  • Producer options
    • wait for all in sync replicas to acknowledge the message
    • wait for only the leader to acknowledge the message
    • do not wait for acknowledgement
  • consumer options
    • receive each message at most once
      • restart from the next offset without ever having processed the message
      • potentially message loss
    • receive each message at least once
      • restart and process message again. duplicate messages in downstream,
      • no data loss
      • ( recommended, downside stream handles duplicate message)
    • receive each message exactly once
      • transitional level
      • re read the last transaction committed
      • no data loss and no data duplication
      • significantly decreasing the throughput using a transaction

Architecture

Message structure

Fundamental data flow

Kafka in Azure through HDinsight

Reference

Apache Kafka official website. https://kafka.apache.org/intro.

Kafka in a Nutshell. https://sookocheff.com/post/kafka/kafka-in-a-nutshell/

Apache Kafka Series – Learn Apache Kafka for Beginners v2, https://tpl.udemy.com/course/apache-kafka/learn/lecture/11566878?start=180#overview

What is Apache Kafka in Azure HDInsight, https://docs.microsoft.com/en-us/azure/hdinsight/kafka/apache-kafka-introduction

Delta Lake Step by Step(1)

Before we start to talk about delta lake, we have to take time to deal with data lake and understand why we need to use data lake. Blew is the best definition I think.

A data lake is a repository for structured, unstructured, and semi-structured data. Data lakes are much different from data warehouses since they allow data to be in its rawest form without needing to be converted and analyzed first.

Devin Pickell 

Data Lake stored everything(raw) with every format. In a short nut, it is a repository. e.g. in Azure, Data Lake Gen2 is a repository based on Blob storage.

What’s the difference between data lake and data warehouse?

Table 1. Difference between data warehouse and data lake

From table 1, we can figure out that dw and dl use in the different scenarios. Data warehouse is used for the classic data analysis, and the data stored in structured rational table; while data lake is used for big data, ELT situation, every raw data coming into data lake then doing following actions. Since data lake is on the cluster cloud, it is very faster to handle the raw data.

What is the downside of data lake?

  • No Transaction.
  • Schema on read.
  • No version control.

These three downsides are very critical in applying data lake into real project. It will affect data quality and efficiency both. It is why most big data projects are failed. Because the data lake can not provide stable data like the database.

Delta Lake comes out!

Figure 1. Delta Lake created ACID and Transaction based on Parquet.

Delta Lake has its own abilities which enables a stable data lake for analysis.

  • Open source storage layer, based on parquet file
  • ACID transaction: serialize isolation
  • Scalable meta data handling: spark distribution data processing
  • stream and batch uniform: batch table as well as stream source
  • Schema enforcement
  • Time travel: data version

Let’s see some simple samples. If you are familiar with spark, you can change to delta lake super quick.

# set up
# upgrade pyspark
pip install --upgrade pyspark
# install
pyspark --packages io.delta:delta-core_2.12:0.1.0


# write table
df= spark.read.csv("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", header="true", inferSchema="true")
df.write.format("delta").save("/delta/diamonds")
# append
df.write.format("delta").mode("append").save("/delta/events")
# we can also partition the table
df.write.format("delta").partitionBy("date").save("/delta/events")
# update
data.write.format("delta").mode("overwrite").option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'").save("/tmp/delta-table")
# by default overwrite doesn't change schema, only if option("overwriteSchema", "true") exiting
# add column automatically
df.write.format("delta").mode("append").option("mergeSchema", "true").save("/delta/events")


# read
df = spark.read.format("delta").load("/tmp/delta-table")
SELECT * FROM delta.`/tmp/delta-table`

# time travel
# create a read version 
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events") 
# For timestamp_string, only date or timestamp strings are accepted
df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")

# structured stream
streamingDf = spark.readStream.format("delta").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")
# set mini-batch
spark.readStream.format("delta").option("maxFilesPerTrigger",1000).load()
# ignore updates and deletes with "ignoreDeletes" and "ignoreChanges"
# append mode and complete mode
streamingDf = spark.writeStream.format("delta").outputMode("append").load() 
streamingDf = spark.writeStream.format("delta").outputMode("complete").load()

stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()
stream.stop()

# check point
.option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg")

In next article, I will talk about when to use delta lake and how to optimize it in data brick.

Here is one of my example which covert three small files into one delta table: https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2499619125013057/2952616815059893/2608746070181068/latest.html

Reference:

  1. Delta Lake Document. https://delta.io
  2. What Is a Data Lake and Why Is It Essential for Big Data? https://learn.g2.com/what-is-a-data-lake
  3. Delta Lake Document for databricks. https://docs.databricks.com/delta

Deprecated: preg_replace(): Passing null to parameter #3 ($subject) of type array|string is deprecated in /home/jietao/jie-tao/wp-content/themes/zacklive/library/zacklive.php on line 283