First thing first, I should remind all visitors I am not a master in Kafka. Actually I am just a beginner learning through official Apache Kafka website and some free udemy class. There might be some mistakes although, I will fix them once I find.
I put this as Kafka in practice as I want to introduce some basic skills such as launching a Kafka service, creating a producer also consumer, UI tools and basic python library.
Install and Launch Kafka
Kafka is based on Java and Zookeeper( which is used to manage cluster). I recommend to use Linux or WSL on windows.
sudo apt update
sudo apt install openjdk-8-jdk
# test
java -version
Download kafka, and unzip it
# download from website
wget <URL of kafka.tgz>
# unzip
tar -xvf kafka.****.tgz
cd <kafka_folder>
# test, run following
bin/kafka-topics.sh
Add command to path(optional)
# need to restart after these steps
nano ~/.bashrc
add this to the end:
PATH="$PATH:/home/<your name>/<kafka folder>/bin"
# check under any folder
kafka-topics.sh
Start Zookeeper
# change zookeeper data dictionary
mkdir data
mkdir data/zookeeper
nano config/zookeeper.properties
# add blew into zookeeper.properties:
dataDir=/home/name/data/zookeeper
# run zookeeper:
zookeeper-server-start.sh config/zookeeper.properties
Start a kafaka broker
mkdir data/kafka
nano config/server.properties
# modify server.properties to change log folder:
log.dirs=/home/name/data/kafka
# start kafka
kafka-server-start.sh config/server.properties
Right now you should have a zookeeper and a broker running. Then let’s do some operations about topic, producer and consumer.
Topic operations
# create a topic
# you can not have replication factor greater than available brokers
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --create --partitions 3 --replication-factor 1
# list all topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe
# delete topic
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic second_topic --delete
# recover in case broker starting failed after deleting topic
1. delete topic under broker log folder, which you can find log.dirs in server.property
2. start zookeeper
3. enter zookeeper shell, zookeeper-shell.sh host:port
3.1 list the topics using: ls /brokers/topics
3.2 remove topic:rmr /brokers/topics/yourtopic
4. restart kafka server
Create a console producer
# console producer
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic
# add some properties
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --producer-property acks=all
# if you set a topic that not exists, the kafka will create a new topic, with one partition and one replication by default. we can change it in config/server.properties
Create a console consumer and Set consumer groups
# read from now on
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic
# read all messages
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning
# set consumers in group
# all the message sending to this group will be split into all consumers
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group group1
# consumer groups
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group group1
# Lag: show how many message has not received yet
# reset offset ---> offset descide by consumer
# --to-earliest / --shift-by n [offsets]
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group group1 --reset-offsets --to-earliest --execute --topic first_topic
# when a consumer leaves, reblance will happen
Add Keys to producer
# keys
kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --property parse.key=true --property key.separator=,
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --property print.key=true --property key.separator=,
UI Tools
kafka Tool is easy to use UI tool for manage topics, brokers, consumers. You can install it on Linux or windows. If you use WSL on windows, the zookeeper port is also open for windows.
# how to use in Linux
wget http://www.kafkatool.com/download2/kafkatool.sh
chmod +x kafkatool.sh
# after installation
cd ~/kafkatool2/
./kafkatool
Developing in Python
Kafka-python provides common functions for kafka. we can find more information here.
Consumer API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
# basic
from kafka import KafkaConsumer
consumer = KafkaConsumer('first_topic',bootstrap_servers=['localhost:9092'],auto_offset_reset='earliest')
for msg in consumer:
print (msg.value)
# other key parameters
# group_id, key_deserializer, value_deserializer,auto_offset_reset[earliest, latest,None]
# assign and seek offset
consumer.assign(partitions)
consumer.seek(partitions, offset) #Manually specify the fetch offset for a TopicPartition.
consumer.assignment() # read assigned partition
beginning_offsets(partitions)
end_offset(partitions)
Producer API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
# basic
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],acks = 1)
for _ in range(10):
future = producer.send('test-topic', b'some_message_bytes')
# or add key to fix the partitions by ordering
future = producer.send('test-topic', key = 'key', value = b'some_message_bytes')
# result = future.get(timeout=60) # do not block, it will kill performance
# print(result)
# producer.flush() # Block until all pending messages are at least put on the network
producer.close() # make sure to .close() your producer before shutting down your application
Serialization and Deserialization
- Serialization: the process of converting an object into a stream of bytes for the purpose of transmission
- Deserialization: the opposite of Serialization
# Serialize json messages
import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('fizzbuzz', {'foo': 'bar'})
# Deserialize msgpack-encoded values
consumer = KafkaConsumer(value_deserializer=msgpack.loads) # same as key_deserializer
consumer.subscribe(['msgpackfoo'])
for msg in consumer:
assert isinstance(msg.value, dict)
# Serialize string keys
producer = KafkaProducer(key_serializer=str.encode)
producer.send('flipflap', key='ping', value=b'1234')
# Compress messages
producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
producer.send('foobar', b'msg %d' % i)
Thread
Producer is thread safe, however, consumer is not. recommend use multiprocessing.
Client Compatibility
Always use the latest client library version, since older client/newer client can talk to any broker.
Reference:
kafka-python: https://github.com/dpkp/kafka-python
Apache Kafka Series – Learn Apache Kafka for Beginners v2
Apache Kafka Website, https://kafka.apache.org/
Leave a Reply