環境
- CentOS7
- kafka2.5.0
准備
先搭建zookeeper,參考:https://www.cnblogs.com/caroar/p/13172921.html
下載&配置
mkdir /kafka
cd /kafka
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -zxvf kafka_2.12-2.5.0.tgz
mv kafka_2.12-2.5.0 kafka1
cd kafka1
vi config/server.properties
修改broker.id=1
修改listeners=PLAINTEXT://127.0.0.1:9091 #這個配置默認是注釋的,默認9092,如果在多台機器上搭建集群,這個可以不用動,單機搭得改,不然三個服務全都默認9092就沖突了
修改log.dirs=/kafka/kafka1/logs
修改zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
cd /kafka
cp -r kafka1 kafka2
cp -r kafka1 kafka 3
vi kafka2/config/server.properties
修改broker.id=2
修改listeners=PLAINTEXT://127.0.0.1:9092
修改log.dirs=/kafka/kafka2/logs
vi kafka3/config/server.properties
修改broker.id=3
修改listeners=PLAINTEXT://127.0.0.1:9093
修改log.dirs=/kafka/kafka3/logs
啟動
cd /kafka/kafka1
nohup sh bin/kafka-server-start.sh config/server.properties & #這里為什么用nohup因為不然啟動起來這個控制台就一直被占着了
cd ../kafka2
nohup sh bin/kafka-server-start.sh config/server.properties &
cd ../kafka3
nohup sh bin/kafka-server-start.sh config/server.properties &
創建Topic(1個分區1個備份)
cd /kafka/kafka1/bin
sh kafka-topics.sh --create --bootstrap-server 127.0.0.1:9091 --replication-factor 1 --partitions 1 --topic test
sh kafka-topics.sh --list --bootstrap-server 127.0.0.1:9091
發送消息
sh kafka-console-producer.sh --bootstrap-server 127.0.0.1:9091 --topic test
# 這里一行一行的輸入消息
接收消息
sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9091 --topic test --from-beginning
創建Topic(1個分區3個備份)
sh kafka-topics.sh --create --bootstrap-server 127.0.0.1:9091 --replication-factor 3 --partitions 1 --topic my-replicated-topic
sh kafka-topics.sh --list --bootstrap-server 127.0.0.1:9091
sh kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9091 --topic my-replicated-topic # 查看當前的leader等信息
備注
官方指引:http://kafka.apache.org/quickstart
問題
注意 0.8.2.1 版本中 consumer.pull 方法里面直接返回 null 的,也是很神奇,高版本里面是有代碼的。