centos7安裝kafka單機版


kafka 自帶zookeeper,如果不需要自帶的可以重新安裝

這里是自己安裝zookeeper 參考下面

https://www.cnblogs.com/zk1023/p/10865237.html

在重新裝個jdk

1.下載kafka

[root@zsls local]# cd /usr/local/src/

[root@zsls src]# wget http://mirror.bit.edu.cn/apache/kafka/2.3.1/kafka_2.11-2.3.1.tgz

2.解壓壓縮包

[root@zsls src]# tar -zxvf kafka_2.11-2.3.1.tgz -C /usr/local/

3.修改配置

[root@zsls kafka_2.11-2.3.1]# cd /usr/local/kafka_2.11-2.3.1/config/

[root@zsls config]# vim server.properties

//這是這台虛擬機上的值,在另外兩台虛擬機上應該是2或者3,這個值是唯一的,每台虛擬機或者叫服務器不能相同

broker.id=1

//設置本機IP和端口:這個IP地址也是與本機相關的,每台服務器上設置為自己的IP地址,端口號默認是9092,可以自己設置其他的 listeners=PLAINTEXT://192.168.93.116:9092

//在log.retention.hours=168下面新增下面三項

message.max.byte=5242880

// 單機版 復制因子默認1 如果是集群的化根據情況配置

 

default.replication.factor=1

replica.fetch.max.bytes=5242880

//指定日志位置

log.dirs=/data/kafka-logs

//設置日志刪除

log.cleanup.polict=delete

log.segment.delete.delay.ms=1000

log.cleanup.interval.mins=1

log.retention.check.interval.ms=1000

//設置zookeeper的連接端口,zookeeper.connect可以設置多個值,多個值之間用逗號分隔

zookeeper.connect=192.168.93.110:2181

 

=======================配置參數詳解===============================

broker.id=0 #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣

port=9092 #當前kafka對外提供服務的端口默認是9092

host.name=192.168.93.110 #這個參數默認是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。

num.network.threads=3 #這個是borker進行網絡處理的線程數

num.io.threads=8 #這個是borker進行I/O處理的線程數

log.dirs=/data/kafka-logs #消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個

socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,數據不是一下子就發送的,先回存儲到緩沖區了到達一定的大小后在發送,能提高性能

socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤

socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小

num.partitions=1 #默認的分區數,一個topic默認1個分區數

log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天

message.max.byte=5242880 #消息保存的最大值5M

default.replication.factor=2 #kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務

replica.fetch.max.bytes=5242880 #取消息的最大直接數

log.segment.bytes=1073741824 #這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件

log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除

log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能

zookeeper.connect=192.168.93.110:12181,192.168.93.111:12181,192.168.93.112:1218#設置zookeeper的連接端口

advertised.listeners:發布到ZooKeeper上供客戶端使用的監聽器,若未配置,則使用配置的listeners屬性

listeners:逗號分隔的需要監聽的URL和協議

# 允許外部端口連接
listeners=PLAINTEXT://0.0.0.0:9092   協議://<公網 ip>:端口(或者 0.0.0.0:端口)
# 外部代理地址
advertised.listeners=PLAINTEXT://192.168.93.110:9092  協議://<宿主機ip>:<宿主機暴露的端口>

 

=======================配置參數詳解===============================

4.啟動

# cd /usr/local/kafka_2.11-2.3.1/bin/

# ./kafka-server-start.sh -daemon ../config/server.properties

5.創建topic來驗證是否啟動成功
[root@zsls bin]# ./kafka-topics.sh --create --zookeeper 192.168.93.110:2181 --replication-factor 1 --partitions 1 --topic my-topic
利用list命令列出所有創建了的topics,來產看剛才創建的topic是否存在

[root@zsls bin]#./kafka-topics.sh --list --zookeeper 192.168.93.110:2181
創建 producer(生產者): 測試生產消息

[root@zsls bin]# ./kafka-console-producer.sh --broker-list 192.168.93.116:9092 --topic my-topic
創建 consumer(消費者): 測試消費
[root@zsls bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.93.116:9092 -topic my-topic --from-beginning
刪除 topic

[root@zsls bin]# ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

 
        
//外部訪問 開通端口
firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload

//程序代碼中如果需要kafka的groupId  這個是在Kafka安裝目錄的配置文件的comsume.properties里面配置
spring.kafka.consumer.group-id=test-consumer-group
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM