概述
上一篇隨筆主要介紹了kafka的基本使用包括集群參數,生產者基本使用,consumer基本使用,現在來介紹一下kafka的使用技巧。
分區機制
我們在使用 Apache Kafka 生產和消費消息的時候,肯定是希望能夠將數據均勻地分配到所有服務器上。比如很多公司使用 Kafka 收集應用服務器的日志數據,這種數據都是很多的,特別是對於那種大批量機器組成的集群環境,每分鍾產生的日志量都能以 GB 數,因此如何將這么大的數據量均勻地分配到 Kafka 的各個 Broker 上,就成為一個非常重要的問題。Kafka 有主題(Topic)的概念,它是承載真實數據的邏輯容器,而在主題之下還分為若干個分區,也就是說 Kafka 的消息組織方式實際上是三級結構:主題 - 分區 - 消息。主題下的每條消息只會保存在某一個分區中,而不會在多個分區中被保存多份。官網上的這張圖非常清晰地展示了 Kafka 的三級結構,如下所示:
現在我拋出一個問題你可以先思考一下:你覺得為什么 Kafka 要做這樣的設計?為什么使用分區的概念而不是直接使用多個主題呢?其實分區的作用就是提供負載均衡的能力,或者說對數據進行分區的主要原因,就是為了實現系統的高伸縮性(Scalability)。不同的分區能夠被放置到不同節點的機器上,而數據的讀寫操作也都是針對分區這個粒度而進行的,這樣每個節點的機器都能獨立地執行各自分區的讀寫請求處理。並且,我們還可以通過添加新的節點機器來增加整體系統的吞吐量。
分區策略
下面我們說說 Kafka 生產者的分區策略。所謂分區策略是決定生產者將消息發送到哪個分區的算法。Kafka 為我們提供了默認的分區策略,同時它也支持你自定義分區策略。
如果要自定義分區策略,你需要顯式地配置生產者端的參數partitioner.class。這個參數該怎么設定呢?方法很簡單,在編寫生產者程序時,你可以編寫一個具體的類實現org.apache.kafka.clients.producer.Partitioner接口。這個接口也很簡單,只定義了兩個方法:partition()和close(),通常你只需要實現最重要的 partition 方法。我們來看看這個方法的方法簽名:
1 int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
這里的topic、key、keyBytes、value和valueBytes都屬於消息數據,cluster則是集群信息(比如當前 Kafka 集群共有多少主題、多少 Broker 等)。Kafka 給你這么多信息,就是希望讓你能夠充分地利用這些信息對消息進行分區,計算出它要被發送到哪個分區中。只要你自己的實現類定義好了 partition 方法,同時設置partitioner.class參數為你自己實現類的 Full Qualified Name,那么生產者程序就會按照你的代碼邏輯對消息進行分區。雖說可以有無數種分區的可能,但比較常見的分區策略也就那么幾種,下面我來詳細介紹一下。
輪訓策略:
也稱 Round-robin 策略,即順序分配。比如一個主題下有 3 個分區,那么第一條消息被發送到分區 0,第二條被發送到分區 1,第三條被發送到分區 2,以此類推。當生產第 4 條消息時又會重新開始,即將其分配到分區 0,輪詢策略有非常優秀的負載均衡表現,它總是能保證消息最大限度地被平均分配到所有分區上,故默認情況下它是最合理的分區策略,也是我們最常用的分區策略之一。
隨機策略:
也稱 Randomness 策略。所謂隨機就是我們隨意地將消息放置到任意一個分區上。本質上看隨機策略也是力求將數據均勻地打散到各個分區,但從實際表現來看,它要遜於輪詢策略,所以如果追求數據的均勻分布,還是使用輪詢策略比較好。事實上,隨機策略是老版本生產者使用的分區策略,在新版本中已經改為輪詢了。
按消息建保存策略:
Kafka 允許為每條消息定義消息鍵,簡稱為 Key。這個 Key 的作用非常大,它可以是一個有着明確業務含義的字符串,比如客戶代碼、部門編號或是業務 ID 等;也可以用來表征消息元數據。特別是在 Kafka 不支持時間戳的年代,在一些場景中,工程師們都是直接將消息創建時間封裝進 Key 里面的。一旦消息被定義了 Key,那么你就可以保證同一個 Key 的所有消息都進入到相同的分區里面,由於每個分區下的消息處理都是有順序的,故這個策略被稱為按消息鍵保序策略。
其他分區策略:
上面這幾種分區策略都是比較基礎的策略,除此之外你還能想到哪些有實際用途的分區策略?其實還有一種比較常見的,即所謂的基於地理位置的分區策略。當然這種策略一般只針對那些大規模的 Kafka 集群,特別是跨城市、跨國家甚至是跨大洲的集群。
壓縮算法
說起壓縮(compression),我相信你一定不會感到陌生。它秉承了用時間去換空間的經典 trade-off 思想,具體來說就是用 CPU 時間去換磁盤空間或網絡 I/O 傳輸量,希望以較小的 CPU 開銷帶來更少的磁盤占用或更少的網絡 I/O 傳輸。在 Kafka 中,壓縮也是用來做這件事的。今天我就來跟你分享一下 Kafka 中壓縮的那些事兒。
怎么壓縮
Kafka 是如何壓縮消息的呢?要弄清楚這個問題,就要從 Kafka 的消息格式說起了。目前 Kafka 共有兩大類消息格式,社區分別稱之為 V1 版本和 V2 版本。V2 版本是 Kafka 0.11.0.0 中正式引入的。不論是哪個版本,Kafka 的消息層次都分為兩層:消息集合(message set)以及消息(message)。一個消息集合中包含若干條日志項(record item),而日志項才是真正封裝消息的地方。Kafka 底層的消息日志由一系列消息集合日志項組成。Kafka 通常不會直接操作具體的一條條消息,它總是在消息集合這個層面上進行寫入操作。
那么社區引入 V2 版本的目的是什么呢?V2 版本主要是針對 V1 版本的一些弊端做了修正,和我們今天討論的主題相關的修正有哪些呢?先介紹一個,就是把消息的公共部分抽取出來放到外層消息集合里面,這樣就不用每條消息都保存這些信息了。
我來舉個例子。原來在 V1 版本中,每條消息都需要執行 CRC 校驗,但有些情況下消息的 CRC 值是會發生變化的。比如在 Broker 端可能會對消息時間戳字段進行更新,那么重新計算之后的 CRC 值也會相應更新;再比如 Broker 端在執行消息格式轉換時(主要是為了兼容老版本客戶端程序),也會帶來 CRC 值的變化。鑒於這些情況,再對每條消息都執行 CRC 校驗就有點沒必要了,不僅浪費空間還耽誤 CPU 時間,因此在 V2 版本中,消息的 CRC 校驗工作就被移到了消息集合這一層。
V2 版本還有一個和壓縮息息相關的改進,就是保存壓縮消息的方法發生了變化。之前 V1 版本中保存壓縮消息的方法是把多條消息進行壓縮然后保存到外層消息的消息體字段中;而 V2 版本的做法是對整個消息集合進行壓縮。顯然后者應該比前者有更好的壓縮效果。
何時壓縮
在 Kafka 中,壓縮可能發生在兩個地方:生產者端和 Broker 端。
生產者程序中配置 compression.type 參數即表示啟用指定類型的壓縮算法。比如下面這段程序代碼展示了如何構建一個開啟 GZIP 的 Producer 對象:
1 Properties props = new Properties();
2 props.put("bootstrap.servers", "localhost:9092"); 3 props.put("acks", "all"); 4 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 5 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 6 // 開啟GZIP壓縮 7 props.put("compression.type", "gzip"); 8 9 Producer<String, String> producer = new KafkaProducer<>(props);
在生產者端啟用壓縮是很自然的想法,那為什么我說在 Broker 端也可能進行壓縮呢?其實大部分情況下 Broker 從 Producer 端接收到消息后僅僅是原封不動地保存而不會對其進行任何修改,但這里的“大部分情況”也是要滿足一定條件的。有兩種例外情況就可能讓 Broker 重新壓縮消息。
情況一:Broker 端指定了和 Producer 端不同的壓縮算法。
你看,這種情況下 Broker 接收到 GZIP 壓縮消息后,只能解壓縮然后使用 Snappy 重新壓縮一遍。如果你翻開 Kafka 官網,你會發現 Broker 端也有一個參數叫 compression.type,和上面那個例子中的同名。但是這個參數的默認值是 producer,這表示 Broker 端會“尊重”Producer 端使用的壓縮算法。可一旦你在 Broker 端設置了不同的 compression.type 值,就一定要小心了,因為可能會發生預料之外的壓縮 / 解壓縮操作,通常表現為 Broker 端 CPU 使用率飆升。
情況二:Broker 端發生了消息格式轉換。
所謂的消息格式轉換主要是為了兼容老版本的消費者程序。還記得之前說過的 V1、V2 版本吧?在一個生產環境中,Kafka 集群中同時保存多種版本的消息格式非常常見。為了兼容老版本的格式,Broker 端會對新版本消息執行向老版本格式的轉換。這個過程中會涉及消息的解壓縮和重新壓縮。一般情況下這種消息格式轉換對性能是有很大影響的,除了這里的壓縮之外,它還讓 Kafka 喪失了引以為豪的 Zero Copy 特性。
何時解壓縮
有壓縮必有解壓縮!通常來說解壓縮發生在消費者程序中,也就是說 Producer 發送壓縮消息到 Broker 后,Broker 照單全收並原樣保存起來。當 Consumer 程序請求這部分消息時,Broker 依然原樣發送出去,當消息到達 Consumer 端后,由 Consumer 自行解壓縮還原成之前的消息。
那么現在問題來了,Consumer 怎么知道這些消息是用何種壓縮算法壓縮的呢?其實答案就在消息中。Kafka 會將啟用了哪種壓縮算法封裝進消息集合中,這樣當 Consumer 讀取到消息集合時,它自然就知道了這些消息使用的是哪種壓縮算法。如果用一句話總結一下壓縮和解壓縮,那么我希望你記住這句話:Producer 端壓縮、Broker 端保持、Consumer 端解壓縮。除了在 Consumer 端解壓縮,Broker 端也會進行解壓縮。注意了,這和前面提到消息格式轉換時發生的解壓縮是不同的場景。每個壓縮過的消息集合在 Broker 端寫入時都要發生解壓縮操作,目的就是為了對消息執行各種驗證。我們必須承認這種解壓縮對 Broker 端性能是有一定影響的,特別是對 CPU 的使用率而言。
無消息丟失配置
一句話概括,Kafka 只對“已提交”的消息(committed message)做有限度的持久化保證。
第一個核心要素是“已提交的消息”。什么是已提交的消息?當 Kafka 的若干個 Broker 成功地接收到一條消息並寫入到日志文件后,它們會告訴生產者程序這條消息已成功提交。此時,這條消息在 Kafka 看來就正式變為“已提交”消息了。那為什么是若干個 Broker 呢?這取決於你對“已提交”的定義。你可以選擇只要有一個 Broker 成功保存該消息就算是已提交,也可以是令所有 Broker 都成功保存該消息才算是已提交。不論哪種情況,Kafka 只對已提交的消息做持久化保證這件事情是不變的。
第二個核心要素就是“有限度的持久化保證”,也就是說 Kafka 不可能保證在任何情況下都做到不丟失消息。舉個極端點的例子,如果地球都不存在了,Kafka 還能保存任何消息嗎?顯然不能!倘若這種情況下你依然還想要 Kafka 不丟消息,那么只能在別的星球部署 Kafka Broker 服務器了。現在你應該能夠稍微體會出這里的“有限度”的含義了吧,其實就是說 Kafka 不丟消息是有前提條件的。假如你的消息保存在 N 個 Kafka Broker 上,那么這個前提條件就是這 N 個 Broker 中至少有 1 個存活。只要這個條件成立,Kafka 就能保證你的這條消息永遠不會丟失。總結一下,Kafka 是能做到不丟失消息的,只不過這些消息必須是已提交的消息,而且還要滿足一定的條件。當然,說明這件事並不是要為 Kafka 推卸責任,而是為了在出現該類問題時我們能夠明確責任邊界。
生產端丟失消息
目前 Kafka Producer 是異步發送消息的,也就是說如果你調用的是 producer.send(msg) 這個 API,那么它通常會立即返回,但此時你不能認為消息發送已成功完成。這種發送方式有個有趣的名字,叫“fire and forget”,翻譯一下就是“發射后不管”。這個術語原本屬於導彈制導領域,后來被借鑒到計算機領域中,它的意思是,執行完一個操作后不去管它的結果是否成功。調用 producer.send(msg) 就屬於典型的“fire and forget”,因此如果出現消息丟失,我們是無法知曉的。這個發送方式挺不靠譜吧,不過有些公司真的就是在使用這個 API 發送消息。
實際上,解決此問題的方法非常簡單:Producer 永遠要使用帶有回調通知的發送 API,也就是說不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。不要小瞧這里的 callback(回調),它能准確地告訴你消息是否真的提交成功了。一旦出現消息提交失敗的情況,你就可以有針對性地進行處理。
消費者丟失消息
Consumer 端丟失數據主要體現在 Consumer 端要消費的消息不見了。Consumer 程序有個“位移”的概念,表示的是這個 Consumer 當前消費到的 Topic 分區的位置。下面這張圖來自於官網,它清晰地展示了 Consumer 端的位移數據。
比如對於 Consumer A 而言,它當前的位移值就是 9;Consumer B 的位移值是 11。Kafka 中 Consumer 端的消息丟失就是這么一回事。要對抗這種消息丟失,辦法很簡單:維持先消費消息(閱讀),再更新位移(書簽)的順序即可。這樣就能最大限度地保證消息不丟失。
如果Consumer 程序自動地向前更新位移。假如其中某個線程運行失敗了,它負責的消息沒有被成功處理,但位移已經被更新了,因此這條消息對於 Consumer 而言實際上是丟失了。這里的關鍵在於 Consumer 自動提交位移,與你沒有確認書籍內容被全部讀完就將書歸還類似,你沒有真正地確認消息是否真的被消費就“盲目”地更新了位移。這個問題的解決方案也很簡單:如果是多線程異步處理消費消息,Consumer 程序不要開啟自動提交位移,而是要應用程序手動提交位移。在這里我要提醒你一下,單個 Consumer 程序使用多線程來消費消息說起來容易,寫成代碼卻異常困難,因為你很難正確地處理位移的更新,也就是說避免無消費消息丟失很簡單,但極易出現消息被消費了多次的情況。
最佳實踐
看完這兩個案例之后,我來分享一下 Kafka 無消息丟失的配置,每一個其實都能對應上面提到的問題。
不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。記住,一定要使用帶有回調通知的 send 方法。
設置 acks = all。acks 是 Producer 的一個參數,代表了你對“已提交”消息的定義。如果設置成 all,則表明所有副本 Broker 都要接收到消息,該消息才算是“已提交”。這是最高等級的“已提交”定義。
設置 retries 為一個較大的值。這里的 retries 同樣是 Producer 的參數,對應前面提到的 Producer 自動重試。當出現網絡的瞬時抖動時,消息發送可能會失敗,此時配置了 retries > 0 的 Producer 能夠自動重試消息發送,避免消息丟失。
設置 unclean.leader.election.enable = false。這是 Broker 端的參數,它控制的是哪些 Broker 有資格競選分區的 Leader。如果一個 Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會造成消息的丟失。故一般都要將該參數設置成 false,即不允許這種情況的發生。
設置 replication.factor >= 3。這也是 Broker 端的參數。其實這里想表述的是,最好將消息多保存幾份,畢竟目前防止消息丟失的主要機制就是冗余。
設置 min.insync.replicas > 1。這依然是 Broker 端參數,控制的是消息至少要被寫入到多少個副本才算是“已提交”。設置成大於 1 可以提升消息持久性。在實際環境中千萬不要使用默認值 1。
確保 replication.factor > min.insync.replicas。如果兩者相等,那么只要有一個副本掛機,整個分區就無法正常工作了。我們不僅要改善消息的持久性,防止數據丟失,還要在不降低可用性的基礎上完成。推薦設置成 replication.factor = min.insync.replicas + 1。
確保消息消費完成再提交。Consumer 端有個參數 enable.auto.commit,最好把它設置成 false,並采用手動提交位移的方式。就像前面說的,這對於單 Consumer 多線程處理的場景而言是至關重要的。
攔截器
Kafka 攔截器分為生產者攔截器和消費者攔截器。生產者攔截器允許你在發送消息前以及消息提交成功后植入你的攔截器邏輯;而消費者攔截器支持在消費消息前以及提交位移后編寫特定邏輯。值得一提的是,這兩種攔截器都支持鏈的方式,即你可以將一組攔截器串連成一個大的攔截器,Kafka 會按照添加順序依次執行攔截器邏輯。舉個例子,假設你想在生產消息前執行兩個“前置動作”:第一個是為消息增加一個頭信息,封裝發送該消息的時間,第二個是更新發送消息數字段,那么當你將這兩個攔截器串聯在一起統一指定給 Producer 后,Producer 會按順序執行上面的動作,然后再發送消息。
當前 Kafka 攔截器的設置方法是通過參數配置完成的。生產者和消費者兩端有一個相同的參數,名字叫 interceptor.classes,它指定的是一組類的列表,每個類就是特定邏輯的攔截器實現類。拿上面的例子來說,假設第一個攔截器的完整類路徑是 com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二個類是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,那么你需要按照以下方法在 Producer 端指定攔截器:
1 Properties props = new Properties();
2 List<String> interceptors = new ArrayList<>(); 3 interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 攔截器1 4 interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 攔截器2 5 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); 6 ……
現在問題來了,我們應該怎么編寫 AddTimeStampInterceptor 和 UpdateCounterInterceptor 類呢?其實很簡單,這兩個類以及你自己編寫的所有 Producer 端攔截器實現類都要繼承 org.apache.kafka.clients.producer.ProducerInterceptor 接口。該接口是 Kafka 提供的,里面有兩個核心的方法。
onSend:該方法會在消息發送之前被調用。如果你想在發送之前對消息“美美容”,這個方法是你唯一的機會。
onAcknowledgement:該方法會在消息成功提交或發送失敗之后被調用。還記得我在上一期中提到的發送回調通知 callback 嗎?onAcknowledgement 的調用要早於 callback 的調用。值得注意的是,這個方法和 onSend 不是在同一個線程中被調用的,因此如果你在這兩個方法中調用了某個共享可變對象,一定要保證線程安全哦。還有一點很重要,這個方法處在 Producer 發送的主路徑中,所以最好別放一些太重的邏輯進去,否則你會發現你的 Producer TPS 直線下降。
同理,指定消費者攔截器也是同樣的方法,只是具體的實現類要實現 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,這里面也有兩個核心方法。
onConsume:該方法在消息返回給 Consumer 程序之前調用。也就是說在開始正式處理消息之前,攔截器會先攔一道,搞一些事情,之后再返回給你。
onCommit:Consumer 在提交位移之后調用該方法。通常你可以在該方法中做一些記賬類的動作,比如打日志等。
一定要注意的是,指定攔截器類時要指定它們的全限定名,即 full qualified name。通俗點說就是要把完整包名也加上,不要只有一個類名在那里,並且還要保證你的 Producer 程序能夠正確加載你的攔截器類。
典型使用場景
Kafka 攔截器都能用在哪些地方呢?其實,跟很多攔截器的用法相同,Kafka 攔截器可以應用於包括客戶端監控、端到端系統性能檢測、消息審計等多種功能在內的場景。
今天 Kafka 默認提供的監控指標都是針對單個客戶端或 Broker 的,你很難從具體的消息維度去追蹤集群間消息的流轉路徑。同時,如何監控一條消息從生產到最后消費的端到端延時也是很多 Kafka 用戶迫切需要解決的問題。從技術上來說,我們可以在客戶端程序中增加這樣的統計邏輯,但是對於那些將 Kafka 作為企業級基礎架構的公司來說,在應用代碼中編寫統一的監控邏輯其實是很難的,畢竟這東西非常靈活,不太可能提前確定好所有的計算邏輯。另外,將監控邏輯與主業務邏輯耦合也是軟件工程中不提倡的做法。現在,通過實現攔截器的邏輯以及可插拔的機制,我們能夠快速地觀測、驗證以及監控集群間的客戶端性能指標,特別是能夠從具體的消息層面上去收集這些數據。這就是 Kafka 攔截器的一個非常典型的使用場景。我們再來看看消息審計(message audit)的場景。設想你的公司把 Kafka 作為一個私有雲消息引擎平台向全公司提供服務,這必然要涉及多租戶以及消息審計的功能。作為私有雲的 PaaS 提供方,你肯定要能夠隨時查看每條消息是哪個業務方在什么時間發布的,之后又被哪些業務方在什么時刻消費。一個可行的做法就是你編寫一個攔截器類,實現相應的消息審計邏輯,然后強行規定所有接入你的 Kafka 服務的客戶端程序必須設置該攔截器。
kafka如何管理TCP連接
Apache Kafka 的所有通信都是基於 TCP 的,而不是基於 HTTP 或其他協議。無論是生產者、消費者,還是 Broker 之間的通信都是如此。你可能會問,為什么 Kafka 不使用 HTTP 作為底層的通信協議呢?其實這里面的原因有很多,但最主要的原因在於 TCP 和 HTTP 之間的區別。從社區的角度來看,在開發客戶端時,人們能夠利用 TCP 本身提供的一些高級功能,比如多路復用請求以及同時輪詢多個連接的能力。
所謂的多路復用請求,即 multiplexing request,是指將兩個或多個數據流合並到底層單一物理連接中的過程。TCP 的多路復用請求會在一條物理連接上創建若干個虛擬連接,每個虛擬連接負責流轉各自對應的數據流。其實嚴格來說,TCP 並不能多路復用,它只是提供可靠的消息交付語義保證,比如自動重傳丟失的報文。
更嚴謹地說,作為一個基於報文的協議,TCP 能夠被用於多路復用連接場景的前提是,上層的應用協議(比如 HTTP)允許發送多條消息。不過,我們今天並不是要詳細討論 TCP 原理,因此你只需要知道這是社區采用 TCP 的理由之一就行了。除了 TCP 提供的這些高級功能有可能被 Kafka 客戶端的開發人員使用之外,社區還發現,目前已知的 HTTP 庫在很多編程語言中都略顯簡陋。基於這兩個原因,Kafka 社區決定采用 TCP 協議作為所有請求通信的底層協議。
Java生產者程序管理tcp
Kafka 的 Java 生產者 API 主要的對象就是 KafkaProducer。通常我們開發一個生產者的步驟有 4 步。
第 1 步:構造生產者對象所需的參數對象。
第 2 步:利用第 1 步的參數對象,創建 KafkaProducer 對象實例。
第 3 步:使用 KafkaProducer 的 send 方法發送消息。
第 4 步:調用 KafkaProducer 的 close 方法關閉生產者並釋放各種系統資源。
上面這 4 步寫成 Java 代碼的話大概是這個樣子:
1 Properties props = new Properties ();
2 props.put(“參數1”, “參數1的值”); 3 props.put(“參數2”, “參數2的值”); 4 …… 5 try (Producer<String, String> producer = new KafkaProducer<>(props)) { 6 producer.send(new ProducerRecord<String, String>(……), callback); 7 …… 8 }
這段代碼使用了 Java 7 提供的 try-with-resource 特性,所以並沒有顯式調用 producer.close() 方法。無論是否顯式調用 close 方法,所有生產者程序大致都是這個路數。現在問題來了,當我們開發一個 Producer 應用時,生產者會向 Kafka 集群中指定的主題(Topic)發送消息,這必然涉及與 Kafka Broker 創建 TCP 連接。那么,Kafka 的 Producer 客戶端是如何管理這些 TCP 連接的呢?
要回答上面這個問題,我們首先要弄明白生產者代碼是什么時候創建 TCP 連接的。
首先,生產者應用在創建 KafkaProducer 實例時是會建立與 Broker 的 TCP 連接的。其實這種表述也不是很准確,應該這樣說:在創建 KafkaProducer 實例時,生產者應用會在后台創建並啟動一個名為 Sender 的線程,該 Sender 線程開始運行時首先會創建與 Broker 的連接。我截取了一段測試環境中的日志來說明這一點:
你也許會問:怎么可能是這樣?如果不調用 send 方法,這個 Producer 都不知道給哪個主題發消息,它又怎么能知道連接哪個 Broker 呢?難不成它會連接 bootstrap.servers 參數指定的所有 Broker 嗎?嗯,是的,Java Producer 目前還真是這樣設計的。我在這里稍微解釋一下 bootstrap.servers 參數。它是 Producer 的核心參數之一,指定了這個 Producer 啟動時要連接的 Broker 地址。請注意,這里的“啟動時”,代表的是 Producer 啟動時會發起與這些 Broker 的連接。因此,如果你為這個參數指定了 1000 個 Broker 連接信息,那么很遺憾,你的 Producer 啟動時會首先創建與這 1000 個 Broker 的 TCP 連接。在實際使用過程中,我並不建議把集群中所有的 Broker 信息都配置到 bootstrap.servers 中,通常你指定 3~4 台就足以了。因為 Producer 一旦連接到集群中的任一台 Broker,就能拿到整個集群的 Broker 信息,故沒必要為 bootstrap.servers 指定所有的 Broker。
針對 TCP 連接何時創建的問題,目前我們的結論是這樣的:TCP 連接是在創建 KafkaProducer 實例時建立的。那么,我們想問的是,它只會在這個時候被創建嗎?當然不是!TCP 連接還可能在兩個地方被創建:一個是在更新元數據后,另一個是在消息發送時。為什么說是可能?因為這兩個地方並非總是創建 TCP 連接。當 Producer 更新了集群的元數據信息之后,如果發現與某些 Broker 當前沒有連接,那么它就會創建一個 TCP 連接。同樣地,當要發送消息時,Producer 發現尚不存在與目標 Broker 的連接,也會創建一個。
接下來,我們來看看 Producer 更新集群元數據信息的兩個場景。
場景一:當 Producer 嘗試給一個不存在的主題發送消息時,Broker 會告訴 Producer 說這個主題不存在。此時 Producer 會發送 METADATA 請求給 Kafka 集群,去嘗試獲取最新的元數據信息。
場景二:Producer 通過 metadata.max.age.ms 參數定期地去更新元數據信息。該參數的默認值是 300000,即 5 分鍾,也就是說不管集群那邊是否有變化,Producer 每 5 分鍾都會強制刷新一次元數據以保證它是最及時的數據。
說完了 TCP 連接的創建,我們來說說它們何時被關閉。Producer 端關閉 TCP 連接的方式有兩種:一種是用戶主動關閉;一種是 Kafka 自動關閉。
我們先說第一種。這里的主動關閉實際上是廣義的主動關閉,甚至包括用戶調用 kill -9 主動“殺掉”Producer 應用。當然最推薦的方式還是調用 producer.close() 方法來關閉。第二種是 Kafka 幫你關閉,這與 Producer 端參數 connections.max.idle.ms 的值有關。默認情況下該參數值是 9 分鍾,即如果在 9 分鍾內沒有任何請求“流過”某個 TCP 連接,那么 Kafka 會主動幫你把該 TCP 連接關閉。用戶可以在 Producer 端設置 connections.max.idle.ms=-1 禁掉這種機制。一旦被設置成 -1,TCP 連接將成為永久長連接。當然這只是軟件層面的“長連接”機制,由於 Kafka 創建的這些 Socket 連接都開啟了 keepalive,因此 keepalive 探活機制還是會遵守的。值得注意的是,在第二種方式中,TCP 連接是在 Broker 端被關閉的,但其實這個 TCP 連接的發起方是客戶端,因此在 TCP 看來,這屬於被動關閉的場景,即 passive close。被動關閉的后果就是會產生大量的 CLOSE_WAIT 連接,因此 Producer 端或 Client 端沒有機會顯式地觀測到此連接已被中斷。
Java消費者程序管理tcp
我們先從消費者創建 TCP 連接開始討論。消費者端主要的程序入口是 KafkaConsumer 類。和生產者不同的是,構建 KafkaConsumer 實例時是不會創建任何 TCP 連接的,也就是說,當你執行完 new KafkaConsumer(properties) 語句后,你會發現,沒有 Socket 連接被創建出來。這一點和 Java 生產者是有區別的,主要原因就是生產者入口類 KafkaProducer 在構建實例的時候,會在后台默默地啟動一個 Sender 線程,這個 Sender 線程負責 Socket 連接的創建。從這一點上來看,我個人認為 KafkaConsumer 的設計比 KafkaProducer 要好。就像我在第 13 講中所說的,在 Java 構造函數中啟動線程,會造成 this 指針的逃逸,這始終是一個隱患。如果 Socket 不是在構造函數中創建的,那么是在 KafkaConsumer.subscribe 或 KafkaConsumer.assign 方法中創建的嗎?嚴格來說也不是。我還是直接給出答案吧:TCP 連接是在調用 KafkaConsumer.poll 方法時被創建的。再細粒度地說,在 poll 方法內部有 3 個時機可以創建 TCP 連接。
1.發起 FindCoordinator 請求時。
還記得消費者端有個組件叫協調者(Coordinator)嗎?它駐留在 Broker 端的內存中,負責消費者組的組成員管理和各個消費者的位移提交管理。當消費者程序首次啟動調用 poll 方法時,它需要向 Kafka 集群發送一個名為 FindCoordinator 的請求,希望 Kafka 集群告訴它哪個 Broker 是管理它的協調者。不過,消費者應該向哪個 Broker 發送這類請求呢?理論上任何一個 Broker 都能回答這個問題,也就是說消費者可以發送 FindCoordinator 請求給集群中的任意服務器。在這個問題上,社區做了一點點優化:消費者程序會向集群中當前負載最小的那台 Broker 發送請求。負載是如何評估的呢?其實很簡單,就是看消費者連接的所有 Broker 中,誰的待發送請求最少。當然了,這種評估顯然是消費者端的單向評估,並非是站在全局角度,因此有的時候也不一定是最優解。不過這不並影響我們的討論。總之,在這一步,消費者會創建一個 Socket 連接。
2.連接協調者時。
Broker 處理完上一步發送的 FindCoordinator 請求之后,會返還對應的響應結果(Response),顯式地告訴消費者哪個 Broker 是真正的協調者,因此在這一步,消費者知曉了真正的協調者后,會創建連向該 Broker 的 Socket 連接。只有成功連入協調者,協調者才能開啟正常的組協調操作,比如加入組、等待組分配方案、心跳請求處理、位移獲取、位移提交等。
3.消費數據時。
消費者會為每個要消費的分區創建與該分區領導者副本所在 Broker 連接的 TCP。舉個例子,假設消費者要消費 5 個分區的數據,這 5 個分區各自的領導者副本分布在 4 台 Broker 上,那么該消費者在消費時會創建與這 4 台 Broker 的 Socket 連接。
通常來說,消費者程序會創建 3 類 TCP 連接:確定協調者和獲取集群元數據。連接協調者,令其執行組成員管理操作。執行實際的消息獲取。
和生產者類似,消費者關閉 Socket 也分為主動關閉和 Kafka 自動關閉。主動關閉是指你顯式地調用消費者 API 的方法去關閉消費者,具體方式就是手動調用 KafkaConsumer.close() 方法,或者是執行 Kill 命令,不論是 Kill -2 還是 Kill -9;而 Kafka 自動關閉是由消費者端參數 connection.max.idle.ms 控制的,該參數現在的默認值是 9 分鍾,即如果某個 Socket 連接上連續 9 分鍾都沒有任何請求“過境”的話,那么消費者會強行“殺掉”這個 Socket 連接。
不過,和生產者有些不同的是,如果在編寫消費者程序時,你使用了循環的方式來調用 poll 方法消費消息,那么上面提到的所有請求都會被定期發送到 Broker,因此這些 Socket 連接上總是能保證有請求在發送,從而也就實現了“長連接”的效果。針對上面提到的三類 TCP 連接,你需要注意的是,當第三類 TCP 連接成功創建后,消費者程序就會廢棄第一類 TCP 連接,之后在定期請求元數據時,它會改為使用第三類 TCP 連接。也就是說,最終你會發現,第一類 TCP 連接會在后台被默默地關閉掉。對一個運行了一段時間的消費者程序來說,只會有后面兩類 TCP 連接存在。
從理論上說,Kafka Java 消費者管理 TCP 資源的機制我已經說清楚了,但如果仔細推敲這里面的設計原理,還是會發現一些問題。我們剛剛講過,第一類 TCP 連接僅僅是為了首次獲取元數據而創建的,后面就會被廢棄掉。最根本的原因是,消費者在啟動時還不知道 Kafka 集群的信息,只能使用一個“假”的 ID 去注冊,即使消費者獲取了真實的 Broker ID,它依舊無法區分這個“假”ID 對應的是哪台 Broker,因此也就無法重用這個 Socket 連接,只能再重新創建一個新的連接。為什么會出現這種情況呢?主要是因為目前 Kafka 僅僅使用 ID 這一個維度的數據來表征 Socket 連接信息。這點信息明顯不足以確定連接的是哪台 Broker,也許在未來,社區應該考慮使用 < 主機名、端口、ID> 三元組的方式來定位 Socket 資源,這樣或許能夠讓消費者程序少創建一些 TCP 連接。
冪等生產者和事務生產者
所謂的消息交付可靠性保障,是指 Kafka 對 Producer 和 Consumer 要處理的消息提供什么樣的承諾。常見的承諾有以下三種:
最多一次(at most once):消息可能會丟失,但絕不會被重復發送。
至少一次(at least once):消息不會丟失,但有可能被重復發送。
精確一次(exactly once):消息不會丟失,也不會被重復發送。
目前,Kafka 默認提供的交付可靠性保障是第二種,即至少一次。我們說過消息“已提交”的含義,即只有 Broker 成功“提交”消息且 Producer 接到 Broker 的應答才會認為該消息成功發送。不過倘若消息成功“提交”,但 Broker 的應答沒有成功發送回 Producer 端(比如網絡出現瞬時抖動),那么 Producer 就無法確定消息是否真的提交成功了。因此,它只能選擇重試,也就是再次發送相同的消息。這就是 Kafka 默認提供至少一次可靠性保障的原因,不過這會導致消息重復發送。Kafka 也可以提供最多一次交付保障,只需要讓 Producer 禁止重試即可。這樣一來,消息要么寫入成功,要么寫入失敗,但絕不會重復發送。我們通常不會希望出現消息丟失的情況,但一些場景里偶發的消息丟失其實是被允許的,相反,消息重復是絕對要避免的。此時,使用最多一次交付保障就是最恰當的。無論是至少一次還是最多一次,都不如精確一次來得有吸引力。大部分用戶還是希望消息只會被交付一次,這樣的話,消息既不會丟失,也不會被重復處理。或者說,即使 Producer 端重復發送了相同的消息,Broker 端也能做到自動去重。在下游 Consumer 看來,消息依然只有一條。那么問題來了,Kafka 是怎么做到精確一次的呢?簡單來說,這是通過兩種機制:冪等性(Idempotence)和事務(Transaction)。它們分別是什么機制?兩者是一回事嗎?要回答這些問題,我們首先來說說什么是冪等性。
冪等性
“冪等”這個詞原是數學領域中的概念,指的是某些操作或函數能夠被執行多次,但每次得到的結果都是不變的。我來舉幾個簡單的例子說明一下。比如在乘法運算中,讓數字乘以 1 就是一個冪等操作,因為不管你執行多少次這樣的運算,結果都是相同的。再比如,取整函數(floor 和 ceiling)是冪等函數,那么運行 1 次 floor(3.4) 和 100 次 floor(3.4),結果是一樣的,都是 3。相反地,讓一個數加 1 這個操作就不是冪等的,因為執行一次和執行多次的結果必然不同。
在計算機領域中,冪等性的含義稍微有一些不同:在命令式編程語言(比如 C)中,若一個子程序是冪等的,那它必然不能修改系統狀態。這樣不管運行這個子程序多少次,與該子程序關聯的那部分系統狀態保持不變。在函數式編程語言(比如 Scala 或 Haskell)中,很多純函數(pure function)天然就是冪等的,它們不執行任何的 side effect。
冪等性有很多好處,其最大的優勢在於我們可以安全地重試任何冪等性操作,反正它們也不會破壞我們的系統狀態。如果是非冪等性操作,我們還需要擔心某些操作執行多次對狀態的影響,但對於冪等性操作而言,我們根本無需擔心此事。
在 Kafka 中,Producer 默認不是冪等性的,但我們可以創建冪等性 Producer。它其實是 0.11.0.0 版本引入的新功能。在此之前,Kafka 向分區發送數據時,可能會出現同一條消息被發送了多次,導致消息重復的情況。在 0.11 之后,指定 Producer 冪等性的方法很簡單,僅需要設置一個參數即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
enable.idempotence 被設置成 true 后,Producer 自動升級成冪等性 Producer,其他所有的代碼邏輯都不需要改變。Kafka 自動幫你做消息的重復去重。底層具體的原理很簡單,就是經典的用空間去換時間的優化思路,即在 Broker 端多保存一些字段。當 Producer 發送了具有相同字段值的消息后,Broker 能夠自動知曉這些消息已經重復了,於是可以在后台默默地把它們“丟棄”掉。當然,實際的實現原理並沒有這么簡單,但你大致可以這么理解。
看上去,冪等性 Producer 的功能很酷,使用起來也很簡單,僅僅設置一個參數就能保證消息不重復了,但實際上,我們必須要了解冪等性 Producer 的作用范圍。首先,它只能保證單分區上的冪等性,即一個冪等性 Producer 能夠保證某個主題的一個分區上不出現重復消息,它無法實現多個分區的冪等性。其次,它只能實現單會話上的冪等性,不能實現跨會話的冪等性。這里的會話,你可以理解為 Producer 進程的一次運行。當你重啟了 Producer 進程之后,這種冪等性保證就喪失了。那么你可能會問,如果我想實現多分區以及多會話上的消息無重復,應該怎么做呢?答案就是事務(transaction)或者依賴事務型 Producer。這也是冪等性 Producer 和事務型 Producer 的最大區別!
事務
Kafka 的事務概念類似於我們熟知的數據庫提供的事務。在數據庫領域,事務提供的安全性保障是經典的 ACID,即原子性(Atomicity)、一致性 (Consistency)、隔離性 (Isolation) 和持久性 (Durability)。
當然,在實際場景中各家數據庫對 ACID 的實現各不相同。特別是 ACID 本身就是一個有歧義的概念,比如對隔離性的理解。大體來看,隔離性非常自然和必要,但是具體到實現細節就顯得不那么精確了。通常來說,隔離性表明並發執行的事務彼此相互隔離,互不影響。經典的數據庫教科書把隔離性稱為可串行化 (serializability),即每個事務都假裝它是整個數據庫中唯一的事務。
提到隔離級別,這種歧義或混亂就更加明顯了。很多數據庫廠商對於隔離級別的實現都有自己不同的理解,比如有的數據庫提供 Snapshot 隔離級別,而在另外一些數據庫中,它們被稱為可重復讀(repeatable read)。好在對於已提交讀(read committed)隔離級別的提法,各大主流數據庫廠商都比較統一。所謂的 read committed,指的是當讀取數據庫時,你只能看到已提交的數據,即無臟讀。同時,當寫入數據庫時,你也只能覆蓋掉已提交的數據,即無臟寫。Kafka 自 0.11 版本開始也提供了對事務的支持,目前主要是在 read committed 隔離級別上做事情。它能保證多條消息原子性地寫入到目標分區,同時也能保證 Consumer 只能看到事務成功提交的消息。下面我們就來看看 Kafka 中的事務型 Producer。
事務型Producer
事務型 Producer 能夠保證將消息原子性地寫入到多個分區中。這批消息要么全部寫入成功,要么全部失敗。另外,事務型 Producer 也不懼進程的重啟。Producer 重啟回來后,Kafka 依然保證它們發送消息的精確一次處理。設置事務型 Producer 的方法也很簡單,滿足兩個要求即可:和冪等性 Producer 一樣,開啟 enable.idempotence = true。設置 Producer 端參數 transactional. id。最好為其設置一個有意義的名字。此外,你還需要在 Producer 代碼中做一些調整,如這段代碼所示:
1 producer.initTransactions();
2 try { 3 producer.beginTransaction(); 4 producer.send(record1); 5 producer.send(record2); 6 producer.commitTransaction(); 7 } catch (KafkaException e) { 8 producer.abortTransaction(); 9 }
這段代碼能夠保證 Record1 和 Record2 被當作一個事務統一提交到 Kafka,要么它們全部提交成功,要么全部寫入失敗。實際上即使寫入失敗,Kafka 也會把它們寫入到底層的日志中,也就是說 Consumer 還是會看到這些消息。因此在 Consumer 端,讀取事務型 Producer 發送的消息也是需要一些變更的。修改起來也很簡單,設置 isolation.level 參數的值即可。當前這個參數有兩個取值:read_uncommitted:這是默認值,表明 Consumer 能夠讀取到 Kafka 寫入的任何消息,不論事務型 Producer 提交事務還是終止事務,其寫入的消息都可以讀取。很顯然,如果你用了事務型 Producer,那么對應的 Consumer 就不要使用這個值。read_committed:表明 Consumer 只會讀取事務型 Producer 成功提交事務寫入的消息。當然了,它也能看到非事務型 Producer 寫入的所有消息。
消費者組
消費者組,即 Consumer Group,應該算是 Kafka 比較有亮點的設計了。那么何謂 Consumer Group 呢?用一句話概括就是:Consumer Group 是 Kafka 提供的可擴展且具有容錯性的消費者機制。既然是一個組,那么組內必然可以有多個消費者或消費者實例(Consumer Instance),它們共享一個公共的 ID,這個 ID 被稱為 Group ID。組內的所有消費者協調在一起來消費訂閱主題(Subscribed Topics)的所有分區(Partition)。當然,每個分區只能由同一個消費者組內的一個 Consumer 實例來消費。個人認為,理解 Consumer Group 記住下面這三個特性就好了。
Consumer Group 下可以有一個或多個 Consumer 實例。這里的實例可以是一個單獨的進程,也可以是同一進程下的線程。在實際場景中,使用進程更為常見一些。
Group ID 是一個字符串,在一個 Kafka 集群中,它標識唯一的一個 Consumer Group。
Consumer Group 下所有實例訂閱的主題的單個分區,只能分配給組內的某個 Consumer 實例消費。這個分區當然也可以被其他的 Group 消費。
Consumer Group 之間彼此獨立,互不影響,它們能夠訂閱相同的一組主題而互不干涉。再加上 Broker 端的消息留存機制,Kafka 的 Consumer Group 完美地規避了上面提到的伸縮性差的問題。可以這么說,Kafka 僅僅使用 Consumer Group 這一種機制,卻同時實現了傳統消息引擎系統的兩大模型:如果所有實例都屬於同一個 Group,那么它實現的就是消息隊列模型;如果所有實例分別屬於不同的 Group,那么它實現的就是發布 / 訂閱模型。
在了解了 Consumer Group 以及它的設計亮點之后,你可能會有這樣的疑問:在實際使用場景中,我怎么知道一個 Group 下該有多少個 Consumer 實例呢?理想情況下,Consumer 實例的數量應該等於該 Group 訂閱主題的分區總數。舉個簡單的例子,假設一個 Consumer Group 訂閱了 3 個主題,分別是 A、B、C,它們的分區數依次是 1、2、3,那么通常情況下,為該 Group 設置 6 個 Consumer 實例是比較理想的情形,因為它能最大限度地實現高伸縮性。你可能會問,我能設置小於或大於 6 的實例嗎?當然可以!如果你有 3 個實例,那么平均下來每個實例大約消費 2 個分區(6 / 3 = 2);如果你設置了 8 個實例,那么很遺憾,有 2 個實例(8 – 6 = 2)將不會被分配任何分區,它們永遠處於空閑狀態。因此,在實際使用過程中一般不推薦設置大於總分區數的 Consumer 實例。設置多余的實例只會浪費資源,而沒有任何好處。
Kafka 有新舊客戶端 API 之分,那自然也就有新舊 Consumer 之分。老版本的 Consumer 也有消費者組的概念,它和我們目前討論的 Consumer Group 在使用感上並沒有太多的不同,只是它管理位移的方式和新版本是不一樣的。老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一個分布式的協調服務框架,Kafka 重度依賴它實現各種各樣的協調管理。將位移保存在 ZooKeeper 外部系統的做法,最顯而易見的好處就是減少了 Kafka Broker 端的狀態保存開銷。現在比較流行的提法是將服務器節點做成無狀態的,這樣可以自由地擴縮容,實現超強的伸縮性。Kafka 最開始也是基於這樣的考慮,才將 Consumer Group 位移保存在獨立於 Kafka 集群之外的框架中。
不過,慢慢地人們發現了一個問題,即 ZooKeeper 這類元框架其實並不適合進行頻繁的寫更新,而 Consumer Group 的位移更新卻是一個非常頻繁的操作。這種大吞吐量的寫操作會極大地拖慢 ZooKeeper 集群的性能,因此 Kafka 社區漸漸有了這樣的共識:將 Consumer 位移保存在 ZooKeeper 中是不合適的做法。於是,在新版本的 Consumer Group 中,Kafka 社區重新設計了 Consumer Group 的位移管理方式,采用了將位移保存在 Kafka 內部主題的方法。這個內部主題就是讓人既愛又恨的 __consumer_offsets。我會在專欄后面的內容中專門介紹這個神秘的主題。不過,現在你需要記住新版本的 Consumer Group 將位移保存在 Broker 端的內部主題中。最后,我們來說說 Consumer Group 端大名鼎鼎的重平衡,也就是所謂的 Rebalance 過程。我形容其為“大名鼎鼎”,從某種程度上來說其實也是“臭名昭著”,因為有關它的 bug 真可謂是此起彼伏,從未間斷。這里我先賣個關子,后面我會解釋它“遭人恨”的地方。我們先來了解一下什么是 Rebalance。
Rebalance 本質上是一種協議,規定了一個 Consumer Group 下的所有 Consumer 如何達成一致,來分配訂閱 Topic 的每個分區。比如某個 Group 下有 20 個 Consumer 實例,它訂閱了一個具有 100 個分區的 Topic。正常情況下,Kafka 平均會為每個 Consumer 分配 5 個分區。這個分配的過程就叫 Rebalance。那么 Consumer Group 何時進行 Rebalance 呢?Rebalance 的觸發條件有 3 個。
組成員數發生變更。比如有新的 Consumer 實例加入組或者離開組,抑或是有 Consumer 實例崩潰被“踢出”組。
訂閱主題數發生變更。Consumer Group 可以使用正則表達式的方式訂閱主題,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明該 Group 訂閱所有以字母 t 開頭、字母 c 結尾的主題。在 Consumer Group 的運行過程中,你新創建了一個滿足這樣條件的主題,那么該 Group 就會發生 Rebalance。
訂閱主題的分區數發生變更。Kafka 當前只能允許增加一個主題的分區數。當分區數增加時,就會觸發訂閱該主題的所有 Group 開啟 Rebalance。
下面來說說reblance“遭人恨”的地方,首先,Rebalance 過程對 Consumer Group 消費過程有極大的影響。如果你了解 JVM 的垃圾回收機制,你一定聽過萬物靜止的收集方式,即著名的 stop the world,簡稱 STW。在 STW 期間,所有應用線程都會停止工作,表現為整個應用程序僵在那邊一動不動。Rebalance 過程也和這個類似,在 Rebalance 過程中,所有 Consumer 實例都會停止消費,等待 Rebalance 完成。這是 Rebalance 為人詬病的一個方面。其次,目前 Rebalance 的設計是所有 Consumer 實例共同參與,全部重新分配所有分區。其實更高效的做法是盡量減少分配方案的變動。例如實例 A 之前負責消費分區 1、2、3,那么 Rebalance 之后,如果可能的話,最好還是讓實例 A 繼續消費分區 1、2、3,而不是被重新分配其他的分區。這樣的話,實例 A 連接這些分區所在 Broker 的 TCP 連接就可以繼續用,不用重新創建連接其他 Broker 的 Socket 資源。最后,Rebalance 實在是太慢了。曾經,有個國外用戶的 Group 內有幾百個 Consumer 實例,成功 Rebalance 一次要幾個小時!這完全是不能忍受的。最悲劇的是,目前社區對此無能為力,至少現在還沒有特別好的解決方案。所謂“本事大不如不攤上”,也許最好的解決方案就是避免 Rebalance 的發生吧。
避免消費者組平衡
具體來講,Consumer 端應用程序在提交位移時,其實是向 Coordinator 所在的 Broker 提交位移。同樣地,當 Consumer 應用啟動時,也是向 Coordinator 所在的 Broker 發送各種請求,然后由 Coordinator 負責執行消費者組的注冊、成員管理記錄等元數據管理操作。
所有 Broker 在啟動時,都會創建和開啟相應的 Coordinator 組件。也就是說,所有 Broker 都有各自的 Coordinator 組件。那么,Consumer Group 如何確定為它服務的 Coordinator 在哪台 Broker 上呢?答案就在Kafka 內部位移主題 __consumer_offsets 身上。
目前,Kafka 為某個 Consumer Group 確定 Coordinator 所在的 Broker 的算法有 2 個步驟。
第 1 步:確定由位移主題的哪個分區來保存該 Group 數據:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
第 2 步:找出該分區 Leader 副本所在的 Broker,該 Broker 即為對應的 Coordinator。
好了,我們說回 Rebalance。既然我們今天要討論的是如何避免 Rebalance,那就說明 Rebalance 這個東西不好,或者說至少有一些弊端需要我們去規避。那么,Rebalance 的弊端是什么呢?總結起來有以下 3 點:
Rebalance 影響 Consumer 端 TPS。總之就是,在 Rebalance 期間,Consumer 會停下手頭的事情,什么也干不了。
Rebalance 很慢。如果你的 Group 下成員很多,就一定會有這樣的痛點。還記得那個國外用戶的例子吧?他的 Group 下有幾百個 Consumer 實例,Rebalance 一次要幾個小時。在那種場景下,Consumer Group 的 Rebalance 已經完全失控了。
Rebalance 效率不高。當前 Kafka 的設計機制決定了每次 Rebalance 時,Group 下的所有成員都要參與進來,而且通常不會考慮局部性原理,但局部性原理對提升系統性能是特別重要的。
關於第 3 點,我們來舉個簡單的例子。比如一個 Group 下有 10 個成員,每個成員平均消費 5 個分區。假設現在有一個成員退出了,此時就需要開啟新一輪的 Rebalance,把這個成員之前負責的 5 個分區“轉移”給其他成員。顯然,比較好的做法是維持當前 9 個成員消費分區的方案不變,然后將 5 個分區隨機分配給這 9 個成員,這樣能最大限度地減少 Rebalance 對剩余 Consumer 成員的沖擊。遺憾的是,目前 Kafka 並不是這樣設計的。在默認情況下,每次 Rebalance 時,之前的分配方案都不會被保留。就拿剛剛這個例子來說,當 Rebalance 開始時,Group 會打散這 50 個分區(10 個成員 * 5 個分區),由當前存活的 9 個成員重新分配它們。顯然這不是效率很高的做法。基於這個原因,社區於 0.11.0.0 版本推出了 StickyAssignor,即有粘性的分區分配策略。所謂的有粘性,是指每次 Rebalance 時,該策略會盡可能地保留之前的分配方案,盡量實現分區分配的最小變動。不過有些遺憾的是,這個策略目前還有一些 bug,而且需要升級到 0.11.0.0 才能使用,因此在實際生產環境中用得還不是很多。
就我個人經驗而言,在真實的業務場景中,很多 Rebalance 都是計划外的或者說是不必要的。我們應用的 TPS 大多是被這類 Rebalance 拖慢的,因此避免這類 Rebalance 就顯得很有必要了。下面我們就來說說如何避免 Rebalance。要避免 Rebalance,還是要從 Rebalance 發生的時機入手。
我們在前面說過,Rebalance 發生的時機有三個:
組成員數量發生變化
訂閱主題數量發生變化
訂閱主題的分區數發生變化
Consumer 程序時,實際上就向這個 Group 添加了一個新的 Consumer 實例。此時,Coordinator 會接納這個新實例,將其加入到組中,並重新分配分區。通常來說,增加 Consumer 實例的操作都是計划內的,可能是出於增加 TPS 或提高伸縮性的需要。總之,它不屬於我們要規避的那類“不必要 Rebalance”。我們更在意的是 Group 下實例數減少這件事。如果你就是要停掉某些 Consumer 實例,那自不必說,關鍵是在某些情況下,Consumer 實例會被 Coordinator 錯誤地認為“已停止”從而被“踢出”Group。如果是這個原因導致的 Rebalance,我們就不能不管了。Coordinator 會在什么情況下認為某個 Consumer 實例已掛從而要退組呢?這個絕對是需要好好討論的話題,我們來詳細說說。
當 Consumer Group 完成 Rebalance 之后,每個 Consumer 實例都會定期地向 Coordinator 發送心跳請求,表明它還存活着。如果某個 Consumer 實例不能及時地發送這些心跳請求,Coordinator 就會認為該 Consumer 已經“死”了,從而將其從 Group 中移除,然后開啟新一輪 Rebalance。Consumer 端有個參數,叫 session.timeout.ms,就是被用來表征此事的。該參數的默認值是 10 秒,即如果 Coordinator 在 10 秒之內沒有收到 Group 下某 Consumer 實例的心跳,它就會認為這個 Consumer 實例已經掛了。可以這么說,session.timeout.ms 決定了 Consumer 存活性的時間間隔。
除了這個參數,Consumer 還提供了一個允許你控制發送心跳請求頻率的參數,就是 heartbeat.interval.ms。這個值設置得越小,Consumer 實例發送心跳請求的頻率就越高。頻繁地發送心跳請求會額外消耗帶寬資源,但好處是能夠更加快速地知曉當前是否開啟 Rebalance,因為,目前 Coordinator 通知各個 Consumer 實例開啟 Rebalance 的方法,就是將 REBALANCE_NEEDED 標志封裝進心跳請求的響應體中。除了以上兩個參數,Consumer 端還有一個參數,用於控制 Consumer 實際消費能力對 Rebalance 的影響,即 max.poll.interval.ms 參數。它限定了 Consumer 端應用程序兩次調用 poll 方法的最大時間間隔。它的默認值是 5 分鍾,表示你的 Consumer 程序如果在 5 分鍾之內無法消費完 poll 方法返回的消息,那么 Consumer 會主動發起“離開組”的請求,Coordinator 也會開啟新一輪 Rebalance。
kafka管理位移的主題
__consumer_offsets 在 Kafka 源碼中有個更為正式的名字,叫位移主題,即 Offsets Topic。老版本 Consumer 的位移管理是依托於 Apache ZooKeeper 的,它會自動或手動地將位移數據提交到 ZooKeeper 中保存。當 Consumer 重啟后,它能自動從 ZooKeeper 中讀取位移數據,從而在上次消費截止的地方繼續消費。這種設計使得 Kafka Broker 不需要保存位移數據,減少了 Broker 端需要持有的狀態空間,因而有利於實現高伸縮性。
但是,ZooKeeper 其實並不適用於這種高頻的寫操作,因此,Kafka 社區自 0.8.2.x 版本開始,就在醞釀修改這種設計,並最終在新版本 Consumer 中正式推出了全新的位移管理機制,自然也包括這個新的位移主題。新版本 Consumer 的位移管理機制其實也很簡單,就是將 Consumer 的位移數據作為一條條普通的 Kafka 消息,提交到 __consumer_offsets 中。可以這么說,__consumer_offsets 的主要作用是保存 Kafka 消費者的位移信息。它要求這個提交過程不僅要實現高持久性,還要支持高頻的寫操作。顯然,Kafka 的主題設計天然就滿足這兩個條件,因此,使用 Kafka 主題來保存位移這件事情,實際上就是一個水到渠成的想法了。
雖說位移主題是一個普通的 Kafka 主題,但它的消息格式卻是 Kafka 自己定義的,用戶不能修改,也就是說你不能隨意地向這個主題寫消息,因為一旦你寫入的消息不滿足 Kafka 規定的格式,那么 Kafka 內部無法成功解析,就會造成 Broker 的崩潰。事實上,Kafka Consumer 有 API 幫你提交位移,也就是向位移主題寫消息。你千萬不要自己寫個 Producer 隨意向該主題發送消息。你可能會好奇,這個主題存的到底是什么格式的消息呢?所謂的消息格式,你可以簡單地理解為是一個 KV 對。Key 和 Value 分別表示消息的鍵值和消息體,在 Kafka 中它們就是字節數組而已。
位移主題的 Key 中應該保存 3 部分內容:<GROUP ID,主題名,分區號>。接下來,我們再來看看消息體的設計。也許你會覺得消息體應該很簡單,保存一個位移值就可以了。實際上,社區的方案要復雜得多,比如消息體還保存了位移提交的一些其他元數據,諸如時間戳和用戶自定義的數據等。保存這些元數據是為了幫助 Kafka 執行各種各樣后續的操作,比如刪除過期位移消息等。但總體來說,我們還是可以簡單地認為消息體就是保存了位移值。
位移主題是 Kafka 自動創建的,那么該主題的分區數是 50,副本數是 3。創建位移主題當然是為了用的,那么什么地方會用到位移主題呢?我們前面一直在說 Kafka Consumer 提交位移時會寫入該主題,那 Consumer 是怎么提交位移的呢?目前 Kafka Consumer 提交位移的方式有兩種:自動提交位移和手動提交位移。
事實上,很多與 Kafka 集成的大數據框架都是禁用自動提交位移的,如 Spark、Flink 等。這就引出了另一種位移提交方式:手動提交位移,即設置 enable.auto.commit = false。一旦設置了 false,作為 Consumer 應用開發的你就要承擔起位移提交的責任。Kafka Consumer API 為你提供了位移提交的方法,如 consumer.commitSync 等。當調用這些方法時,Kafka 會向位移主題寫入相應的消息。如果你選擇的是自動提交位移,那么就可能存在一個問題:只要 Consumer 一直啟動着,它就會無限期地向位移主題寫入消息。
Kafka 是怎么刪除位移主題中的過期消息的呢?答案就是 Compaction。國內很多文獻都將其翻譯成壓縮,我個人是有一點保留意見的。在英語中,壓縮的專有術語是 Compression,它的原理和 Compaction 很不相同,我更傾向於翻譯成壓實,或干脆采用 JVM 垃圾回收中的術語:整理。
不管怎么翻譯,Kafka 使用 Compact 策略來刪除位移主題中的過期消息,避免該主題無限期膨脹。那么應該如何定義 Compact 策略中的過期呢?對於同一個 Key 的兩條消息 M1 和 M2,如果 M1 的發送時間早於 M2,那么 M1 就是過期消息。Compact 的過程就是掃描日志的所有消息,剔除那些過期的消息,然后把剩下的消息整理在一起。我在這里貼一張來自官網的圖片,來說明 Compact 過程。
Kafka 提供了專門的后台線程定期地巡檢待 Compact 的主題,看看是否存在滿足條件的可刪除數據。這個后台線程叫 Log Cleaner。很多實際生產環境中都出現過位移主題無限膨脹占用過多磁盤空間的問題,如果你的環境中也有這個問題,我建議你去檢查一下 Log Cleaner 線程的狀態,通常都是這個線程掛掉了導致的。
consumer位移提交
Consumer 端有個位移的概念,它和消息在分區中的位移不是一回事兒,雖然它們的英文都是 Offset。今天我們要聊的位移是 Consumer 的消費位移,它記錄了 Consumer 要消費的下一條消息的位移。這可能和你以前了解的有些出入,不過切記是下一條消息的位移,而不是目前最新消費消息的位移。提交位移主要是為了表征 Consumer 的消費進度,這樣當 Consumer 發生故障重啟之后,就能夠從 Kafka 中讀取之前提交的位移值,然后從相應的位移處繼續消費,從而避免整個消費過程重來一遍。換句話說,位移提交是 Kafka 提供給你的一個工具或語義保障,你負責維持這個語義保障,即如果你提交了位移 X,那么 Kafka 會認為所有位移值小於 X 的消息你都已經成功消費了。
這一點特別關鍵。因為位移提交非常靈活,你完全可以提交任何位移值,但由此產生的后果你也要一並承擔。假設你的 Consumer 消費了 10 條消息,你提交的位移值卻是 20,那么從理論上講,位移介於 11~19 之間的消息是有可能丟失的;相反地,如果你提交的位移值是 5,那么位移介於 5~9 之間的消息就有可能被重復消費。所以,我想再強調一下,位移提交的語義保障是由你來負責的,Kafka 只會“無腦”地接受你提交的位移。你對位移提交的管理直接影響了你的 Consumer 所能提供的消息語義保障。鑒於位移提交甚至是位移管理對 Consumer 端的巨大影響,Kafka,特別是 KafkaConsumer API,提供了多種提交位移的方法。從用戶的角度來說,位移提交分為自動提交和手動提交;從 Consumer 端的角度來說,位移提交分為同步提交和異步提交。
我們先來說說自動提交和手動提交。所謂自動提交,就是指 Kafka Consumer 在后台默默地為你提交位移,作為用戶的你完全不必操心這些事;而手動提交,則是指你要自己提交位移,Kafka Consumer 壓根不管。開啟自動提交位移的方法很簡單。Consumer 端有個參數 enable.auto.commit,把它設置為 true 或者壓根不設置它就可以了。因為它的默認值就是 true,即 Java Consumer 默認就是自動提交位移的。如果啟用了自動提交,Consumer 端還有個參數就派上用場了:auto.commit.interval.ms。它的默認值是 5 秒,表明 Kafka 每 5 秒會為你自動提交一次位移。為了把這個問題說清楚,我給出了完整的 Java 代碼。這段代碼展示了設置自動提交位移的方法。有了這段代碼做基礎,今天后面的講解我就不再展示完整的代碼了。
1 Properties props = new Properties();
2 props.put("bootstrap.servers", "localhost:9092"); 3 props.put("group.id", "test"); 4 props.put("enable.auto.commit", "true"); 5 props.put("auto.commit.interval.ms", "2000"); 6 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 7 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 8 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 9 consumer.subscribe(Arrays.asList("foo", "bar")); 10 while (true) { 11 ConsumerRecords<String, String> records = consumer.poll(100); 12 for (ConsumerRecord<String, String> record : records) 13 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 14 }
上面的第 3、第 4 行代碼,就是開啟自動提交位移的方法。總體來說,還是很簡單的吧。和自動提交相反的,就是手動提交了。開啟手動提交位移的方法就是設置 enable.auto.commit 為 false。但是,僅僅設置它為 false 還不夠,因為你只是告訴 Kafka Consumer 不要自動提交位移而已,你還需要調用相應的 API 手動提交位移。最簡單的 API 就是 KafkaConsumer#commitSync()。該方法會提交 KafkaConsumer#poll() 返回的最新位移。從名字上來看,它是一個同步操作,即該方法會一直等待,直到位移被成功提交才會返回。如果提交過程中出現異常,該方法會將異常信息拋出。下面這段代碼展示了 commitSync() 的使用方法:
1 while (true) {
2 ConsumerRecords<String, String> records =
3 consumer.poll(Duration.ofSeconds(1)); 4 process(records); // 處理消息 5 try { 6 consumer.commitSync(); 7 } catch (CommitFailedException e) { 8 handle(e); // 處理提交失敗異常 9 } 10 }
可見,調用 consumer.commitSync() 方法的時機,是在你處理完了 poll() 方法返回的所有消息之后。如果你莽撞地過早提交了位移,就可能會出現消費數據丟失的情況。那么你可能會問,自動提交位移就不會出現消費數據丟失的情況了嗎?它能恰到好處地把握時機進行位移提交嗎?為了搞清楚這個問題,我們必須要深入地了解一下自動提交位移的順序。一旦設置了 enable.auto.commit 為 true,Kafka 會保證在開始調用 poll 方法時,提交上次 poll 返回的所有消息。從順序上來說,poll 方法的邏輯是先提交上一批消息的位移,再處理下一批消息,因此它能保證不出現消費丟失的情況。但自動提交位移的一個問題在於,它可能會出現重復消費。
在默認情況下,Consumer 每 5 秒自動提交一次位移。現在,我們假設提交位移之后的 3 秒發生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 從上一次提交的位移處繼續消費,但該位移已經是 3 秒前的位移數據了,故在 Rebalance 發生前 3 秒消費的所有數據都要重新再消費一次。雖然你能夠通過減少 auto.commit.interval.ms 的值來提高提交頻率,但這么做只能縮小重復消費的時間窗口,不可能完全消除它。這是自動提交機制的一個缺陷。反觀手動提交位移,它的好處就在於更加靈活,你完全能夠把控位移提交的時機和頻率。但是,它也有一個缺陷,就是在調用 commitSync() 時,Consumer 程序會處於阻塞狀態,直到遠端的 Broker 返回提交結果,這個狀態才會結束。在任何系統中,因為程序而非資源限制而導致的阻塞都可能是系統的瓶頸,會影響整個應用程序的 TPS。當然,你可以選擇拉長提交間隔,但這樣做的后果是 Consumer 的提交頻率下降,在下次 Consumer 重啟回來后,會有更多的消息被重新消費。
鑒於這個問題,Kafka 社區為手動提交位移提供了另一個 API 方法:KafkaConsumer#commitAsync()。從名字上來看它就不是同步的,而是一個異步操作。調用 commitAsync() 之后,它會立即返回,不會阻塞,因此不會影響 Consumer 應用的 TPS。由於它是異步的,Kafka 提供了回調函數(callback),供你實現提交之后的邏輯,比如記錄日志或處理異常等。下面這段代碼展示了調用 commitAsync() 的方法:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); process(records); // 處理消息 consumer.commitAsync((offsets, exception) -> { if (exception != null) handle(exception); }); }
commitAsync 是否能夠替代 commitSync 呢?答案是不能。commitAsync 的問題在於,出現問題時它不會自動重試。因為它是異步操作,倘若提交失敗后自動重試,那么它重試時提交的位移值可能早已經“過期”或不是最新值了。因此,異步提交的重試其實沒有意義,所以 commitAsync 是不會重試的。顯然,如果是手動提交,我們需要將 commitSync 和 commitAsync 組合使用才能到達最理想的效果,原因有兩個:
我們可以利用 commitSync 的自動重試來規避那些瞬時錯誤,比如網絡的瞬時抖動,Broker 端 GC 等。因為這些問題都是短暫的,自動重試通常都會成功,因此,我們不想自己重試,而是希望 Kafka Consumer 幫我們做這件事。
我們不希望程序總處於阻塞狀態,影響 TPS。
我們來看一下下面這段代碼,它展示的是如何將兩個 API 方法結合使用進行手動提交。
1 try {
2 while(true) { 3 ConsumerRecords<String, String> records = 4 consumer.poll(Duration.ofSeconds(1)); 5 process(records); // 處理消息 6 commitAysnc(); // 使用異步提交規避阻塞 7 } 8 } catch(Exception e) { 9 handle(e); // 處理異常 10 } finally { 11 try { 12 consumer.commitSync(); // 最后一次提交使用同步阻塞式提交 13 } finally { 14 consumer.close(); 15 } 16 }
CommitFailedException異常
我相信用過 Kafka Java Consumer 客戶端 API 的你一定不會感到陌生。所謂 CommitFailedException,顧名思義就是 Consumer 客戶端在提交位移時出現了錯誤或異常,而且還是那種不可恢復的嚴重異常。如果異常是可恢復的瞬時錯誤,提交位移的 API 自己就能規避它們了,因為很多提交位移的 API 方法是支持自動錯誤重試的。
我們就來討論下該異常是什么時候被拋出的。從源代碼方面來說,CommitFailedException 異常通常發生在手動提交位移時,即用戶顯式調用 KafkaConsumer.commitSync() 方法時。從使用場景來說,有兩種典型的場景可能遭遇該異常。
場景一
我們先說說最常見的場景。當消息處理的總時間超過預設的 max.poll.interval.ms 參數值時,Kafka Consumer 端會拋出 CommitFailedException 異常。這是該異常最“正宗”的登場方式。你只需要寫一個 Consumer 程序,使用 KafkaConsumer.subscribe 方法隨意訂閱一個主題,之后設置 Consumer 端參數 max.poll.interval.ms=5 秒,最后在循環調用 KafkaConsumer.poll 方法之間,插入 Thread.sleep(6000) 和手動提交位移,就可以成功復現這個異常了。
如果要防止這種場景下拋出異常,你需要簡化你的消息處理邏輯。具體來說有 4 種方法。
1,縮短單條消息處理的時間。比如,之前下游系統消費一條消息的時間是 100 毫秒,優化之后成功地下降到 50 毫秒,那么此時 Consumer 端的 TPS 就提升了一倍。
2,增加 Consumer 端允許下游系統消費一批消息的最大時長。這取決於 Consumer 端參數 max.poll.interval.ms 的值。在最新版的 Kafka 中,該參數的默認值是 5 分鍾。如果你的消費邏輯不能簡化,那么提高該參數值是一個不錯的辦法。值得一提的是,Kafka 0.10.1.0 之前的版本是沒有這個參數的,因此如果你依然在使用 0.10.1.0 之前的客戶端 API,那么你需要增加 session.timeout.ms 參數的值。不幸的是,session.timeout.ms 參數還有其他的含義,因此增加該參數的值可能會有其他方面的“不良影響”,這也是社區在 0.10.1.0 版本引入 max.poll.interval.ms 參數,將這部分含義從 session.timeout.ms 中剝離出來的原因之一。
3,減少下游系統一次性消費的消息總數。這取決於 Consumer 端參數 max.poll.records 的值。當前該參數的默認值是 500 條,表明調用一次 KafkaConsumer.poll 方法,最多返回 500 條消息。可以說,該參數規定了單次 poll 方法能夠返回的消息總數的上限。如果前兩種方法對你都不適用的話,降低此參數值是避免 CommitFailedException 異常最簡單的手段。
4,下游系統使用多線程來加速消費。這應該算是“最高級”同時也是最難實現的解決辦法了。具體的思路就是,讓下游系統手動創建多個消費線程處理 poll 方法返回的一批消息。之前你使用 Kafka Consumer 消費數據更多是單線程的,所以當消費速度無法匹及 Kafka Consumer 消息返回的速度時,它就會拋出 CommitFailedException 異常。如果是多線程,你就可以靈活地控制線程數量,隨時調整消費承載能力,再配以目前多核的硬件條件,該方法可謂是防止 CommitFailedException 最高檔的解決之道。事實上,很多主流的大數據流處理框架使用的都是這個方法,比如 Apache Flink 在集成 Kafka 時,就是創建了多個 KafkaConsumerThread 線程,自行處理多線程間的數據消費。不過,凡事有利就有弊,這個方法實現起來並不容易,特別是在多個線程間如何處理位移提交這個問題上,更是極容易出錯。在專欄后面的內容中,我將着重和你討論一下多線程消費的實現方案。
除了調整 max.poll.interval.ms 之外,你還可以選擇調整 max.poll.records 值,減少每次 poll 方法返回的消息數。還拿剛才的例子來說,你可以設置 max.poll.records 值為 150,甚至更少,這樣每批消息的總消費時長不會超過 300 秒(150*2=300),即 max.poll.interval.ms 的默認值 5 分鍾。這種減少 max.poll.records 值的做法就屬於上面提到的方法 3。
場景二
Kafka Java Consumer 端還提供了一個名為 Standalone Consumer 的獨立消費者。它沒有消費者組的概念,每個消費者實例都是獨立工作的,彼此之間毫無聯系。不過,你需要注意的是,獨立消費者的位移提交機制和消費者組是一樣的,因此獨立消費者的位移提交也必須遵守之前說的那些規定,比如獨立消費者也要指定 group.id 參數才能提交位移。你可能會覺得奇怪,既然是獨立消費者,為什么還要指定 group.id 呢?沒辦法,誰讓社區就是這么設計的呢?總之,消費者組和獨立消費者在使用之前都要指定 group.id。
現在問題來了,如果你的應用中同時出現了設置相同 group.id 值的消費者組程序和獨立消費者程序,那么當獨立消費者程序手動提交位移時,Kafka 就會立即拋出 CommitFailedException 異常,因為 Kafka 無法識別這個具有相同 group.id 的消費者實例,於是就向它返回一個錯誤,表明它不是消費者組內合法的成員。
消費者消費進度監控
對於 Kafka 消費者來說,最重要的事情就是監控它們的消費進度了,或者說是監控它們消費的滯后程度。這個滯后程度有個專門的名稱:消費者 Lag 或 Consumer Lag。所謂滯后程度,就是指消費者當前落后於生產者的程度。比方說,Kafka 生產者向某主題成功生產了 100 萬條消息,你的消費者當前消費了 80 萬條消息,那么我們就說你的消費者滯后了 20 萬條消息,即 Lag 等於 20 萬。通常來說,Lag 的單位是消息數,而且我們一般是在主題這個級別上討論 Lag 的,但實際上,Kafka 監控 Lag 的層級是在分區上的。如果要計算主題級別的,你需要手動匯總所有主題分區的 Lag,將它們累加起來,合並成最終的 Lag 值。
既然消費進度這么重要,我們應該怎么監控它呢?簡單來說,有 3 種方法。
使用 Kafka 自帶的命令行工具 kafka-consumer-groups 腳本。
使用 Kafka Java Consumer API 編程。
使用 Kafka 自帶的 JMX 監控指標。
接下來,我們分別來討論下這 3 種方法。
kafka自帶命令
我們先來了解下第一種方法:使用 Kafka 自帶的命令行工具 bin/kafka-consumer-groups.sh(bat)。kafka-consumer-groups 腳本是 Kafka 為我們提供的最直接的監控消費者消費進度的工具。當然,除了監控 Lag 之外,它還有其他的功能。今天,我們主要討論如何使用它來監控 Lag。使用 kafka-consumer-groups 腳本很簡單。該腳本位於 Kafka 安裝目錄的 bin 子目錄下,我們可以通過下面的命令來查看某個給定消費者的 Lag 值:
1 $ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker連接信息> --describe --group <group名稱>
Kafka 連接信息就是 < 主機名:端口 > 對,而 group 名稱就是你的消費者程序中設置的 group.id 值。我舉個實際的例子來說明具體的用法,請看下面這張圖的輸出:
在運行命令時,我指定了 Kafka 集群的連接信息,即 localhost:9092。另外,我還設置了要查詢的消費者組名:testgroup。kafka-consumer-groups 腳本的輸出信息很豐富。首先,它會按照消費者組訂閱主題的分區進行展示,每個分區一行數據;其次,除了主題、分區等信息外,它會匯報每個分區當前最新生產的消息的位移值(即 LOG-END-OFFSET 列值)、該消費者組當前最新消費消息的位移值(即 CURRENT-OFFSET 值)、LAG 值(前兩者的差值)、消費者實例 ID、消費者連接 Broker 的主機名以及消費者的 CLIENT-ID 信息。
Kafka Java Consumer API
下面這段代碼展示了如何利用 Consumer 端 API 監控給定消費者組的 Lag 值:
1 public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {
2 Properties props = new Properties();
3 props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
4 try (AdminClient client = AdminClient.create(props)) {
5 ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
6 try {
7 Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
8 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自動提交位移
9 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
10 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
11 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
12 try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
13 Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());
14 return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),
15 entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));
16 }
17 } catch (InterruptedException e) {
18 Thread.currentThread().interrupt();
19 // 處理中斷異常
20 // ...
21 return Collections.emptyMap();
22 } catch (ExecutionException e) {
23 // 處理ExecutionException
24 // ...
25 return Collections.emptyMap();
26 } catch (TimeoutException e) {
27 throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);
28 }
29 }
30 }
這段代碼送給你,你可以將 lagOf 方法直接應用於你的生產環境,以實現程序化監控消費者 Lag 的目的。不過請注意,這段代碼只適用於 Kafka 2.0.0 及以上的版本,2.0.0 之前的版本中沒有 AdminClient.listConsumerGroupOffsets 方法。
Kafka JMX 監控指標
上面這兩種方式,都可以很方便地查詢到給定消費者組的 Lag 信息。但在很多實際監控場景中,我們借助的往往是現成的監控框架。如果是這種情況,以上這兩種辦法就不怎么管用了,因為它們都不能集成進已有的監控框架中,如 Zabbix 或 Grafana。下面我們就來看第三種方法,使用 Kafka 默認提供的 JMX 監控指標來監控消費者的 Lag 值。當前,Kafka 消費者提供了一個名為 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指標,里面有很多屬性。和我們今天所講內容相關的有兩組屬性:records-lag-max 和 records-lead-min,它們分別表示此消費者在測試窗口時間內曾經達到的最大的 Lag 值和最小的 Lead 值。
試想一下,監控到 Lag 越來越大,可能只會給你一個感受,那就是消費者程序變得越來越慢了,至少是追不上生產者程序了,除此之外,你可能什么都不會做。畢竟,有時候這也是能夠接受的。但反過來,一旦你監測到 Lead 越來越小,甚至是快接近於 0 了,你就一定要小心了,這可能預示着消費者端要丟消息了。
為什么?我們知道 Kafka 的消息是有留存時間設置的,默認是 1 周,也就是說 Kafka 默認刪除 1 周前的數據。倘若你的消費者程序足夠慢,慢到它要消費的數據快被 Kafka 刪除了,這時你就必須立即處理,否則一定會出現消息被刪除,從而導致消費者程序重新調整位移值的情形。這可能產生兩個后果:一個是消費者從頭消費一遍數據,另一個是消費者從最新的消息位移處開始消費,之前沒來得及消費的消息全部被跳過了,從而造成丟消息的假象。
總結
以后關於kafka系列的總結大部分來自Geek Time的課件,大家可以自行關鍵字搜索。