flink調優之RocksDB設置


一、開啟監控

RocksDB是基於LSM Tree實現的,寫數據都是先緩存到內存中,所以RocksDB的寫請求效率比較高。RocksDB使用內存結合磁盤的方式來存儲數據,每次獲取數據時,先從內存中blockcache中查找,如果內存中沒有再去磁盤中查詢。使用

RocksDB時,狀態大小僅受可用磁盤空間量的限制,性能瓶頸主要在於RocksDB對磁盤的讀請求,每次讀寫操作都必須對數據進行反序列化或者序列化。當處理性能不夠時。僅需要橫向擴展並行度即可提高整個Job的吞吐量。

flink1.13中引入了State訪問的性能監控,即latency tracking state、此功能不局限於State Backend的類型,自定義實現的State Backend也可以復用此功能。

 

state訪問的性能監控會產生一定的性能影響,所以默認每100次做一次抽樣sample,對不同的state Backend性能損失影響不同。

對於RocksDB State Backend,性能損失大概在1%左右

對於heap State Backend,性能損失最多可達10%(內存本身速度比較快,一點損失影響就很大)

關於性能監控的一些參數,正常開啟第一個參數即可,

state.backend.latency-track.keyed-state-enabled:true   //啟用訪問狀態的性能監控

state.backend.latency-track.sample-interval:100            //采樣間隔

state.backend.latency-track.histroy-size:128                  //保留的采樣數據個數,越大越精確

state.backend.latency-track.state-name-as-variable:true  //將狀態名作為變量

 

 0代表是任務編號,filter.visit-state是定義的狀態的變量名

 

 

 

有很多這種統計值可以查看,中位值,75分位值等。

二、RocksDB狀態優化

①開啟增量檢查點:

RocksDB是目前唯一可用於支持有狀態流處理應用程序增量檢查點的狀態后端,可以修改參數開啟增量檢查點:

state.backend.incremental:true  //默認false,可以改為true

或代碼中指定 new EmbededRocksDBStateBackend(true)

②開啟本地恢復:當flink任務失敗時,可以基於本地的狀態信息進行恢復任務。可能不需要從hdfs拉取數據。本地恢復目前僅涵蓋鍵值類型的狀態后端(RocksDB)。MemoryStateBackend不支持本地恢復並忽略此選項

state.backend.local-recovery:true

③如果你有多塊磁盤,可以考慮指定本地多目錄

state.backend.rocksdb.localdir:

/data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb

不要配置單塊磁盤的多個目錄,務必將目錄配置到多塊不同的磁盤上,讓多塊磁盤來分擔io壓力

三、增量檢查點優化效果案例

提交一個任務,具體參數如下

bin/flink run \

-t yarn-per-job \

-d \

-p 5 \

-Dyarn.application.queue=test \

-Djobmanager.memory.process.size=2048mb \

-Dtaskmanager.memory.process.size=4096mb \

-Dtaskmanager.numberOfTaskSlots=2 \

-Dstate.backend.latency-track.keyed-state-enabled=true \      //開啟狀態監控

-c com.xxx.xxx.Demo \

 

在flink ui查看狀態的監控

 

 

 

 然后重新提交任務,在提交時增加參數:

-Dstate.backend.incremental=true  \                      //開啟增量檢查點

-Dstate.backend.local-recovery=true \                    //開啟本地恢復

代碼中增加 env.setStateBackend(new EmbeddedRocksDBStateBackend())   //狀態后端使用RocksDB

 

 

查看兩張圖的checkpointed data size,可以發現,第一次任務(第一張圖)checkpoint時是全量備份,所以狀態是越來越大的,從1m+增加到了3m+, 而第二次任務它每次checkpoint的狀態大小是有大有小的,范圍在200kb-1.2m之間

再查看End to End Duration,第一次任務的狀態后端是內存存儲,而時間卻略大於第二次任務,說明增量的RocksDB的效果有可能好於全量的memory

四、調整RockSDB的預定義選項。

預定義選項就是一個選項集合,如果調整預定義選項達不到預期,再去調整block、writebuffer等參數。

當前支持的預定義選項有支持的選項有: 

DEFAULT

SPINING_DISK_OPTIMIZED

SPINNING_DISK_OPTIMIZED_HIGH_MEM

FLASH_SSD_OPTIMIZED  (有條件使用ssd的可以使用這個選項)

我們一般使用第三個SPINNING_DISK_OPTIMIZED_HIGH_MEM,設置為機械硬盤+內存模式

該模式下flink會幫我們設置一些它認為比較ok的參數(選項集合),具體如下:

可以在提交任務時指定

state.backend.rocksdb.predefined-options:SPINNING_DISK_OPTIMIZED_HIGH+MEN   

也可以在代碼中指定:

EmbededRocksDBStateBackend embededRocksDBStateBackend = new EmbededRocksDBStateBackend();

EmbededRocksDBStateBackend,setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);

env.setStateBackend(embeddedRocksDBStateBackend); 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM