kafka的通訊協議是基於tcp之上的二進制協議,所有類型的請求和響應都是結構化的,由不同的初始類型構成。kafka使用這組協議完成各個功能的實現。
單個kafka client通常需要同時連接多個broker服務器進行數據交互,但每個broker之上只需要維護一個Socket連接用於數據傳輸。clients可能會創建額外的socket連接用於其他任務,如元數據獲取以及組rebalance等。kafka自帶的java clients使用了類似於epoll的方式在單個連接上不停的輪訓以傳輸數據。
broker端某時刻只能處理一條請求的做法是為了保證不會出現請求亂序。clients端在實現時,需要自行保證請求發送順序。
3中請求發送流向:
1.clients給broker發送請求
2.controller也能夠給其他broker發送請求
3.follower副本所在的broker向leader副本所在broker發送請求。
請求/響應結構:
統一構建於多種初始類型之上:
初始類型:
所有的請求和響應都具有統一的格式,即size+Request/Response,其中的Size時int32表示的整數,表示了該請求或響應的長度信息。
請求=請求頭部+請求體,請求體隨類型變化,
請求頭固定:
api_key:請求類型,int16整數表示
api_version:請求版本號,以int16整數表示
correlation_id:與對應響應的關聯號,實際中用於關聯response與request,方便用戶調試和排錯。該字段以int32整數表示
client_id:表示發出此請求的client ID,實際場景中用於區分集群上不同clients發送的請求。該字段是一個非空字符串。
響應=響應頭部+響應體,響應體格式隨請求類型變化,
響應頭部固定:
corrlation_id:該字段值就是上面請求頭部中的correlation_id。有了該字段,用戶就能知道該請求對應於哪個請求了。
kafka推薦用戶總是指定client_id和correlation_id,這樣可以方便用戶后續定位問題和debug。
1.0.0版本38個請求類型
produce請求:事務id+ack+timeout+[topic數據]
produce響應結構:
2.FETCH請求,既包括clients向broker發送的fetch請求,也包括分區follower副本發送給leader副本的fetch請求。格式為:
replica_id+max_wait_time+min_bytes+max_bytes+isolation_level+[topics]
3.client向broker發送metadata請求以獲取指定topic的元數據信息。
請求處理流程: