Kafka在1.1.0版本引入了fetch session的概念,旨在降低“無效”FETCH請求對集群帶寬資源的占用。故事的背景是這樣的:
眾所周知,Kafka的broker和consumer都會定期地向leader broker發送FETCH請求去獲取數據。對於分區數很多的topic而言,待發送的FETCH請求就會很大,從而整體上增加網絡帶寬占用。即使這些分區沒有任何新的數據到來,follower和consumer構造的FETCH請求都需要顯式地羅列出每個訂閱分區的詳細數據,這包括:分區號、該分區當前開始位移(log start offset)、位移以及能夠請求的最大字節數。
下面我們以1.0.2版本為例查看一下一個普通的FETCH請求有多大?首先啟動兩台broker服務器,然后創建一個replcation factor是2且有1000個分區的topic:test,之后觀察下列JMX指標的值,如下圖所示:
可以看出,對於這個有着1000個分區的topic而言,每台broker上接收到的FETCH請求大小固定是12031個字節,即大約12KB左右。即使我沒有給這個topic發送過任何消息(即consumer沒有可以消費的消息),FETCH請求都是這么大。
下面我們再用1.1.0版本重試一下這個測試,依然是啟動兩台1.1.0版本的broker服務器,然后創建一個replication-factor是2,有1000個分區的topic,然后再次查看FETCH請求的大小,如下圖所示:
這次我們發現FETCH請求大小的最大值可以達到12043字節,而之后一定穩定在最小值33字節上。從這兩組測試結果來看,顯然1.1.0版本在某些情況下極大地減少了FETCH請求的大小,節省了網絡帶寬(針對我們這個測試topic而言,網絡流量節省了將近365倍!)。那么這是怎么做到的呢?首先先從FETCH請求的協議格式開始說起。1.1.0之前最新的FETCH請求格式是V6版本,如下圖所示:
這里我不詳細展開各個字段的含義了,因為這與本文要討論的主題無關。不過我請各位再看下1.1.0版本引入的V7版本格式,如下圖所示:
V7與V6版本的差異我已經用紅框標識出來了,即在V7版本中新引入了3個主要的變化:session_id/epoch和forgetten_topics_data,其中session_id和epoch合稱為fetch session元數據。這里面的fetch session即是1.1.0版本關於FETCH請求的最大變化。一個FETCH SESSION本質上封裝了一個fetcher線程的狀態,broker端會緩存若干個session在內存中,然后通過FETCH response把相應的狀態發送給clients端,這樣clients就能知曉當前這個fetcher的狀態,從而避免每次FETCH請求中都重復性地請求無意義的數據。
在引入這個變化后,FETCH請求被分成了兩類:FULL FETCH請求和Incremental FETCH請求。在首次發送FETCH請求或當session狀態發生變化的時候,clients依然發送和以往類似的完整FETCH請求——也就是所謂的FULL FETCH請求,而一旦session穩定下來,且沒有變更,那么clients就能安全地發送“瘦身后”的FETCH請求,即增量式FETCH請求,從而起到節省帶寬的作用。
上面測試場景中第二張圖的MAX值就是FULL FETCH請求的大小(即12043B),如果我們和圖1的FETCH請求大小(12031B)做比較會發現V7版本中FULL FETCH請求比之前V6版本的多了12字節(12043 - 12031),這12個字節就是session_id(4B), epoch(4B)和空forgetten_topics_data數組(4B)所占用的內存空間。
FULL FETCH請求的完整格式如下:
{replica_id=1,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=0,epoch=0,topics=[{topic=test,partitions=[
{partition=0,fetch_offset=0,log_start_offset=0,max_bytes=1048576},
{partition=1,fetch_offset=0,log_start_offset=0,max_bytes=1048576},
...
{partition=999,fetch_offset=0,log_start_offset=0,max_bytes=1048576}
]}], forgotten_topics_data=[]}
由此可見在FULL FETCH請求中session_id和session的epoch都是0。一旦發現這些分區的FETCH session沒有變化,clients會自動切換成發送增量式FETCH請求,其格式如下:
{replica_id=1,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=1253124317,epoch=1,topics=[],forgotten_topics_data=[]}
如果我們驗證增量式FETCH請求的大小,可以很容易地計算出其大小=4B(replica_id) + 4B(max_wait_time) + 8B(min_bytes&max_bytes)+ 1B(isolation_level)+ 4B (session_id) + 4B(epoch) + 4B(空topics數組) + 4B(空forgotten_topics_data數組)=33字節,與圖2中穩定后的FETCH請求大小正好吻合!
綜上所說,Kafka在1.1.0版本引入了FETCH session的概念以期望減少FETCH請求對於網絡帶寬的占用,從實際使用角度而言,這種減少對於超大規模的集群是有明顯提升的,而對於一般的小集群其優化的效果則並非那么顯著。不過鑒於這是對用戶透明的性能提升,故總然是一件好事情~~