Kafka是由scala和java編寫的一款高吞吐量分布式發布訂閱消息系統。
應用場景:
- 異步處理
- 應用解耦
- 流量削峰
- 日志處理
- 消息通訊
相關術語:
- Broker:在集群中的服務器,用於存儲消息,提供接口給生產者和消費者
- Topic:消息的一個自定義類別,每個消息都有一個topic,topic下有很多條消息,生產者和消費者通過用定義好的topic名來通訊
- Parittion:每個topic包含一個或多個分區,用於對消息進行排序,如果一個topic有多個分區,則消息的順序不能保證,如果需要嚴格保證順序,則需要將partition設置為1。同一topic的分區數只能增加不能減少。
- Producer:生產者,消息的投遞方
- Consumer:消費者,消息的接收方
- Consumer Group:不同消費組的消費者在訂閱同一個topic時,會拉到相同的消息,相同的消費族下的消費者在同一個topic的時候,會拉到不同分區的消息
- Leader:每個partition都有多個副本,其中一個會成為Leader,leader負責數據的讀寫
- Follower:Follower跟隨Leader,所有寫請求都需要先果果Leader,然后再廣播到所有Follower。如果Leader失效,則從Follower中選舉一個新的Leader,當Follower與Leader掛掉/卡住或者同步太慢,leader會把follower從ISR中刪除
- Zookeeper:負責維護和協調broker,但系統新增broker或者某個broker失效,有zookeeper通知生產者和消費者,
- AR:Assigned Replicas。所有的副本
- ISR:In of sync Replicas。已同步的副本
- OSR:Out of sync Replicas。沒有同步的副本
- LEO:LogEndOffset。分區最新的數據的offset。每次寫入,offset都會發生變化
- HW:HighWatermark。只有寫入數據被同步到所有的ISR中的副本后,數據才認為已提交,HW更新到該位置,在HW之前的數據才可以被消費,保證沒有同步完成的數據不會被消費者訪問到
數據流圖

HW和LEO

