Kafka 單節點部署


       三種安裝Kafka的方式,分別為:單節點單Broker部署、單節點多Broker部署、集群部署(多節點多Broker)。實際生產環境中使用的是第三種方式,以集群的方式來部署Kafka。 

      Kafka強依賴ZK,如果想要使用Kafka,就必須安裝ZK,Kafka中的消費偏置信息、kafka集群、topic信息會被存儲在ZK中。有人可能會說我在使用Kafka的時候就沒有安裝ZK,那是因為Kafka內置了一個ZK,一般我們不使用它。

一、Kafka 單節點部署

Kafka中單節點部署又分為兩種,一種為單節點單Broker部署,一種為單節點多Broker部署。因為是單節點的Kafka,所以在安裝ZK時也只需要單節點即可。

ZooKeeper官網:http://zookeeper.apache.org/

下載Zookeeper並解壓到指定目錄

$ wget http://www-eu.apache.org/dist/zookeeper/zookeeper-3.5.1-alpha/zookeeper-3.5.1-alpha.tar.gz
$ tar -zxvf zookeeper-3.5.1-alpha.tar.gz -c /opt/zookeeper

 

進入Zookeeper的config目錄下

$ cd /opt/zookeeper/conf

 

拷貝zoo_sample.cfg文件重命名為zoo.cfg,然后修改dataDir屬性

# 數據的存放目錄
dataDir=/home/hadoop/zkdata
# 端口,默認就是2181
clientPort=2181

 

配置環境變量

# Zookeeper Environment Variable
export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

 

Zookeeper 啟動停止命令

$ zkServer.sh start
$ zkServer.sh stop

 

筆者在安裝完Zookeeper后,輸入命令啟動后,jps中並沒有查看到QuorumPeerMain進程,說明沒有啟動成功,進入Zookeeper的log目錄下查看日志,發現報了一個錯誤,如下

AdminServer$AdminServerException: Problem starting AdminServer on address 0.0.0.0, port 8080 and command URL /commands

原因:zookeeper的管理員端口被占用
解決:筆者使用的zookeeper的版本為3.5.1,該版本中有個內嵌的管理控制台是通過jetty啟動,會占用8080 端口,需要修改配置里的“admin.serverPort=8080”,默認8080沒有寫出來,只要改為一個沒使用的端口即可,例如:admin.serverPort=8181

1.Kafka 單節點單Broker部署及使用

部署架構

這里寫圖片描述

配置Kafka

參考官網:http://kafka.apache.org/quickstart

進入kafka的config目錄下,有一個server.properties,添加如下配置

# broker的全局唯一編號,不能重復
broker.id=0
# 監聽
listeners=PLAINTEXT://:9092
# 日志目錄
log.dirs=/home/hadoop/kafka-logs
# 配置zookeeper的連接(如果不是本機,需要該為ip或主機名)
zookeeper.connect=localhost:2181

 

啟動Zookeeper

[hadoop@Master ~]$ zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

 

啟動Kafka

$ kafka-server-start.sh $KAFKA_HOME/config/server.properties

 

打印的日志信息沒有報錯,可以看到如下信息

[Kafka Server 0], started (kafka.server.KafkaServer)

 

但是並不能保證Kafka已經啟動成功,輸入jps查看進程,如果可以看到Kafka進程,表示啟動成功

[hadoop@Master ~]$ jps
9173 Kafka
9462 Jps
8589 QuorumPeerMain
[hadoop@Master ~]$ jps -m
9472 Jps -m
9173 Kafka /opt/kafka/config/server.properties
8589 QuorumPeerMain /opt/zookeeper/bin/../conf/zoo.cfg

 

創建topic

[hadoop@Master ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

 

參數說明:
–zookeeper:指定kafka連接zk的連接url,該值和server.properties文件中的配置項{zookeeper.connect}一樣
–replication-factor:指定副本數量
–partitions:指定分區數量
–topic:主題名稱

查看所有的topic信息

[hadoop@Master ~]$ kafka-topics.sh --list --zookeeper localhost:2181 test

 

啟動生產者

[hadoop@Master ~]$ kafka-console-producer.sh --broker-list localhost:9092 --topic test

 

啟動消費者

[hadoop@Master ~]$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

 

測試

  • 生產者生產數據

這里寫圖片描述

  • 消費者消費數據

這里寫圖片描述

我們在啟動一個消費者,去掉后面的參數–from-beginning,看有什么區別

  • 生產者消費數據

這里寫圖片描述

  • 消費者1消費數據(含有–from-beginning參數)

這里寫圖片描述

  • 消費者2消費數據(沒有–from-beginning參數)

這里寫圖片描述

總結:–from-beginning參數如果有表示從最開始消費數據,舊的和新的數據都會被消費,而沒有該參數表示只會消費新產生的數據

2.Kafka 單節點多Broker部署及使用

部署架構

這里寫圖片描述

配置Kafka

參考官網:http://kafka.apache.org/quickstart

拷貝server.properties三份

[hadoop@Master ~]$ cd /opt/kafka/config
[hadoop@Master config]$ cp server.properties server-1.properties 
[hadoop@Master config]$ cp server.properties server-2.properties 
[hadoop@Master config]$ cp server.properties server-3.properties 

 

修改server-1.properties文件

# broker的全局唯一編號,不能重復
broker.id=1
# 監聽
listeners=PLAINTEXT://:9093
# 日志目錄
log.dirs=/home/hadoop/kafka-logs-1

 

修改server-2.properties文件

# broker的全局唯一編號,不能重復
broker.id=2
# 監聽
listeners=PLAINTEXT://:9094
# 日志目錄
log.dirs=/home/hadoop/kafka-logs-2

 

修改server-3.properties文件

# broker的全局唯一編號,不能重復
broker.id=3
# 監聽
listeners=PLAINTEXT://:9094
# 日志目錄
log.dirs=/home/hadoop/kafka-logs-3

 

啟動Zookeeper

[hadoop@Master ~]$ zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

 

啟動Kafka(分別啟動server1、2、3)

 
         
$ kafka-server-start.sh $KAFKA_HOME/config/server-1.properties
$ kafka-server-start.sh $KAFKA_HOME/config/server-2.properties
$ kafka-server-start.sh $KAFKA_HOME/config/server-3.properties

 

查看進程

[hadoop@Master ~]$ jps
11905 Kafka
11619 Kafka
8589 QuorumPeerMain
12478 Jps
12191 Kafka
[hadoop@Master ~]$ jps -m
11905 Kafka /opt/kafka/config/server-2.properties
11619 Kafka /opt/kafka/config/server-1.properties
12488 Jps -m
8589 QuorumPeerMain /opt/zookeeper/bin/../conf/zoo.cfg
12191 Kafka /opt/kafka/config/server-3.properties

 

創建topic(指定副本數量為3)

[hadoop@Master ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".

 

查看所有的topic信息

[hadoop@Master ~]$ kafka-topics.sh --list --zookeeper localhost:2181
my-replicated-topic
test

 

查看某個topic的詳細信息

[hadoop@Master ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 2,3,1 Isr: 2,3,1

 

啟動生產者

$ kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic my-replicated-topic

 

啟動消費者

$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-replicated-topic --from-beginning

 

測試

  • 生產者生產數據

這里寫圖片描述

  • 消費者消費數據

這里寫圖片描述

單節點多borker容錯性測試

Kafka是支持容錯的,上面我們已經完成了Kafka單節點多Blocker的部署,下面我們來對Kafka的容錯性進行測試,測試步驟如下

(1).查看topic的詳細信息,觀察那個blocker的角色是leader,那些blocker的角色是follower

[hadoop@Master ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 2,3,1 Isr: 2,3,1

 

(2).手工kill掉任意一個狀態是follower的borker,測試生成和消費信息是否正確

步驟1中可以看到 2 為leader,1 和 3為 follower,將follower為1的進程kill掉
這里寫圖片描述

啟動生產和消費者測試信息是否正確

這里寫圖片描述
這里寫圖片描述

結論:kill掉任意一個狀態是follower的broker,生成和消費信息正確,不受任何影響

(3).手工kill掉狀態是leader的borker,測試生產和消費的信息是否正確

borker2的角色為leader,將它kill掉,borker 3變成了leader
這里寫圖片描述

啟動生產和消費者測試信息是否正確

這里寫圖片描述

這里寫圖片描述

結論:kill掉狀態是leader的borker,生產和消費的信息正確

總結:不管當前狀態的borker是leader還是follower,當我們kill掉后,只要有一個borker能夠正常使用,則消息仍然能夠正常的生產和發送。即Kafka的容錯性是有保證的!

2.Kafka 集群搭建(多節點多Broker)

Kafka 集群方式部署,需要先安裝ZK集群,筆者是三個節點組成的集群,具體安裝配置請參考Hadoop HA 高可用集群搭建 中的ZK集群安裝,在這筆者主要介紹Kafka的集群安裝配置。

下載安裝包

wget http://mirror.bit.edu.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz

 

解壓安裝包

tar -zxvf kafka_2.10-0.9.0.0.tgz -C ~/export/servers/

 

創建軟連接

ln -s kafka_2.10-0.9.0.0/ kafka

 

修改配置文件server.properties

############################# Server Basics #############################
# broker 的全局唯一編號,不能重復
broker.id=0

############################# Socket Server Settings #############################
# 配置監聽,,默認
listeners=PLAINTEXT://:9092

# 用來監聽連接的端口,producer和consumer將在此端口建立連接,,默認
port=9092

# 處理網絡請求的線程數量,默認
num.network.threads=3

# 用來處理磁盤IO的線程數量,默認
num.io.threads=8

# 發送套接字的緩沖區大小,默認
socket.send.buffer.bytes=102400

# 接收套接字的緩沖區大小,默認
socket.receive.buffer.bytes=102400

# 請求套接字的緩沖區大小,默認
socket.request.max.bytes=104857600

############################# Log Basics #############################
# kafka 運行日志存放路徑
log.dirs=/root/export/servers/logs/kafka

# topic 在當前broker上的分片個數,默認為1
num.partitions=2

# 用來恢復和清理data下數據的線程數量,默認
num.recovery.threads.per.data.dir=1

############################# Log Retention Policy #############################
# segment文件保留的最長時間,超時將被刪除,默認
log.retention.hours=168

# 滾動生成新的segment文件的最大時間,默認
log.roll.hours=168

 

配置環境變量

# Kafka Environment Variable
export KAFKA_HOME=/root/export/servers/kafka
export PATH=$PATH:$KAFKA_HOME/bin

 

分發安裝包
注意:分發安裝包,也要創建軟連接,配置環境變量

scp -r ~/export/servers/kafka_2.10-0.9.0.0/ storm02:~/export/servers
scp -r ~/export/servers/kafka_2.10-0.9.0.0/ storm03:~/export/servers

 

再次修改配置文件

筆者的ZK集群,使用的節點的主機名分別為storm01、storm02、storm03
依次修改各服務器上配置文件server.properties 的 broker.id,分別是0,1,2不得重復
修改host.name分別為storm01,storm02,storm03

這里寫圖片描述
這里寫圖片描述

啟動Kafka集群
注意:在啟動Kafka集群前,確保ZK集群已經啟動且能夠正常運行

這里寫圖片描述

測試

  • 創建topic
[root@storm01 ~]# kafka-topics.sh --create --zookeeper storm01:2181 --replication-factor 3 --partitions 2 --topic test
Created topic "test".
[root@storm01 ~]# kafka-topics.sh --describe --zookeeper storm01:2181 --topic test
Topic:test  PartitionCount:2    ReplicationFactor:3 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2
    Topic: test Partition: 1    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

 

  • 啟動生產者
[root@storm01 ~]# kafka-console-producer.sh --broker-list storm01:9092,storm02:9092,storm03:9092 --topic test
hello
hello kafka cluster
test
hello storm

 

  • 啟動兩個消費者,消費消息
[root@storm02 ~]# kafka-console-consumer.sh --zookeeper storm02:2181 --topic test --from-beginning
hello
hello kafka cluster
test
hello storm

 

[root@storm03 ~]# kafka-console-consumer.sh --zookeeper storm03:2181 --topic test --from-beginning
hello
hello kafka cluster
test
hello storm

 

Kafka集群模式(多節點多Broker)下Broker容錯性測試

Kafka 單節點多Broker中筆者已經做了Broker的容錯性測試,得出的結論是:不管當前狀態的borker是leader還是follower,當我們kill掉后,只要有一個borker能夠正常使用,則消息仍然能夠正常的生產和發送。即Kafka的容錯性是有保證的!

Kafka 集群中和單節點多Broker的測試相同,結果相同,請參考Kafka 單節點多Broker容錯性測試


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM