下載了最新的版本0.8.0 Beta1 Release
下載地址:https://dist.apache.org/repos/dist/release/kafka/kafka-0.8.0-beta1-src.tgz
一、安裝
# tar xzvf kafka-0.8.0-beta1-src.tgz # cd kafka-0.8.0-beta1-src # ./sbt update # ./sbt package # ./sbt assembly-package-dependency
./sbt update這一步有點慢,因為需要下載很多依賴文件
安裝ok
二、啟動服務
首先啟動zookeeper服務,使用kafka提供的單節點腳本
# bin/zookeeper-server-start.sh config/zookeeper.properties &
“&”號是為了讓在后台運行,要不還要在手動放后台或者重新開啟一個終端窗口。
然后啟動kafka服務,同樣有現成的腳本
# bin/kafka-server-start.sh config/server.properties &
同樣放后台
三、創建topic
創建一個topic,名字叫test
# bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
$bin/kafka-create-topic.sh Missing required argument "[topic]" Option Description ------ ----------- --partition <Integer: # of partitions> number of partitions in the topic (default: 1) --replica <Integer: replication factor> replication factor for each partitions in the topic (default: 1) --replica-assignment-list for manually assigning replicas to <broker_id_for_part1_replica1 : brokers (default: ) broker_id_for_part1_replica2, broker_id_for_part2_replica1 : broker_id_for_part2_replica2, ...> --topic <topic> REQUIRED: The topic to be created. --zookeeper <urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
查看創建的topic
# bin/kafka-list-topic.sh --zookeeper localhost:2181 2>/dev/null topic: test partition: 0 leader: 0 replicas: 0 isr: 0
“2>/dev/null”屏蔽一些線程的啟動信息等,只顯示想看到的結果。
四、收發消息
kafka發送的message可以是文件或者標准輸入的一行。
#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test >/dev/null hahahahahaha
收取message
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 2>/dev/null hahahahahaha
五、多broker測試
之前是一台機器上當做一個node,現在嘗試在一台機器上放3個node,即broker
修改配置文件
# cp config/server.properties config/server-1.properties # cp config/server.properties config/server-2.properties config/server-1.properties: broker.id=1 port=9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 port=9094 log.dir=/tmp/kafka-logs-2
啟動broker1和broker2
# JMX_PORT=9997 bin/kafka-server-start.sh config/server-1.properties & # JMX_PORT=9998 bin/kafka-server-start.sh config/server-2.properties &
需要設置JMX的端口,防止沖突
可以jps看一下任務
# jps 23398 Kafka 23300 QuorumPeerMain 24057 Kafka 23944 Kafka 28938 Jps
創建一個topic
# bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 3 --partition 1 --topic my-replicated-topic
list一下
# bin/kafka-list-topic.sh --zookeeper localhost:2181 topic: my-replicated-topic partition: 0 leader: 2 replicas: 2,0,1 isr: 2,0,1 topic: test partition: 0 leader: 0 replicas: 0 isr: 0
partition:同一個topic下可以設置多個partition,將topic下的message存儲到不同的partition下,目的是為了提高並行性
leader:負責此partition的讀寫操作,每個broker都有可能成為某partition的leader
replicas:副本,即此partition在那幾個broker上有備份,不管broker是否存活
isr:存活的replicas
測試:刪除一個broker之后
# pkill -9 -f server-1.properties # bin/kafka-list-topic.sh --zookeeper localhost:2181 topic: my-replicated-topic partition: 0 leader: 2 replicas: 2,0,1 isr: 2,0 topic: test partition: 0 leader: 0 replicas: 0 isr: 0
可以看到leader的broker.id是2,我們kill掉的是broker.id=1,所以leader不變,replicas也不變,isr則是2,0,因為1被殺掉了
# pkill -9 -f server-2.properties # bin/kafka-list-topic.sh --zookeeper localhost:2181 topic: my-replicated-topic partition: 0 leader: 0 replicas: 2,0,1 isr: 0 topic: test partition: 0 leader: 0 replicas: 0 isr: 0
此時leader變成了0,isr也為0