Flink 實時寫入數據到 ElasticSearch 性能調優
背景說明
線上業務反應使用Flink消費上游kafka topic里的軌跡數據出現backpressure,數據積壓嚴重。單次bulk的寫入量為:3000/50mb/30s,並行度為48。針對該問題,為了避免影響線上業務申請了一個與線上集群配置相同的ES集群。本着復現問題進行優化就能解決的思路進行調優測試。
測試環境
-
elasticsearch 2.3.3
-
flink 1.6.3
-
flink-connector-elasticsearch2_2.11
-
八台SSD,56核 :3主5從
Rally分布式壓測ES集群

-
從壓測結果來看,集群層面的平均寫入性能大概在每秒10w+的doc。
Flink寫入測試
-
配置文件
1config.put("cluster.name", ConfigUtil.getString(ES_CLUSTER_NAME, "flinktest"));
2config.put("bulk.flush.max.actions", ConfigUtil.getString(ES_BULK_FLUSH_MAX_ACTIONS, "3000"));
3config.put("bulk.flush.max.size.mb", ConfigUtil.getString(ES_BULK_FLUSH_MAX_SIZE_MB, "50"));
4config.put("bulk.flush.interval.ms", ConfigUtil.getString(ES_BULK_FLUSH_INTERVAL, "3000"));
-
執行代碼片段
1final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2initEnv(env);
3Properties properties = ConfigUtil.getProperties(CONFIG_FILE_PATH);
4//從kafka中獲取軌跡數據
5FlinkKafkaConsumer010<String> flinkKafkaConsumer010 =
6 new FlinkKafkaConsumer010<>(properties.getProperty("topic.name"), new SimpleStringSchema(), properties);
7//從checkpoint最新處消費
8flinkKafkaConsumer010.setStartFromLatest();
9DataStreamSource<String> streamSource = env.addSource(flinkKafkaConsumer010);
10//Sink2ES
11streamSource.map(s -> JSONObject.parseObject(s, Trajectory.class))
12 .addSink(EsSinkFactory.createSinkFunction(new TrajectoryDetailEsSinkFunction())).name("esSink");
13env.execute("flinktest");
-
運行時配置
任務容器數為24個container,一共48個並發。savepoint為15分鍾

-
運行現象
-
source和Map算子均出現較高的反壓

-
ES集群層面,目標索引寫入速度寫入陡降
平均QPS為:12k左右
-
對比取消sink算子后的QPS
1streamSource.map(s -> JSONObject.parseObject(s, FurionContext.class)).name("withnosink");

平均QPS為:116k左右
有無sink參照實驗的結論
取消sink2ES的操作后,QPS達到110k,是之前QPS的十倍。由此可以基本判定: ES集群寫性能導致的上游反壓
優化方向
-
索引字段類型調整

bulk失敗的原因是由於集群dynamic mapping自動監測,部分字段格式被識別為日期格式而遇到空字符串無法解析報錯。
解決方案:關閉索引自動檢測

效果: ES集群寫入性能明顯提高但flink operator 依然存在反壓:

-
降低副本數
1curl -XPUT{集群地址}/{索引名稱}/_settings?timeout=3m -H "Content-Type: application/json" -d'{"number_of_replicas":"0"}'
-
提高refresh_interval
針對這種ToB、日志型、實時性要求不高的場景,我們不需要查詢的實時性,通過加大甚至關閉refresh_interval的參數提高寫入性能。
1curl -XPUT{集群地址}/{索引名稱}/_settings?timeout=3m -H "Content-Type: application/json" -d '{ "settings": { "index": {"refresh_interval" : -1 } } }'
-
檢查集群各個節點CPU核數
在flink執行時,通過Grafana觀測各個節點CPU 使用率以及通過linux命令查看各個節點CPU核數。發現CPU使用率高的節點CPU核數比其余節點少。為了排除這個短板效應,我們將在這個節點中的索引shard移動到CPU核數多的節點。
1curl -XPOST {集群地址}/_cluster/reroute -d'{"commands":[{"move":{"index":"{索引名稱}","shard":5,"from_node":"源node名稱","to_node":"目標node名稱"}}]}' -H "Content-Type:application/json"
以上優化的效果:

經過以上的優化,我們發現寫入性能提升有限
。因此,需要深入查看寫入的瓶頸點
-
在CPU使用率高的節點使用Arthas觀察線程:

-
打印阻塞的線程堆棧
1"elasticsearch[ES-077-079][bulk][T#3]" Id=247 WAITING on java.util.concurrent.LinkedTransferQueue@369223fa
2 at sun.misc.Unsafe.park(Native Method)
3 - waiting on java.util.concurrent.LinkedTransferQueue@369223fa
4 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
5 at java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:737)
6 at java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:647)
7 at java.util.concurrent.LinkedTransferQueue.take(LinkedTransferQueue.java:1269)
8 at org.elasticsearch.common.util.concurrent.SizeBlockingQueue.take(SizeBlockingQueue.java:161)
9 at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
10 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
11 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
12 at java.lang.Thread.run(Thread.java:745)
從上面的線程堆棧我們可以看出線程處於等待狀態。
關於這個問題的討論詳情查看https://discuss.elastic.co/t/thread-selection-and-locking/26051/3,這個issue討論大致意思是:節點數不夠,需要增加節點。於是我們又增加節點並通過設置索引級別的total_shards_per_node參數將索引shard的寫入平均到各個節點上)
-
線程隊列優化
ES是將不同種類的操作(index、search…)交由不同的線程池執行,主要的線程池有三:index、search和bulk thread_pool。線程池隊列長度配置按照官網默認值,我覺得增加隊列長度而集群本身沒有很高的處理能力線程還是會await(事實上實驗結果也是如此在此不必贅述),因為實驗節點機器是56核,對照官網,:

