本博文所使用kafka版本2.5.0,操作系統centos8.
1)啟動zookeeper
演示用的話,直接啟動kafka自帶的zookeeper即可:
cd kafkaDirectory/kafka_2.12-2.5.0 bin/zookeeper-server-start.sh config/zookeeper.properties
生產上建議連接到zookeeper集群,需更改配置文件 config/server.properties 里更改zookeeper配置zookeeper.connect=localhost:2181
2) 啟動kafka server
cd kafkaDirectory/kafka_2.12-2.5.0 bin/kafka-server-start.sh config/server.properties
3) 主題topic
創建 test Topic,一個partitions分區,一個replication-factor副本:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
列出topic:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
往test Topic里發送數據:
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
從test Topic里面從頭開始消費數據:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
更多kafka Topic相關細節請移步至: 《kafka2.5.0 主題Topic》
4) 本機搭建kafka集群
復制多個server.properties文件:
cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties
修改server-1.properties:
broker.id=1 listeners=PLAINTEXT://:9093 log.dirs=/tmp/kafka-logs-9093
修改server-2.properties:
broker.id=2 listeners=PLAINTEXT://:9094 log.dirs=/tmp/kafka-logs-9094
注意: broker.id 是每個節點唯一的id,並且永恆不變的。
5)查看Topic詳情
查看Topic詳情命令:
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test // 命令輸出:
Topic:
test
PartitionCount:1 ReplicationFactor:1 Configs:
Topic:
test
Partition: 0 Leader: 0 Replicas: 0 Isr: 0
6)生產者發送消息
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-replicated-topic my test message 1 my test message 2
7)消費者消費消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic my test message 1 my test message 2
end.
