Kafka是一個分布式、可分區、可復制的消息系統。Kafka將消息以topic為單位進行歸納;Kafka發布消息的程序稱為producer,也叫生產者;Kafka預訂topics並消費消息的程序稱為consumer,也叫消費者;當Kafka以集群的方式運行時,可以由一個服務或者多個服務組成,每個服務叫做一個broker,運行過程中producer通過網絡將消息發送到Kafka集群,集群向消費者提供消息。
Kafka客戶端和服務端基於TCP協議通信,並且提供了Java客戶端API,實際上Kafka對多種語言都有支持
接下來訪問Apache Kafka官網下載安裝包,http://kafka.apache.org/ 網站的界面非常的簡潔,下載和文檔等分類一目了然
點擊左邊的download按鈕,進入版本選擇,這里選擇0.8.2.2系列的基於Scala 2.9.2編寫的kafka_2.9.2-0.8.2.2.tgz包,
點擊會進入下載頁面,再次點擊下載即可,下載完畢上傳至服務器
和之前一樣安裝kafka集群之前,確保zookeeper服務已經正常運行,3台主機准備工作都以完成,並且通信正常,這些之前都詳細說過,這三台主機分別為:linux1,linux2,linux3,接下來在linux1主機上執行釋放:
1 $ tar -xvzf kafka_2.9.2-0.8.2.2.tgz 2 $ mv kafka_2.9.2-0.8.2.2 /usr/ 3 $ cd /usr/kafka_2.9.2-0.8.2.2
這里相當於把kafka安裝到了/usr目錄下,接下來編輯配置文件,執行:
vim config/server.properties
修改broker.id=1,默認是0
這個值是集群中唯一的一個整數,每台機器各不相同,這里linux1設置為1其他機器后來再更改
修改port=9091 默認為9092,就是為了方便后來的通信
這里設置log.dirs=/usr/kafka-logs默認為/tmp/kafka-logs,
這個目錄可以根據自己習慣定義,意思就是設置Kafka日志存放的目錄,這個目錄后來我們需要手動建立
設置log.retention.hours=5 默認是168,代表清理日志的時間,根據實際情況配置即可
設置log.cleaner.enable=true,默認為false,這里一定要設置為true,否則Kafka不會自動清理日志
設置zookeeper.connect=linux1:2181,linux2:2181,linux3:2181/kafka,配置zookeeper集群的列表,這里就是這三台主機,然后指定了Kafka在zookeeper上創建的目錄為/kafka,
最后一項配置,默認即可
表示連接zookeeper服務器的超時時間,以上設置都完畢,沒問題保存配置並退出,然后將安裝包通過網絡發送至其他主機,就避免再次配置了
$ scp -r kafka_2.9.2-0.8.2.2 linux2:/usr/ $ scp -r kafka_2.9.2-0.8.2.2 linux3:/usr/
這樣就發送到了linux2和linux3這兩台主機,然后分別修改linux2和linux3中config/server.properties配置文件中broker.id分別為2和3,保存
然后集群中三台主機都執行:
mkdir /usr/kafka-logs
這樣就創建了日志存放目錄,到這里就算配置完畢了,
然后可以啟動kafka服務,進入kafka_2.9.2-0.8.2.2目錄,運行命令啟動服務:
nohup bin/kafka-server-start.sh config/server.properties &
所有主機都要執行上面這條命令,到這里集群都已經啟動了Kafka服務
接下來測試Kafka服務是否正常使用,在linux1上創建一個topic消息隊列:
bin/kafka-topics.sh --create --replication-factor 2 --partitions 2 --topic dt_test --zookeeper linux1:2181,linux2:2181,linux3:2181/kafka
這里指定了2個副本,2個分區,topic名為dt_test,並且指定zookeeper分布,運行完這個稍等一下,如果卡住再次按回車即可回到命令行界面
然后在linux1上創建一個消費者consumer:
bin/kafka-console-consumer.sh --zookeeper linux1:2181,linux2:2181,linux3:2181/kafka --topic dt_test
這里用--topic指定了剛才剛剛創建的消息隊列,現在命令行進入等待,等待生產者生產
現在在linux2上面創建一個生產者producer:
bin/kafka-console-producer.sh --broker-list linux2:9091,linux3:9091 --topic dt_test
這里--broker-list指定建立生產者服務的節點,可以是本機也可以是指定多台機器,這里指定了linux2和linux3,--topic同樣指定消息作業名,需要注意的是端口號這是是9091必須和前面配置文件中的設置一致,否則無法通信,我們前面把端口號改成9091這里必須是9091,如果端口號默認,那么這里一定是9092才可以;命令執行后同樣會等待用戶輸入,我們輸入Hello Kafka!
回車之后查看linux1的等待窗口:
可以看到這里將之前的輸入消費輸出了,這說明Kafka服務運行正常
以上就是Kafka集群搭建和測試的最基本的內容,另外運行bin/kafka-server-stop.sh腳本可以停止Kafka服務,重啟Kafka時先停止再啟動即可