因此修改size數值為56。

經過以上的優化,我們發現在kafka中的topic積壓有明顯變少的趨勢:

-
index buffer size的優化
參照官網:

1indices.memory.index_buffer_size : 10%
-
translog優化:
索引寫入ES的基本流程是:1.數據寫入buffer緩沖和translog 2.每秒buffer的數據生成segment並進入內存,此時segment被打開並供search使用查詢 3.buffer清空並重復上述步驟 4.buffer不斷
添加、清空
translog不斷累加,當達到某些條件觸發commit操作,刷到磁盤。es默認的刷盤操作為request但容易部分操作比較耗時,在日志型集群、允許數據在刷盤過程中少量丟失可以改成異步async
另外一次commit操作是在translog達到某個閾值執行的,可以把大小(flush_threshold_size )調大,刷新間隔調大。
1index.translog.durability : async
2index.translog.flush_threshold_size : 1gb
3index.translog.sync_interval : 30s
效果:
-
flink反壓從打滿100%降到40%(output buffer usage):

-
kafka 消費組里的積壓明顯減少:

總結
當ES寫入性能遇到瓶頸時,我總結的思路應該是這樣:
-
看日志,是否有字段類型不匹配,是否有臟數據。
-
看CPU使用情況,集群是否異構
-
客戶端是怎樣的配置?使用的bulk 還是單條插入
-
查看線程堆棧,查看耗時最久的方法調用
-
確定集群類型:ToB還是ToC,是否允許有少量數據丟失?
-
針對ToB等實時性不高的集群減少副本增加刷新時間
-
index buffer優化 translog優化,滾動重啟集群
作者:張劉毅
原文鏈接:https://blog.csdn.net/dtzly/article/details/101006064
END
關注我
公眾號(zhisheng)里回復 面經、ES、Flink、 Spring、Java、Kafka、監控 等關鍵字可以查看更多關鍵字對應的文章
Flink 實戰
1、《從0到1學習Flink》—— Apache Flink 介紹
2、《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程序入門
3、《從0到1學習Flink》—— Flink 配置文件詳解
4、《從0到1學習Flink》—— Data Source 介紹
5、《從0到1學習Flink》—— 如何自定義 Data Source ?
6、《從0到1學習Flink》—— Data Sink 介紹
7、《從0到1學習Flink》—— 如何自定義 Data Sink ?
8、《從0到1學習Flink》—— Flink Data transformation(轉換)
9、《從0到1學習Flink》—— 介紹 Flink 中的 Stream Windows
10、《從0到1學習Flink》—— Flink 中的幾種 Time 詳解
11、《從0到1學習Flink》—— Flink 讀取 Kafka 數據寫入到 ElasticSearch
12、《從0到1學習Flink》—— Flink 項目如何運行?
13、《從0到1學習Flink》—— Flink 讀取 Kafka 數據寫入到 Kafka
14、《從0到1學習Flink》—— Flink JobManager 高可用性配置
15、《從0到1學習Flink》—— Flink parallelism 和 Slot 介紹
16、《從0到1學習Flink》—— Flink 讀取 Kafka 數據批量寫入到 MySQL
17、《從0到1學習Flink》—— Flink 讀取 Kafka 數據寫入到 RabbitMQ
18、《從0到1學習Flink》—— 你上傳的 jar 包藏到哪里去了
19、大數據“重磅炸彈”——實時計算框架 Flink
20、《Flink 源碼解析》—— 源碼編譯運行
21、為什么說流處理即未來?
22、OPPO數據中台之基石:基於Flink SQL構建實數據倉庫
23、流計算框架 Flink 與 Storm 的性能對比
24、Flink狀態管理和容錯機制介紹
25、原理解析 | Apache Flink 結合 Kafka 構建端到端的 Exactly-Once 處理
26、Apache Flink 是如何管理好內存的?
27、《從0到1學習Flink》——Flink 中這樣管理配置,你知道?
28、《從0到1學習Flink》——Flink 不可以連續 Split(分流)?
29、Flink 從0到1學習—— 分享四本 Flink 的書和二十多篇 Paper 論文
30、360深度實踐:Flink與Storm協議級對比
31、Apache Flink 1.9 重大特性提前解讀
32、如何基於Flink+TensorFlow打造實時智能異常檢測平台?只看這一篇就夠了
33、美團點評基於 Flink 的實時數倉建設實踐
34、Flink 靈魂兩百問,這誰頂得住?
35、一文搞懂 Flink 的 Exactly Once 和 At Least Once
36、你公司到底需不需要引入實時計算引擎?
37、Flink 從0到1學習 —— 如何使用 Side Output 來分流?
38、一文讓你徹底了解大數據實時計算引擎 Flink
39、基於 Flink 實現的商品實時推薦系統(附源碼)
40、如何使用 Flink 每天實時處理百億條日志?
41、Flink 在趣頭條的應用與實踐
42、Flink Connector 深度解析
43、滴滴實時計算發展之路及平台架構實踐
44、Flink Back Pressure(背壓)是怎么實現的?有什么絕妙之處?
45、Flink 實戰 | 貝殼找房基於Flink的實時平台建設
46、如何使用 Kubernetes 部署 Flink 應用
47、一文徹底搞懂 Flink 網絡流控與反壓機制
Flink 源碼解析

知識星球里面可以看到下面文章
長按二維碼向我轉賬

受蘋果公司新規定影響,微信 iOS 版的贊賞功能被關閉,可通過二維碼轉賬支持公眾號。