KAFKA的部署與使用


kafka的部署模式

  1. 單節點Broker部署
  2. 單節點多Broker部署
  3. 集群部署(多節點多Broker部署)

 

  • 實際的生產環境中使用的是第3中方式,以集群的方式來部署kafka。kafka強依賴ZK,如果想要使用Kafka,就必須安裝ZK,kafka中的消息偏置信息、kafka集群、topic信息會被存儲在ZK中。有人可能會說在在使用kafka的時候就沒有安裝ZK,那是因為kafka內置了一個ZK,一般我們不使用它。
  • 單節點ZK部署

下載Zookeeper並解壓到指定目錄

$ wget http://www-eu.apache.org/dist/zookeeper/zookeeper-3.5.1-alpha/zookeeper-3.5.1-alpha.tar.gz
$ tar -zxvf zookeeper-3.5.1-alpha.tar.gz -c /opt/zookeeper

 進入Zookeeper的config目錄下

$ cd /opt/zookeeper/conf

拷貝zoo_sample.cfg文件重命名為zoo.cfg,然后修改dataDir屬性

# 數據的存放目錄
dataDir=/home/hadoop/zkdata
# 端口,默認就是2181
clientPort=2181

配置環境變量

# Zookeeper Environment Variable
export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin 

Zookeeper 啟動停止命令

$ zkServer.sh start
$ zkServer.sh stop
  • 單節點Broker部署及使用

  • Kafka中單節點部署又分為兩種,一種為單節點單Broker部署,一種為單節點多Broker部署,因為是節點的kafka,所以在安裝ZK的時候也只需要單節點即可

部署架構

 

 

 

 

 配置Kafka

 

vim $KAFKA_HOME/config/server.properties
# broker的全局唯一編號,不能重復 broker.id
=0 # 監聽 listeners=PLAINTEXT://:9092 # 日志目錄 log.dirs=/home/hadoop/kafka-logs # 配置zookeeper的連接(如果不是本機,需要該為ip或主機名) zookeeper.connect=localhost:2181

啟動Zookeeper

[hadoop@hadoop001]$zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

啟動Kafka

$ kafka-server-start.sh $KAFKA_HOME/config/server.properties

打印的日志信息沒有報錯,可以看到如下信息

[Kafka Server 0], started (kafka.server.KafkaServer)

但是並不能保證Kafka已經啟動成功,輸入jps查看進程,如果可以看到Kafka進程,表示啟動成功

[hadoop@hadoop001 ~]$ jps
17940 Jps
16966 Kafka
13789 QuorumPeerMain
[hadoop@hadoop001 ~]$ jps -m
17940 Jps -m 9173 Kafka /opt/kafka/config/server.properties 
13789 QuorumPeerMain /opt/zookeeper/bin/../conf/zoo.cfg

創建topic

[hadoop@hadoop001 bin]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

查看所有的topic信息

[hadoop@hadoop001 bin]$ kafka-topics.sh --list --zookeeper localhost:2181
test
wsk_test

 啟動生產者

[hadoop@hadoop001 bin]$ kafka-console-producer.sh --broker-list localhost:9092 --topic test
kafaka
zz
zz
kkk
zzz
bbb

 啟動消費者

[hadoop@hadoop001 bin]$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginningkafka
kafaka
zz
zz
kkk
zzz
bbb

測試  

  • 生產者生產數據

 

  • 消費者消費數據

 我們在啟動一個消費者,去掉后面的參數–from-beginning,看有什么區別

  • 總結:不帶參數–from-beginning,數據的消費不會對歷史數據進行消費,就是說,在這個消費啟動之前的數據,都不會在這里被消費,只會消費這個消費啟動以后的數據。

 

Kafka 單節點多Broker部署及使用

  • 配置Kafka

拷貝server.properties三份

cd $KAFKA_HOME/config
cp server.properties server-1.properties

cp server.properties server-2.properties

cp server.properties server-3.properties

修改server-1.properties文件

# broker的全局唯一編號,不能重復
broker.id=1
# 監聽
listeners=PLAINTEXT://:9093
# 日志目錄
log.dirs=/home/hadoop/kafka-logs-1

修改server-2.properties文件

