Kafka基礎教程(二):Kafka安裝


  因為kafka是基於Zookeeper的,而Zookeeper一般都是一個分布式的集群,盡管kafka有自帶Zookeeper,但是一般不使用自帶的,都是使用外部安裝的,所以首先我們需要安裝Zookeeper,可以參考:Zookeeper基礎教程(二):Zookeeper安裝

  Zookeeper集群地址:  

  # 192.168.209.133 test1
  # 192.168.209.134 test2
  # 192.168.209.135 test3

  為了方便,我這里也使用這三個地址安裝部署kafka,注意,安裝kafka之前需確保已安裝了jdk!

  首先,前往Kafka官網下載Kafka的安裝包:http://kafka.apache.org/downloads.html

  

   可以現在windows瀏覽器上下載好,然后將文件發到linux服務器下,博主使用的linux服務器版本是Ubuntu16.04的server版

   下載完成之后,將這個tgz壓縮包傳到linux上去(可以使用xshell,或者filezilla也可以)  

   # 在tgz壓縮包所在目錄進行解壓,-C 表示解壓后的文件所存放的路徑,這里表示解壓后的文件放到/opt目錄下
  sudo tar -zxvf kafka_2.12-2.5.0.tgz -C /opt/
  # 進入kafka的配置目錄
  cd /opt/kafka_2.12-2.5.0/config
  # 其中server.properties是kafka的主要配置文件
  sudo vim server.properties

   server.properties常用參數介紹:  

  # 當前機器在集群中的唯一標識,和zookeeper的myid性質一樣,要求集群中每個broker.id都說不一樣的,可以從0開始遞增,也可以從1開始遞增
  broker.id=0
  # 監聽地址,需要提供外網服務的話,要設置本地的IP地址,當前kafka對外提供服務的端口默認是9092
  listeners=PLAINTEXT://test1:9092
  # 這個是borker進行網絡處理的線程數
  num.network.threads=3
  # 這個是borker進行I/O處理的線程數
  num.io.threads=8
  # 發送緩沖區buffer大小,數據不是一下子就發送的,先回存儲到緩沖區了到達一定的大小后在發送,能提高性能
  socket.send.buffer.bytes=102400
  # kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤
  socket.receive.buffer.bytes=102400
  # 這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小
  socket.request.max.bytes=104857600
  # 消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,
  # 如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個
  log.dirs=/tmp/kafka-logs
  # 默認的分區數,一個topic默認1個分區數
  num.partitions=1
  # 每個數據目錄用來日志恢復的線程數目
  num.recovery.threads.per.data.dir=1
  # topic的offset的備份份數
  offsets.topic.replication.factor=1
  # 事務主題的復制因子(設置更高以確保可用性)。 內部主題創建將失敗,直到群集大小滿足此復制因素要求。
  transaction.state.log.replication.factor=1
  # 覆蓋事務主題的min.insync.replicas配置。
  transaction.state.log.min.isr=1   # 默認消息的最大持久化時間,168小時,7天   log.retention.hours=168
  # 日志達到刪除大小的閾值。每個topic下每個分區保存數據的最大文件大小;注意,這是每個分區的上限,因此這個數值乘以分區的個數就是每個topic保存的數據總量
  log.retention.bytes=1073741824   # 這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件   log.segment.bytes=1073741824   # 每隔300000毫秒去檢查上面配置的log失效時間   log.retention.check.interval.ms=300000   # 是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能   log.cleaner.enable=true   # 設置zookeeper的連接端口,多個地址以逗號(,)隔開,后面可以跟一個kafka在Zookeeper中的根znode節點的路徑   zookeeper.connect=localhost:2181   # 設置zookeeper的連接超時時間   zookeeper.connection.timeout.ms=18000
  # 在執行第一次再平衡之前,group協調員將等待更多消費者加入group的時間
  group.initial.rebalance.delay.ms=0

  我們可以根據自己的情況配置,比如我這里只需要配置:  

  # 192.168.209.133 test1配置

  broker.id=0
  listeners=PLAINTEXT://test1:9092
  zookeeper.connect=test1:2181,test2:2181,test3:2181/kafka

  # 192.168.209.134 test2配置

  broker.id=1
  listeners=PLAINTEXT://test2:9092
  zookeeper.connect=test1:2181,test2:2181,test3:2181/kafka

   # 192.168.209.135 test3配置

  broker.id=2
  listeners=PLAINTEXT://test3:9092
  zookeeper.connect=test1:2181,test2:2181,test3:2181/kafka

   現在就可以啟動Kafka了,注意,啟動前保證我們的Zookeeper是正常運行的!  

   # 啟動 ,可以加上-daemon表示后台啟動
  sudo /opt/kafka_2.12-2.5.0/bin/kafka-server-start.sh -daemon /opt/kafka_2.12-2.5.0/config/server.properties  
  # 停止kafka
  sudo /opt/kafka_2.12-2.5.0/bin/kafka-server-stop.sh   

   等三台kafka都啟動之后,使用ZooInspector連接Zookeeper可以看到多了一個路徑為/kafka的znode節點,啟動/kafka/brokers/ids的自己點就是我們上面配置的broker.id:

    

   截圖中/kafka節點下的所有znode節點都是kafka所生成的,可以認為記錄的是kafka運行狀態的一些信息

  如果啟動過程中報錯:kafka.common.InconsistentClusterIdException: The Cluster ID XXXXXXXXXXXXXX doesn't match stored clusterId Some(XXXXXXXXXXXXXXX) in meta.properties

  可以前往server.properties中配置的log.dirs目錄下,找到meta.properties文件,將其中的cluster.id=XXXXXXXX給注釋掉,然后重新啟動kafka就可以了

  創建Topic

  使用kafka-topics.sh創建topic:  

   sudo /opt/kafka_2.12-2.5.0/bin/kafka-topics.sh --create --zookeeper test1:2181/kafka --replication-factor 3 --partitions 3 --topic test
  # --create  表示創建,--delete  表示刪除  --describe  表示獲取詳情  --list  表示列出所有的topic
  # --zookeeper  表示創建topic時連接Zookeeper所使用的的連接,注意,后面需要帶上根路徑名,這個和我們在server.properties中配置zookeeper.connect節點中的根路徑一致
  # --replication-factor   創建的topic副本數,不能大於broker的個數,否則會拋出異常:ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: M larger than available brokers: N.
  # --partitions  topic的分區數,最好是等於broker數
  # --topic  表示topic

  這時刷新ZooInspector可以看到

  

  發布消費消息

  發布消息使用kafka-console-producer.sh

   sudo /opt/kafka_2.12-2.5.0/bin/kafka-console-producer.sh --bootstrap-server test1:9092 --topic test
  # --bootstrap-server 表示連接的kafka的broker,可以在ZooInspector中查看

  執行命令后即進入命令行,可以輸入消息了:

  

    消費消息使用kafka-console-consumer.sh  

    sudo /opt/kafka_2.12-2.5.0/bin/kafka-console-consumer.sh --bootstrap-server test1:9092 --topic test
    # 可以增加--from-beginning參數表示從頭開始消費

  上述命令執行后,就開始接收消息了,當重新發送一條消息,就會打印出來了:

  

  

  使用kafkatool連接使用Kafka

  首先下載kafkatool可以前往官網:https://www.kafkatool.com/download.html

  或者在百度網盤下載:https://pan.baidu.com/s/1WVbRWW5thzJ9ZCGrimDekQ (提取碼: 3h88)

  安裝后打開,File=>New Connection創建一個新連接:

  

  kafkatool提供了兩種連接方式:

  第一種是指定kafka在Zookeeper中的節點信息,然后kafkatool根據Zookeeper上的信息去獲取各個Broker的信息。

  第二種方式是手動提供kafka的Broker地址和端口(不用提供全部Broker,部分也可以)。

  填好信息后,可以點擊右下角的Test測試連接是否正常,確認正常后點擊Add添加連接。

  連接上后,可以看到我們的Broker還有我們創建的Topic: 

   

    點擊指定的Topic可以查看這個Topic下的分區即消息等等:

  

  注:如果您也像我這樣使用了hosts文件做了一層ip:hostname映射,可能導致在windows中無法連接或者Topic和Comsumers等操作失敗,可以試試在windows的hosts中配置ip:hostname

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM