借助flink來對es的寫入性能進行調優


轉載:https://my.oschina.net/u/2828172/blog/443419

 

背景說明

 

線上業務反應使用 Flink 消費上游 kafka topic 里的軌跡數據出現 backpressure,數據積壓嚴重。單次 bulk 的寫入量為:3000/50mb/30s,並行度為 48。針對該問題,為了避免影響線上業務申請了一個與線上集群配置相同的 ES 集群。本着復現問題進行優化就能解決的思路進行調優測試。

 

測試環境

 

  • Elasticsearch 2.3.3

  • Flink 1.6.3

  • flink-connector-elasticsearch 2_2.11

  • 八台 SSD,56 核 :3 主 5 從

 

Rally 分布式壓測 ES 集群

 

 

  • 從壓測結果來看,集群層面的平均寫入性能大概在每秒 10 w+ 的 doc。

 

Flink 寫入測試

 

  • 配置文件

 

1config.put("cluster.name", ConfigUtil.getString(ES_CLUSTER_NAME, "flinktest"));2config.put("bulk.flush.max.actions", ConfigUtil.getString(ES_BULK_FLUSH_MAX_ACTIONS, "3000"));3config.put("bulk.flush.max.size.mb", ConfigUtil.getString(ES_BULK_FLUSH_MAX_SIZE_MB, "50"));4config.put("bulk.flush.interval.ms", ConfigUtil.getString(ES_BULK_FLUSH_INTERVAL, "3000"));

 

  • 執行代碼片段

 

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); initEnv(env); Properties properties = ConfigUtil.getProperties(CONFIG_FILE_PATH); //從kafka中獲取軌跡數據 FlinkKafkaConsumer010<String> flinkKafkaConsumer010 = new FlinkKafkaConsumer010<>(properties.getProperty("topic.name"), new SimpleStringSchema(), properties); //從checkpoint最新處消費 flinkKafkaConsumer010.setStartFromLatest(); DataStreamSource<String> streamSource = env.addSource(flinkKafkaConsumer010);10//Sink2ESstreamSource.map(s -> JSONObject.parseObject(s, Trajectory.class))    .addSink(EsSinkFactory.createSinkFunction(new TrajectoryDetailEsSinkFunction())).name("esSink");env.execute("flinktest");

 

  • 運行時配置

 

任務容器數為 24 個 container,一共 48 個並發。savepoint 為 15 分鍾:

 

 

  • 運行現象

 

(1)source 和 Map 算子均出現較高的反壓

 

 

(2)ES 集群層面,目標索引寫入速度寫入陡降

 

平均 QPS 為:12 k 左右。

 

(3)對比取消 sink 算子后的 QPS

 

streamSource.map(s -> JSONObject.parseObject(s, FurionContext.class)).name("withnosink");

 

 

平均QPS為:116 k 左右。

 

有無sink參照實驗的結論:

 

取消 sink 2 ES 的操作后,QPS 達到 110 k,是之前 QPS 的十倍。由此可以基本判定: ES 集群寫性能導致的上游反壓

 

優化方向

 

  • 索引字段類型調整

 

 

bulk 失敗的原因是由於集群 dynamic mapping 自動監測,部分字段格式被識別為日期格式而遇到空字符串無法解析報錯。


解決方案:關閉索引自動檢測。

 

 

效果: ES 集群寫入性能明顯提高但 Flink operator 依然存在反壓:

 

 

  • 降低副本數

 

curl -XPUT{集群地址}/{索引名稱}/_settings?timeout=3m -H "Content-Type: application/json" -d'{"number_of_replicas":"0"}'

 

  • 提高 refresh_interval

 

針對這種 ToB、日志型、實時性要求不高的場景,我們不需要查詢的實時性,通過加大甚至關閉 refresh_interval 的參數提高寫入性能。

 

curl -XPUT{集群地址}/{索引名稱}/_settings?timeout=3m -H "Content-Type: application/json" -d '{ "settings": {  "index": {"refresh_interval" : -1   }   }  }'

 

  • 檢查集群各個節點 CPU 核數

 

在 Flink 執行時,通過 Grafana 觀測各個節點 CPU 使用率以及通過 Linux 命令查看各個節點 CPU 核數。發現 CPU 使用率高的節點 CPU 核數比其余節點少。為了排除這個短板效應,我們將在這個節點中的索引 shard 移動到 CPU 核數多的節點。

 

curl -XPOST {集群地址}/_cluster/reroute -d'{"commands":[{"move":{"index":"{索引名稱}","shard":5,"from_node":"源node名稱","to_node":"目標node名稱"}}]}' -H "Content-Type:application/json"

 

以上優化的效果:

 

 

經過以上的優化,我們發現寫入性能提升有限。因此,需要深入查看寫入的瓶頸點。

 

  • 在 CPU 使用率高的節點使用 Arthas 觀察線程

     

 

  • 打印阻塞的線程堆棧

 

 "elasticsearch[ES-077-079][bulk][T#3]" Id=247 WAITING on java.util.concurrent.LinkedTransferQueue@369223fa at sun.misc.Unsafe.park(Native Method)     -  waiting on java.util.concurrent.LinkedTransferQueue@369223fa at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.LinkedTransferQueue.awaitMatch(LinkedTransferQueue.java:737) at java.util.concurrent.LinkedTransferQueue.xfer(LinkedTransferQueue.java:647) at java.util.concurrent.LinkedTransferQueue.take(LinkedTransferQueue.java:1269) at org.elasticsearch.common.util.concurrent.SizeBlockingQueue.take(SizeBlockingQueue.java:161) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

 

從上面的線程堆棧我們可以看出線程處於等待狀態。

 

關於這個問題的討論詳情查看 https://discuss.elastic.co/t/thread-selection-and-locking/26051/3,這個 issue 討論大致意思是:節點數不夠,需要增加節點。於是我們又增加節點並通過設置索引級別的 total_shards_per_node 參數將索引 shard 的寫入平均到各個節點上。

 

  • 線程隊列優化

 

ES 是將不同種類的操作(index、search…)交由不同的線程池執行,主要的線程池有三:index、search 和 bulk thread_pool。線程池隊列長度配置按照官網默認值,我覺得增加隊列長度而集群本身沒有很高的處理能力線程還是會 await(事實上實驗結果也是如此在此不必贅述),因為實驗節點機器是 56 核,對照官網:

 

 

因此修改 size 數值為 56。

 

 

經過以上的優化,我們發現在 kafka 中的 topic 積壓有明顯變少的趨勢:

 

 

  • index buffer size 的優化

 

參照官網:

 

 

indices.memory.index_buffer_size : 10%

 

  • translog 優化:

 

索引寫入 ES 的基本流程是:

 

  • 數據寫入 buffer 緩沖和 translog; 

  • 每秒 buffer 的數據生成 segment 並進入內存,此時 segment 被打開並供 search 使用查詢; 

  • buffer 清空並重復上述步驟 ;

  • buffer 不斷添加、清空 translog 不斷累加,當達到某些條件觸發 commit 操作,刷到磁盤;

 

ES 默認的刷盤操作為 Request 但容易部分操作比較耗時,在日志型集群、允許數據在刷盤過程中少量丟失可以改成異步 async。

 

另外一次 commit 操作是在 translog 達到某個閾值執行的,可以把大小(flush_threshold_size )調大,刷新間隔調大。

 

 

index.translog.durability : asyncindex.translog.flush_threshold_size : 1gbindex.translog.sync_interval : 30s

 

效果:

 

  • Flink 反壓從打滿 100% 降到 40%(output buffer usage):

     

 

  • kafka 消費組里的積壓明顯減少:

     

 

總結

 

當 ES 寫入性能遇到瓶頸時,我總結的思路應該是這樣:

 

  • 看日志,是否有字段類型不匹配,是否有臟數據。

  • 看 CPU 使用情況,集群是否異構

  • 客戶端是怎樣的配置?使用的 bulk 還是單條插入

  • 查看線程堆棧,查看耗時最久的方法調用

  • 確定集群類型:ToB 還是 ToC,是否允許有少量數據丟失?

  • 針對 ToB 等實時性不高的集群減少副本增加刷新時間

  • index buffer 優化 translog 優化,滾動重啟集群

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM