轉載:https://my.oschina.net/u/2828172/blog/443419
背景說明
線上業務反應使用 Flink 消費上游 kafka topic 里的軌跡數據出現 backpressure,數據積壓嚴重。單次 bulk 的寫入量為:3000/50mb/30s,並行度為 48。針對該問題,為了避免影響線上業務申請了一個與線上集群配置相同的 ES 集群。本着復現問題進行優化就能解決的思路進行調優測試。
測試環境
-
Elasticsearch 2.3.3
-
Flink 1.6.3
-
flink-connector-elasticsearch 2_2.11
-
八台 SSD,56 核 :3 主 5 從
Rally 分布式壓測 ES 集群
-
從壓測結果來看,集群層面的平均寫入性能大概在每秒 10 w+ 的 doc。
Flink 寫入測試
-
配置文件
1config.put("cluster.name", ConfigUtil.getString(ES_CLUSTER_NAME, "flinktest"));
2config.put("bulk.flush.max.actions", ConfigUtil.getString(ES_BULK_FLUSH_MAX_ACTIONS, "3000"));
3config.put("bulk.flush.max.size.mb", ConfigUtil.getString(ES_BULK_FLUSH_MAX_SIZE_MB, "50"));
4config.put("bulk.flush.interval.ms", ConfigUtil.getString(ES_BULK_FLUSH_INTERVAL, "3000"));
-
執行代碼片段
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
initEnv(env);
Properties properties = ConfigUtil.getProperties(CONFIG_FILE_PATH);
//從kafka中獲取軌跡數據
FlinkKafkaConsumer010<String> flinkKafkaConsumer010 =
new FlinkKafkaConsumer010<>(properties.getProperty("topic.name"), new SimpleStringSchema(), properties);
//從checkpoint最新處消費
flinkKafkaConsumer010.setStartFromLatest();
DataStreamSource<String> streamSource = env.addSource(flinkKafkaConsumer010);
10//Sink2ES
streamSource.map(s -> JSONObject.parseObject(s, Trajectory.class))
.addSink(EsSinkFactory.createSinkFunction(new TrajectoryDetailEsSinkFunction())).name("esSink");
env.execute("flinktest");
-
運行時配置
任務容器數為 24 個 container,一共 48 個並發。savepoint 為 15 分鍾:
-
運行現象
(1)source 和 Map 算子均出現較高的反壓
(2)ES 集群層面,目標索引寫入速度寫入陡降
平均 QPS 為:12 k 左右。
(3)對比取消 sink 算子后的 QPS
streamSource.map(s -> JSONObject.parseObject(s, FurionContext.class)).name("withnosink");
平均QPS為:116 k 左右。
有無sink參照實驗的結論:
取消 sink 2 ES 的操作后,QPS 達到 110 k,是之前 QPS 的十倍。由此可以基本判定: ES 集群寫性能導致的上游反壓
優化方向
-
索引字段類型調整
bulk 失敗的原因是由於集群 dynamic mapping 自動監測,部分字段格式被識別為日期格式而遇到空字符串無法解析報錯。
解決方案:關閉索引自動檢測。
效果: ES 集群寫入性能明顯提高但 Flink operator 依然存在反壓:
-
降低副本數
curl -XPUT{集群地址}/{索引名稱}/_settings?timeout=3m -H "Content-Type: application/json" -d'{"number_of_replicas":"0"}'
-
提高 refresh_interval
針對這種 ToB、日志型、實時性要求不高的場景,我們不需要查詢的實時性,通過加大甚至關閉 refresh_interval 的參數提高寫入性能。
curl -XPUT{集群地址}/{索引名稱}/_settings?timeout=3m -H "Content-Type: application/json" -d '{ "settings": { "index": {"refresh_interval" : -1 } } }'
-
檢查集群各個節點 CPU 核數
在 Flink 執行時,通過 Grafana 觀測各個節點 CPU 使用率以及通過 Linux 命令查看各個節點 CPU 核數。發現 CPU 使用率高的節點 CPU 核數比其余節點少。為了排除這個短板效應,我們將在這個節點中的索引 shard 移動到 CPU 核數多的節點。
curl -XPOST {集群地址}/_cluster/reroute -d'{"commands":[{"move":{"index":"{索引名稱}","shard":5,"from_node":"源node名稱","to_node":"目標node名稱"}}]}' -H "Content-Type:application/json"
以上優化的效果:
經過以上的優化,我們發現寫入性能提升有限。因此,需要深入查看寫入的瓶頸點。
-
在 CPU 使用率高的節點使用 Arthas 觀察線程
-
打印阻塞的線程堆棧
"elasticsearch[ES-077-079][bulk][T#3]" Id=247 WAITING on java.util.concurrent.LinkedTransferQueue@369223fa
at sun.misc.Unsafe.park(Native Method)
- waiting on java.util.concurrent.LinkedTransferQueue@369223fa
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:737)
at java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:647)
at java.util.concurrent.LinkedTransferQueue.take(LinkedTransferQueue.java:1269)
at org.elasticsearch.common.util.concurrent.SizeBlockingQueue.take(SizeBlockingQueue.java:161)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
從上面的線程堆棧我們可以看出線程處於等待狀態。
關於這個問題的討論詳情查看 https://discuss.elastic.co/t/thread-selection-and-locking/26051/3,這個 issue 討論大致意思是:節點數不夠,需要增加節點。於是我們又增加節點並通過設置索引級別的 total_shards_per_node 參數將索引 shard 的寫入平均到各個節點上。
-
線程隊列優化
ES 是將不同種類的操作(index、search…)交由不同的線程池執行,主要的線程池有三:index、search 和 bulk thread_pool。線程池隊列長度配置按照官網默認值,我覺得增加隊列長度而集群本身沒有很高的處理能力線程還是會 await(事實上實驗結果也是如此在此不必贅述),因為實驗節點機器是 56 核,對照官網:
因此修改 size 數值為 56。
經過以上的優化,我們發現在 kafka 中的 topic 積壓有明顯變少的趨勢:
-
index buffer size 的優化
參照官網:
indices.memory.index_buffer_size : 10%
-
translog 優化:
索引寫入 ES 的基本流程是:
-
數據寫入 buffer 緩沖和 translog;
-
每秒 buffer 的數據生成 segment 並進入內存,此時 segment 被打開並供 search 使用查詢;
-
buffer 清空並重復上述步驟 ;
-
buffer 不斷添加、清空 translog 不斷累加,當達到某些條件觸發 commit 操作,刷到磁盤;
ES 默認的刷盤操作為 Request 但容易部分操作比較耗時,在日志型集群、允許數據在刷盤過程中少量丟失可以改成異步 async。
另外一次 commit 操作是在 translog 達到某個閾值執行的,可以把大小(flush_threshold_size )調大,刷新間隔調大。
index.translog.durability : async
index.translog.flush_threshold_size : 1gb
index.translog.sync_interval : 30s
效果:
-
Flink 反壓從打滿 100% 降到 40%(output buffer usage):
-
kafka 消費組里的積壓明顯減少:
總結
當 ES 寫入性能遇到瓶頸時,我總結的思路應該是這樣:
-
看日志,是否有字段類型不匹配,是否有臟數據。
-
看 CPU 使用情況,集群是否異構
-
客戶端是怎樣的配置?使用的 bulk 還是單條插入
-
查看線程堆棧,查看耗時最久的方法調用
-
確定集群類型:ToB 還是 ToC,是否允許有少量數據丟失?
-
針對 ToB 等實時性不高的集群減少副本增加刷新時間
-
index buffer 優化 translog 優化,滾動重啟集群