應用往Kafka寫數據的原因有很多:用戶行為分析、日志存儲、異步通信等。多樣化的使用場景帶來了多樣化的需求:消息是否能丟失?是否容忍重復?消息的吞吐量?消息的延遲?
kafka介紹
Kafka屬於Apache組織,是一個高性能跨語言分布式發布訂閱消息隊列系統[7]。它的主要特點有:
以時間復雜度O(1)的方式提供消息持久化能力,並對大數據量能保證常數時間的訪問性能;
高吞吐率,單台服務器可以達到每秒幾十萬的吞吐速率;
支持服務器間的消息分區,支持分布式消費,同時保證了每個分區內的消息順序;
輕量級,支持實時數據處理和離線數據處理兩種方式。
1.1. 主要功能
根據官網的介紹,ApacheKafka®是一個分布式流媒體平台,它主要有3種功能:
1:發布和訂閱消息流,這個功能類似於消息隊列,這也是kafka歸類為消息隊列框架的原因
2:以容錯的方式記錄消息流,kafka以文件的方式來存儲消息流
3:可以再消息發布的時候進行處理
1.2. 使用場景
1:在系統或應用程序之間構建可靠的用於傳輸實時數據的管道,消息隊列功能
2:構建實時的流數據處理程序來變換或處理數據流,數據處理功能
kafka生產者
首先,創建ProducerRecord必須包含Topic和Value,key和partition可選。然后,序列化key和value對象為ByteArray,並發送到網絡。
接下來,消息發送到partitioner。如果創建ProducerRecord時指定了partition,此時partitioner啥也不用做,簡單的返回指定的partition即可。如果未指定partition,partitioner會基於ProducerRecord的key生成partition。producer選擇好partition后,增加record到對應topic和partition的batch record。最后,專有線程負責發送batch record到合適的Kafka broker。
當broker收到消息時,它會返回一個應答(response)。如果消息成功寫入Kafka,broker將返回RecordMetadata對象(包含topic,partition和offset);相反,broker將返回error。這時producer收到error會嘗試重試發送消息幾次,直到producer返回error。
實例化producer后,接着發送消息。這里主要有3種發送消息的方法:
立即發送:只管發送消息到server端,不care消息是否成功發送。大部分情況下,這種發送方式會成功,因為Kafka自身具有高可用性,producer會自動重試;但有時也會丟失消息;
同步發送:通過send()方法發送消息,並返回Future對象。get()方法會等待Future對象,看send()方法是否成功;
異步發送:通過帶有回調函數的send()方法發送消息,當producer收到Kafka broker的response會觸發回調函數
以上所有情況,一定要時刻考慮發送消息可能會失敗,想清楚如何去處理異常。
通常我們是一個producer起一個線程開始發送消息。為了優化producer的性能,一般會有下面幾種方式:單個producer起多個線程發送消息;使用多個producer。
kafka消費者
kafka的消費模式總共有3種:最多一次,最少一次,正好一次。為什么會有這3種模式,是因為客戶端處理消息,提交反饋(commit)這兩個動作不是原子性。
1.最多一次:客戶端收到消息后,在處理消息前自動提交,這樣kafka就認為consumer已經消費過了,偏移量增加。
2.最少一次:客戶端收到消息,處理消息,再提交反饋。這樣就可能出現消息處理完了,在提交反饋前,網絡中斷或者程序掛了,那么kafka認為這個消息還沒有被consumer消費,產生重復消息推送。
3.正好一次:保證消息處理和提交反饋在同一個事務中,即有原子性。
本文從這幾個點出發,詳細闡述了如何實現以上三種方式。
At-most-once(最多一次)
設置enable.auto.commit為ture
設置 auto.commit.interval.ms為一個較小的時間間隔.
client不要調用commitSync(),kafka在特定的時間間隔內自動提交。
At-least-once(最少一次)
方法一
設置enable.auto.commit為false
client調用commitSync(),增加消息偏移;
方法二
設置enable.auto.commit為ture
設置 auto.commit.interval.ms為一個較大的時間間隔.
client調用commitSync(),增加消息偏移;
Exactly-once(正好一次)
3.1 思路
如果要實現這種方式,必須自己控制消息的offset,自己記錄一下當前的offset,對消息的處理和offset的移動必須保持在同一個事務中,例如在同一個事務中,把消息處理的結果存到mysql數據庫同時更新此時的消息的偏移。
3.2 實現
設置enable.auto.commit為false
保存ConsumerRecord中的offset到數據庫
當partition分區發生變化的時候需要rebalance,有以下幾個事件會觸發分區變化
1 consumer訂閱的topic中的分區大小發生變化
2 topic被創建或者被刪除
3 consuer所在group中有個成員掛了
4 新的consumer通過調用join加入了group
此時 consumer通過實現ConsumerRebalanceListener接口,捕捉這些事件,對偏移量進行處理。
consumer通過調用seek(TopicPartition, long)方法,移動到指定的分區的偏移位置。
參考:https://blog.csdn.net/laojiaqi/article/details/79034798
Broker
Kafka是一個高吞吐量分布式消息系統,采用Scala和Java語言編寫,它提供了快速、可擴展的、分布式、分區的和可復制的日志訂閱服務。它由Producer、Broker、Consumer三部分構成.
Producer向某個Topic發布消息,而Consumer訂閱某個Topic的消息。 一旦有某個Topic新產生的消息,Broker會傳遞給訂閱它的所有Consumer,每個Topic分為多個分區,這樣的設計有利於管理數據和負載均衡。
Broker:消息中間件處理結點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka集群。
Controller:中央控制器Control,負責管理分區和副本狀態並執行管理着這些分區的重新分配。(里面涉及到partition leader 選舉)
ISR:同步副本組
Topic
在Kafka中,消息是按Topic組織的.
Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列。
Segment:partition物理上由多個segment組成
offset:每個partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到partition中. partition中的每個消息都有一個連續的序列號叫做offset,用於partition唯一標識一條消息.
topic中partition存儲分布
在Kafka文件存儲中,同一個topic下有多個不同partition,每個partition為一個目錄,partiton命名規則為topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數量減1。
cleaner-offset-checkpoint:存了每個log的最后清理offset
meta.properties: broker.id 信息
recovery-point-offset-checkpoint:表示已經刷寫到磁盤的記錄。recoveryPoint以下的數據都是已經刷 到磁盤上的了。
replication-offset-checkpoint: 用來存儲每個replica的HighWatermark的(high watermark (HW),表示已經被commited的message,HW以下的數據都是各個replicas間同步的,一致的。)
partiton中文件存儲方式
每個partion(目錄)由多個大小相等segment(段)數據文件中。但每個段segment file消息數量不一定相等,這種特性方便old segment file快速被刪除。
每個partiton只需要支持順序讀寫就行了,segment文件生命周期由服務端配置參數決定。
partiton中segment文件存儲結構
Kafka消費者
消費組與分區重平衡
可以看到,當新的消費者加入消費組,它會消費一個或多個分區,而這些分區之前是由其他消費者負責的;另外,當消費者離開消費組(比如重啟、宕機等)時,它所消費的分區會分配給其他分區。這種現象稱為重平衡(rebalance)。重平衡是Kafka一個很重要的性質,這個性質保證了高可用和水平擴展。不過也需要注意到,在重平衡期間,所有消費者都不能消費消息,因此會造成整個消費組短暫的不可用。而且,將分區進行重平衡也會導致原來的消費者狀態過期,從而導致消費者需要重新更新狀態,這段期間也會降低消費性能。后面我們會討論如何安全的進行重平衡以及如何盡可能避免。
消費者通過定期發送心跳(hearbeat)到一個作為組協調者(group coordinator)的broker來保持在消費組內存活。這個broker不是固定的,每個消費組都可能不同。當消費者拉取消息或者提交時,便會發送心跳。
如果消費者超過一定時間沒有發送心跳,那么它的會話(session)就會過期,組協調者會認為該消費者已經宕機,然后觸發重平衡。可以看到,從消費者宕機到會話過期是有一定時間的,這段時間內該消費者的分區都不能進行消息消費;通常情況下,我們可以進行優雅關閉,這樣消費者會發送離開的消息到組協調者,這樣組協調者可以立即進行重平衡而不需要等待會話過期。
在0.10.1版本,Kafka對心跳機制進行了修改,將發送心跳與拉取消息進行分離,這樣使得發送心跳的頻率不受拉取的頻率影響。另外更高版本的Kafka支持配置一個消費者多長時間不拉取消息但仍然保持存活,這個配置可以避免活鎖(livelock)。活鎖,是指應用沒有故障但是由於某些原因不能進一步消費。
1.3. 詳細介紹
Kafka目前主要作為一個分布式的發布訂閱式的消息系統使用,下面簡單介紹一下kafka的基本機制
1.3.1 消息傳輸流程
Producer即生產者,向Kafka集群發送消息,在發送消息之前,會對消息進行分類,即Topic,上圖展示了兩個producer發送了分類為topic1的消息,另外一個發送了topic2的消息。
Topic即主題,通過對消息指定主題可以將消息分類,消費者可以只關注自己需要的Topic中的消息
Consumer即消費者,消費者通過與kafka集群建立長連接的方式,不斷地從集群中拉取消息,然后可以對這些消息進行處理。
從上圖中就可以看出同一個Topic下的消費者和生產者的數量並不是對應的。
1.3.2 kafka服務器消息存儲策略
談到kafka的存儲,就不得不提到分區,即partitions,創建一個topic時,同時可以指定分區數目,分區數越多,其吞吐量也越大,但是需要的資源也越多,同時也會導致更高的不可用性,kafka在接收到生產者發送的消息之后,會根據均衡策略將消息存儲到不同的分區中。
在每個分區中,消息以順序存儲,最晚接收的的消息會最后被消費。
kafka中的message以topic的形式存在,topic在物理上又分為很多的partition,partition物理上由很多segment組成,segment是存放message的真正載體。
下面具體介紹下segment文件:
(1) 每個partition(目錄)相當於一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每個段segment file消息數量不一定相等,這種特性方便old segment file快速被刪除。
(2) 每個partiton只需要支持順序讀寫就行了,segment文件生命周期由服務端配置參數決定。
(3) segment file組成:由2大部分組成,分別為index file和data file,此2個文件一一對應,成對出現,后綴”.index”和“.log”分別表示為segment索引文件、數據文件.
(4) segment文件命名規則:partion全局的第一個segment從0開始,后續每個segment文件名為上一個segment文件最后一條消息的offset值。數值最大為64位long大小,19位數字字符長度,沒有數字用0填充。
segment中index<—->data file對應關系物理結構如下:
index與log映射關系
.index文件存放的是message邏輯相對偏移量(相對offset=絕對offset-base offset)與在相應的.log文件中的物理位置(position)。但.index並不是為每條message都指定到物理位置的映射,而是以entry為單位,每條entry可以指定連續n條消息的物理位置映射(例如:假設有20000~20009共10條消息,.index文件可配置為每條entry
指定連續10條消息的物理位置映射,該例中,index entry會記錄偏移量為20000的消息到其物理文件位置,一旦該條消息被定位,20001~20009可以很快查到。)。每個entry大小8字節,前4個字節是這個message相對於該log segment第一個消息offset(base offset)的相對偏移量,后4個字節是這個消息在log文件中的物理位置。
1.3.3 與生產者的交互
生產者在向kafka集群發送消息的時候,可以通過指定分區來發送到指定的分區中
也可以通過指定均衡策略來將消息發送到不同的分區中
如果不指定,就會采用默認的隨機均衡策略,將消息隨機的存儲到不同的分區中
1.3.4 與消費者的交互
在消費者消費消息時,kafka使用offset來記錄當前消費的位置
在kafka的設計中,可以有多個不同的group來同時消費同一個topic下的消息,如圖,我們有兩個不同的group同時消費,他們的的消費的記錄位置offset各不項目,不互相干擾。
對於一個group而言,消費者的數量不應該多余分區的數量,因為在一個group中,每個分區至多只能綁定到一個消費者上,即一個消費者可以消費多個分區,一個分區只能給一個消費者消費
因此,若一個group中的消費者數量大於分區數量的話,多余的消費者將不會收到任何消息。
轉自:https://blog.csdn.net/luanpeng825485697/article/details/81036028