1. 介紹
原文來自linkedin的一篇PPT producer-performance-tuning-for-apache-kafka 。
2. 本文的一些前提
- 討論的kafka版本為0.10.0
- 沒有broker端的再壓縮
- 消息都有8字節的時間戳介紹信息
3. 優化目標
給定一個要發送的數據集,在滿足持久性、有序性的前提下優化以下兩點:
- 吞吐量
- 延遲
優化專注於優化平均性能,這樣對所有的producer都有效。
4. kafka producer原理回顧
4.1 生產者的關鍵配置
- batch.size: 基於大小的batching策略
- linger.ms: 基於時間的batching策略
- compression.type:壓縮的速度上lz4=snappy<gzip。
- max.in.flight.requests.per.connection (affects ordering,設置為1可以保證有序性,但是發送性能會受影響。不為1的時候,如果發生消息重發則會亂序)
- acks (affects durability)
PS: 更大的批次,意味着更好的壓縮率、更高的吞吐量。但是負面影響,就是延遲會高些。
4.2 生產者發送原理
這個之前在kafka生產者原理詳解一文中做了一些分析。現在來看看kafka的 committer如何來分析的發送者原理的。其分析相對更加簡明扼要。
發送者發送消息的過程簡單概括為:
- 序列化
- 根據topic的元信息對數據進行分區
- 分區數據經過壓縮器處理后放入batch,產生M和CB。分區數據按照batch在Record Accumulator里面組織(used和callback)。一個batch對象本身會占用一些空間,圖上的used和callbacks。
假設現在Record Accumulator中已經包含了如下的數據:
當一個batch准備完畢后,用戶線程就可以去執行具體的發送操作了。當滿足以下條件之一時,我們認為一個batch是已經“准備完畢的”:
- batch.size達到了
- linger.ms時間達到了
- 發現同一個broker的其他batch已經完畢了
- flush()和close()方法被調用
用戶線程獲取batch的過程如下:
- 從batch隊列中輪詢獲取batch
- 將batch根據leader broker來分組
- 將分完組的batches發送給broker
- 如果max.in.flight.requests.per.connection>1則在管道中排隊
PS: 接下來的說明,都假設max.in.flight.requests.per.connection=1
5. 生產者調優
5.1 調優工具
生產者調優,主要可以利用kafka-producer-perf-test.sh(org.apache.kafka.tools.ProducerPerformance)。通過測試不同的配置來對比發送效率。
使用方法例子:
./kafka-producer-perf-test.sh --num-records 1000000 --record-size 1000 --topic
becket_test_3_replicas_1_partition --throughput 1000000 --producer-props bootstrap. servers=localhost:9092 max.in.flight.requests.per.connection=1 batch.size=100000 compression.type=lz4
PS: kafka 0.8的版本還支持thread-num等選項,現在0.10.1中還沒有,不過已經有issue在解決了。相信馬上會有了。詳情見: KAFKA-3554
3554修復后會有如下功能:
- --num-threads: 發送消息的線程數
- --value-bound: The range of the random integer in the messages. This option is useful when compression is used.Different integer range simulates different compression ratio.
- producer metrics: 在使用ProducerPerformance的時候,還會打印一系列metrics。
關於第三點,是以前沒有的特性。這個對生產者調優十分重要。使用ProducerPerformance的時候,打印的度量信息有:
- Select_Rate_Avg (The rate that the sender thread runs to check if it can send some messages)
- Request_Rate_Avg
- Request_Latency_Avg (Not including the callback execution time)
- Request_Size_Avg (After compression)
- Batch_Size_Avg (After compression)
- Records_Per_Request_Avg
- Record_Queue_Time_Avg
- Compression_Rate_Avg
PS:以上度量信息,需要至少1分鍾運行時間才能保證穩定。
使用例子:
./kafka-producer-perf-test.sh --num-records 1000000 --record-size 1000 --topic becket_test_3_replicas_4_partition --throughput 100000 --num-threads 1 --value-bound 50000 --producer-props bootstrap.servers=localhost:9092 compression.type=gzip max.in.flight. requests.per.connection=1
5.2 用於調優的幾個公式
5.2.1 吞吐量計算公式
吞吐量可以用以下公式估算:
throughput_Avg(平均吞吐量) ~= Request_Rate_Avg (平均請求速率)* Request_Size_Avg(平均請求大小) / Compression_Rate_Avg (壓縮率)
估算的實際值會比實際值大一些,因為會有一些request overhead沒有考慮進去。
5.2.2 request_size_avg計算
平均請求大小的計算公式為:
Request_Size_Avg(平均請求大小) = Records_Per_Request_Avg (每個請求的消息數)Record_Size (消息大小) Compression_Rate_Avg(壓縮率) +Request_Overhead
request overhead取決於:
- topic和分區數量
- 一般都是從幾十字節到幾百字節
5.2.3 Request_Rate_Upper_Limit
5.2.4 平均延遲計算公式
5.3 調優工具使用示例
假設我們使用以下的生產者來測試:
5.2.1 測試結果分析
根據得到的結果,我們發現吞吐量為9.96MB/s,遠遠小於我們實際的網絡帶寬1Gbps。
request_rate_avg和理論上限差距不大,而壓縮率又是固定的。所以我們的目標為增大request_size_avg來增加吞吐量。增加吞吐量的方式主要有:
linger.ms與batch size、壓縮率以及吞吐量和延遲之間的關系:
5.2.2 batching與壓縮時間和吞吐量的關系
上圖看出來,batching增大之后,吞吐量反而變差了,而且壓縮率也只有少量增長。這種原因主要是:增大batch會顯著增加壓縮的耗時。
相關測試:
總結: 一般我們說增大批次,都有利於增加吞吐量(減少了網絡IO次數)。但是這里之所以行不通是因為增大批次帶來的好處無法抵消壓縮時間的增長。從上圖的實驗結果可以看到,采用16KB或者索性采用較大的256KB都是可以的。避免采用處在中間的batch size
5.2.3 線程數與吞吐量的關系
可見:發送者的線程數,不是越多越好,因為線程數過多會影響延遲,而且有時候會產生負面效果。但是一般線程數小於topic分區數都是沒啥問題的。
5.2.4 優化
通過增加分區數、線程數、batch size,使得吞吐量得到改善:
5.2.5 關於尋找吞吐量瓶頸的方式
5.3 acks=-1(all)時的延遲調優
5.3.1 原理回顧
acks=all的時候,瓶頸很有肯能發生在replication time。
高水位線的值變更需要等待下一次fetch過來之后才變更。所有ProduceRequest里面的高水位線全部抵達當前offset了,才會返回ProduceResponse。

第二個fetch過來的時候,partition0的高水位線移動到當前offset

假設broker1只有1個replication線程,則replicaiton time為
5.3.2 replication time優化
顯而易見的是增加num.replica.fetchers,從而使得並發的fetch來做復制。這樣的Replication time則為:
設置多少的replica fetchers合理?一般按照官方的生產建議設置成4就好了。
5.4 生產者RTT時間長優化
5.4.1 場景描述
有個跨洋的pipeline

5.4.2 優化方案
現有情況的計算,發現確實吞吐量比較低。

解決辦法是增加send和 recieve buffer。下圖可以看到增大吞吐量之后,最多能達到20MB/s的吞吐量。