最近看了kafka2.4新版本的一些功能特性,不得不說,在kafka2.0以后,kafka自身就比較少推出一些新的feature了,基本都是一些修修補補的東西。倒是kafka connect和kafka stream相關的開發工作做的比較多。可能kafka的野心也不局限於要當一個中間件,而是要實現一個流處理系統的生態了。
這次要介紹的是我覺得比較有意思的兩個特性,一個是kafka支持從follower副本讀取數據,當然這個功能並不是為了提供讀取性能,后面再詳細介紹。另一個則是新推出的sticky partitioner功能,我猜是從rebalance的StickyAssignor中得到靈感,發現producer的分區策略也可以這樣搞,233,這個feature主要作用是提高性能。
這兩個feature都是kafka2.4.0版本推出的,如果想使用這些新feature,那么不妨升級下吧~
follower副本讀取數據(consumer fetch from closest replica)
背景
在早先kafka的設計中,為了使consumer讀取數據能夠保持一致,是只允許consumer讀取leader副本的數據的。即follower replica只是單純地備份數據的作用。那推出follower replica fetch功能的背景是什么呢?
舉個比較常見的場景,kafka存在多個數據中心,不同數據中心存在於不同的機房,當其中一個數據中心需要向另一個數據中心同步數據的時候,由於只能從leader replica消費數據,那么它不得不進行跨機房獲取數據,而這些流量帶寬通常是比較昂貴的(尤其是雲服務器)。即無法利用本地性來減少昂貴的跨機房流量。
所以kafka推出這一個功能,就是幫助類似這種場景,節約流量資源。並且這種功能似乎還可以和新推出的mirror maker2相互配合,實現多個數據源的數據同步,不過我自己還沒測試過。
rack功能介紹
要說follower replica fetch,那就不得不先說rack功能,這個是kafka比較早就推出的功能,是Kafka對機架感知提供了的基本支持,可以將其用於控制副本的放置,詳細內容可以參閱這篇Kafka機架感知文章。
使用方式,其實就是一個broker端的參數,broker.rack,這個參數可以說明當前broker在哪個機房。
舉上面文章中的例子,如果一個數據中心的集群分布如下:
那么可以這樣配置:
- broker0 -> rack1
- broker1 -> rack1
- broker2 -> rack2
- broker3 -> rack2
這樣其實就是相當於給broker打一個標簽,當新建topic,比如新建一個兩個副本 & 兩個分區的topic,kafka至少會自動給rack1或rack2分配全部分區的一個副本。什么,你說要是創建兩個分區一個副本的topic該怎么分。。。抱歉,我給不了答案。等你自己實踐然后評論跟我說下答案 =。=
replica fetch功能測試
OK,上面介紹的rack功能,我們就能發現,這個其實跟跨機房讀數據的這種場景是很搭的。在跨機房多數據中心場景中,如果數據中心A,一個副本放在數據中心B的機房中,只要讓數據中心B的consumer能夠讀數據中心A的那個replica的數據(follower副本)讀數據,那不就萬事大吉。
社區也是這樣想的,所以就推出了這個功能。讓消費者可以指定rack id,然后可以不從消費者讀取數據。要實現這個目的,需要先配置兩個參數:
replica.selector.class
- broker端配置
- 配置名:replica.selector.class
- 解釋:ReplicaSelector實現類的全名,包括路徑 (比如 RackAwareReplicaSelector 即按 rack id 指定消費)
- 默認:從 Leader 消費的 LeaderSelector
為了支持這個功能,kafka修改了這部分的接口實現,源碼中新增一個ReplicaSelector
接口,如果用戶有自定義消費策略的需求,也可以繼承這個接口實現自己的功能。
目前這個接口有兩個實現類,一個是LeaderSelector
,即從leader副本讀數據。另一個則是RackAwareReplicaSelector
,會去到指定的rack id讀數據。
client.rack
- consumer端配置
- 配置名:client.rack
- 解釋:這個參數需要和broker端指定的
broker.rack
相同,表示去哪個rack中獲取數據。 - 默認:null
這個參數只有在上面的replica.selector.class
指定為RackAwareReplicaSelector
且broekr指定了broker.rack
才會生效。
這個功能要測試也挺簡單的,可以直接搭建一個兩個broker的kafka集群,配置broker.rack,然后使用consumer客戶端指定client.rack發送到非leader的節點查數據就行了。另外,可以使用這條命令查看網卡流量信息:
sar -n DEV 1 300
存在問題
從follower replica讀取數據肯定有問題,最可能的問題就是落后節點的問題,從這樣的節點讀取數據會面臨什么樣的情況呢?官方給出了幾種場景及解決辦法。先看看這張圖
主要有四種可能出現問題的情況,我們分別來看看應該如何解決:
Case 1(uncommitted offset)
這個場景是follower接收到數據但還未committed offset,這個時候,若消費者的offet消費到high watemark到log end offset之間的那段(Case 1黃色那段),會返回空數據,而不是一個錯誤信息。直到這段內容 committed。
case 2(unavailable offset)
這種場景應該發生於慢節點的情況下,滿節點的broker還未接收到實際數據,但已經跟leader通信知道有部分數據committed了(case 2黃色部分)。當遇到這種情況,consumer 消費到時候,會返回 OFFSET_NOT_AVAILABLE 錯誤信息。
case 3(offset too small)
這種情況可能出現在消費者指定了 offset 的情況。那么在指定不同auto.offset.reset
的時候有不同的情況。
- If the reset policy is "earliest," fetch the log start offset of the current replica that raised the out of range error.
- If the reset policy is "latest," fetch the log end offset from the leader.
- If the reset policy is "none," raise an exception.
case 4(offset too large)
遇到這種情況,會返回一個 broker 會返回一個 OFFSET_OUT_OF_RANGE 的錯誤。
但 OFFSET_OUT_OF_RANGE 遇到這種錯誤的時候也有多種可能,官方給出當 consumer 遇到這種問題的解決思路,
Use the OffsetForLeaderEpoch API to verify the current position with the leader.
- If the fetch offset is still valid, refresh metadata and continue fetching
- If truncation was detected, follow the steps in KIP-320 to either reset the offset or raise the truncation error
- Otherwise, follow the same steps above as in case 3.
sticky partitioner功能
背景
kafka producer發送數據並不是一個一個消息發送,而是取決於兩個producer端參數。一個是linger.ms
,默認是0ms,當達到這個時間后,kafka producer就會立刻向broker發送數據。另一個參數是batch.size
,默認是16kb,當產生的消息數達到這個大小后,就會立即向broker發送數據。
按照這個設計,從直觀上思考,肯定是希望每次都盡可能填滿一個batch再發送到一個分區。但實際決定batch如何形成的一個因素是分區策略(partitioner strategy)。在Kafka2.4版本之前,在producer發送數據默認的分區策略是輪詢策略(沒指定keyd的情況),這在我以前的文章有說到過詳細解析kafka之kafka分區和副本。如果多條消息不是被發送到相同的分區,它們就不能被放入到一個batch中。
所以如果使用默認的輪詢partition策略,可能會造成一個大的batch被輪詢成多個小的batch的情況。鑒於此,kafka2.4的時候推出一種新的分區策略,即Sticky Partitioning Strategy,Sticky Partitioning Strategy會隨機地選擇另一個分區並會盡可能地堅持使用該分區——即所謂的粘住這個分區。
鑒於小batch可能導致延時增加,之前對於無Key消息的分區策略效率很低。社區於2.4版本引入了黏性分區策略(Sticky Partitioning Strategy)。該策略是一種全新的策略,能夠顯著地降低給消息指定分區過程中的延時。
使用Sticky Partitioner有助於改進消息批處理,減少延遲,並減少broker的負載。
功能解析
sticky Partitioner實現的代碼是在UniformStickyPartitioner
里面。貼下代碼看看:
public class UniformStickyPartitioner implements Partitioner {
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return stickyPartitionCache.partition(topic, cluster);
}
public void close() {}
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}
}
我們主要關注UniformStickyPartitioner#partition()
方法,可以看到,它是直接通過一個cache類獲取相同的分區,這表示新的record會一直發送到同一個分區中,除非生成新的batch,觸發了UniformStickyPartitioner#onNewBatch()
方法才會換分區。
可以看看RoundRobinPartitioner#partition()
方法(即輪詢分區策略)進行對比,就能發現比較明顯的對比。
這個sticky partitioner最大的好處就是性能較好,按照官方給出的測試結果,使用sticky partitioner測試可以減少50%的延時,吞吐也有相對應的提高。我自己測了下數據基本出入不大。
另外說明下,在kafka2.4以后,默認的partitioner分區策略,已經包含了sticky partitioner了,所以升級到kafka2.4以后,並不需要任何修改就能享受到性能到極大提升。這里可以看下kafka2.4版本的策略說明:
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose the sticky partition that changes when the batch is full.
*
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {
有一點挺奇怪到,在測試過程中(使用bin/kafka-producer-perf-test.sh測試),發現DefaultPartitioner
的性能要比UniformStickyPartitioner
的性能要好一些,不確定是什么原因,知道到小伙伴可以在評論區給出答案:)
參考:
KIP-392: Allow consumers to fetch from closest replica
KIP-480: Sticky Partitioner
以上~