kafka集群搭建


一、kafka優點

  • 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作。
  • 可擴展性:kafka集群支持熱擴展
  • 持久性、可靠性:消息被持久化到本地磁盤,並且支持數據備份防止數據丟失
  • 容錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)
  • 高並發:支持數千個客戶端同時讀寫

二、kafka的名詞解釋

  • Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集群。
  • Topic:一類消息,消息存放的目錄即主題,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能夠同時負責多個topic的分發。
  • massage: Kafka中最基本的傳遞對象。
  • Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。
  • Segment:partition物理上由多個segment組成,每個Segment存着message信息。
  • Producer : 生產者,生產message發送到topic。
  • Consumer : 消費者,訂閱topic並消費message, consumer作為一個線程來消費。
  • Consumer Group:消費者組,一個Consumer Group包含多個consumer。
  • Offset:偏移量,理解為消息partition中的索引即可

三、kafka存儲策略

  • kafka以topic來進行消息管理,每個topic包含多個partition,每個partition對應一個邏輯log,有多個segment組成。
  • 每個segment中存儲多條消息(見下圖),消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。
  • 每個part在內存中對應一個index,記錄每個segment中的第一條消息偏移。
  • 發布者發到某個topic的消息會被均勻的分布到多個partition上(或根據用戶指定的路由規則進行分布),broker收到發布消息往對應partition的最后一個segment上添加該消息,當某個segment上的消息條數達到配置值或消息發布時間超過閾值時,segment上的消息會被flush到磁盤,只有flush到磁盤上的消息訂閱者才能訂閱到,segment達到一定的大小后將不會再往該segment寫數據,broker會創建新的segment。

四、kafka集群搭建

  1. zookeeper集群搭建
  2. kafak下載
  3. 解壓、復制
tar -zvxf kafka_2.11-2.3.0.tgz
mv kafka_2.11-2.3.0/ kafka1/
cp -r kafka1/   kafka2/
cp -r kafka1/   kafka3/

4.kakfa配置參數詳解

參數 說明
delete.topic.enable=true 是否允許刪除topic,默認false不能手動刪除
broker.id=0 當前機器在集群中的唯一標識,和zookeeper的myid性質一樣
listeners = PLAINTEXT://192.168.100.151:9092 當前kafka服務偵聽的地址和端口,端口默認是9092
num.network.threads=3 這個是borker進行網絡處理的線程數
num.io.threads=8 這個是borker進行I/O處理的線程數
socket.send.buffer.bytes=102400 發送緩沖區buffer大小,數據不是一下子就發送的,先會存儲到緩沖區到達一定的大小后在發送,能提高性能
socket.receive.buffer.bytes=102400 kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤
socket.request.max.bytes=104857600 這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小
log.dirs= 消息日志存放的路徑
num.partitions=1 默認的分區數,一個topic默認1個分區數
num.recovery.threads.per.data.dir=1 每個數據目錄用來日志恢復的線程數目
log.retention.hours=168 默認消息的最大持久化時間,168小時,7天
log.segment.bytes=1073741824 這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件
log.retention.check.interval.ms=300000 每隔300000毫秒去檢查上面配置的log失效時間
log.cleaner.enable=false 是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能
zookeeper.connect=node1:2181,node2:2181,node3:2181 設置zookeeper的連接端口
broker.id=0 當前機器在集群中的唯一標識,和zookeeper的myid性質一樣
zookeeper.connection.timeout.ms=6000 設置zookeeper的連接超時時間

5.修改kafka的配置文件

節點1
broker.id=0
listeners=PLAINTEXT://192.168.100.151:9092
log.dirs=/yangk/kafka/kafka1/logs
zookeeper.connect=192.168.100.151:2181,192.168.100.151:2182,192.168.100.151:2183

節點2
broker.id=1
listeners=PLAINTEXT://192.168.100.151:9093
log.dirs=/yangk/kafka/kafka2/logs
zookeeper.connect=192.168.100.151:2181,192.168.100.151:2182,192.168.100.151:2183

節點3
broker.id=2
listeners=PLAINTEXT://192.168.100.151:9094
log.dirs=/yangk/kafka/kafka3/logs
zookeeper.connect=192.168.100.151:2181,192.168.100.151:2182,192.168.100.151:2183

6.啟動kafka
先啟動zookeeper集群。然后到每個kafka的目錄下啟動kafka

./bin/kafka-server-start.sh -daemon config/server.properties 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM