安裝及部署
一、環境配置
-
操作系統:Cent OS 7
-
Kafka版本:0.9.0.0
-
Kafka官網下載:請點擊
-
JDK版本:1.7.0_51
-
SSH Secure Shell版本:XShell 5
二、操作過程
1、下載Kafka並解壓
-
下載:
curl -L -O http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz
-
解壓:
tar zxvf kafka_2.10-0.9.0.0.tgz
2、Kafka目錄介紹
-
/bin 操作kafka的可執行腳本,還包含windows下腳本
-
/config 配置文件所在目錄
-
/libs 依賴庫目錄
-
/logs 日志數據目錄,目錄kafka把server端日志分為5種類型,分為:server,request,state,log-cleaner,controller
3、配置
-
配置zookeeper
請參考zookeeper
-
進入kafka安裝工程根目錄編輯config/server.properties
kafka最為重要三個配置依次為:broker.id、log.dir、zookeeper.connect,kafka server端config/server.properties參數說明和解釋如下:
4、啟動Kafka
-
啟動
進入kafka目錄,敲入命令 bin/kafka-server-start.sh config/server.properties &
-
檢測2181與9092端口
netstat -tunlp|egrep "(2181|9092)" tcp 0 0 :::2181 :::* LISTEN 19787/java tcp 0 0 :::9092 :::* LISTEN 28094/java
說明:
Kafka的進程ID為28094,占用端口為9092
QuorumPeerMain為對應的zookeeper實例,進程ID為19787,在2181端口監聽
5、單機連通性測試
啟動2個XSHELL客戶端,一個用於生產者發送消息,一個用於消費者接受消息。
-
運行producer,隨機敲入幾個字符,相當於把這個敲入的字符消息發送給隊列。
bin/kafka-console-producer.sh --broker-list 192.168.153.118:9092 --topic test
說明:早版本的Kafka,–broker-list 192.168.1.181:9092需改為–zookeeper 192.168.1.181:2181
-
運行consumer,可以看到剛才發送的消息列表。
bin/kafka-console-consumer.sh --zookeeper 192.168.153.118:2181 --topic test --from-beginning
-
注意:
producer,指定的Socket(192.168.1.181+9092),說明生產者的消息要發往kafka,也即是broker
consumer, 指定的Socket(192.168.1.181+2181),說明消費者的消息來自zookeeper(協調轉發)
上面的只是一個單個的broker,下面我們來實驗一個多broker的集群。
6、搭建一個多個broker的偽集群
剛才只是啟動了單個broker,現在啟動有3個broker組成的集群,這些broker節點也都是在本機上。
(1)為每一個broker提供配置文件
我們先看看config/server0.properties配置信息:
broker.id=0 listeners=PLAINTEXT://:9092 port=9092 host.name=192.168.1.181 num.network.threads=4 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=5 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=192.168.1.181:2181 zookeeper.connection.timeout.ms=6000 queued.max.requests =500 log.cleanup.policy = delete
-
說明:
broker.id為集群中唯一的標注一個節點,因為在同一個機器上,所以必須指定不同的端口和日志文件,避免數據被覆蓋。
在上面單個broker的實驗中,為什么kafka的端口為9092,這里可以看得很清楚。
kafka cluster怎么同zookeeper交互的,配置信息中也有體現。
那么下面,我們仿照上面的配置文件,提供2個broker的配置文件:
-
server1.properties:
-
server2.properties:
broker.id=2 listeners=PLAINTEXT://:9094 port=9094 host.name=192.168.1.181 num.network.threads=4 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs2 num.partitions=5 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=192.168.1.181:2181 zookeeper.connection.timeout.ms=6000 queued.max.requests =500 log.cleanup.policy = delete
(2)啟動所有的broker
命令如下:
bin/kafka-server-start.sh config/server0.properties & #啟動broker0 bin/kafka-server-start.sh config/server1.properties & #啟動broker1 bin/kafka-server-start.sh config/server2.properties & #啟動broker2
查看2181、9092、9093、9094端口
netstat -tunlp|egrep "(2181|9092|9093|9094)" tcp 0 0 :::9093 :::* LISTEN 29725/java tcp 0 0 :::2181 :::* LISTEN 19787/java tcp 0 0 :::9094 :::* LISTEN 29800/java tcp 0 0 :::9092 :::* LISTEN 29572/java
一個zookeeper在2181端口上監聽,3個kafka cluster(broker)分別在端口9092,9093,9094監聽。
(3)創建topic
bin/kafka-topics.sh --create --topic topic_1 --partitions 1 --replication-factor 3 \--zookeeper localhost:2181 bin/kafka-topics.sh --create --topic topic_2 --partitions 1 --replication-factor 3 \--zookeeper localhost:2181 bin/kafka-topics.sh --create --topic topic_3 --partitions 1 --replication-factor 3 \--zookeeper localhost:2181
查看topic創建情況:
bin/kafka-topics.sh --list --zookeeper localhost:2181 test topic_1 topic_2 topic_3 [root@atman081 kafka_2.10-0.9.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic:topic_1 PartitionCount:1 ReplicationFactor:3 Configs: Topic: topic_1 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic:topic_2 PartitionCount:1 ReplicationFactor:3 Configs: Topic: topic_2 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic:topic_3 PartitionCount:1 ReplicationFactor:3 Configs: Topic: topic_3 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
上面的有些東西,也許還不太清楚,暫放,繼續試驗。需要注意的是topic_1的Leader=2
(4)模擬客戶端發送,接受消息
-
發送消息
bin/kafka-console-producer.sh --topic topic_1 --broker-list 192.168.1.181:9092,192.168.1.181:9093,192.168.1.181:9094
-
接收消息
bin/kafka-console-consumer.sh --topic topic_1 --zookeeper 192.168.1.181:2181 --from-beginning
需要注意,此時producer將topic發布到了3個broker中,現在就有點分布式的概念了。
(5) kill some broker
kill broker(id=0)
首先,我們根據前面的配置,得到broker(id=0)應該在9092監聽,這樣就能確定它的PID了。
broker0沒kill之前topic在kafka cluster中的情況
bin/kafka-topics.sh --describe --zookeeper localhost:2181 Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic:topic_1 PartitionCount:1 ReplicationFactor:3 Configs: Topic: topic_1 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic:topic_2 PartitionCount:1 ReplicationFactor:3 Configs: Topic: topic_2 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic:topic_3 PartitionCount:1 ReplicationFactor:3 Configs: Topic: topic_3 Partition: 0 Leader: 2 Replicas: 0,2,1 Isr: 2,1,0
kill之后,再觀察,做下對比。很明顯,主要變化在於Isr,以后再分析
bin/kafka-topics.sh --describe --zookeeper localhost:2181 Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: -1 Replicas: 0 Isr: Topic:topic_1 PartitionCount:1 ReplicationFactor:3 Configs: Topic: topic_1 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1 Topic:topic_2 PartitionCount:1 ReplicationFactor:3 Configs: Topic: topic_2 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2 Topic:topic_3 PartitionCount:1 ReplicationFactor:3 Configs: Topic: topic_3 Partition: 0 Leader: 2 Replicas: 0,2,1 Isr: 2,1
測試下,發送消息,接受消息,是否收到影響。
-
發送消息
bin/kafka-console-producer.sh --topic topic_1 --broker-list 192.168.1.181:9092,192.168.1.181:9093,192.168.1.181:9094
-
接收消息
bin/kafka-console-consumer.sh --topic topic_1 --zookeeper 192.168.1.181:2181 --from-beginning
可見,kafka的分布式機制,容錯能力還是挺好的~
Kafka介紹
1、kafka有什么?
-
producer 消息的生成者,即發布消息
-
consumer 消息的消費者,即訂閱消息
-
broker Kafka以集群的方式運行,可以由一個或多個服務組成,服務即broker
-
zookeeper 協調轉發
2、kafka的工作圖
producers通過網絡將消息發送到Kafka集群,集群向消費者提供消息
kafka對消息進行歸納,即topic,也就是說producer發布topic,consumer訂閱topic