Kafka是由Apache軟件基金會開發的一個開源流處理平台,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。 這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解決。 對於像Hadoop一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。Kafka是一種高吞吐量的分布式發布訂閱消息系統:
(1)通過O的磁盤數據結構提供消息的持久化,夠保持長時間的穩定性能。
(2)高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒數百萬的消息。
(3)支持通過Kafka服務器和消費機集群來分區消息。
(4)支持Hadoop並行數據加載。
本博客主要以:單節點單Broker部署、單節點多Broker部署、集群部署(多節點多Broker)來講解。在實際生產環境中常用的是第三種方式,以集群的方式來部署Kafka。Kafka比較依賴zookeeper集群,如果想要使用Kafka,就必須部署zookeeper集群,Kafka中的消費偏置信息、kafka集群、topic信息會被存儲在ZK中。
在部署集群前,需要部署部署zookeeper集群,直接按照zookeeper3.5.5集群部署部署並啟動。
一、Kafka 單節點部署
准備:
關閉防火牆:systemctl stop firewalld.service
禁止防火牆自動啟動:systemctl disable firewalld.service
關閉selinux : setenforce 0
禁止selinux啟動:vim /etc/selinux/config
SELINUX=disabled
各節點的host解析:
10.0.0.11 node01
10.0.0.12 node02 10.0.0.13 node03
1.Kafka 單節點單Broker部署及使用
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.1/kafka_2.12-2.2.1.tgz
安裝
tar xf kafka_2.12-2.2.1.tgz -C /usr/local/src ln -s /usr/local/src/kafka_2.12-2.2.1 /usr/local/kafka
配置kafka
參考官網:http://kafka.apache.org/quickstart
mkdir -p /data/kafka/logs
添加環境變量
echo -e 'export KAFKA_HOME=/usr/local/kafka\nexport PATH=$KAFKA_HOME/bin:$PATH' >>/etc/profile
source /etc/profile
進入kafka的config目錄下,在server.properties文件,添加如下配置
vim /usr/local/kafka/config/server.properties
#broker id 全局唯一 broker.id=0 #監聽 port=9092 #日志目錄 log.dirs=/data/kafka/logs #配置zookeeper的連接 zookeeper.connect=node01:2181
啟動kafka
kafka-server-start.sh $KAFKA_HOME/config/server.properties
打印的日志信息沒有報錯,可以看到如下信息
[2019-06-16 17:44:14,757] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
但是並不能保證Kafka已經啟動成功,輸入jps查看進程,如果可以看到Kafka進程,表示啟動成功
[root@node01 ~]# jps 8577 QuorumPeerMain 11000 Jps 1899 -- process information unavailable 10637 Kafka [root@node01 ~]# jps -m 8577 QuorumPeerMain /usr/local/zookeeper/bin/../conf/zoo.cfg 1899 -- process information unavailable 10637 Kafka config/server.properties 11022 Jps -m
創建topic
kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic test
參數說明:
--zookeeper:指定kafka連接zk的連接url,該值和server.properties文件中的配置項{zookeeper.connect}一樣
--replication-factor:指定副本數量
--partitions:指定分區數量
--topic:主題名稱
刪除主題
kafka-topics.sh --delete --zookeeper node01:2181 --topic test
查看所有的topic信息
kafka-topics.sh --list --zookeeper node01:2181 test
啟動生產者
kafka-console-producer.sh --broker-list node01:9092 --topic test
啟動消費者
kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic test

備注
--from-beginning:如果有表示從最開始消費數據,舊的和新的數據都會被消費,而沒有該參數表示只會消費新產生的數據。
2.Kafka 單節點多Broker部署及使用
配置Kafka
參考官網:http://kafka.apache.org/quickstart
拷貝server.properties三份
cd /usr/local/kafka/config/ cp server.properties server-1.properties cp server.properties server-2.properties cp server.properties server-3.properties
修改server-1.properties
vim /usr/local/kafka/config/server-1.properties
#broker id 全局唯一 broker.id=1 #監聽 port=9093 #日志目錄 log.dirs=/data/kafka/logs-1 #配置zookeeper的連接 zookeeper.connect=node01:2181
修改server-2.properties
vim /usr/local/kafka/config/server-2.properties
#broker id 全局唯一 broker.id=2 #監聽 port=9094 #日志目錄 log.dirs=/data/kafka/logs-2 #配置zookeeper的連接 zookeeper.connect=node01:2181
修改server-3.properties
vim /usr/local/kafka/config/server-3.properties
#broker id 全局唯一 broker.id=3 #監聽 port=9095 #日志目錄 log.dirs=/data/kafka/logs-3 #配置zookeeper的連接 zookeeper.connect=node01:2181
創建日志文件夾
mkdir -p /data/kafka/{logs-1,logs-2,logs-3}
啟動Kafka(分別啟動server1、2、3)
kafka-server-start.sh $KAFKA_HOME/config/server-1.properties kafka-server-start.sh $KAFKA_HOME/config/server-2.properties kafka-server-start.sh $KAFKA_HOME/config/server-3.properties
查看進程
[root@node01 ~]# jps 8577 QuorumPeerMain 18353 Jps 17606 Kafka 1899 -- process information unavailable 17243 Kafka 17964 Kafka [root@node01 ~]# jps -m 8577 QuorumPeerMain /usr/local/zookeeper/bin/../conf/zoo.cfg 17606 Kafka /usr/local/kafka/config/server-2.properties 1899 -- process information unavailable 17243 Kafka /usr/local/kafka/config/server-1.properties 18363 Jps -m 17964 Kafka /usr/local/kafka/config/server-3.properties
創建topic(指定副本數量為3)
kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 1 --topic wzxmt
查看所有的topic信息
kafka-topics.sh --list --zookeeper node01:2181