# broker的全局唯一編號,不能重復
broker.id=3
# 監聽
listeners=PLAINTEXT://:9094
# 日志目錄
log.dirs=/home/hadoop/kafka-logs-3

修改server-3.properties文件 

# broker的全局唯一編號,不能重復
broker.id=2
# 監聽
listeners=PLAINTEXT://:9095
# 日志目錄
log.dirs=/home/hadoop/kafka-logs-2

啟動Zookeeper

[hadoop@hadoop001]$zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

啟動Kafka(分別啟動server1、2、3)

$ kafka-server-start.sh $KAFKA_HOME/config/server-1.properties
$ kafka-server-start.sh $KAFKA_HOME/config/server-2.properties
$ kafka-server-start.sh $KAFKA_HOME/config/server-3.properties

查看進程

[hadoop@hadoop001 ~]$ jps
19761 Kafka
4323 Kafka
5717 Jps
5051 Kafka
13789 QuorumPeerMain

[hadoop@hadoop001 ~]$ jps -m
12096 Jps -m
19761 Kafka /home/hadoop/app/kafka_2.11-0.10.0.0/config/server-1.properties
4323 Kafka /home/hadoop/app/kafka_2.11-0.10.0.0/config/server-2.properties
5051 Kafka /home/hadoop/app/kafka_2.11-0.10.0.0/config/server-3.properties
13789 QuorumPeerMain /home/hadoop/app/zookeeper-3.4.6/bin/../conf/zoo.cfg

創建topic(指定副本數量為3)

[hadoop@hadoop001 ~]$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".

查看所有的topic信息

[hadoop@hadoop001 ~]$ kafka-topics.sh --list --zookeeper localhost:2181
my-replicated-topic
test
wsk_test

 查看某個topic的詳細信息 

hadoop@hadoop001 ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 2	Replicas: 2,1,3	Isr: 2,1,3

啟動生產者

[hadoop@hadoop001 ~]$ kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic my-replicated-topic

啟動消費者

[hadoop@hadoop001 ~]$  kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-replicated-topic --from-beginning
  • 生產者生產數據

  • 消費者消費數據

單節點多borker容錯性測試

  • Kafka是支持容錯的,上面我們已經完成了kafka單節點Blocker的部署,下面我們來對Kafka的容錯性進行測試,測試步驟如下

(1)查看topic的詳細信息,觀察那個blocker的角色是leader,那些blocker的角色follower

[hadoop@hadoop001 ~]$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 2	Replicas: 2,1,3	Isr: 2,1,3

[hadoop@hadoop001 ~]$ jps -m
12096 Jps -m
19761 Kafka /home/hadoop/app/kafka_2.11-0.10.0.0/config/server-1.properties
4323 Kafka /home/hadoop/app/kafka_2.11-0.10.0.0/config/server-2.properties
5051 Kafka /home/hadoop/app/kafka_2.11-0.10.0.0/config/server-3.properties
13789 QuorumPeerMain /home/hadoop/app/zookeeper-3.4.6/bin/../conf/zoo.cfg



 (2)手工kill掉任意一個狀態是follower的borker,測試生成和消費信息是否正確

步驟1中可以看到 2 為leader,1 和 3為 follower,將follower為1的進程kill掉 

kill -9 19761   (查看於步驟1的jps -m)

殺死以后繼續在生產者輸入數據

  • 消費者的數據

結論:kill掉任意一個狀態是follower的broker,生成和消費信息正確,不受任何影響

 (3)手工kill掉狀態是leader的borker,測試生產和消費的信息是否正確

borker2的角色為leader,將它kill掉,borker 3變成了leader 

 

 2,leader被殺死以后消費者會有大量的警告信息但是並沒有報錯

繼續在生產者輸入數據,消費者依舊能照常運轉消費。

[2019-07-15 14:50:25,788] WARN [ConsumerFetcherThread-console-consumer-80431_hadoop001-1563173078257-f10aadd9-0-2], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@3df9da19 (kafka.consumer.ConsumerFetcherThread)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98)
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)

 

 

 

結論:kill掉狀態是leader的borker,生產和消費的信息正確

 

 總結:不管當前狀態的borker是leader還是follower,當我們kill掉后,只要有一個borker能夠正常使用,則消息仍然能夠正常的生產和發送。即Kafka的容錯性是有保證的!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM