1. 簡介
kafka (官網地址: http://kafka.apache.org)是一款分布式消息發布和訂閱的系統,具有高性能和高吞吐率。
i. 消息的發布(publish)稱作producer,消息的訂閱(subscribe)稱作consumer,中間的存儲陣列稱作broker。
ii. 多個broker協同合作,producer、consumer和broker三者之間通過zookeeper來協調請求和轉發。
iii. producer產生和推送(push)數據到broker,consumer從broker拉取(pull)數據並進行處理。
iv. broker端不維護數據的消費狀態,提升了性能。
v. 直接使用磁盤進行存儲,線性讀寫,速度快:避免了數據在JVM內存和系統內存之間的復制,減少耗性能的創建對象和垃圾回收。
vi. Kafka使用scala編寫,可以運行在JVM上。
2. 安裝:
a. 首先安裝JRE/JDK
Linux安裝JDK
b. 下載kafka
進入下載頁面: http://kafka.apache.org/downloads.html
選擇Binary downloads下載 (Source download需要編譯才能使用)
也可以直接在linux終端下載:
c. 解壓
目錄:
/bin 啟動和停止命令等。
/config 配置文件
/libs 類庫
d. 修改配置
Kafka默認開啟JVM壓縮指針,但只是在64位的HotSpot VM受支持,如果安裝了32位的HotSpot VM,需要修改 /bin/kafka-run-class.sh文件
找到如下行:
去除-XX:+UseCompressedOops參數
3. 啟動和停止
啟動Zookeeper server:
&是為了能退出命令行
啟動Kafka server:
停止Kafka server
停止Zookeeper server:
4. 單機連通性測試
運行producer:
早版本的Kafka,--broker-list localhost:9092需改為--zookeeper localhost:2181
運行consumer:
在producer端輸入字符串並回車,查看consumer端是否顯示。
5. 分布式連通性測試
Zookeeper Server, Kafka Server, Producer都放在服務器server1上,ip地址為192.168.1.10
Consumer放在服務器server2上,ip地址為192.168.1.12。
分別運行server1的producer和server2的consumer,
在producer的console端輸入字符串,consumer報Connection refused錯誤:
broker, producer和consumer都注冊到zookeeper上,producer和consumer的參數明確指定。問題出在broker的配置文件server.properties上:
host名稱沒有指定,就是127.0.0.1,consumer去broker拿數據就有問題。設置為192.168.1.10,重啟服務就好了。
kafka (官網地址: http://kafka.apache.org)是一款分布式消息發布和訂閱的系統,具有高性能和高吞吐率。

i. 消息的發布(publish)稱作producer,消息的訂閱(subscribe)稱作consumer,中間的存儲陣列稱作broker。
ii. 多個broker協同合作,producer、consumer和broker三者之間通過zookeeper來協調請求和轉發。
iii. producer產生和推送(push)數據到broker,consumer從broker拉取(pull)數據並進行處理。
iv. broker端不維護數據的消費狀態,提升了性能。
v. 直接使用磁盤進行存儲,線性讀寫,速度快:避免了數據在JVM內存和系統內存之間的復制,減少耗性能的創建對象和垃圾回收。
vi. Kafka使用scala編寫,可以運行在JVM上。
2. 安裝:
a. 首先安裝JRE/JDK
Linux安裝JDK
b. 下載kafka
進入下載頁面: http://kafka.apache.org/downloads.html
選擇Binary downloads下載 (Source download需要編譯才能使用)
也可以直接在linux終端下載:
- wget -q http://apache.fayea.com/apache-mirror/kafka/0.8.1/kafka_2.8.0-0.8.1.tgz
c. 解壓
- tar -xzvf kafka_2.8.0-0.8.1.tgz
- rm kafka_2.8.0-0.8.1.tgz
- cd kafka_2.8.0-0.8.1
目錄:

/bin 啟動和停止命令等。
/config 配置文件
/libs 類庫
d. 修改配置
Kafka默認開啟JVM壓縮指針,但只是在64位的HotSpot VM受支持,如果安裝了32位的HotSpot VM,需要修改 /bin/kafka-run-class.sh文件
- vi bin/kafka-run-class.sh
找到如下行:
- KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
去除-XX:+UseCompressedOops參數
3. 啟動和停止
啟動Zookeeper server:
- bin/zookeeper-server-start.sh config/zookeeper.properties &
&是為了能退出命令行
啟動Kafka server:
- bin/kafka-server-start.sh config/server.properties &
停止Kafka server
- bin/kafka-server-stop.sh
停止Zookeeper server:
- bin/zookeeper-server-stop.sh
4. 單機連通性測試
運行producer:
- bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
早版本的Kafka,--broker-list localhost:9092需改為--zookeeper localhost:2181
運行consumer:
- bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
在producer端輸入字符串並回車,查看consumer端是否顯示。
5. 分布式連通性測試
Zookeeper Server, Kafka Server, Producer都放在服務器server1上,ip地址為192.168.1.10
Consumer放在服務器server2上,ip地址為192.168.1.12。
分別運行server1的producer和server2的consumer,
- bin/kafka-console-producer.sh --broker-list 192.168.1.10:9092 --topic test
- bin/kafka-console-consumer.sh --zookeeper 192.168.1.10:2181 --topic test --from-beginning
在producer的console端輸入字符串,consumer報Connection refused錯誤:

broker, producer和consumer都注冊到zookeeper上,producer和consumer的參數明確指定。問題出在broker的配置文件server.properties上:
- # Hostname the broker will bind to. If not set, the server will bind to all interfaces
- #host.name=localhost
host名稱沒有指定,就是127.0.0.1,consumer去broker拿數據就有問題。設置為192.168.1.10,重啟服務就好了。
轉載blog:http://czj4451.iteye.com/blog/2041096