kafka的部署模式
- 單節點Broker部署
- 單節點多Broker部署
- 集群部署(多節點多Broker部署)
- 實際的生產環境中使用的是第3中方式,以集群的方式來部署kafka。kafka強依賴ZK,如果想要使用Kafka,就必須安裝ZK,kafka中的消息偏置信息、kafka集群、topic信息會被存儲在ZK中。有人可能會說在在使用kafka的時候就沒有安裝ZK,那是因為kafka內置了一個ZK,一般我們不使用它。
-
單節點ZK部署
下載Zookeeper並解壓到指定目錄
$ wget http://www-eu.apache.org/dist/zookeeper/zookeeper-3.5.1-alpha/zookeeper-3.5.1-alpha.tar.gz $ tar -zxvf zookeeper-3.5.1-alpha.tar.gz -c /opt/zookeeper
進入Zookeeper的config目錄下
$ cd /opt/zookeeper/conf
拷貝zoo_sample.cfg文件重命名為zoo.cfg,然后修改dataDir屬性
# 數據的存放目錄 dataDir=/home/hadoop/zkdata # 端口,默認就是2181 clientPort=2181
配置環境變量
# Zookeeper Environment Variable export ZOOKEEPER_HOME=/opt/zookeeper export PATH=$PATH:$ZOOKEEPER_HOME/bin
Zookeeper 啟動停止命令
$ zkServer.sh start $ zkServer.sh stop
-
單節點Broker部署及使用
- Kafka中單節點部署又分為兩種,一種為單節點單Broker部署,一種為單節點多Broker部署,因為是節點的kafka,所以在安裝ZK的時候也只需要單節點即可
部署架構
配置Kafka
vim $KAFKA_HOME/config/server.properties
# broker的全局唯一編號,不能重復 broker.id=0 # 監聽 listeners=PLAINTEXT://:9092 # 日志目錄 log.dirs=/home/hadoop/kafka-logs # 配置zookeeper的連接(如果不是本機,需要該為ip或主機名) zookeeper.connect=localhost:2181
啟動Zookeeper
[hadoop@hadoop001]$zkServer.sh start ZooKeeper JMX enabled by default Using config: /opt/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
啟動Kafka
$ kafka-server-start.sh $KAFKA_HOME/config/server.properties
打印的日志信息沒有報錯,可以看到如下信息
[Kafka Server 0], started (kafka.server.KafkaServer)
但是並不能保證Kafka已經啟動成功,輸入jps查看進程,如果可以看到Kafka進程,表示啟動成功
[hadoop@hadoop001 ~]$ jps
17940 Jps
16966 Kafka
13789 QuorumPeerMain
[hadoop@hadoop001 ~]$ jps -m
17940 Jps -m 9173 Kafka /opt/kafka/config/server.properties
13789 QuorumPeerMain /opt/zookeeper/bin/../conf/zoo.cfg
創建topic
[hadoop@hadoop001 bin]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
查看所有的topic信息
test
wsk_test
啟動生產者
[hadoop@hadoop001 bin]$ kafka-console-producer.sh --broker-list localhost:9092 --topic test kafaka zz zz kkk zzz bbb
啟動消費者
[hadoop@hadoop001 bin]$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginningkafka kafaka zz zz kkk zzz bbb
測試
- 生產者生產數據
- 消費者消費數據
我們在啟動一個消費者,去掉后面的參數–from-beginning,看有什么區別
- 總結:不帶參數–from-beginning,數據的消費不會對歷史數據進行消費,就是說,在這個消費啟動之前的數據,都不會在這里被消費,只會消費這個消費啟動以后的數據。
Kafka 單節點多Broker部署及使用
-
配置Kafka
拷貝server.properties三份
cd $KAFKA_HOME/config cp server.properties server-1.properties cp server.properties server-2.properties cp server.properties server-3.properties
修改server-1.properties文件
# broker的全局唯一編號,不能重復 broker.id=1 # 監聽 listeners=PLAINTEXT://:9093 # 日志目錄 log.dirs=/home/hadoop/kafka-logs-1
修改server-2.properties文件
# broker的全局唯一編號,不能重復 broker.id=3 # 監聽 listeners=PLAINTEXT://:9094 # 日志目錄 log.dirs=/home/hadoop/kafka-logs-3
修改server-3.properties文件
# broker的全局唯一編號,不能重復 broker.id=2 # 監聽 listeners=PLAINTEXT://:9095 # 日志目錄 log.dirs=/home/hadoop/kafka-logs-2
啟動Zookeeper
[hadoop@hadoop001]$zkServer.sh start ZooKeeper JMX enabled by default Using config: /opt/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
啟動Kafka(分別啟動server1、2、3)
$ kafka-server-start.sh $KAFKA_HOME/config/server-1.properties $ kafka-server-start.sh $KAFKA_HOME/config/server-2.properties $ kafka-server-start.sh $KAFKA_HOME/config/server-3.properties
查看進程
[hadoop@hadoop001 ~]$ jps 19761 Kafka 4323 Kafka 5717 Jps 5051 Kafka 13789 QuorumPeerMain
[hadoop@hadoop001 ~]$ jps -m
12096 Jps -m
19761 Kafka /home/hadoop/app/kafka_2.11-0.10.0.0/config/server-1.properties
4323 Kafka /home/hadoop/app/kafka_2.11-0.10.0.0/config/server-2.properties
5051 Kafka /home/hadoop/app/kafka_2.11-0.10.0.0/config/server-3.properties
13789 QuorumPeerMain /home/hadoop/app/zookeeper-3.4.6/bin/../conf/zoo.cfg
創建topic(指定副本數量為3)
[hadoop@hadoop001 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic Created topic "my-replicated-topic".
查看所有的topic信息
[hadoop@hadoop001 ~]$ kafka-topics.sh --list --zookeeper localhost:2181 my-replicated-topic test wsk_test
查看某個topic的詳細信息
hadoop@hadoop001 ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
啟動生產者
[hadoop@hadoop001 ~]$ kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic my-replicated-topic
啟動消費者
[hadoop@hadoop001 ~]$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-replicated-topic --from-beginning
- 生產者生產數據
- 消費者消費數據
單節點多borker容錯性測試
- Kafka是支持容錯的,上面我們已經完成了kafka單節點Blocker的部署,下面我們來對Kafka的容錯性進行測試,測試步驟如下
(1)查看topic的詳細信息,觀察那個blocker的角色是leader,那些blocker的角色follower
[hadoop@hadoop001 ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
[hadoop@hadoop001 ~]$ jps -m
12096 Jps -m
19761 Kafka /home/hadoop/app/kafka_2.11-0.10.0.0/config/server-1.properties
4323 Kafka /home/hadoop/app/kafka_2.11-0.10.0.0/config/server-2.properties
5051 Kafka /home/hadoop/app/kafka_2.11-0.10.0.0/config/server-3.properties
13789 QuorumPeerMain /home/hadoop/app/zookeeper-3.4.6/bin/../conf/zoo.cfg
(2)手工kill掉任意一個狀態是follower的borker,測試生成和消費信息是否正確
步驟1中可以看到 2 為leader,1 和 3為 follower,將follower為1的進程kill掉 kill -9 19761 (查看於步驟1的jps -m)
殺死以后繼續在生產者輸入數據
- 消費者的數據
結論:kill掉任意一個狀態是follower的broker,生成和消費信息正確,不受任何影響
(3)手工kill掉狀態是leader的borker,測試生產和消費的信息是否正確
borker2的角色為leader,將它kill掉,borker 3變成了leader
2,leader被殺死以后消費者會有大量的警告信息但是並沒有報錯
繼續在生產者輸入數據,消費者依舊能照常運轉消費。
[2019-07-15 14:50:25,788] WARN [ConsumerFetcherThread-console-consumer-80431_hadoop001-1563173078257-f10aadd9-0-2], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@3df9da19 (kafka.consumer.ConsumerFetcherThread) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
結論:kill掉狀態是leader的borker,生產和消費的信息正確
總結:不管當前狀態的borker是leader還是follower,當我們kill掉后,只要有一個borker能夠正常使用,則消息仍然能夠正常的生產和發送。即Kafka的容錯性是有保證的!