查看某個topic的詳細信息
kafka-topics.sh --describe --zookeeper node01:2181 --topic wzxmt

啟動生產者
kafka-console-producer.sh --broker-list node01:9093,node01:9094,node01:9095 --topic wzxmt
啟動消費者
kafka-console-consumer.sh --bootstrap-server node01:2181 --from-beginning --topic wzxmt

單節點多borker容錯性測試
Kafka是支持容錯的,上面我們已經完成了Kafka單節點多Blocker的部署,下面我們來對Kafka的容錯性進行測試,測試步驟如下
(1).查看topic的詳細信息,觀察那個blocker的角色是leader,那些blocker的角色是follower
kafka-topics.sh --describe --zookeeper node01:2181 --topic wzxmt

(2).手工kill掉任意一個狀態是follower的borker,測試生成和消費信息是否正確
步驟1中可以看到 1為leader,2 和 3為 follower,將follower為2的進程kill掉

啟動生產和消費者測試信息是否正確

結論:kill掉任意一個狀態是follower的broker,生成和消費信息正確,不受任何影響.
(3).手工kill掉狀態是leader的borker,測試生產和消費的信息是否正確
borker1的角色為leader,將它kill掉,borker 3變成了leader .

啟動生產和消費者測試信息是否正確.

結論:kill掉狀態是leader的borker,生產和消費的信息正確
總結:不管當前狀態的borker是leader還是follower,當我們kill掉后,只要有一個borker能夠正常使用,則消息仍然能夠正常的生產和發送。即Kafka的容錯性是有保證的!
二、Kafka 多節點部署(多節點多borker)
在多個節點進行安裝部署,部署方式與前面單節點一樣。
修改配置文件server.properties
vim /usr/local/kafka/config/server.properties
#broker的全局唯一編號,不能重復,只能是數字
broker.id=1
#此處的host.name為本機IP(重要),如果不改,則客戶端會拋出:Producerconnection to node01:9092 unsuccessful 錯誤!
host.name=10.0.0.11
#用來監聽鏈接的端口,producer或consumer將在此端口建立連接
port=9092
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤IO的線程數量
num.io.threads=8
#發送套接字的緩沖區大小
socket.send.buffer.bytes=102400
#接受套接字的緩沖區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區大小
socket.request.max.bytes=104857600
#kafka消息存放的路徑(持久化到磁盤)
log.dirs=/data/kafka/logs
#topic在當前broker上的分片個數
num.partitions=2
#用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168
#滾動生成新的segment文件的最大時間
log.roll.hours=168
#日志文件中每個segment的大小,默認為1G
log.segment.bytes=1073741824
#周期性檢查文件大小的時間
log.retention.check.interval.ms=300000
#日志清理是否打開
log.cleaner.enable=true
#broker需要使用zookeeper保存meta數據
zookeeper.connect=node01:2181,node02:2181,node03:2181
#zookeeper鏈接超時時間
zookeeper.connection.timeout.ms=6000
#partion buffer中,消息的條數達到閾值,將觸發flush到磁盤
log.flush.interval.messages=10000
#消息buffer的時間,達到閾值,將觸發flush到磁盤
log.flush.interval.ms=3000
#刪除topic需要server.properties中設置delete.topic.enable=true否則只是標記刪除
delete.topic.enable=true
#延遲初始使用者重新平衡的時間(生產用3)
group.initial.rebalance.delay.ms=0
#broker能接收消息的最大字節數
message.max.bytes=2000000000
#broker可復制的消息的最大字節數
replica.fetch.max.bytes=2000000000
#消費者端的可讀取的最大消息
fetch.message.max.bytes=2000000000
不同節點之間只需要修改server.properties 的 broker.id,broker.id不能相同。
保證zookeeper集群運行正常情況下,啟動服務
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
#備注”&“或”-daemon"在后台運行,不占用當前窗口
查看Kafka是否啟動成功,輸入jps查看進程,如果可以看到Kafka進程,表示啟動成功

創建topic
kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 2 --topic www
查看topic詳細信息
kafka-topics.sh --describe --zookeeper node01:2181 --topic www

啟動生產者
[root@node01 ~]# kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic www
啟動2個消費者
[root@node02 bin]# kafka-console-consumer.sh --bootstrap-server node02:9092 --from-beginning --topic www [root@node03 bin]# kafka-console-consumer.sh --bootstrap-server node03:9092 --from-beginning --topic www

Kafka 集群與單節點多Broker是測試相同,結果相同;從Kafka 單節點多Broker與多節點多Broker的容錯性測試,得出的結論是:
不管當前狀態的borker是leader還是follower,當我們kill掉后,只要有一個borker能夠正常使用,則消息仍然能夠正常的生產和發送。即Kafka的容錯性是有保證!
