第一部分:kafka概述
一、定義(消息引擎系統)
一句話概括kafka的核心功能就是:高性能的消息發送與高性能的消息消費。
kafka剛推出的時候是以消息引擎的身份出現的,它具有強大的消息傳輸效率和完備的分布式解決方案,隨着版本更新,在kafka0.10.0.0版推出了流式處理組件——Kafka Streams,使kafka交由下游數據處理平台做的事也可以自己做,自此kafka在消息引擎的基礎上正式成為了一個流式處理框架。但無論是消息引擎還是流式處理平台,kafka的處理架構從未質變,概括如下:
圖 kafka簡要架構圖
總結就是三句話:
- 生產者發送消息給kafka服務器;
- 消費者從kafka服務器讀取消息;
- kafka服務器依托zookeeper集群進行服務的協調管理。
說到消息引擎,和它類似的術語就是消息隊列和消息中間件,個人感覺稱kafka為消息引擎更合理。因為“消息隊列”名字給出了一個很不准確的暗示,仿佛它就是以隊列的形式實現的;而消息中間件有點過度強調了“中間件”之嫌,使其真實用途不夠明顯。
消息引擎系統既然是在不同應用之間傳輸消息的系統,那么在設計時需要重點考慮的關鍵因素就是:消息設計、傳輸協議設計和消息引擎范型。
消息設計
消息引擎系統在設計消息時一定要考慮語義的清晰和格式上的通用性,消息通常都采用結構化的方式進行設計,比如XML格式、JSON格式的消息等,而kafka的消息是用二進制方式來保存的,但依然是結構化的消息。
傳輸協議設計
廣義上的傳輸協議包括任何能夠在不同系統間傳輸消息或是執行語義操作的協議或框架,比如RPC及序列化框架、Google的ProtoBuffers、阿里系的Dubbo等,而kafka自己設計了一套二進制的消息傳輸協議。(后面再講)傳輸協議作為一個基礎構建塊,它服務於消息引擎系統實現的消息引擎范型。
消息引擎范型
最常見的兩種消息引擎范型是消息隊列模型和發布/訂閱模型。
消息隊列模型是基於隊列提供消息傳輸服務的,其定義了消息隊列、發送者、接收者,提供的是一種點對點的消息傳遞方式。一旦消息被消費就會從隊列中移除該消息,每條消息由一個發送者生產出來,且只被一個消費者處理——發送者和消費者是一對一的關系,類似生活中之前的電話接線生的工作。
發布/訂閱模型有主題的概念,一個主題可以理解為邏輯語義相近的消息的容器,該模型也定義了類似生產者、消費者的角色,即發布者(publisher)和訂閱者(subscriber)。發布者將消息生產出來發送到指定的topic中,所有訂閱了該topic的訂閱者都可以接受到該topic下的所有消息,類似生活中報紙的訂閱。
kafka通過引入消息組(consumer group)來同時支持這兩種模型。(后面再講)
二、概要設計
kafka的設計初衷就是為了解決互聯網公司超大量級數據的實時傳輸,概要設計關鍵點:吞吐量/延時、消息持久化、負載均衡和故障轉移、伸縮性。
1、吞吐量/延時
kafka的吞吐量就是指每秒能夠處理的消息數或者每秒能處理的字節數。kafka的延時可以表示客戶端發起請求與服務器處理請求並發送響應給客戶端之間的這段時間。在實際使用場景中,這兩個指標通常是一個矛盾體,但也不是等比例的此消彼長的關系。
kafka寫入端實現高吞吐量低延時的方法原理:利用操作系統的頁緩存和采用追加寫入消息的方式。
kafka會持久化所有數據到磁盤,但是本質上每次寫入操作都只是把數據寫入到操作系統的頁緩存中,然后由操作系統自行決定何時把頁緩存中的數據寫回磁盤上。正是得益於這種對磁盤的使用方式,使得kafka的寫入操作是很快的。
這樣設計有3個主要優勢:
- 操作系統頁緩存是在內存中分配的,所以消息寫入的速度非常快;
- kafka不必直接與底層的文件系統打交道,所有繁瑣的IO操作都交給操作系統來處理;
- kafka寫入操作采用追加寫入的方式,避免了磁盤的隨機寫操作(對於普通的物理磁盤(非固態硬盤)隨機讀/寫的吞吐量的確很慢,但是磁盤的順序讀/寫操作其實是很快的,速度甚至可以匹敵內存的隨機I/O速度)。
kafka在設計時采用了追加寫入消息的方式,即只能在日志文件末尾追加寫入新的消息,且不能修改已寫入的消息,因此它屬於典型的磁盤順序訪問型操作。
kafka消費端實現高吞吐量低延時的方法原理:kafka把消息寫入操作系統的頁緩存中,同樣地,kafka在讀取消息時會首先嘗試從操作系統的頁緩存中讀取,且大部分消息很可能依然存在於頁緩存中,如果命中就把消息經頁緩存直接發送到網絡的socket上,不用“穿透”到底層的物理磁盤上獲取消息,同時這個過程用到了大名鼎鼎的零拷貝(zero copy)技術。
補充說明:傳統的Linux操作系統中的I/O接口是依托於數據拷貝來是實現的,在零拷貝技術出現之前,一個I/O操作會將同一份數據進行多次拷貝,數據傳輸過程中還涉及到內核態與用戶態的上下文切換,CPU的開銷非常大,極大限制了操作系統高效進行數據傳輸的能力,而零拷貝技術很好的改善了這個問題。
【總結】kafka依靠下面4點達到了高吞吐量、低延時的設計目標:
- 大量使用操作系統頁緩存,內存操作速度快且命中率高;
- kafka不直接參與物理I/O操作,而是交由最擅長此事的操作系統來完成;
- 采用追加寫入方式,摒棄了緩慢的磁盤隨機讀/寫操作;
- 使用以sendfile為代表的零拷貝技術加強網絡間的數據傳輸效率;
2、消息持久化
kafka是要持久化消息到磁盤上的,這樣做的好處是:
- 解耦消息發送和消息消費:通過將消息持久化使得生產者方不再需要直接和消費者方耦合,它只是簡單的把消息生產出來並交由kafka服務器保存即可;
- 實現靈活的消息處理:可以很方便的實現消息重演,即對於已經處理過的消息可能在未來某個時間點需要重新處理一次。
普通系統在實現持久化時可能會先盡量使用內存,當內存資源耗盡時再一次性的把數據“刷盤”,而kafka則反其道而行之,所有數據都會立即被寫入文件系統的持久化日志中,之后kafka服務器才會返回結果給客戶端通知它們消息已被成功寫入。這樣能減少kafka程序對內存的消耗從而將節省出來的內存留給頁緩存使用,更進一步提升性能。
3、負載均衡和故障轉移
負載均衡就是指讓系統的負載根據一定的規則均衡地分配在所有參與工作的服務器上,從而最大限度的提升系統整體的運行效率。
對於kafka來說就是,每台服務器broker都有均等的機會為kafka的客戶提供服務,可以把負載分散到所有集群中的機器上。
kafka通過智能化的分區領導者選舉來實現負載均衡,kafka默認提供智能的leader選舉算法,可在集群的所有機器上以均等機會分散各個partition的leader,從而整體上實現負載均衡。
kafka的故障轉移是通過使用會話機制實現的,每台kafka服務器啟動后會以會話的形式把自己注冊到zookeeper服務器上。一旦該服務器運轉出現問題,與zookeeper的會話便不能維持從而超時失效,此時kafka集群會選舉出另一台服務器來完全替代這台服務器繼續提供服務。
4、伸縮性
伸縮性是指向分布式系統中增加額外的計算資源比如CPU、內存、存儲或帶寬等時吞吐量提升的能力。
如果一個CPU的運算能力是U,那么兩個CPU的運算能力我們自然希望是2U,即可以線性的擴容計算能力,但是由於很多隱藏的“單點”瓶頸導致實際中幾乎不可能達到。阻礙線性擴容的一個很常見的因素就是狀態的保存,因為無論哪類分布式系統,集群中的每台服務器一定會維護很多內部狀態,如果有服務器自己來保存這些狀態信息,則必須要處理一致性的問題。相反,若服務器是無狀態的,狀態的保存和管理交由專門的協調服務來做比如zookeeper,那么整個集群的服務器之間就無需繁重的狀態共享,就極大地降低了維護復雜度。倘若要擴容集群節點,只需簡單的啟動新的節點機器進行自動負載均衡就可以了。kafka正是采用上述思想,將每台kafka服務器上的狀態統一交由zookeeper保管,擴展kafka集群時只需啟動新的kafka服務器即可。說明:kafka服務器上並不是所有狀態都不保存,之保存了很輕量級的內部狀態,因此整個集群間維護狀態一致性的代價很低。
三、kafka基本概念和術語
1、消息
消息由消息頭部、key和value組成。kafka中的消息格式由很多字段組成,其中很多字段都是用於管理消息的元數據字段能,對用戶是透明的。V1版本的消息格式如下圖(不同版本可能會有稍微差異):
圖 消息的完整格式
kafka使用緊湊的二進制字節數組來保存字段,也就是沒有多余的比特位浪費。通常的Java堆上內存分配,即使有重排各個字段在內存的布局以減少內存使用量的優化措施,但仍有部分字節用於補齊之用。同時,運行Java的操作系統通常都默認開啟了頁緩存機制,也就是說堆上保存的對象很可能在頁緩存中還保留一份,這就造成了極大的資源浪費。kafka在消息設計時直接使用緊湊的二進制字節數組ByteBuffer而不是獨立的對象,避開了繁重的java堆上內存分配。因此,我們至少能夠訪問多一倍的可用內存。還有一點,大量使用頁緩存而非堆內存還有一個好處——數據不丟失,即當出現kafka broker進程崩潰時,堆內存上的數據也一並消失,但頁緩存的數據依然存在。
2、主題和分區即topic和partition:
topic只是一個邏輯概念,代表一類消息,也可以認為是消息被發送到的地方,通常我們可以使用topic來區分實際業務。
kafka中的topic通常都會被多個消費者訂閱,出於性能的考量,kafka並不是topic-message的兩級結構,而是采用topic-partition-message的三級結構來分散負載。topic與partition關系如下圖.
圖 topic和partition
kafka的partition實際上並沒有太多的業務含義,它的引入就是單純的為了提升系統的吞吐量。
topic是有多個partition組成的,而partition是不可修改的有序消息序列,也可以說是有序的消息日志。每個partition有自己專屬的partition號,通常是從0開始。用戶對partition唯一 能做的就是在消息序列的末尾追加寫入消息。partition上的每條消息都會被分配一個唯一的序列號——位移。位移值也是從0開始順序遞增的整數,通過位移信息可以唯一定位到某partition下的一條信息。
3、位移offset
topic partition下的每條消息都被分配一個位移值,而在kafka消費者端也有位移的概念,注意區分。每條消息在某個partition的位移是固定的,但消費該partition的消費者的位移會隨着消費進度不斷前移,但不會超過前者。因此,今后討論位移的時候一定給出清晰的上下文環境。
綜上,可以斷言kafka中的一條消息其實就是一個<topic, partition,offset>三元組。
4、replica副本、leader、follower
kafka中的分區partition是有序消息日志,那為了實現高可靠性,通過冗余機制——備份多份日志,而這些備份日志在kafka中被稱為副本(replica),它們存在的唯一目的就是防止數據丟失。
kafka中的replica分為兩個角色:領導者(leader)和追隨者(follower)(類似過去的主備的提法(Master-slave)),也即副本分為兩類:領導者副本(leader replica)和追隨者副本(follower replica)。follower replica是不能提供服務給客戶端的,也即不負責響應客戶端發來的消息寫入和消息消費請求,它只是被動地向領導者副本獲取數據,保持與leader的同步,follower存在的唯一價值就是充當leader的候補,一旦leader replica所在的broker宕機,kafka會從剩余的replica中選舉出新的leader繼續提供服務。
圖 kafka的leader-follower系統
5、ISR(與leader replica保持同步的replica集合)
比如一個partition可以配置N個replica,那么是否就以為着該partition可以容忍N-1個replica失效而不丟失數據呢?答案是“否”。
kafka為partition動態維護一個replica集合,該集合中的所有replica保存的消息日志都與leader replica保持同步狀態,只有這個集合中的replica才能被選舉為leader,也只有該集合中所有replica都接收到了同一條消息,kafka才會將該消息置於“已提交”狀態,即認為這條消息發送成功。kafka能保證只要ISR集合中至少存在一個replica,那些“已提交”狀態的消息就不會丟失——兩個關鍵點:第一,ISR中至少存在一個“活着的”replica;第二,“已提交”消息。
正常情況下,partition的所有replica都應該與leader replica保持同步,即所有的replica都在ISR中,但因各種原因,小部分replica可能開始落后於leader replica的進度,當其滯后到一定程度時,kafka會將這些replica“踢出”ISR。相反,當這些replica重新“追上”了leader replica的進度時,kafka又會將它們加回到ISR中。這些都是自動維護的,不需人工干預。
四、kafka使用場景
1、消息傳輸:替代傳統的消息總線等。
2、網站行為日志追蹤:鑒於點擊流數據量很大,kafka超強的吞吐量特性就有了用武之地。網站上的用戶操作以消息的形式發送到kafka的某個對應topic中,然后使用機器學習或其他實時處理框架來幫助收集並分析。
3、審計數據收集:從各個運維應用程序處實時匯總操作步驟信息進行集中式管理,同時支持持久化特性,方便后續離線審計。
4、日志收集:各個機器上的分散日志,通過kafka進行全量收集,並集中送往下游的分布式存儲如hdfs中。相對於其他主流的日志抽取框架比如flume,kafka有更好的性能,而且提供了完備的可靠性解決方案,同時還有低延時的特點。
5、流式處理:新版本kafka才推出的流式處理組件kafka streams,相對於典型的流式處理框架如Apache Storm、Apache Samza、Spark、Apache Flink等競爭力如何,讓時間給出答案吧。
五、版本注意事項
自1.0.0版本開始,kafka版本號正式從原來的四位升級到了現在的3位,格式是<major>.<minor>.<patch>。
在kafka世界中,通常把producer和consumer統稱為客戶端即clients,這是與服務器端即broker相對應的。
選擇kafka版本時要注意的幾個分界點為:0.8版本才加入了集群間的備份機制;0.9.0.0版本開始才支持kafka security功能;0.10.0.0(含)之后的版本才有了流式處理組件kafka streams;但建議選擇相對較新版本,功能更完善bug更少咯。
2014年kafka的創始人創辦了公司——Confluent.io,從事商業化Kafka工具開發以及提供實時流式處理方面的產品。另外,confluent還分為開源版本和企業版本,企業版本中提供了對底層kafka集群完整的可視化監控解決方案以及一些輔助系統幫助管理集群,而開源版本與Apache社區的kafka並無太大區別。
第二部分:kafka線上環境部署
一、環境部署說明
略
二、集群環境規划
1、操作系統選型
除了現狀的確是Linux服務器數量最多,單論它與kafka本身的相適性,Linux也要比Windows等操作系統更加適合部署kafka,能想到的原因有兩個:I/O模型的使用和數據網絡傳輸效率。
2、磁盤選型
使用機械硬盤完全可以滿足kafka集群的使用,當然SSD更好。
關於JBOD(一堆普通磁盤的意思)和RAID(磁盤陣列)的選擇,即使用一堆普通商用磁盤進行安裝還是搭建專屬的RAID呢?答案是具體問題具體分析。追求性價比的公司可以考慮使用JBOD.
3、磁盤容量規划
主要考慮以下因素:
- 新增消息數
- 消息留存時間
- 平均消息時間
- 副本數
- 是否啟用壓縮。
4、內存規划
kafka對於Java堆內存的使用不是很多,kafka將消息寫入頁緩存,一般情況下,broker所需的堆內存都不會超過6GB。
對於內存的規划建議如下:
- 盡量分配更多的內存給操作系統的page cache;
- 不要為broker設置過大的堆內存;
- page cache大小至少要大於一個日志段的大小(?)。
5、CPU規划
要追求多核而非高時鍾頻率。
6、帶寬選擇
規划建議為:
- 盡量使用高速網絡;
- 根據自身網絡條件和帶寬來評估Kafka集群機器數量;
- 盡量避免使用跨機房網絡。
7、kafka集群涉及的主要幾類參數:
- broker端參數
- topic級別參數
- GC配置參數
- JVM參數
- OS參數。
第三部分:producer開發
一、序言
kafka內置有Java版本producer,而當前Apache kafka支持的第三方clients庫有很多,這些第三方庫基本上都是由非Apache kafka社區的人維護的,用戶下載的是Apache kafka的話默認是不包含這些庫的,需要單獨下載對應的庫。
Apache kafka封裝了一套二進制通信協議,對於producer而言,用戶幾乎可以使用任意語言按照該協議進行編程,從而實現向kafka發送消息。
實際上內置的Java版本producer和上面列出的所有第三方庫在底層都是相同的實現原理,這組協議本質上為不同的協議類型分別定義了專屬的緊湊二進制字節數組格式,然后通過socket發送給合適的broker,之后等待broker處理完成后返回響應給producer。這樣設計的好處就是具有良好的統一性——即所有的協議類型都是統一格式的,並且由於是自定義的二進制格式,這套協議不依賴任何外部序列號框架,從而顯得輕量級也具有好的擴展性。
二、producer工作原理
說到producer,它的主要功能就是向某個topic的某個分區發送一條消息,所以它首先需要確定到底要向topic的哪個分區寫入消息——這就是分區器做的事。
kafka producer提供了一個默認的分區器,對於每條待發送的消息,如果該消息指定了key,那么partitioner會根據key的哈希值來選擇目標分區;若這條消息沒有指定key,則partitioner使用輪訓的方式確認目標分區,從而最大限度的保證消息在所有分區上的均勻性。
當然,producer提供了用戶自行指定目標分區的API,即用戶在消息發送時跳過partitioner直接指定要發送到的分區。另外,producer也允許用戶實現自定義的分區策略而不使用默認的分區器。
第二,確認了目標分區之后,producer要做的第二個事就是尋找這個分區對應的leader,也就是該分區leader副本所在的kafka broker。因此,在發送消息時,producer也就有了多種選擇來實現消息發送(比如不等待任何副本的響應便返回成功、只是等待leader副本響應寫入操作后再返回成功等)。
producer簡言之就是將用戶待發送的消息封裝成一個ProducerRecord對象,然后使用KafkaProducer.send方法進行發送。具體過程為:Producer首先使用一個線程(用戶主線程,也即用戶啟動Producer的線程)將待發送的消息封裝進一個ProducerRecord類實例,然后將其序列化之后發送給partitioner,再結合本地緩存的元數據信息由partitioner來確定目標分區后一同發送到位於producer程序中的一塊內存緩沖區中。而KafkaProducer中的另一個專門的sender I/O線程則負責實時地從該緩沖區中提取出准備就緒的消息封裝進一個批次(batch),統一發送給對應的broker。工作流程圖如下圖。
圖 Java版本producer的工作流程
三、構造producer(詳見Demo代碼)
1、構造producer實例大致步驟
1>構造一個java.util.Properties對象,然后至少指定bootstrap.servers 、key.serializer、value.serializer這三個屬性。
對於bootstrap.servers參數,若kafka集群中機器數很多,可只需指定部分broker即可,producer會通過該參數找到並發現集群中所有的broker。被發送到broker端的任何消息的格式必須是字節數組,因此消息的各個組件必須首先做序列化,然后才能發送到broker。一定注意的是,key.serializer和value.serializer兩個參數必須是全限定名。
2>使用上一步中創建的Properties實例構造KafkaProducer對象。
/** 創建producer的時候同時指定key和value的序列化類,則不需在Properties中指定了。*/Serializer<String> keySerializer = new StringSerializer();Serializer<String> valueSerializer = new StringSerializer();Producer<String, String> producer = new KafkaProducer<String, String>(props, keySerializer, valueSerializer);
3>構造待發送的消息對象ProducerRecord,指定消息要被發送到的topic、分區及對應的key和value。注意,分區和key信息可以不用指定,有kafka自行確定分區。
4>調用KafkaProducer的send方法發送消息。
通過Java提供的Future同時實現了同步發送和異步發送+回調(Callback)兩種發送方式。而上文代碼清單中的調用方式實現了第三種發送方式——fire and forget即發送之后不管發送結果,在實際中不被推薦使用。真是使用場景中,同步和異步發送方式才是最常見的兩種方式。
異步發送:實際上所有的寫入操作默認都是異步的。send方法提供了回調類參數來實現異步發送以及發送結果的響應,具體代碼如下:
/** 發送消息后的回調類Callback實際上是一個Java接口,用戶可以創建自定義的Callback實現類來處理消息發送后的邏輯,* 只要該類實現org.apache.kafka.clients.producer.Callback接口即可。*/producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//兩個參數不會同時非空if(exception == null) {//消息發送成功}else {//執行錯誤處理邏輯
if(exception instanceof RetriableException) {
//處理可重試瞬時異常
}else {
//處理不可重試異常
}}}})
同步發送:調用Future.get()無限等待結果返回,即實現同步發送的效果,具體代碼如下:
producer.send(record).get();//使用Future.get會一直等待直至Kafka broker將發送結果返回給producer程序.
【說明】無論同步發送和異步發送都有可能失敗,當前kafka的錯誤類型包含兩類:可重試異常和不可重試異常。所有可重試異常都繼承自org.apache.kafka.common.errors.RetriableException抽象類。
5>關閉KafkaProducer。
producer程序結束時一定要關閉producer。提供有無參數的close方法和有超時參數close方法。在實際場景中,一定要慎用待超時參數的close方法。
2、producer的主要參數
acks:指定在給producer發送響應前,leader broker必須要確保已成功寫入該消息的副本數。有3個取值:0、1和all。
| acks | producer吞吐量 | 消息持久性 | 使用場景 |
| 0 | 最高 | 最差 | 1、完全不關心消息是否發送成功; 2、允許消息丟失(比如統計服務器日志等) |
| 1 | 適中 | 適中 | 一般場景即可 |
| all或-1 | 最差 | 最高 | 不能容忍消息丟失 |
buffer.memory:指定producer端用於緩存消息的緩沖區大小,單位是字節,我們幾乎可以認為該參數指定的內存大小就是producer程序使用的內存大小。
compression.type:指定是否壓縮消息,默認是none。若要壓縮直接指定壓縮類型,目前kafka支持3中壓縮算法:GZIP、Snappy和LZ4,根據實際使用經驗producer結合LZ4的性能最好。
四、消息自定義分區機制(詳見Demo代碼)
producer提供了默認的分區策略及對應的分區器供用戶使用,但有時候用戶可能想實現自己的分區策略,就需要用戶自定義實現。若要使用自定義分區機制,用戶需要做兩件事:
1、在producer程序中創建一個類,實現org.apache.kafka.clients.producer.Partitioner接口。主要分區邏輯在Partitioner.partition中實現;
2、在用於構造KafkaProducer的Properties對象中設置partitioner.class參數。
五、自定義序列化(詳見Demo代碼)
kafka支持用戶自定義消息序列化,需要完成的3件事:
1、定義數據對象格式;
2、創建自定義序列化類,實現org.apache.kafka.common.serialization.Serializer接口,在serializer方法中實現序列化邏輯;
3、在用於構造KafkaProducer的Properties對象中設置key.serializer或value.serializer.
六、producer攔截器(詳見Demo代碼)
實現定制化邏輯,實例實現了一個簡單的雙interceptor組成的攔截鏈。
七、無消息丟失配置
KafkaProducer.send方法僅僅把消息放入緩沖區中,由一個專屬I/O線程負責從緩沖區中提取消息並封裝進消息batch中,然后發送出去,而這個過程中存在着數據丟失的窗口:若I/O線程發送之前producer崩潰,則存在緩沖區中的消息全部丟失了。采用同步發送不會丟數據,但是性能會很差,實際場景中不推薦使用,因此最好能有一份配置,既使用異步方式還能有效避免數據丟失。
1、producer端配置
block.on.buffer.full = true
acks = all or –1
retries = Integer.MAX_VALUE
max.in.flight.requests.per.connection = 1
使用帶回調機制的send發送消息,即KafkaProducer.sent(record, callback)
Callback邏輯中顯式地立即關閉producer,使用close
2、broker端參數配置
unclean.leader.election.enable = false
replication.factor = 3
min.insync.replicas = 2
replication.factor > min.insync.replicas
enable.auto.commit = false
八、 producer多線程處理
存在兩種基本的使用方法:多線程單KafkaProducer實例 + 多線程多KafkaProducer實例。
兩種KafkaProducer使用方式比較
| 說明 | 優勢 | 劣勢 | |
| 單KafkaProducer實例 |
所有線程共享一個KafkaProducer實例 |
實現簡單,性能好 |
1、所有線程共享一個內存緩沖區,可能需要較多內存;2、一旦producer某個線程崩潰導致KafkaProducer實例被“破壞”,則所有用戶線程都無法工作。 |
| 多KafkaProducer實例 |
每個線程維護自己專屬的KafkaProducer實例 |
1、每個用戶線程擁有專屬的KafkaProducer實例、緩沖區空間及一組對應的配置參數,可以進行細粒度的調優;2、單個KafkaProducer崩潰不會影響其他producer線程工作 |
需要較大的內存分配開銷 |
【建議】如果是對分區數不多的Kafka集群而言,推薦使用第一種方法,即在多個producer用戶線程中共享一個KafkaProducer實例;若對那些擁有超多分區的集群而言,采用第二種方法具有較高的可控性,方便producer的后續管理。
第四部分:consumer開發
一、序言
1、版本對比
新舊版本consumer對比
|
|
編程語言 |
位移管理 |
API包名 |
主要使用類 |
|
| 新版本 |
使用消費者組(consumer group) |
Java |
新版本把位移提交到kafka的一個內部topic(__consumer_offsets)上。注意這個topic名字的前面有兩個下划線 |
org.apache.kafka.clients.consumer.* |
KafkaConsumer |
| 舊版本 | 使用low-level consumer,分high-level和low-level兩種API. | Scala | 舊版本把位移提交到zookeeper。 | kafka.consumer.* | ZookeeperConsumerConnector SimpleConsumer |
2、consumer分類
consumer分為兩類:消費者組(consumer group)和獨立消費者(standalone consumer),其中前者是由多個消費者實例(consumer instance)構成一個整體進行消費,而后者則單獨執行消費操作。我們在討論或開發consumer程序的時候,必須明確消費者上下文信息,即所使用的consumer的版本以及consumer的分類。
【消費者組】
- 一個consumer group可能有若干個consumer實例,當然一個group只有一個實例也是允許的;
- 對於同一個group而言,topic的每條消息只能被發送到group下的一個consumer實例上;
- topic消息可以被發送到多個group中。
kafka就是通過consumer group實現了對基於隊列和基於發布/訂閱兩種消息引擎模型的支持的:
- 所有consumer實例都屬於相同group——實現基於隊列的模型,每條消息只會被一個consumer實例處理;
- 所有consumer實例都屬於不同group——實現基於發布/訂閱的模型。
group.id唯一標示一個consumer group,一個consumer實例可以是一個線程或是運行在其他機器上的進程。
3、位移相關說明
這里的位移指的是consumer端的offset,與分區日志中的offset是不同的含義。
很多消息引擎是把消費端的offset保存在服務器端(broker),這樣做的好處是實現簡單,但會存在下面的問題:
- broker從此變成了有狀態的,增加了同步成本,影響伸縮性;
- 需要引入應答機制來確認消費成功;
- 由於要保存許多consumer的offset,故必然引入復雜的數據結構,從而造成不必要的資源浪費。
而kafka選擇讓consumer group保存offset,只需要保存一個長整型數據即可。當前kafka consumer在內部使用一個map來保存期訂閱topic所屬分區的offset。
新版本consumer把位移提交到kafka的一個內部topic(__consumer_offsets)上,用戶應盡量避免執行該topic的任何操作。
二、構造consumer(詳見Demo代碼)
1、構造consumer實例大致步驟
1、構造一個java.util.Properties對象,至少指定bootstrap.servers、key.deserializer、value.deserializer和group.id的值;
2、使用上一步創建的Properties實例構造KafkaConsumer對象;
3、調用KafkaConsumer.subscribe方法訂閱consumer group感興趣的topic列表;
注意subscribe方法不是增量式的,后續的subscribe調用會完全覆蓋之前的訂閱語句。
4、循環調用KafkaConsumer.poll方法獲取封裝在ConsumerRecord的topic消息;
poll函數的參數是一個超時設定,通常如果consumer拿到了足夠多的可用數據,那么它可立即從該方法返回;但若當前沒有足夠多的數據可供返回,consumer會處於阻塞狀態,這個超時參數即控制阻塞的最大時間。這個超時設定給予了用戶能夠在consumer消費的同時定期去執行其他任務(但不知道具體實現)。否則設定一個比較大的值甚至是Integer.MAX_VALUE是不錯的建議。
5、處理獲取到的ConsumerRecord對象;
拿到這些kafka消息后consumer通常都包含處理邏輯,也即consumer的目的不僅是要從kafka處讀取消息,還要對獲取到的消息進行有意義的業務級處理。從kafka consumer的角度來說,poll方法返回即認為consumer成功消費了消息,但我們用戶的觀點通常認為是執行完真正的業務級處理之后才算消費完畢。因此,對於“consumer處理太慢”的問題要從兩個方面定位明確瓶頸:第一,如果是poll返回消息的速度過慢,那么可以調節相應的參數來提升poll方法的效率;第二,若消息的業務級處理邏輯過慢,則應該考慮簡化處理邏輯或把處理邏輯放入單獨的線程執行。
6、關閉KafkaConsumer。
consumer腳本命令:目前來說,kafka所有命令行腳本表示相同含義的參數都不是統一的名字,比如consumer腳本中的名字是bootstrap-server,到了producer腳本中變成了broker-list,而在創建主題腳本中又變成了zookeeper。
三、訂閱topic
1、訂閱列表
在consumer group訂閱topic列表使用下面語句即可:
consumer.subscribe(Arrays.asList("topic1","topic2","topic3"));
在獨立consumer(standalone consumer),訂閱列表則使用下面語句實現手動訂閱:
TopicPartition tp1 = new TopicPartition("topic-name", 0);TopicPartition tp2 = new TopicPartition("topic-name", 1);consumer.assign(Arrays.asList(tp1, tp2));
2、基於正則表達式訂閱topic
使用基於正則表達式的訂閱必須指定ConsumerRebalanceListener,該類是一個回調接口,用戶需要通過實現這個接口來是吸納consumer分區分配方案發生變更時的邏輯。
如果用戶使用的是自動提交(即設置enable.auto.commit=true),則通常不用理會這個類,用下面實現類即可。
consumer.subscribe(Pattern.compile("kafka-.*"), new NoOpConsumerRebalanceListener());
但是當用戶手動提交位移的,則至少要在ConsumerRebalanceListener實現類的onPartitionsRevoked方法中處理分區分配方案變更時的位移提交。
四、consumer.poll方法剖析(詳見Demo代碼)
1、poll的內部原理
kafka的consumer是用來讀取消息的,且要能夠同時讀取多個topic的多個分區的消息。若要實現並行的消息讀取,一種方法是使用多線程的方式,為每個要讀取的分區都創建一個專有的線程去消費(這其實就是舊版本consumer采用的方式);另一種方法是采用類似Linux I/O模型的poll或select等,使用一個線程來同時管理多個socket連接,即同時與多個broker通信實現消息的並行讀取(這就是新版consumer最重要的設計改變)。
新版本Java consumer是一個多線程或說是一個雙線程的Java進程:創建KafkaConsumer的線程被稱為用戶主線程,同時consumer在后台會創建一個心跳線程。KafkaConsumer的poll方法在用戶主線程中運行,而一旦consumer訂閱了topic,所有的消費邏輯包括coordinator的協調、消費者組的rebalance以及數據的獲取都會在主邏輯poll方法的一次調用中被執行。
2、poll使用方法
KafkaConsumer.poll方法引入參數的作用:
- 第一,超時設定;
- 第二,是想讓consumer程序有機會定期“醒來”去做一些其他的事情,這是超時設定的最大意義。
poll的使用方法總結如下:
- consumer需要定期執行其他子任務:推薦poll(較小超時時間)+運行標識布爾變量的方式;
- consumer不需要定期執行子任務:推薦poll(MAX_VALUE)+捕獲WakeupException的方式。
需要定期執行的代碼:
try {while (isRunning){//將isRunning標示為volatile型,然后在其他線程中設置isRunning=false來控制consumer的結束。// 4、循環調用KafkaConsumer.poll方法獲取封裝在ConsumerRecord的topic消息;ConsumerRecords<String, String> records = consumer.poll(1000);// 5、處理獲取到的ConsumerRecord對象;for (ConsumerRecord<String, String> record : records) {LOG.info("topic = %s, partition = %d, offset = %d", record.topic(), record.partition(), record.offset());}}} finally {// 千萬不要忘記!!關閉KafkaConsumer。它不僅會清除consumer創建的各種socket資源,還會通知消費者組coordinator主動離組從而更快的開啟新一輪rebalance。consumer.close();}
不需要定期執行的代碼:
try {while (true){//設置為trueConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);//在consumer程序未獲取到足夠多數據時無限等待,然后通過捕獲WakeupException異常來判斷consumer是否結束。需要在另一個線程中調用consumer.wakeup()方法來觸發consumer的關閉。// 5、處理獲取到的ConsumerRecord對象;for (ConsumerRecord<String, String> record : records) {LOG.info("topic = %s, partition = %d, offset = %d", record.topic(), record.partition(), record.offset());}}} catch(WakeupException e) {
// 此處忽略此異常的處理
}finally {
// 千萬不要忘記!!關閉KafkaConsumer。它不僅會清除consumer創建的各種socket資源,還會通知消費者組coordinator主動離組從而更快的開啟新一輪rebalance。consumer.close();}
說明:KafkaConsumer不是線程安全的,但有一個例外就是wakeup方法,用戶可以安全地在另一個線程中調用consumer.wakeup(). 其他KafkaConsumer方法都不能同時在多線程中使用。
五、交付語義
offset對於consumer非常重要,因為它是實現消息交付語義保證的基石,常見的3種消息交付語義保證如下:
- 最多一次(at most once)處理語義:消息可能丟失,但不會被重復處理;
- 最少一次(at least once)處理語義:消息不會丟失,但可能被處理多次;
- 精確一次(exactly once)處理語義:消息一定會被處理且只會被處理一次。
六、自動提交和手動提交(詳見Demo代碼)
consumer默認是自動提交的,優勢是降低用戶的開發成本,劣勢是用戶不能細粒度地處理位移的提交。
所謂的手動位移提交就是用戶自行確定消息何時被真正處理完並可以提交位移。一個典型的應用場景是:用戶需要對poll方法返回的消息集合中的消息執行業務級處理,用戶想要確保只有消息被真正處理完成后再提交位移,如果使用自動位移提交則無法保證這種時序性,這種情況就必須使用手動位移提交。
設置使用手動位移提交的步驟:
- 在構建KafkaConsumer時設置enable.auto.commit=false;
- 然后調用commitSync或commitAsync方法即可。
自動提交和手動提交的比較
| 使用方法 | 優勢 | 劣勢 | 交付語義保證 | 使用場景 | |
| 自動提交 | 默認不用配置或顯示設置enable.auto.commit=true | 開發成本低,簡單易用 | 無法實現精確控制,位移提交失敗后不易處理 | 可能造成消息丟失,最多實現“最少一次”處理語義 | 對消息交付語義無需求,容忍一定的消息丟失 |
| 手動提交 | 設置enable.auto.commit=false;手動調用commitSync或commitAsync提交位移 | 可精確控制位移提交行為 | 額外的開發成本,須自行處理位移提交 | 易實現“最少一次”處理語義,依賴外部狀態鵝考實現“精確一次”處理語義 | 消息處理邏輯重,不允許消息消失,至少要求“最少一次”處理語義 |
手動提交位移API進一步細分為同步手動提交和異步手動提交,即commitSync和commitAsync方法。當用戶調用上面兩個方法時,consumer會為所有它訂閱的分區提交位移。它們還有帶參數的重載方法。用戶調用帶參數的方法時需要指定一個Map顯示地告訴kafka為哪些分區提交位移。consumer只對它所擁有的分區做提交時更合理的行為,因此跟推薦帶參數的重載方法。下面是一段典型的手動提交部分分區位移的代碼:
//下面代碼按照分區級別進行位移提交。它首先對poll方法返回的消息集合按照分區進行分組,然后每個分區下的消息待處理完成后構造一個Map對象統一提交位移,從而實現了細粒度控制位移提交。
try {
while (running) {ConsumerRecord<String, String> records = consumer.poll(1000);for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println(record.offset() + ": " + record.value());}long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}}} finally {consumer.close();}
七、rebalance監聽器(詳見Demo代碼)
新版本consumer默認是把位移提交到__consumer_offsets中,其實kafka也支持用戶把位移提交到外部存儲中,比如數據庫中。若要實現這個功能,用戶就必須使用rebalance監聽器。
【注意】使用rebalance監聽器的前提是用戶使用consumer group,若使用的是consumer或是直接手動分配分區,那么rebalance監聽器是無效的。
八、解序列化(詳見Demo代碼)
kafka consumer從broker端獲取消息的格式是字節數組。自定義解序列化的步驟(同自定義序列化類似):
- 定義或復用serializer的數據對象格式;
- 創建自定義deserializer類,令其實現org.apache.kafka.common.serialization.Deserializer接口,在deserializer方法中實現deserialize邏輯;
- 在構造kafkaConsumer的Properties對象中設置key.deserializer或value.deserializer為上一步的實現類全限定名。
九、多線程消費實例(詳見Demo代碼)
下面介紹兩種多線程消費的方法及實例代碼:
1、每個線程維護一個KafkaConsumer
在這個方法中用戶創建多個線程來消費topic數據,每個線程都會創建專屬於該線程的KafkaConsumer實例,如圖.
由圖可知,consumer group由多個線程的KafkaConsumer組成,每個線程負責消費固定數目的分區。
2、單KafkaConsumer實例+多worker線程
本方法將消息的獲取與消息的處理解耦,把后者放入單獨的工作者線程中,即所謂的woker線程中,同時在全局維護一個或若干個consumer實例執行消息獲取任務,如下圖。
本例使用全局的kafkaConsumer實例執行消息獲取,然后把獲取到的消息集合交給線程池中的worker線程執行工作,之后worker線程完成處理后上報位移狀態,由全局consumer提交位移。
第五部分:管理kafka集群
待補充。。。。
第六部分:監控kafka集群
待補充。。。。










