1.官網或 wget 下載 kafka_2.12-2.2.0.tgz 二進制代碼包
cd /home/tar
wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz
2.解壓安裝
tar -zxvf kafka_2.12-2.2.0.tgz
mv kafka_2.12-2.2.0 /usr/local/
3.編輯 server.propertie 配置文件(配置修改待完善)
備份文件
cd /usr/local/kafka_2.12-2.2.0/config
cp server.propertie server.propertie.bak
編輯文件
vi server.propertie
主要修改一下幾個地方:
broker.id=123 #每台服務器的broker.id都不能相同 host.name=xxx-xxx-xxx-xxx #主機名 listeners=PLAINTEXT://ip:9092 #監聽地址
log.dirs=/usr/local/kafka_2.12-2.2.0/logs #在log.retention.hours=168 下追加 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #設置zookeeper的連接端口 zookeeper.connect=ip:12181
4.啟動服務
啟動命令
/usr/local/kafka_2.12-2.2.0/bin/kafka-server-start.sh -daemon ../config/server.propertie
在終端輸入“jps”,若啟動成功會得到如下結果:
7253 Jps 5850 ZooKeeperMain 6076 QuorumPeerMain 6093 Kafka
其中QuorumPeerMain是 zookeeper 的守護進程,kafka 是 kafka 的守護進程
5.常用命令
停止服務
/usr/local/kafka_2.12-2.2.0/bin/kafka-server-stop.sh
創建主題
/usr/local/kafka_2.12-2.2.0/bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test-topic
列出主題
/usr/local/kafka_2.12-2.2.0/bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
運行producer並發送消息
/usr/local/kafka_2.12-2.2.0/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test-topic
運行consumer並接收消息
/usr/local/kafka_2.12-2.2.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-topic --from-beginning
6.集群配置
待續
附:
1. server.propertie配置釋義
broker.id=0 #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣,但是不管你怎么配,別配0就是,不然創建Topic的時候回報錯。 port=19092 #當前kafka對外提供服務的端口默認是9092 host.name=192.168.7.100 #這個參數默認是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。 num.network.threads=3 #這個是borker進行網絡處理的線程數 num.io.threads=8 #這個是borker進行I/O處理的線程數 log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個 socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,數據不是一下子就發送的,先回存儲到緩沖區了到達一定的大小后在發送,能提高性能 socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤 socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小 num.partitions=1 #默認的分區數,一個topic默認1個分區數 log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天 message.max.byte=5242880 #消息保存的最大值5M default.replication.factor=2 #kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務 replica.fetch.max.bytes=5242880 #取消息的最大直接數 log.segment.bytes=1073741824 #這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除 log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能 zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #設置zookeeper的連接端口
2.啟動時內存不足異常解決
## There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (malloc) failed to allocate 986513408 bytes for committing reserved memory. # An error report file with more information is saved as: # //hs_err_pid6500.logOpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000bad30000, 986513408, 0) failed; error='Cannot allocate memory' (errno=12)
原因:kafka 啟動腳本 kafka-server-start.sh 中指定了啟動時需要的最小內存,默認為1G
解決方法:
修改啟動腳本
cd /usr/local/kafka_2.12-2.2.0/bin/
cp kafka-server-start.sh kafka-server-start.sh.bak
vi kafka-server-start.sh
修改
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
為
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
3.啟動時出現oom異常解決
[ FATAL ] Fatal error during KafkaServerStable startup. Prepare to shutdown java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at kafka.log.SkimpyOffsetMap.(OffsetMap.scala:42)
原因:kafka啟動時分配的內存國小導致
解決方法:同上
參考文章:
https://www.w3cschool.cn/apache_kafka/apache_kafka_installation_steps.html w3cschool
https://www.cnblogs.com/krockey/p/9068129.html kafka安裝
https://blog.csdn.net/qq982782662/article/details/82810415 常用命令
https://blog.csdn.net/yundanfengqingfeng/article/details/84781852 常用命令
https://blog.csdn.net/r02221/article/details/55225036 啟動異常解決
https://blog.csdn.net/wqh8522/article/details/79163467 集群配置
https://blog.csdn.net/zzq900503/article/details/83348419 集群配置