Kafka基本使用
官網地址 http://kafka.apache.org/ 一切應以官網文檔為准。
安裝
download里下載要安裝的版本。或者直接wget該網址。如wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
quickstart里有安裝方法:
tar -zxvf kafka_2.11-0.8.2.1.tgz
config/server.properties下有各種各種配置
先要保證zookeeper以啟動,然后啟動kafka bin/kafka-server-start.sh config/server.properties
測試
可以先測試一下:這些命令都以官網最新的為准: 建topic(指定zookeeper factor partitions topic名) bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
看topic (指定zookeeper) bin/kafka-topics.sh --list --zookeeper localhost:2181
啟動consumer (指定zookeeper topic from開始) 啟動后不要關 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
再啟一個窗口 啟動producer (指定broker topic) 輸入內容回車,去consumer觀察 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
報錯
注意版本問題,如果javaApi producer版本高,想在客戶端consumer啟動低版本驗證,會不停的報錯:
Closing socket connection to/127,0,0,1.(kafka.network.Processor) 無法識別客戶端消息。
由於需要定時啟動Kafka consumer拉取數據,第一次啟動后,沒有關掉線程。
但Kafka consumer是非線程安全的,第二次消費數據時會報錯:
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
更多實時計算,Kafka等相關技術博文,歡迎關注實時流式計算