特性:
- 高吞吐量,低延遲,kafka每秒可以處理幾十萬條消息,延遲最低只有幾毫秒
- 可擴展:集群支持熱擴展
- 持久化,可靠性:消息被持久化到磁盤並支持數據備份
- 若錯性:允許集群中節點失敗(若副本數量為n,則允許n-1個節點失敗)
- 高並發
消息發送流程:
- 指定topic/key/value
- 序列化value
- 分區:通過hash(key)/value/自定義來確定分區
- 攔截:可以通過編寫攔截器,統一對消息進行格式轉換
消息發送類型:
- 同步
- 異步
生產者的其他參數
- acks:0代表不等任何寫入成功則馬上返回,如果出現故障,生產者無感知。用於高吞吐量場景;1代表集群的leader收到消息,如果沒有leader,則返回失敗,並重試;-1代表所有節點都同步完,最安全
- retries:如果分區找不到leader,則會返回失敗,並重試retries次,超過次數則放棄重試返回錯誤。
- batch.size:有很多消息要發送到同一分區時,生產者會把他們放到同一批次里,該參數代表內存可以容納的消息的多少,相當於緩沖區
- max.request.size:單個消息的最大值,需要跟broker可以接收消息的最大值一致message.max.size。要是大於,broker會拒絕接收數據
消費者
-
消費者可以訂閱多個topic,可以指定訂閱哪個分區
-
位移提交
分區內,每條消息都有一個offset,用於管理消息在分區的位置,當消費者讀取消息時,broker並不會更新offset,而是由消費者來commit位移
重復消費:
原因:
- 數據已經被消費,但是offset沒提交
場景:
- max.poll.interval.ms:消費者兩次poll操作允許的最大時間間隔,默認5分鍾,如果超過這個時間,kafka會認為消費者下線,kafka會進行rebalance,導致原來的消費者連接失效,無法提交offset,而新的消費者就會重復消費這條消息
- 不同組的消費者消費同一個topic
- 消費者使用自動提交模式,當還沒有提交,組內由新的消費者進來或者移除,發生rebalance,原來消費者失效,offset沒有提交,消費被重復消費
- 使用異步提交,並且在callback里寫了失敗重試,但是沒有注意順序。例如提交5的時候,發送網絡故障,由於是異步,程序繼續運行,再次提交10的時候,提交成功,此時正好運行到5的重試,並且成功。當發生了rebalance,又會重復消費了數據
- 自己手動設置offset
解決方法:
- 在redis中維持offset的記錄(key=topic+'-'+partition,value=offset)。每次新的消費者起來,先取出上次讀到的offset,然后用seek到上次的offset的位置,然后緊接着從kafka取記錄
數據丟失:
場景:
- ack=0,發送失敗,就丟失了
- ack=1,leader crash,follower沒來得及同步,丟失
- unclean.leader.election.enable 為 true,允許OSR的副本作為leader,當leader和ISR都crash了,OSR中的副本成為leader,數據會丟失
解決:
- ack=all/-1,retries>1,unclean.leader.election.enable=false
會影響吞吐量 - min.insync.replicas>1
生產者發送重復
原因
生產者發送消息但是沒有收到broker的響應,導致生產者重試
解決方法:
- 啟用冪等
- ack=0 不重試
生產者的冪等性
- 可用於解決生產者的重復發送的問題
- 原理:kafka會對每個生產者維護一個seq,每收到一條消息,seq會自增。當服務器收到seq小於當前最大的seq時,會拒絕這條消息
自動提交
消費者每次poll調用后,每隔5秒會自動向kafka提交offset
同步提交
消費者自己控制什么時候提交offset到kafka,同步等待方式,失敗會重試或者拋出異常
異步提交
消費者異步提交offset到kafka,不會阻塞,(ps:不要在提交失敗的回調是重試,會導致offset回退)
分組消費再平衡:
場景:
- consumer group中新增或者刪除某個consumer,導致其消費分區需要分配到組內的其他consumer
- consumer訂閱的topic發生變化,例如訂閱topic采用正則表達式匹配,而新增或刪除topic匹配正則,則會發生此topic的分區就需要分配到consumer
- consumer訂閱的topic增加分區
平衡策略:
- Round Robin:會按分區和消費者的字典序輪詢分配,會導致消費不均勻的情況,因為每個消費者可以特定指定自定擁有的分區,那么用輪詢分配,就可能造成這些分區有可能會分配到更多的分區
- Range:會根據分區和消費者的字典序輪詢分配,首先計算消費者可以得到的range是多少,然后輪詢分配,最后一輪,會把剩下的全部分配給前面幾個消費者。會導致分配不均勻
- Sticky:每次分配分區之前,都會對consumer根據所擁有的分區個數排序,個數小的排在前面,所以每次都會先從小到大的去分配。這樣做的好處是可以盡量平均的分配分區,而且保證原有的分區不會移動到其他consumer那里去
消費攔截器
可以定義同一的入口代碼,對消息進行修改或者屏蔽
Leader選舉
如果leader失效,則ISR中的節點會向zookeeper搶占leader的角色,誰先第一個搶到,誰就會成為leader
分區重新分配
場景:
- 集群擴容,需要把原有topic分區進行重新分配,否則新增節點不會負載已存在的topic
集群縮容
- pending
存儲結構:

-
每一個parition(文件夾)會平均分配到大小相同的segment文件中
-
每個文件僅需要順序讀寫
-
segment文件由index文件和data文件組成

日志清理:
- 定時清理
- 指定達到一定大小進行清理
事務
冪等性可以解決一個分區不重復,但是不能解決多個分區的運作,生產者可以通過事務對多個分區進行寫操作,並確保要么全部成功,要么全部失敗
控制器
集群中會有一個或者多個broker,其中一個會選舉為控制器(kafka controler),它負責整個集群所有分區和副本的狀態,當某個分區的leader出現故障,控制器負責該分區leader的選舉,當檢測到某個分區的ISR發生變化,由控制器通知所有broker更新元數據;當某個topic增加分區時,由控制器負責分區的重新分配
消息一致性

削峰限流例子

大量客戶端發送請求,服務器有可能資源不夠,導致大量請求失敗,並不能在短時間內處理大量的請求,可以用MQ做緩沖,客戶端把請求發送到MQ,server根據自己的能力拉取消息,並把response的消息推送到MQ,客戶端再拉取消息。
- 優點:可以支持大量的請求,不會出現大量請求失敗
- 缺點:使用MQ是用時間換成功率,時延會拉長
