Kafka session.timeout.ms heartbeat.interval.ms參數的區別以及對數據存儲的一些思考
在計算機世界中經常需要與數據打交道,這也是我們戲稱CURD工程師的原因之一。寫了兩年代碼,接觸了不少存儲系統,Redis、MySQL、Kafka、Elasticsearch…慢慢地發現背后的一些公共的設計思想總是那么似曾相識,再深究一下,就會發現一些隱藏在這些系統背后的數學理論。
生活中產生的大量數據需要交由計算機來處理,根據處理方式的不同分為OLTP和OLAP兩大類應用。有些數據比如登錄流水、系統日志信息,源源不斷,先采集下來拋給消息中間件(Kafka);有些數據,比如一條描述用戶特征的記錄,就適合存儲到MySQL,並按日期建查詢索引。也就是說:面對大量的數據,把數據存儲起來只是一小步,重要的是如何把這些數據用起來,體現到存儲系統則是:有沒有一套方便的查詢接口能夠方便快速地查到我們想要的數據。如果將數據放到Kafka上了,那要怎么查?如果把數據放到MySQL上了,非常適合針對高cardinality列建B+樹索引查詢,而對於文本類的數據,放到ES上,則基於倒排索引這種數據結構,根據tf-idf、bm25等這些衡量文檔相似度的算法來快速地獲得想要的數據。
從這也可以看出,不同的存儲系統,為了滿足"查詢",它們背后的存儲原理(所采用的數據結構)是不同的。而對於這些存儲系統而言,都面臨着兩個問題:高可靠性和高可用性。可靠性,在我看來,是站在存儲系統本身來看,一般是討論單個實例如何保證數據的可靠。比如,一個正在運行的MySQL實例,它根據checkpoint機制,通過redo log 文件來保證持久性,另外還有double write buffer,保證數據頁的寫入是可靠的。類似地,在Elasticsearch里面也有translog機制,用來保證數據的可靠性。所以,想深入了解存儲系統,不妨對比一下它們之間的各種checkpoint機制。
數據為什么需要有可靠性呢?根本原因還是內存是一種易失性存儲,根據馮偌依曼體系結構,程序總是從內存中取數據交給CPU做運算。如果數據沒有fsync到磁盤,如果系統宕機那數據會不會丟?
而對於可用性,是從Client角度而言的。即我不管你背后是一個redis實例還是一個redis 集群,你只管正常地給我提供好讀寫服務就好了。這里為了避免SPOF,分布式集群就派上用場了,一台機器掛了,另一台機器頂上。在分布式系統中,需要管理好各個存儲實例,這時就需要節點的角色划分,比如master節點、controller節點之類的稱呼。畢竟管理是要有層級的嘛,大家角色都一樣,怎么管理呢?在這一點上,Redis集群與Kafka集群或者Elasticsearch集群有很大的不同,具體體現在Redis本質上是一個P2P結構的集群,而Elasticsearch和Kafka 采用的主從模型,為什么這么說呢?Redis雖然也有Master節點和Slave節點之分,但它的各個Master節點之間是平等的,Redis的數據分布方式是hash16384個槽到各個master節點上,每個master節點負責處理落在這些槽內的數據,這是從數據分布的角度來定義的Master節點,而Kafka中的Controller節點、Elasticsearch中的master節點並不是從數據分布的角度定義的,而是從集群元信息維護、集群管理的角度定義的,關於它們之間的具體區別我在這篇文章中也有過一些描述。另外,MySQL作為關系型數據庫,受數據完整性約束、事務支持的限制,在分布式集群能力上要弱一些。
最近碰到一個問題,多個業務往向一個Kafka topic發送消息,有些業務的消費量很大,有些業務的消息量很小。因Kafka尚未較好地支持按優先級來消費消息,導致某些業務的消息消費延時的問題。一種簡單的解決方案是再增加幾個Topic,面對一些系統遺留問題,增加Topic帶來的是生產者和消費者處理邏輯復雜性。一種方法是使用Kafka Standalone consumer,先使用consumer.partitionFor("TOPIC_NAME")
獲取topic下的所有分區信息,再使用consumer.assign(partitions)
顯示地為consumer指定消費分區。另一種方法是基於consumer group 自定義Kafka consumer的分區分配策略,那這時候就得對Kafka目前已有的分區分配策略有所了解,並且明白什么時候、什么場景下觸發rebalance?
Kafka consumer要消費消息,哪些的分區的消息交給哪個consumer消費呢?這是consumer的分區分配策略,默認有三個:range、round-robin、sticky。說到round-robin這個算法,真是無處不在,它經常用在一些需要負載均衡的場景。比如Elasticsearch client向ES Server發送搜索請求時,因為默認情況下每台ES節點都可做為coordinator節點接收用戶的查詢請求,而在coordinator節點上需要匯總所有分片的查詢結果,這需要消耗大量的內存和CPU,因此ES Client 也是基於round-robin算法選擇將查詢請求發送到哪個ES節點上。如果你仔細留意,會發現在Redis里面也會有這個算法的身影。再比如說:Redis LRU Cache中關於Key的access pattern,一般服從冪指分布(power-law distribution):具有某一特征的一小部分的Key訪問頻率遠遠大於其他的Key,正如這種訪問特性,LRU能達到很好的緩存效果。另外,Redis sorted set類型是基於skiplist實現,新的skipNode節點最大層數設置為多少合適呢?這也是個power-law distribution問題,其源碼注釋中:
Returns a random level for the new skiplist node we are going to create. The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL (both inclusive), with a powerlaw-alike distribution where higher levels are less likely to be returned.
其實,我想表達的是有些思想或者說是解決方案,它是通用的,應用於各個不同的存儲系統中,將它們對比起來看,能更好地理解系統背后的原理。
最近每次想寫一些筆記時,腦海里總是出現一些其他各種各樣的想法。這次本來主要是想寫kafka 中這兩個配置參數:session.timeout.ms 和 heartbeat.interval.ms的區別的,結果就先扯了一通數據存儲相關的東西。
下面繼續:
因為一個topic往往有多個分區,而我們又會在一個consumer group里面創建多個消費者消費這個topic,因此:就有了一個問題:哪些的分區的消息交給哪個consumer消費呢?這里涉及到三個概念:consumer group,consumer group里面的consumer,以及每個consumer group有一個 group coordinator。conusmer分區分配是通過組管理協議來實施的:具體如下:
consumer group里面的各個consumer都向 group coordinator發送JoinGroup請求,這樣group coordinator就有了所有consumer的成員信息,於是它從中選出一個consumer作為Leader consumer,並告訴Leader consumer說:你拿着這些成員信息和我給你的topic分區信息去安排一下哪些consumer負責消費哪些分區吧
接下來,Leader consumer就根據我們配置的分配策略(由參數partition.assignment.strategy指定)為各個consumer計算好了各自待消費的分區。於是,各個consumer向 group coordinator 發送SyncGroup請求,但只有Leader consumer的請求中有分區分配策略,group coordinator 收到leader consumer的分區分配方案后,把該方案下發給各個consumer。畫個圖,就是下面這樣的:
而在正常情況下 ,當有consumer進出consumer group時就會觸發rebalance,所謂rebalance就是重新制訂一個分區分配方案。而制訂好了分區分配方案,就得及時告知各個consumer,這就與 heartbeat.interval.ms參數有關了。具體說來就是:每個consumer 都會根據 heartbeat.interval.ms 參數指定的時間周期性地向group coordinator發送 hearbeat,group coordinator會給各個consumer響應,若發生了 rebalance,各個consumer收到的響應中會包含 REBALANCE_IN_PROGRESS 標識,這樣各個consumer就知道已經發生了rebalance,同時 group coordinator也知道了各個consumer的存活情況。
那為什么要把 heartbeat.interval.ms 與 session.timeout.ms 進行對比呢?session.timeout.ms是指:group coordinator檢測consumer發生崩潰所需的時間。一個consumer group里面的某個consumer掛掉了,最長需要 session.timeout.ms 秒檢測出來。舉個示例session.timeout.ms=10,heartbeat.interval.ms=3
session.timeout.ms是個"邏輯"指標,它指定了一個閾值---10秒,在這個閾值內如果coordinator未收到consumer的任何消息,那coordinator就認為consumer掛了。而heartbeat.interval.ms是個"物理"指標,它告訴consumer要每3秒給coordinator發一個心跳包,heartbeat.interval.ms越小,發的心跳包越多,它是會影響發TCP包的數量的,產生了實際的影響,這也是我為什么將之稱為"物理"指標的原因。
如果group coordinator在一個heartbeat.interval.ms周期內未收到consumer的心跳,就把該consumer移出group,這有點說不過去。就好像consumer犯了一個小錯,就一棍子把它打死了。事實上,有可能網絡延時,有可能consumer出現了一次長時間GC,影響了心跳包的到達,說不定下一個heartbeat就正常了。
而heartbeat.interval.ms肯定是要小於session.timeout.ms的,如果consumer group發生了rebalance,通過心跳包里面的REBALANCE_IN_PROGRESS,consumer就能及時知道發生了rebalance,從而更新consumer可消費的分區。而如果超過了session.timeout.ms,group coordinator都認為consumer掛了,那也當然不用把 rebalance信息告訴該consumer了。
在kafka0.10.1之后的版本中,將session.timeout.ms 和 max.poll.interval.ms 解耦了。也就是說:new KafkaConsumer對象后,在while true循環中執行consumer.poll拉取消息這個過程中,其實背后是有2個線程的,即一個kafka consumer實例包含2個線程:一個是heartbeat 線程,另一個是processing線程,processing線程可理解為調用consumer.poll方法執行消息處理邏輯的線程,而heartbeat線程是一個后台線程,對程序員是"隱藏不見"的。如果消息處理邏輯很復雜,比如說需要處理5min,那么 max.poll.interval.ms可設置成比5min大一點的值。而heartbeat 線程則和上面提到的參數 heartbeat.interval.ms有關,heartbeat線程 每隔heartbeat.interval.ms向coordinator發送一個心跳包,證明自己還活着。只要 heartbeat線程 在 session.timeout.ms 時間內 向 coordinator發送過心跳包,那么 group coordinator就認為當前的kafka consumer是活着的。
在kafka0.10.1之前,發送心跳包和消息處理邏輯這2個過程是耦合在一起的,試想:如果一條消息處理時長要5min,而session.timeout.ms=3000ms,那么等 kafka consumer處理完消息,group coordinator早就將consumer 移出group了,因為只有一個線程,在消息處理過程中就無法向group coordinator發送心跳包,超過3000ms未發送心跳包,group coordinator就將該consumer移出group了。而將二者分開,一個processing線程負責執行消息處理邏輯,一個heartbeat線程負責發送心跳包,那么:就算一條消息需要處理5min,只要底heartbeat線程在session.timeout.ms向group coordinator發送了心跳包,那consumer可以繼續處理消息,而不用擔心被移出group了。另一個好處是:如果consumer出了問題,那么在 session.timeout.ms內就能檢測出來,而不用等到 max.poll.interval.ms 時長后才能檢測出來。
一次kafka consumer 不斷地 rebalance 分析
明白了session.timeout.ms 和 max.poll.interval.ms 和 heartbeat.interval.ms三個參數的意義后,現在來實際分析一下項目中經常碰到的 consumer rebalance 錯誤。
一般我們是在一個線程(用戶線程)里面執行kafka consumer 的while true循環邏輯的,其實這里有2個線程:一個是用戶線程,另一個是心跳線程。心跳線程,我想就是根據heartbeat.interval.ms參數配置的值周期性向coordinator發送心跳包以證明consumer還活着。
如果消息處理邏輯過重,也即用戶線程需要執行很長的時間處理消息,然后再提交offset。咋一看,有一個后台心跳線程在不斷地發送心跳啊,那為什么group coordinator怎么還老是將consumer移出group,然后導致不斷地rebalance呢?
我想,問題應該是 max.poll.interval.ms這個參數引起的吧,因為在ERROR日志中,老是提示:消息處理邏輯花了太長的時間,要么減少max.poll.records值,要么增大session.timeout.ms的值。盡管有后台heartbeat 線程,但是如果consumer的消息處理邏輯時長超過了max.poll.interval.ms ,那么此consumer提交offset就會失敗:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
此外,在用戶線程中,一般會做一些失敗的重試處理。比如通過線程池的 ThreadPoolExecutor#afterExecute()方法捕獲到異常,再次提交Runnable任務重新訂閱kafka topic。本來消費處理需要很長的時間,如果某個consumer處理超時:消息處理邏輯的時長大於max.poll.interval.ms (或者消息處理過程中發生了異常),被coordinator移出了consumer組,這時由於失敗的重試處理,自動從線程池中拿出一個新線程作為消費者去訂閱topic,那么意味着有新消費者加入group,就會引發 rebalance,而可悲的是:新的消費者還是來不及處理完所有消息,又被移出group。如此循環,就發生了不停地 rebalance 的現象。
參考資料
原文:https://www.cnblogs.com/hapjin/p/10926882.html
最近碰到一些中文分詞的歸一化、分詞結果的准確度(分詞生成自定義的詞)、定制 ES Analyzer插件滿足特殊符號搜索、中文行業術語搜索 需求的問題...有時間再寫一篇。