環境CentOS7.0,JDK1.8
一、下載安裝
在kafka官網 http://kafka.apache.org/downloads下載到最新的kafka安裝包
下載 2.0.0 release,解壓. > tar -xzf kafka_2.11-2.0.0.tgz > cd kafka_2.11-2.0.0
在kafka解壓目錄下下有一個config的文件夾,里面放置的是配置文件
consumer.properites 消費者配置,可以使用默認設置
producer.properties 生產者配置,可以使用默認設置
server.properties kafka服務器的配置,此配置文件用來配置kafka服務器,常用的配置
broker.id 申明當前kafka服務器在集群中的唯一ID,需配置為integer,並且集群中的每一個kafka服務器的id都應是唯一的,這里采用默認配置即可 listeners 申明此kafka服務器需要監聽的端口號,如果是在本機上跑虛擬機運行可以不用配置本項,默認會使用localhost的地址,如果是在遠程服務器上運行則必須配置,例如: listeners=PLAINTEXT:// 192.168.10.1:9092。並確保服務器的9092端口能夠訪問 zookeeper.connect 申明kafka所連接的zookeeper的地址 ,需配置為zookeeper的地址,如果使用自帶的zookeeper版本,使用默認配置即可zookeeper.connect=localhost:2181
二、啟動kafka服務
啟動kafka前需要先啟動zookeeper,如果設備上沒有安裝,可以使用kafka安裝包里自帶的zookeeper,這是一個單節點便捷版zookeeper。
啟動zookeeper... > bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...
啟動kafka服務 > bin/kafka-server-start.sh config/server.properties [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...
三、創建一個topic
Topic即主題,通過對消息指定主題可以將消息分類,消費者可以只關注自己需要的Topic中的消息。
物理上不同 Topic 的消息分開存儲,邏輯上一個 Topic 的消息雖然保存於一個或多個 broker 上,但用戶只需指定消息的 Topic 即可生產或消費數據而不必關心數據存於何處。
創建一個名為test的topic,只有一個分區,一份副本(即除了自己沒有其他副本)
創建topic > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看運行的topic > bin/kafka-topics.sh --list --zookeeper localhost:2181 test
四、創建一個消息的生產者
Producer,指消息的產生者,或者,消息的寫端,向Kafka集群發送消息,在發送消息之前,會對消息進行分類,即Topic
kafka包里提供了控制台的消息生產方式
創建對kafka服務(正在偵聽9092端口,在server.properties里配置)的test主題消息生產者 > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
五、創建一個消息的消費者
Consumer,指消息的消費者,或者,消息的讀端,消費者通過與kafka集群建立長連接的方式,不斷地從集群中拉取(pull方式)消息,然后可以對這些消息進行處理。
Consumer Group:每個 Consumer 屬於一個特定的 Consumer Group(可為每個 Consumer 指定 group name,若不指定 group name 則屬於默認的 group)。
kafka包里提供了控制台的消息消費方式
創建消息消費者 > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
到這里,一個簡單的kafka的服務搭建完成
六、kafka集群
在同一台設備搭建一個kafka集群,主要給kafka服務定義不同偵聽端口就可以。
先復制兩份服務配置文件 > cp config/server.properties config/server-1.properties > cp config/server.properties config/server-2.properties
然后修改兩個配置文件以下節點 config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dirs=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dirs=/tmp/kafka-logs-2
broker,指消息服務器,Producer 產生的消息都是寫到這里,Consumer 讀消息也是從這里讀
broker.id 申明當前kafka服務器在集群中的唯一ID,需配置為integer,並且集群中的每一個kafka服務器的id都應是唯一的
確保zookeeper運行,並且之前啟動的單節點kafka服務也確保運行着,這樣,我們就有三個kafka服務。
啟動server-1和server-2 > bin/kafka-server-start.sh config/server-1.properties ... > bin/kafka-server-start.sh config/server-2.properties ...
創建一個使用三份副本的topic
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
這個時候我們可以看看這幾個broke都干啥,使用kafka-topics的--describe參數
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
- "leader" 簡單的說就是主節點,讀寫都是它負責。
- "replicas" 副本列表,表示副本分布在哪些代理上,且該列表第一個元素就是Leader副本所在代理。
- "isr" 副本子集,是指消息同步的集合,這個列表的副本都是存活的
現在可以試試發送和接收消息:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ... my test message 1 my test message 2
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2
七、測試一下集群的容錯性
現在leader是broke 1,我們找到它,然后殺死進程
> ps aux | grep server-1.properties 7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java... > kill -9 7564
然后通過參數--describe看看變化
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
leader已經變成broke 2,並且Isr里broke 1已經被排除,也就是數據不同步到broke 1
之前寫入的消息依然有效
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2
然后,我們再重新啟動server-1,並且查看狀態
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1
broke 1又回到同步列表里。
八、常用命令
Kafka bin/ 目錄下的工具: bin/connect-distributed.sh bin/kafka-consumer-offset-checker.sh bin/kafka-replica-verification.sh bin/kafka-verifiable-producer.sh bin/connect-standalone.sh bin/kafka-consumer-perf-test.sh bin/kafka-run-class.sh bin/zookeeper-security-migration.sh bin/kafka-acls.sh bin/kafka-mirror-maker.sh bin/kafka-server-start.sh bin/zookeeper-server-start.sh bin/kafka-configs.sh bin/kafka-preferred-replica-election.sh bin/kafka-server-stop.sh bin/zookeeper-server-stop.sh bin/kafka-console-consumer.sh bin/kafka-producer-perf-test.sh bin/kafka-simple-consumer-shell.sh bin/zookeeper-shell.sh bin/kafka-console-producer.sh bin/kafka-reassign-partitions.sh bin/kafka-topics.sh bin/kafka-consumer-groups.sh bin/kafka-replay-log-producer.sh bin/kafka-verifiable-consumer.sh 常用的命令有以下幾個: bin/kafka-server-start.sh -daemon config/server.properties bin/kafka-topics.sh --describe --zookeeper 192.168.10.31:2181 --topic topic1 bin/kafka-topics.sh --list --zookeeper 192.168.10.31:2181 bin/kafka-topics.sh --delete --zookeeper 192.168.10.31:2181 --topic topic1 bin/kafka-topics.sh --create --zookeeper 192.168.10.31:2181 --replication-factor 3 --partitions 2 --topic topic1 bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.31:9092 --topic topic1 --from-beginning bin/kafka-console-producer.sh --broker-list 192.168.10.31:9092 --topic topic1
刪除主題: 在server.properties配置 delete.topic.enable=true #默認是false 刪除主題數據: 在server.properties中配置 log.retention.hours=168 //保留時間,單位小時 log.retention.check.interval.ms=300000 //保留時間檢查間隔,單位毫秒
九、其他
如果想繼續運行其他命令,不關閉當前窗口,可以加“&“,就可以后台運行了:
bin/zookeeper-server-start.sh config/zookeeper.properties &