kafka 入門筆記 #1


kafka 入門筆記(#1)

單機測試

下載版本,解壓

tar -xzf kafka_2.11-0.10.1.1.tgz
cd kafka_2.11-0.10.1.1

啟動服務

Kafka用到了Zookeeper ,所以首先要啟動zookeeper,先啟動一個單實例的zk服務。

bin/zookeeper-server-start.sh config/zookeeper.properties &

啟動Kafka 服務

bin/kafka-server-start.sh config/server.properties

創建 topic

創建一個 名叫“test”的topic ,只有一個分區,一個副本

bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test

可以通過list 命令查看已創建的topic:

bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
test

發送消息

Kafka 使用一個簡單的命令行producer,從文件中或者從標准輸入中讀取消息並發送到服務端。默認的每條命令將發送一條消息

運行producer並在控制台中輸一些消息,這些消息將被發送到服務端:
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
This is a message /:Enter
This is another message /:Enter

Enter 鍵為push 一條記錄,Ctrl + c 退出發送

啟動consumer

Kafka也有一個命令行consumer可以讀取消息並輸出到標准輸出

bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning
This is a message
This is another message

如果開兩個命令終端,通過producer 輸入message,consumer 立刻就可以讀取出消息

搭建 多broker 集群

上面啟動了單broker,現在啟動3個broker 組成的集群(機器A,B,C)
機器A:192.168.56.129 ;機器B:192.168.56.131 ;機器C:192.168.56.132

配置

ABC 均照前面安裝好kafka
zookeeper單例:放到機器A(192.168.56.129:2181)
機器A 配置:

broker.id=0
advertised.host.name=192.168.56.129
zookeeper.connect=localhost:2181

修改機器B 中kafka 的配置文件

broker.id=1
advertised.host.name=192.168.56.131
zookeeper.connect=192.168.56.129:2181

修改機器C 中kafka 的配置文件

broker.id=1
advertised.host.name=192.168.56.132
zookeeper.connect=192.168.56.129:2181

為了避免出現 Should not set log end offset on partition 異常,需要將各個broker 的server.properties 中設置

advertised.host.name=192.168.xxx

啟動

機器A上,啟動zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties &

依次啟動A,B,C上的 kafka broker

bin/kafka-server-start.sh config/server.properties &

創建一個擁有 3個副本的topic

bin/kafka-topics.sh --create --zookeeper 192.168.56.129:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

檢測節點信息

bin/kafka-topics.sh --describe --zookeeper 192.168.56.129:2181 --topic my-replicated-topic

Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
Topic: my-replicated-topic	Partition: 0	Leader: 0	Replicas: 0,1,2	Isr: 0,1,2

Leader:負責處理消息的讀寫,leader 是從所有節點中隨機選擇的
Replicas:列出所有的副本節點,不管節點是否在服務中
Isr:是正在服務的節點

注意:broker 運行了3台,但是Isr檢測到 一直只有一台,而且無法寫入消息,結果發現是防火牆的問題

如 : Replicas: 0,1,2	Isr: 0
如producer 寫消息時:Error while fetching metadata with correlation id 11 : {my-replicated-topic2=LEADER_NOT_AVAILABLE}

這時需要把各台機器的 9092 端口打開
vi /etc/sysconfig/iptables
service iptables restart

消息讀寫

同單例;隨意挑選機器,做為producer 向my-replicated-topic 寫入消息,挑另外一台機器做consumer ,消費這些消息

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-replicated-topic

bin/kafka-console-consumer.sh --zookeeper 192.168.56.129:2181 --from-beginning --topic my-replicated-topic

容錯能力

上面檢測節點信息時,broker.id=0 是作為Leader,現在kill Leader node

在幾秒后,就恢復正常,見其他broker 的日志:

	[2017-04-06 16:52:07,601] WARN [ReplicaFetcherThread-0-0], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@11acd8a (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 192.168.56.129:9092 (id: 0 rack: null) failed
	at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:83)
	at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:93)
	at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:248)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2017-04-06 16:52:08,091] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2017-04-06 16:52:08,093] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2017-04-06 16:52:08,094] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2017-04-06 16:52:08,312] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions my-replicated-topic2-0 (kafka.server.ReplicaFetcherManager)
[2017-04-06 16:52:08,314] INFO [ReplicaFetcherThread-0-0], Shutting down (kafka.server.ReplicaFetcherThread)
[2017-04-06 16:52:08,315] INFO [ReplicaFetcherThread-0-0], Stopped  (kafka.server.ReplicaFetcherThread)
[2017-04-06 16:52:08,316] INFO [ReplicaFetcherThread-0-0], Shutdown completed (kafka.server.ReplicaFetcherThread)
[2017-04-06 16:52:08,363] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2017-04-06 16:52:14,877] INFO Partition [my-replicated-topic,0] on broker 1: Shrinking ISR for partition [my-replicated-topic,0] from 0,1,2 to 1,2 (kafka.cluster.Partition)

再次檢測 節點信息:

[root@localhost kafka_2.11-0.10.1.1]# bin/kafka-topics.sh --describe --zookeeper 192.168.56.129:2181 --topic my-replicated-topic2
Topic:my-replicated-topic2	PartitionCount:1	ReplicationFactor:3	Configs:
Topic: my-replicated-topic2	Partition: 0	Leader: 1	Replicas: 0,1,2	Isr: 1,2

剩下兩台機器,一個生產消息,一個消費消息,都沒有問題

按照 http://www.aboutyun.com/thread-12882-1-1.html 思路進行操作。集群從單機多端口,改用多機器演示


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM