這個 Kafka 的專題,我會從系統整體架構,設計到代碼落地。和大家一起杠源碼,學技巧,漲知識。希望大家持續關注一起見證成長!
我相信:技術的道路,十年如一日!十年磨一劍!
往期文章
前言
我們說 Kafka 是一個消息隊列,其實更加確切的說:是 Broker 這個核心部件。為何這么說?你會發現我們可以通過控制台、 Java 代碼、 C++ 代碼、甚至是 Socket 向 Broker 寫入消息,只要我們遵從了 Kafka 寫入消息的協議,就可以將消息發送到 Kafka 隊列中。
用專業一點的話術來說,Kafka 定義了一個應用層的網絡協議,只要我們基於傳輸層構造出符合這個協議的數據,就是合法的 Kafka 消息。
所以說我們寫入 Kafka 消息的只是一個生產者的客戶端,他的形式多種多樣,有 Java ,Python,C++ 等多種實現,那么我們每次發消息難道還需要自己去實現這套發送消息的協議么?顯然 Kafka 官方已經考慮到這個問題了,為了給我們提供 開箱即用
的消息隊列,官方已經幫我們寫好了各種語言的優質生產者實現,例如我們今天要討論的 Java 版本的實現。
思考
前面提到 Kafka 幫我們實現了各個版本的生產者代碼,其實他也可以完全不提供這份代碼,因為核心的隊列的功能已經實現了,這些客戶端的代碼也可以完全交由用戶自己實現。
那么假如沒有官方代碼,我們又該實現一些什么功能,有哪些接口,哪些方法,以及如何組織這些代碼呢。帶着這樣的問題我們一起來思考一下!一般對於這種帶有數據流轉的設計,我會從 由誰產生?
什么數據?
通往哪去?
如何保證通路可靠?
這幾個方面來考慮。
消息自然是通過應用程序構造出來並提供給生產者,生產者首先要知道需要將消息發送到哪個 Broker 的哪個 Topic,以及 Topic 的具體 Partition 。那么必然需要配置客戶端的 Broker集群地址
,需要發送的 Topic 名稱
,以及 消息的分區策略
,是指定到具體的分區還是通過某個 key hash 到不同的分區。
知道了消息要通往哪,還需要知道發送的是什么格式的消息,是字符串還是數字或是被序列化的二進制對象。 消息序列化
將需要消息序列化成字節數組才方便在網絡上傳輸,所以要配置生產者的消息序列化策略,最好是可以通過傳遞枚舉或者類名的方式自動構造序列化器,便於后續序列化過程的擴展。
從上面一篇文章 《Kafka 探險 - 架構簡介》 了解到:消息隊列常常用於多個系統之間的異步調用,那么這種調用關系就沒有強實時依賴。由於發消息到 Kafka 會產生 網絡 I/O
,相對來說比較耗時,那么消息發送這一動作除了同步調用, 是否也可以設置為異步,提高生產者的吞吐呢?
。並且大量消息發送場景, 我們可以設置一個窗口,窗口可以是時間維度也可以是消息數量維度,將消息積攢起來批次發送,減少網絡 I/O 次數,提高吞吐量。
最后呢為了保證消息可以最大程度的成功發送到 Broker ,我們還需要一些 失敗重試機制
,例如失敗后放到重試隊列中,隔一段時間嘗試再次發送。
理清思路
通過上面的分析,我們會有一個大致的認識,應該會有哪些方法,以及底層的大致的設計會分為哪幾個部分。但是不夠清楚,不夠明晰。
首先總結一下實現客戶端的幾個要點在於:
- 配置 Broker 基礎信息:集群地址、Topic、Partition
- 消息序列化,通過可擴展的序列化器實現
- 消息異步寫入緩沖區,網絡 I/O 線程實現消息發送
- 消息發送的失敗重試機制
話不多說,用一張圖畫出各個核心模塊以及他們之間的交互順序:
用戶設定 Kafka 集群信息,生產者從 Kafka Broker 上拉取 可用 Kafka 節點、Topic 以及 Partition 對應關系。緩存到生產者成員變量中,如果 Broker 集群有擴容,或者有機器下線需要重新獲取這些服務信息。
客戶端根據用戶設置的序列化器,對消息進行序列化,之后異步的將消息寫入到客戶端緩沖區。緩沖區內的消息到達一定的數量或者到達一個時間窗口后,網絡 I/O 線程將消息從緩沖區取走,發送到 Broker 。
以上就是我對於一個 Kafka 生產者實現的思考,接下來看看官方的代碼設計與我們的思路有何差別,他又是為什么這么設計。
官方設計
其實經過上面的思考和整理,我們的設計已經非常接近 Kafka 的官方設計了,官方的模塊拆分的更加細致,功能更加獨立。
核心組件
首先看一眼 KafkaProducer 類中有哪些成員變量,這些變量就是 Producer 的核心組件。
其中核心字段的解釋如下:
clinetId
:標識發送者Id
metric
:統計指標
partitioner
:分區器作用是決定消息發到哪個分區。有 key 則按照 key 的 hash ,否則使用 roundrobin
key/value Serializer
:消息 key/value 序列化器
interceptors
:發送之前/后對消息的統一處理
maxRequestSize
:可以發送的最大消息,默認值是1M,即影響一個消息 Record 的大小,此值在服務端也是有限制的。
maxBlockTimeMs
:buffer滿了或者等待metadata信息的,超時的補償機制
accumulator
:累積緩沖器
networkClient
:包裝的網絡層
sender
:網絡 I/O 線程
發送流程
發送一條消息的時候,數據又是怎樣在這些組件之間進行流轉的呢?
Producer調用 send 方法后,在從 Broker 獲取的 Metadata 有效情況下,經過攔截器和序列化后,被分區器放到了一個緩沖區的特定位置,緩沖區由一個 ConcurrentHashMap 構成,key 為主題分區,value 是一個 deque 存放消息緩存塊。從客戶端角度來看如果無需關心發送結果,發送流程就已經結束了。
接下來是獨立的Sender線程負責從緩沖中獲取足量的數據調用 Network Client 封裝層去真正發送數據,這里使用了 Java8 的 NIO 網絡模型發送數據。
可以看到整個邏輯的關鍵點在於 RecordAccumulator 如何進行消息緩存,一般的成熟框架和中間件中都會有一套自己的內存管理機制,比如 Netty 也有一套復雜而又精妙的內存管理抽象層,這里的緩沖區也是一樣的道理,主要需要去看看 Kafka 如何去做內存管理。
另外需要關注 Sender 從緩沖里以什么樣的邏輯獲取數據,來達到盡量少的網絡交互發送盡量多的數據。還有網絡失敗又是如何保證數據的可靠性的。這個地方也是我們的設計和官方實現的差距,對於網絡 I/O 的精心優化。
目前的篇幅已經比較長了,為了大家方便閱讀理解,本篇主要從和大家一起思考如何設計一個 Kafka Producer 以及官方是如何實現的,我們之間的差距是什么,更需要關注的點是什么。通過自己的思考和對比更加能認識到不足學習到新的點!
尾聲(嘮叨)
這篇文章從周內就開始了,后面斷斷續續每天寫了點,只是每天回去的確實有點晚,偶爾還給我整個失眠,精神狀態不太好,周五六點多飯都沒吃直接回家睡覺了,確實好困,希望下周能休息好。
這周的工作壓力也很大,主要是需要推動很多上下游協同,還需要定方案。經常在想怎么交涉?怎么修改方案大家會認同?怎樣說服他們? 是壓力也是鍛煉,說明這方面欠缺的較多,該補!
下篇文章主要會寫 KafkaProducer 的緩存內存管理機制,Meta 信息更新機制,以及網絡 I/O 模型的設計。敬請期待~
另外:大家也可以關注下我的微信公眾號哦~ 技術分享和個人思考都會第一時間同步!