Kafka強依賴ZK,如果想要使用Kafka,就必須安裝ZK,Kafka中的消費偏置信息、kafka集群、topic信息會被存儲在ZK中。有人可能會說我在使用Kafka的時候就沒有安裝ZK,那是因為Kafka內置了一個ZK,一般我們不使用它。
一、Kafka 單節點部署
Kafka中單節點部署又分為兩種,一種為單節點單Broker部署,一種為單節點多Broker部署。因為是單節點的Kafka,所以在安裝ZK時也只需要單節點即可。
ZooKeeper官網:http://zookeeper.apache.org/
下載Zookeeper並解壓到指定目錄
$ wget http://www-eu.apache.org/dist/zookeeper/zookeeper-3.5.1-alpha/zookeeper-3.5.1-alpha.tar.gz $ tar -zxvf zookeeper-3.5.1-alpha.tar.gz -c /opt/zookeeper
進入Zookeeper的config目錄下
$ cd /opt/zookeeper/conf
拷貝zoo_sample.cfg文件重命名為zoo.cfg,然后修改dataDir屬性
# 數據的存放目錄 dataDir=/home/hadoop/zkdata # 端口,默認就是2181 clientPort=2181
配置環境變量
# Zookeeper Environment Variable export ZOOKEEPER_HOME=/opt/zookeeper export PATH=$PATH:$ZOOKEEPER_HOME/bin
Zookeeper 啟動停止命令
$ zkServer.sh start
$ zkServer.sh stop
筆者在安裝完Zookeeper后,輸入命令啟動后,jps中並沒有查看到QuorumPeerMain進程,說明沒有啟動成功,進入Zookeeper的log目錄下查看日志,發現報了一個錯誤,如下
AdminServer$AdminServerException: Problem starting AdminServer on address 0.0.0.0, port 8080 and command URL /commands
原因:zookeeper的管理員端口被占用
解決:筆者使用的zookeeper的版本為3.5.1,該版本中有個內嵌的管理控制台是通過jetty啟動,會占用8080 端口,需要修改配置里的“admin.serverPort=8080”,默認8080沒有寫出來,只要改為一個沒使用的端口即可,例如:admin.serverPort=8181
1.Kafka 單節點單Broker部署及使用
部署架構
配置Kafka
參考官網:http://kafka.apache.org/quickstart
進入kafka的config目錄下,有一個server.properties,添加如下配置
# broker的全局唯一編號,不能重復 broker.id=0 # 監聽 listeners=PLAINTEXT://:9092 # 日志目錄 log.dirs=/home/hadoop/kafka-logs # 配置zookeeper的連接(如果不是本機,需要該為ip或主機名) zookeeper.connect=localhost:2181
啟動Zookeeper
[hadoop@Master ~]$ zkServer.sh start ZooKeeper JMX enabled by default Using config: /opt/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
啟動Kafka
$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
打印的日志信息沒有報錯,可以看到如下信息
[Kafka Server 0], started (kafka.server.KafkaServer)
但是並不能保證Kafka已經啟動成功,輸入jps查看進程,如果可以看到Kafka進程,表示啟動成功
[hadoop@Master ~]$ jps 9173 Kafka 9462 Jps 8589 QuorumPeerMain [hadoop@Master ~]$ jps -m 9472 Jps -m 9173 Kafka /opt/kafka/config/server.properties 8589 QuorumPeerMain /opt/zookeeper/bin/../conf/zoo.cfg
創建topic
[hadoop@Master ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
參數說明:
–zookeeper:指定kafka連接zk的連接url,該值和server.properties文件中的配置項{zookeeper.connect}一樣
–replication-factor:指定副本數量
–partitions:指定分區數量
–topic:主題名稱
查看所有的topic信息
[hadoop@Master ~]$ kafka-topics.sh --list --zookeeper localhost:2181 test
啟動生產者
[hadoop@Master ~]$ kafka-console-producer.sh --broker-list localhost:9092 --topic test
啟動消費者
[hadoop@Master ~]$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
測試
- 生產者生產數據
- 消費者消費數據
我們在啟動一個消費者,去掉后面的參數–from-beginning,看有什么區別
- 生產者消費數據
- 消費者1消費數據(含有–from-beginning參數)
- 消費者2消費數據(沒有–from-beginning參數)
總結:–from-beginning參數如果有表示從最開始消費數據,舊的和新的數據都會被消費,而沒有該參數表示只會消費新產生的數據
2.Kafka 單節點多Broker部署及使用
部署架構
配置Kafka
參考官網:http://kafka.apache.org/quickstart
拷貝server.properties三份
[hadoop@Master ~]$ cd /opt/kafka/config [hadoop@Master config]$ cp server.properties server-1.properties [hadoop@Master config]$ cp server.properties server-2.properties [hadoop@Master config]$ cp server.properties server-3.properties
修改server-1.properties文件
# broker的全局唯一編號,不能重復 broker.id=1 # 監聽 listeners=PLAINTEXT://:9093 # 日志目錄 log.dirs=/home/hadoop/kafka-logs-1
修改server-2.properties文件
# broker的全局唯一編號,不能重復 broker.id=2 # 監聽 listeners=PLAINTEXT://:9094 # 日志目錄 log.dirs=/home/hadoop/kafka-logs-2
修改server-3.properties文件
# broker的全局唯一編號,不能重復 broker.id=3 # 監聽 listeners=PLAINTEXT://:9094 # 日志目錄 log.dirs=/home/hadoop/kafka-logs-3
啟動Zookeeper
[hadoop@Master ~]$ zkServer.sh start ZooKeeper JMX enabled by default Using config: /opt/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
啟動Kafka(分別啟動server1、2、3)
$ kafka-server-start.sh $KAFKA_HOME/config/server-1.properties $ kafka-server-start.sh $KAFKA_HOME/config/server-2.properties $ kafka-server-start.sh $KAFKA_HOME/config/server-3.properties
查看進程
[hadoop@Master ~]$ jps 11905 Kafka 11619 Kafka 8589 QuorumPeerMain 12478 Jps 12191 Kafka [hadoop@Master ~]$ jps -m 11905 Kafka /opt/kafka/config/server-2.properties 11619 Kafka /opt/kafka/config/server-1.properties 12488 Jps -m 8589 QuorumPeerMain /opt/zookeeper/bin/../conf/zoo.cfg 12191 Kafka /opt/kafka/config/server-3.properties
創建topic(指定副本數量為3)
[hadoop@Master ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic Created topic "my-replicated-topic".
查看所有的topic信息
[hadoop@Master ~]$ kafka-topics.sh --list --zookeeper localhost:2181 my-replicated-topic test
查看某個topic的詳細信息
[hadoop@Master ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
啟動生產者
$ kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic my-replicated-topic
啟動消費者
$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-replicated-topic --from-beginning
測試
- 生產者生產數據
- 消費者消費數據
單節點多borker容錯性測試
Kafka是支持容錯的,上面我們已經完成了Kafka單節點多Blocker的部署,下面我們來對Kafka的容錯性進行測試,測試步驟如下
(1).查看topic的詳細信息,觀察那個blocker的角色是leader,那些blocker的角色是follower
[hadoop@Master ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
(2).手工kill掉任意一個狀態是follower的borker,測試生成和消費信息是否正確
步驟1中可以看到 2 為leader,1 和 3為 follower,將follower為1的進程kill掉
啟動生產和消費者測試信息是否正確
結論:kill掉任意一個狀態是follower的broker,生成和消費信息正確,不受任何影響
(3).手工kill掉狀態是leader的borker,測試生產和消費的信息是否正確
borker2的角色為leader,將它kill掉,borker 3變成了leader
啟動生產和消費者測試信息是否正確
結論:kill掉狀態是leader的borker,生產和消費的信息正確
總結:不管當前狀態的borker是leader還是follower,當我們kill掉后,只要有一個borker能夠正常使用,則消息仍然能夠正常的生產和發送。即Kafka的容錯性是有保證的!
2.Kafka 集群搭建(多節點多Broker)
Kafka 集群方式部署,需要先安裝ZK集群,筆者是三個節點組成的集群,具體安裝配置請參考Hadoop HA 高可用集群搭建 中的ZK集群安裝,在這筆者主要介紹Kafka的集群安裝配置。
下載安裝包
wget http://mirror.bit.edu.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz
解壓安裝包
tar -zxvf kafka_2.10-0.9.0.0.tgz -C ~/export/servers/
創建軟連接
ln -s kafka_2.10-0.9.0.0/ kafka
修改配置文件server.properties
############################# Server Basics ############################# # broker 的全局唯一編號,不能重復 broker.id=0 ############################# Socket Server Settings ############################# # 配置監聽,,默認 listeners=PLAINTEXT://:9092 # 用來監聽連接的端口,producer和consumer將在此端口建立連接,,默認 port=9092 # 處理網絡請求的線程數量,默認 num.network.threads=3 # 用來處理磁盤IO的線程數量,默認 num.io.threads=8 # 發送套接字的緩沖區大小,默認 socket.send.buffer.bytes=102400 # 接收套接字的緩沖區大小,默認 socket.receive.buffer.bytes=102400 # 請求套接字的緩沖區大小,默認 socket.request.max.bytes=104857600 ############################# Log Basics ############################# # kafka 運行日志存放路徑 log.dirs=/root/export/servers/logs/kafka # topic 在當前broker上的分片個數,默認為1 num.partitions=2 # 用來恢復和清理data下數據的線程數量,默認 num.recovery.threads.per.data.dir=1 ############################# Log Retention Policy ############################# # segment文件保留的最長時間,超時將被刪除,默認 log.retention.hours=168 # 滾動生成新的segment文件的最大時間,默認 log.roll.hours=168
配置環境變量
# Kafka Environment Variable export KAFKA_HOME=/root/export/servers/kafka export PATH=$PATH:$KAFKA_HOME/bin
分發安裝包
注意:分發安裝包,也要創建軟連接,配置環境變量
scp -r ~/export/servers/kafka_2.10-0.9.0.0/ storm02:~/export/servers scp -r ~/export/servers/kafka_2.10-0.9.0.0/ storm03:~/export/servers
再次修改配置文件
筆者的ZK集群,使用的節點的主機名分別為storm01、storm02、storm03
依次修改各服務器上配置文件server.properties 的 broker.id,分別是0,1,2不得重復
修改host.name分別為storm01,storm02,storm03
啟動Kafka集群
注意:在啟動Kafka集群前,確保ZK集群已經啟動且能夠正常運行
測試
- 創建topic
[root@storm01 ~]# kafka-topics.sh --create --zookeeper storm01:2181 --replication-factor 3 --partitions 2 --topic test Created topic "test". [root@storm01 ~]# kafka-topics.sh --describe --zookeeper storm01:2181 --topic test Topic:test PartitionCount:2 ReplicationFactor:3 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: test Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
- 啟動生產者
[root@storm01 ~]# kafka-console-producer.sh --broker-list storm01:9092,storm02:9092,storm03:9092 --topic test hello hello kafka cluster test hello storm
- 啟動兩個消費者,消費消息
[root@storm02 ~]# kafka-console-consumer.sh --zookeeper storm02:2181 --topic test --from-beginning hello hello kafka cluster test hello storm
[root@storm03 ~]# kafka-console-consumer.sh --zookeeper storm03:2181 --topic test --from-beginning hello hello kafka cluster test hello storm
Kafka集群模式(多節點多Broker)下Broker容錯性測試
Kafka 單節點多Broker中筆者已經做了Broker的容錯性測試,得出的結論是:不管當前狀態的borker是leader還是follower,當我們kill掉后,只要有一個borker能夠正常使用,則消息仍然能夠正常的生產和發送。即Kafka的容錯性是有保證的!
Kafka 集群中和單節點多Broker的測試相同,結果相同,請參考Kafka 單節點多Broker容錯性測試