1. Kafka發送流程
kafka的發送流程可以簡單概括為如下的圖。這幅圖我們可以分為三部分來理解。中間的(深藍色矩形)部分的流程是發送的核心流程(同步發送);左邊(淡藍色)是異步發送時相關的額外流程,右邊(黃色)是客戶端更新元信息相關的流程。簡單概括為:
- 同步發送流程
- 異步發送流程
- 更新元信息流程
2.1 創建生產者
一般在生產者客戶端代碼中我們使用如下這樣的代碼來創建一個生產者。
Produce p = new Producer(new ProducerConfig());
實際上在運行該代碼后,我們是啟動了三個實例,同時也初始化了ProducerConfig類完成了生產者的配置。三個實例如下:
- Producer
- DefaultEventHandler
- ProducerPool: 連接不同kafka broker的生產者池,連接個數有broker.list參數決定
2.2 同步發送數據
- Producer實例調用其send方法
- 本質是調用了Handler的handle(message)
- handler序列化消息
- handler調用dispatchSerializedData方法來調度序列化后的消息
- dispatchSerializedData方法調用partitionAndCollate方法對topic的message進行分組(根據獲取的leaderBrokerId元數據來對消息分組)
- 從生產者池中獲取不同broker對應的生產者,來真正的發送消息
2.3 異步發送流程
異步發送可以結合同步發送的流程來看。異步發送流程就是在同步發送流程前面多進行了一些額外的流程,來達到異步批量發送的目的。
額外增加的流程為:
- 根據生產者API,采用異步方式,則先將消息寫入一個阻塞隊列
- DefaultEventHandler定期向阻塞隊列拉去消息
- 后面和同步發送流程相同
結合下圖的流程來理解同步發送流程和異步發送流程(區別可以看到就是多了一個阻塞隊列):
-
同步發送流程
2.異步發送流程