kafka 自帶zookeeper,如果不需要自帶的可以重新安裝
這里是自己安裝zookeeper 參考下面
https://www.cnblogs.com/zk1023/p/10865237.html
在重新裝個jdk
1.下載kafka
[root@zsls local]# cd /usr/local/src/
[root@zsls src]# wget http://mirror.bit.edu.cn/apache/kafka/2.3.1/kafka_2.11-2.3.1.tgz
2.解壓壓縮包
[root@zsls src]# tar -zxvf kafka_2.11-2.3.1.tgz -C /usr/local/
3.修改配置
[root@zsls kafka_2.11-2.3.1]# cd /usr/local/kafka_2.11-2.3.1/config/
[root@zsls config]# vim server.properties
//這是這台虛擬機上的值,在另外兩台虛擬機上應該是2或者3,這個值是唯一的,每台虛擬機或者叫服務器不能相同
broker.id=1
//設置本機IP和端口:這個IP地址也是與本機相關的,每台服務器上設置為自己的IP地址,端口號默認是9092,可以自己設置其他的 listeners=PLAINTEXT://192.168.93.116:9092
//在log.retention.hours=168下面新增下面三項
message.max.byte=5242880
// 單機版 復制因子默認1 如果是集群的化根據情況配置
default.replication.factor=1
replica.fetch.max.bytes=5242880
//指定日志位置
log.dirs=/data/kafka-logs
//設置日志刪除
log.cleanup.polict=delete
log.segment.delete.delay.ms=1000
log.cleanup.interval.mins=1
log.retention.check.interval.ms=1000
//設置zookeeper的連接端口,zookeeper.connect可以設置多個值,多個值之間用逗號分隔
zookeeper.connect=192.168.93.110:2181
=======================配置參數詳解===============================
broker.id=0 #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣
port=9092 #當前kafka對外提供服務的端口默認是9092
host.name=192.168.93.110
#這個參數默認是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。
num.network.threads=3 #這個是borker進行網絡處理的線程數
num.io.threads=8 #這個是borker進行I/O處理的線程數
log.dirs=/data/kafka-logs #消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個
socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,數據不是一下子就發送的,先回存儲到緩沖區了到達一定的大小后在發送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤
socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小
num.partitions=1 #默認的分區數,一個topic默認1個分區數
log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務
replica.fetch.max.bytes=5242880 #取消息的最大直接數
log.segment.bytes=1073741824 #這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除
log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能
zookeeper.connect=192.168.93.110:12181,192.168.93.111:12181,192.168.93.112:1218
#設置zookeeper的連接端口
advertised.listeners:發布到ZooKeeper上供客戶端使用的監聽器,若未配置,則使用配置的listeners屬性
listeners:逗號分隔的需要監聽的URL和協議
# 允許外部端口連接
listeners=PLAINTEXT://0.0.0.0:9092 協議://<公網 ip>:端口(或者 0.0.0.0:端口)
# 外部代理地址
advertised.listeners=PLAINTEXT://192.168.93.110:9092 協議://<宿主機ip>:<宿主機暴露的端口>
=======================配置參數詳解===============================
4.啟動
# cd /usr/local/kafka_2.11-2.3.1/bin/
# ./kafka-server-start.sh -daemon ../config/server.properties
5.創建topic來驗證是否啟動成功
[root@zsls bin]# ./kafka-topics.sh --create --zookeeper 192.168.93.110:2181 --replication-factor 1 --partitions 1 --topic my-topic
利用list命令列出所有創建了的topics,來產看剛才創建的topic是否存在
[root@zsls bin]#
./kafka-topics.sh --list --zookeeper 192.168.93.110
:2181
創建 producer(生產者): 測試生產消息
[root@zsls bin]# ./kafka-console-producer.sh --broker-list 192.168.93.116:9092 --topic my-topic
創建 consumer(消費者): 測試消費
[root@zsls bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.93.116:9092 -topic my-topic --from-beginning
刪除 topic
[root@zsls bin]#
.
/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
//外部訪問 開通端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload
//程序代碼中如果需要kafka的groupId 這個是在Kafka安裝目錄的配置文件的comsume.properties里面配置
spring.kafka.consumer.group-id=test-consumer-group