在新版本的kafka中(從0.9開始),其實只有異步方式一種,是批量發送的方式
在producer端,存在2個線程,一個是producer主線程,用戶端調用send消息時,是在主線程執行的,數據被緩存到RecordAccumulator中,send方法即刻返回,也就是說此時並不能確定消息是否真正的發送到broker。另外一個是sender IO線程,其不斷輪詢RecordAccumulator,滿足一定條件后,就進行真正的網絡IO發送,使用的是異步非阻塞的NIO。主線程的send方法提供了一個用於回調的參數,當sender線程發送完后,回調函數將被調用,可以用來處理成功,失敗或異常的邏輯
在客戶端調用send方法時,需要先構造好ProducerRecord對象,其包含以下字段
如果在發送時指定了partition,則消息將被保存到指定的tp分區隊列,如果沒有指定分區,將對key散列后來計算分區,相同key的消息將被寫到同一個分區隊列中,如果key是null,且使用默認的分區器,則分區器將用輪詢的方法(Round Robin)將序列化后的消息均衡分布到不同的隊列中,sender線程從Accumulator中取出批量數據組成一個batch發送
如果想使用同步方式,其實是通過異步方式間接實現,因為異步方式返回的是一個future對象,在這對象上調用get方法,將被阻塞直到返回結果。
如下圖所示,在record被加入到accumulator時,會根據record所在的tp找到RecordBatch隊列,如果不存在,就新建一個隊列,在隊列中取出最后一個RecordBatch,如果這個batch還有空間,就把record新追加到緩存后面,這樣1個batch可能會有多個record,如果batch空間不夠,就新創建一個batch,重新分配一個Max(16k, recordsize)的buffer,如果這個record超過16k,則這個batch中只會保存這1個record
下圖詳細描述了數據發送到accumulator及sender線程從accumulator取出並發送到broker的過程,
紅色的粗虛線表示數據流向,紅色粗實線表示sender線程不斷的循環過程,在虛線旁帶有數字標號的是具體的執行步驟