Kafka集群環境搭建(2.9.2-0.8.2.2)


  Kafka是一個分布式、可分區、可復制的消息系統。Kafka將消息以topic為單位進行歸納;Kafka發布消息的程序稱為producer,也叫生產者;Kafka預訂topics並消費消息的程序稱為consumer,也叫消費者;當Kafka以集群的方式運行時,可以由一個服務或者多個服務組成,每個服務叫做一個broker,運行過程中producer通過網絡將消息發送到Kafka集群,集群向消費者提供消息。

  kafka消息傳遞

  Kafka客戶端和服務端基於TCP協議通信,並且提供了Java客戶端API,實際上Kafka對多種語言都有支持

  接下來訪問Apache Kafka官網下載安裝包,http://kafka.apache.org/ 網站的界面非常的簡潔,下載和文檔等分類一目了然

  點擊左邊的download按鈕,進入版本選擇,這里選擇0.8.2.2系列的基於Scala 2.9.2編寫的kafka_2.9.2-0.8.2.2.tgz包,

  

  點擊會進入下載頁面,再次點擊下載即可,下載完畢上傳至服務器

  和之前一樣安裝kafka集群之前,確保zookeeper服務已經正常運行,3台主機准備工作都以完成,並且通信正常,這些之前都詳細說過,這三台主機分別為:linux1,linux2,linux3,接下來在linux1主機上執行釋放:

1 $ tar -xvzf kafka_2.9.2-0.8.2.2.tgz
2 $ mv kafka_2.9.2-0.8.2.2 /usr/
3 $ cd /usr/kafka_2.9.2-0.8.2.2

  這里相當於把kafka安裝到了/usr目錄下,接下來編輯配置文件,執行:

vim config/server.properties 

  修改broker.id=1,默認是0

  

  這個值是集群中唯一的一個整數,每台機器各不相同,這里linux1設置為1其他機器后來再更改

  修改port=9091 默認為9092,就是為了方便后來的通信

  

  這里設置log.dirs=/usr/kafka-logs默認為/tmp/kafka-logs,

  

  這個目錄可以根據自己習慣定義,意思就是設置Kafka日志存放的目錄,這個目錄后來我們需要手動建立

  設置log.retention.hours=5 默認是168,代表清理日志的時間,根據實際情況配置即可

  

  設置log.cleaner.enable=true,默認為false,這里一定要設置為true,否則Kafka不會自動清理日志

  

  設置zookeeper.connect=linux1:2181,linux2:2181,linux3:2181/kafka,配置zookeeper集群的列表,這里就是這三台主機,然后指定了Kafka在zookeeper上創建的目錄為/kafka,

  

  最后一項配置,默認即可

  

  表示連接zookeeper服務器的超時時間,以上設置都完畢,沒問題保存配置並退出,然后將安裝包通過網絡發送至其他主機,就避免再次配置了

$ scp -r kafka_2.9.2-0.8.2.2 linux2:/usr/
$ scp -r kafka_2.9.2-0.8.2.2 linux3:/usr/

  這樣就發送到了linux2和linux3這兩台主機,然后分別修改linux2和linux3中config/server.properties配置文件中broker.id分別為2和3,保存

  然后集群中三台主機都執行:

mkdir /usr/kafka-logs

  這樣就創建了日志存放目錄,到這里就算配置完畢了,

  然后可以啟動kafka服務,進入kafka_2.9.2-0.8.2.2目錄,運行命令啟動服務:

nohup bin/kafka-server-start.sh config/server.properties &

  所有主機都要執行上面這條命令,到這里集群都已經啟動了Kafka服務

  接下來測試Kafka服務是否正常使用,在linux1上創建一個topic消息隊列:

bin/kafka-topics.sh --create --replication-factor 2 --partitions 2 --topic dt_test --zookeeper linux1:2181,linux2:2181,linux3:2181/kafka

  這里指定了2個副本,2個分區,topic名為dt_test,並且指定zookeeper分布,運行完這個稍等一下,如果卡住再次按回車即可回到命令行界面

  然后在linux1上創建一個消費者consumer:

bin/kafka-console-consumer.sh --zookeeper linux1:2181,linux2:2181,linux3:2181/kafka --topic dt_test

  這里用--topic指定了剛才剛剛創建的消息隊列,現在命令行進入等待,等待生產者生產

  現在在linux2上面創建一個生產者producer:

bin/kafka-console-producer.sh --broker-list linux2:9091,linux3:9091 --topic dt_test

  這里--broker-list指定建立生產者服務的節點,可以是本機也可以是指定多台機器,這里指定了linux2和linux3,--topic同樣指定消息作業名,需要注意的是端口號這是是9091必須和前面配置文件中的設置一致,否則無法通信,我們前面把端口號改成9091這里必須是9091,如果端口號默認,那么這里一定是9092才可以;命令執行后同樣會等待用戶輸入,我們輸入Hello Kafka!

  

  回車之后查看linux1的等待窗口:

  

  可以看到這里將之前的輸入消費輸出了,這說明Kafka服務運行正常

  以上就是Kafka集群搭建和測試的最基本的內容,另外運行bin/kafka-server-stop.sh腳本可以停止Kafka服務,重啟Kafka時先停止再啟動即可

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM