如何學習kafka?


  本文是我學習kafka的一個思路和總結,希望對剛接觸kafka的你有所幫助。在學習kafka之前,最好能對kafka有一個簡單的了解,可以提出一些問題,帶着問題去學習,就會容易一些。

0 什么是kakfa1 kafka的版本2 kakfa中的術語3 Kafka消息模型4 kafka的結構5 使用kafka創建demo6 kakfa客戶端請求是如何被處理的7 kafka中的組件coordinatorcontroller8 位移提交與分區管理9 重平衡10 kakfa的參數(整理項,選讀)brokertopicproducerconsumer端

0 什么是kakfa

  kakfa是一個開源的消息引擎系統,提供了一組規范,企業可以利用這組規范在不同的系統中,傳遞語義准確的消息。(通俗的來講,就是系統A發消息給消息引擎,系統B從消息引擎中讀取系統A發送的消息)。

1 kafka的版本

  我們學習開源框架,一定要有版本意識。至少要了解每個版本有哪些大的改動,如果沒有版本意識,就很容易出現學習后,由於使用的版本不同,導致各種錯誤,甚至是調用的api不能通過編譯。
  接下來我就對kafka版本的改動點做一個總結:
  0.7: 只提供了最基礎的消息隊列的功能
  0.8: 引入了副本的機制,保證了系統的可靠性,此時kafka使用的還是老版本的客戶端api,需要指定zookeeper地址,0.8.2引入了新版本的producer,但是bug較多。
  0.9: 引入了kakfa connect組件,使用了java語言重寫了新版 consumer api,從這個版本開始,新版producer api相對穩定。
  0.10: 引入了kafka stream 使得kafka正式升級成為了分布式流處理平台,這個版本的consumer api開始相對穩定,0.10.22修復了可能會降低producer性能的bug,如果你使用的是這個版本的kafka,建議升級為0.10.22
  0.11: 引入了冪等性Producer 以及事務性,這個版本的kafka消息進行了重構,由v1升級成了v2,提高了壓縮的程度和效率。
  1.0/2.0:主要是對kafka Stream進行改進和優化

2 kakfa中的術語

  消息(record)、主題(topic)、分區(Partition)、位移(offset)、副本(replica)、生產者(producer)、消費者(consumer)、消費者組(consumer group)、重平衡(rebalance)

3 Kafka消息模型

  點對點模型:也叫做消息隊列模型,即系統A發送的消息只能系統B去接收,類似於打電話。
  發布/訂閱模型:這里有一個主題的概念,可以理解成消息的邏輯容器。消息的發布者向主題發布消息,訂閱者從它訂閱的主題中獲取消息。在這個過程中發布者和訂閱者都可能是多個,主題也可以是多個,類似於訂報紙。

4 kafka的結構


  注:topic只是一個邏輯容器,常用來區分不同的業務數據。

 

5 使用kafka創建demo

  kafka集群的創建過程以及使用java調用kafka客戶端api的demo,請參考本博客前2篇文章,有非常詳細的教程。

6 kakfa客戶端請求是如何被處理的

  kafka broker采用的React模型處理請求,React模式是事件驅動架構的一種實現方式,特被適用於處理多個客戶端並發向服務器端發送請求的場景。我們通過一幅圖來對React模式有一個初步的了解。


  客戶端的請求發送到Reactor,Reactor中有一個dispatch線程,負責分發請求,它將請求分發到工作線程中,由工作線程進行處理。映射到kafka中, SocketServer就上述中的Reactor, acceptor線程就是上述中的dispatch線程,kafka工作線程也有一個專屬的名字,叫 網絡線程池。但是kafka在此基礎上,進行了進一步的細化,接收用戶請求的線程(網絡線程池中的線程)不對用戶請求進行處理,而是將請求放到一個 共享請求隊列中,由 IO線程池中的線程進行處理。

  上圖中的purgatory組件用於緩存延時請求,比如我們配置了acks = all,當ISR中的其他副本還沒有寫入結果的時候,這個響應就會緩存在purgatory中,直到條件滿足,IO線程才會將結果返回到網絡響應隊列中。
  我們通過上圖可以知道,如果想提高kafka處理消息的能力,可以提高網絡線程數和io線程數,這兩個線程數分別對應着 num.network.threadsnum.io.threads兩個參數。

 

7 kafka中的組件

coordinator

  管理消費者組,與位移提交有關。
  coordinator是每個broker都有的組件,那么如何確定coordinator呢?
  確定位移主題是由哪個分區保存的:Math.abs(group.id % offsetTopicParitionCount);
  找出該分區leader對應的broker。

controller

  主題、分區管理、集群管理、數據緩存

8 位移提交與分區管理

  consumer會定期向kakfa提交自己的消費位移(offset),kafkaConsumer api提供了多種提交位移的方式,就用戶而言,位移提交分為手動提交和自動提交(auto.commit.offset.enable),就Consumer而言,提交位移分為同步提交(commitSync)和異步提交(CommitAsync),consumer會將位移信息提交到__consumer_offset這個主題中。
  kafka中的副本(replica),是在分區(Partition)層面上進行的,可以實現數據的冗余(replica.factor)。在同一個分區中,數據是有序的,kafka支持對消息設置,同一個鍵的消息會被發送到一個分區中,一個分區只能由一個Consumer進行消費。
  在kafka中只有leader 副本對客戶端提供服務,follower副本只是異步同步數據。不提供服務。

9 重平衡

  重平衡可以說是kafka中最重要的概念了,重平衡的本質是一種協議,它規定了一個consumer group下的所有consumer如何達成一致,對訂閱topic的partition進行分配
  那么為什我們要避免重平衡呢?重平衡的效率不高,在重平衡的過程中,當前consumer group的所有consumer會停止消費(可以類比java中的full gc),重平衡的影響范圍較廣,consumer group下的所有comsumer都會受到影響。
  重平衡的實現,需要借助Coordinator組件,消費者端的重平衡可以分為兩步:
  1.Consumer入組
  2.等待Leader consumer分配方案,分別對應着joinGroup和syncGroup兩類請求。
  首先當consumer加入consumer group的時候,會向coordinator發送join Group請求,這樣coordinator就知道了所有訂閱主題的消費者信息。一旦收集了所有consumer的信息,協調者就會選出一個comsumer leader。
  領導者消費者的任務就是收集所有成員的訂閱信息,然后根據信息,制定分區方案。
  最后領導者消費者會將分配方案發送給協調者,其他消費者也會發送請求給協調者,不過請求中沒有實際的內容。協調者會以響應的方式將方案返回給所有消費者,這樣消費者組內的成員就可以知道自己消費的分區了。

10 kakfa的參數(整理項,選讀)

  總結的都是個人認為比較重要的參數,但是篇幅有限,很難展開說明,其目的就是讓你有一個印象,下次見到這些參數的時候,要重點記憶

broker

  log.dirs:指定了broker使用的若干文件路徑。
  listeners:監聽器,告訴外部連接者通過什么協議訪問指定名稱和端口的kafka服務
  advertised.listeners:和listeners功能一致,不同的是它是對外部發布的
  auto.create.topics.enable:是否允許自動創建topic
  unclean.leader.election.enable:是否允許unclean的leader進行選舉
  auto.leader.rebalance.enabl:是否允許定期舉行leader選舉
  log.retention.{hours|minutes|ms}:消息被保存的時間
  message.max.bytes:broker最大能接收消息的大小
  replica.lag.time.max.ms:follower副本可以落后leader副本的最長時間間隔。

topic

  retention.ms:該topic中消息被保存的時間
  max.message.bytes:該topic最大能接收的消息大小
  replication.factor:消息冗余份數

producer

  bootstrap.server:用於與kafka集群中的broker創建連接,只需要配置部分broker即可
  key.serializer/value.serializer:鍵/值的序列化方式,必填
  acks:非常重要,用於控制producer產生消息的持久性,kafka只對“已提交”的消息做有有限度的持久化保證。而這個參數,就是定義“已提交”
  acks = 0:producer不用理睬broker端的處理結果,只要消息發送后,就認為是“已提交”
  acks = all或-1,不僅leader broker要將消息寫入本地日志,還要ISR集合中的所有副本都成功寫入各自的本地日志后,才發送響應消息給producer,認為消息“已提交”
  acks = 1折中方案,當leader broker將消息寫入本地日志,就返回響應給producer,認為消息“已提交”
  min.insync.replica:消息至少要被寫入多少個副本,才算寫入成功,這個參數和acks功能類似,不過它強調的是,acks = all時,強調的是所有副本。(比如ISR中只有一個replica,那么配置acks = 1即寫入一個broker即可,min.sync.replica = 2,即需要寫入2個broker,這條消息會寫入失敗)
  buffer.memory:指定了producer端用於緩存消息的緩沖區大小。kafka發送消息采用的是異步架構的方式,消息寫入緩沖區,由一個專屬線程從緩沖區中獲取消息,執行真正的發送
  compression.type:producer端壓縮方式,壓縮可以節省網絡傳輸中的帶寬,犧牲CPU使用率
  retries:失敗后的重試次數,非常重要的參數,用於實現kafka的處理語義
  retries = 0,即失敗后不會進行重試,實現至多一次的處理語義
  retries = n,即失敗后會重試n次,實現至少一次的處理語義
  (kafka 0.11后推出了精確一次的處理語義,即冪等性producer以及事務,相關參數:enable.idempotence = true)
  bacth.size:producer會將發往同一分區的消息,打成一個batch,當batch滿了后,producer會一次發送batch中的所有消息,這個參數控制者batch的大小。
  linger.ms:上面提到的batch,在batch沒滿的時候,也會進行發送,這其實是一種權衡,權衡的是吞吐量消息延時,linger.ms控制的就是消息的延時行為,默認值是0,表示消息會被立即發送,不管batch是否裝滿,我們可以改變這個參數,來修改發送消息的時間,即一條消息是否會被發送,取決於1、batch是否裝滿;2、有沒有達到linger.ms規定的時間。
  max.request.size:控制producer端最大可以發送消息的大小。
  request.timeout.ms:當producer發送消息給broker后,broker需要在指定時間內返回響應,否則producer就會認為該請求超時,並顯示拋出TimeoutException。

consumer端

  bootstrap.server:用於與kafka集群中的broker創建連接,只需要配置部分broker即可
  key.deserializer/value.deserializer:鍵/值的反序列化方式,必填
  group.id:consumer group的名字,必填
  session.time.out:coordinator檢測到consumer失活的時間,這個值設置的較小有利於coordinator更快的檢測consumer失活。
  max.poll.interval.ms:consumer處理邏輯的最大時間,如果一次poll()超過了這個時間,則coordinator會認為該consumer已經不可用,會將其踢出消費者組並進行重平衡。
  auto.offset.reset:制定了無位移或位移越界時,kafka的對應策略。取值:earliest:從最早位移進行消費、laster:從最新位移開始消費、none:拋出異常。
  auto.commit.enable:指定consumer是否自定義提交位移
  fetch.max.bytes:consumer端單次獲取數據的最大字節數
  max.poll.records:單次poll()返回的最大消息數
  heartbeat.interval.ms:心跳請求頻率
  connections.max.idle.ms:定期關閉空閑的tcp連接。

  最后,期待您的訂閱和點贊,專欄每周都會更新,希望可以和您一起進步,同時也期待您的批評與指正!

imageimage


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM