1.前言
之前幾篇文章都是圍繞Rocksdb狀態后端引申出來的一系列問題,本文主要介紹一下Rocksdb作為狀態后端的一些技術細節,以及Flink的狀態抽象是如何設計的,為開發一個新的狀態后端做指導。
本文基於Flink 1.8.2,目前Flink版本處於一種快速變化的過程,所以可能不適用於高版本,但仍有借鑒意義。
2.結構
Flink的狀態定義都在flink-runtime包下,路徑是org.apache.flink.runtime.state,里面有若干子模塊,可以看見分成filesystem(文件狀態后端),heap(通用的內存操作結構),memory(內存狀態后端),metainfo(狀態的元數據描述),ttl(狀態過期清除的一些設置)。其他包的根節點類都是一些狀態的抽象定義,包括抽象狀態后端,抽象key狀態后端,抽象快照策略,checkpoint觸發listener,checkpoint存儲,checkpoint流等等,總的來說就是狀態后端,checkpoint保存,restore恢復,keyGroup和其他的一些相關內容。
RocksdbStateBackend不是Flink內置的狀態,需要在pom.xml中引入flink-statebackend-rocksdb_${scala.version},其包結構也十分簡單:操作rocksdb的迭代器封裝,恢復,快照和基本狀態后端實現類。
3.RocksDB
3.1 快照
快照就是當前瞬間的一個狀態保存,這個基本上就是配合checkpoint使用的了。
- 接口是:SnapshotStrategy,只有一個方法,snapshot,參數是checkpointId, timestamp, checkpointStreamFactory,checkpointOptions。
該方法調用地方主要是AbstractStreamOperator的snapshotState,看名稱可以理解是在某個時間點,保存某次checkpoint的數據,streamFactory提供了快照數據輸出的流,用於持久化數據,在CheckpointStorage.resolveCheckpointStorageLocation的時候,會創建這個對象,保證提供了輸出流。
- 抽象類是:AbstractSnapshotStrategy,其沒有做任何操作,只提供了日志輸出,日志輸出需要描述字段,限制了一下接口泛型。
- Rocksdb繼承了AbstractSnapshotStrategy,也是一個抽象類:RocksDBSnapshotStrategyBase,同時其實現了CheckpointListener接口。
構造函數需要父類的描述,rocksdb對象,key序列化器,keyGroup的范圍及前綴,recovery配置,和kvState的元數據,資源控制。其沒有什么復雜的邏輯,如果kvState沒有,直接返回成功,否則交給子類做處理。
結果對象是SnapshotResult,里面有jobManagerOwnedSnapshot和taskLocalSnapshot,都是繼承了StateObject
- 實現類有兩個,一個是RocksFullSnapshotStrategy,另一個是RocksIncrementalSnapshotStrategy。全量快照用於savepoint,以及未開啟增量模式的checkpoint,增量快照自然是用於增量模式的checkpoint。具體創建可以看方法RocksDBKeyedStateBackendBuilder.initializeSavepointAndCheckpointStrategies.
- RocksFullSnapshotStrategy的描述字段是Asynchronous incremental RocksDB snapshot,不知道是不是打錯了-.-. 和增量快照一樣。其notifyCheckpointComplete方法啥也沒做,具體快照步驟是:
- 如果允許本地恢復且類型不是SAVEPOINT(state.backend.local-recovery默認是false不允許),會調用CheckpointStreamWithResultProvider.createDuplicatingStream,否則調用CheckpointStreamWithResultProvider.createSimpleStream,后者一般是默認選擇。所以,使用本地恢復的區別在於兩種不同的方式,前者會輸出一份到本地恢復的文件夾,后者只會輸出到streamFactory提供的輸出目的地。
- 復制kvState的元數據,不同的元數據有不同的實現,類型看StateMetaInfoSnapshot,存在KEYVALUE,OPERATOR,BROADCAST,PRIORITY_QUEUE.
- 然后從資源控制獲取令牌,看ResourceGuard的邏輯可以看到,其要close必須所有的令牌都被close了,是一種保護措施了。
- 最后得到rocksdb的一個snapshot對象。
- 根據上面步驟得到的對象,生成SnapshotAsynchronousPartCallable,最后執行toAsyncSnapshotFutureTask,返回一個RunnableFuture對象,執行邏輯都在callInternal()方法里面
- 先創建KeyGroupOffset對象,獲取checkpoint輸出流提供者,然后注冊到snapshotCloseableRegistry,這樣最后關閉的時候會關閉流。然后調用writeSnapshotToOutputStream寫入數據,最后判斷關閉。
- 寫入數據流,先創建每個kvState對應的元數據,並提供編號,會為每個元數據創建一個rocksdb的列族迭代器,flink使用的rocksdb時,每個state都是一個列族。然后會寫入一些必要信息,操作都是在KeyedBackendSerializationProxy.write中,是否壓縮,寫入key之類的,最后寫入元數據的快照。第二步就是寫入狀態的真實數據了,用的就是第一步生成的每個rocksdb列族的迭代器,其會使用RocksStatesPerKeyGroupMergeIterator將所有的kvState數據按照順序包裝成一個迭代器,這樣操作方便,順序與之前寫入的元數據基本相同,除非沒有可迭代的內容。https://developer.aliyun.com/article/667562?spm=a2c6h.13262185.0.0.3d747e185enfna 這篇文章介紹了keyGroup的概念,寫入也會按組的方式設置相關數據。
- RocksIncrementalSnapshotStrategy是增量式快照,具體步驟如下:
- 准備本地臨時文件夾,然后使用rocksdb的checkpoint,寫入臨時文件,獲取狀態元數據備份
- 寫入全量元數據到目標輸出流,上傳當前此次checkpoint產生的sst文件。后面就沒啥內容了
上圖就是全量快照和增量快照的一個時序圖了。起點都是streamOperator觸發快照函數,選擇對應的快照策略。全量快照是通過rocksdb的snapshot進行的,會將其放入readOptions,先通過快照獲取rocksdb的迭代器和元數據信息,元數據直接寫入StreamFactory提供的輸出流,再通過迭代器寫出rocksdb的KV數據。增量快照是通過rocksdb提供的checkpoint寫入本地的磁盤文件夾,然后異步的將這些文件上傳到StreamFactory提供的輸出流中,對於增量快照而言,需要保證每次checkpoint的數據都上傳了,且沒有重復,所以多了一個lastCompletedCheckpoint的值,本次checkpoint會進行比較最后一次的sst文件,防止重復上傳。
3.2 恢復
恢復就是快照的一個逆過程了。
- 接口定義:RestoreOperation,只有一個方法restore,進行恢復,返回恢復結果。
- Rocksdb接口:RocksDBRestoreOperation,限制了返回類型。
- 抽象類:AbstractRocksDBRestoreOperation,主要提供了打開rocksdb,讀取元數據的方法,注冊列族的方法。
- RocksDBNoneRestoreOperation,最簡單的恢復方法,不恢復,從0開始積累數據,重新創建rocksdb數據庫
- RocksDBFullRestoreOperation,全量數據恢復方法,打開rocksdb,遍歷restoreStateHandles,一個個進行恢復。其實也就是逆過程。
- RocksDBIncrementalRestoreOperation,增量數據恢復,這里的邏輯有點復雜,判斷是否需要Rescaling,條件有兩個滿足一個即可,restoreStateHandles數量是否>1,第一個keyStateHandler的keygroup是否與恢復的相符。
- 不需要重新調整,判斷是從遠程恢復還是本地恢復了,先恢復最后一次checkpoint的狀態,遠程的會從遠處拉取文件到本地,然后再按本地的方式進行恢復。讀取元數據,獲取列族信息,將文件拷貝到rocksdb的數據庫路徑,打開rocksdb,注冊列族等
- 需要重新調整,會先判斷一個最合適用於恢復的KeyedStateHandle,判斷標准在RocksDBIncrementalCheckpointUtils的STATE_HANDLE_EVALUATOR中,需要目標的keyGroup數量占現有的keyedStateHandler的75%。如果找到了就用這個進行恢復,沒找到簡單的打開數據庫。然后就要開始修復缺少了的keyGroupRange數據了,找到目標的開始鍵前綴和結束鍵前綴。從遠程拉取所有文件,遍歷進行查找需要的key的數據,批量寫入rocksdb。增量式就這樣恢復了需要的內容。
上圖是全量恢復的圖了,StreamTaskStateInitializerImpl會提供一個需要恢復的KeyedStateHandle集合,然后創建想對應的狀態后端,創建過程中會打開Rocksdb數據庫,通過KeyedStateHandler提供的輸入流讀取數據,先恢復元數據,然后恢復寫入rocksdb的狀態。增量狀態恢復是差不多的,所以沒有使用時序圖,描述見上面,其需要判斷是否調整keygroup。重點是keyedStateHandler是哪里來的,其通過TaskStateManager綁定固定的operatorID,所以恢復的時候這個值需要固定。通過jobmanager獲取state和最后一次checkpoint信息,恢復狀態。
3.3 狀態過期
在文章《由Rocksdb狀態后端引出的Tree的應用》中簡單的介紹了LSM樹的一些特性,可以了解到LSM樹是無法修改數據的,意味着也無法刪除,唯一能夠操作的地方就是compact的時候,可以將不要的數據丟棄。
Flink中Rocksdb的狀態清除也十分簡單,主要邏輯在RocksDbTtlCompactFiltersManager中,通過類RocksDBOperationUtils調用。就是在每次創建列族描述的時候,開啟了ttl就為每個state注冊一個FlinkCompactionFilterFactory,這個類在rocksdb里面,是rocksdb的java版實現提供給flink設置的,后面會為每個state對應的FlinkCompactionFilterFactory配置ttl參數,這樣rocksdb就可以清理state了。
3.4 狀態流程
RocksDBStateBackend也沒有太多的內容了,剩下的類可以快速分個類:
- 核心類:
- AbstractRocksDBState
- RocksDBKeyedStateBackend
- RocksDBKeyedStateBackendBuilder
- RocksDBStateBackend
- RocksDBStateBackendFactory
- 配置類:
- DefaultConfigurableOptionsFactory:配置工廠類,顧名思義,專門生成配置對象的類
- PredefinedOptions:一些預定義的配置,方便快速設置:DEFAULT(默認使用),SPINNING_DISK_OPTIMIZED(機械硬盤優化),SPINNING_DISK_OPTIMIZED_HIGH_MEM(內存充足時的機械硬盤優化),FLASH_SSD_OPTIMIZED(固態硬盤優化)這四種。默認參數在RocksDBOptions中有顯示:state.backend.rocksdb.predefined-options
- RocksDBConfigurableOptions:一些可配置的參數
- RocksDBNativeMetricOptions:可開啟的rocksdb的監控項配置,默認全部未開啟
- RocksDBOptions:一些全局配置,並非詳細的配置項,更多與Flink有關
- RocksDBProperty:rocksdb的jni相關參數
- 需要實現的類:
- RocksDBAggregatingState
- RocksDBFoldingState
- RocksDBListState
- RocksDBMapState
- RocksDBReducingState
- RocksDBValueState
- 功能類:
- RocksDBCachingPriorityQueueSet:內存的優先隊列,於RocksDBPriorityQueueSetFactory創建。InternalTimeServiceManager使用。
- RocksDBPriorityQueueSetFactory:提供的rocksdb的TIMER_SERVICE_FACTORY,默認使用的是內存,選擇rocksdb這個生效,其創建的是RocksDBCachingPriorityQueueSet。
- RocksDBIncrementalCheckpointUtils:增量式狀態后端的工具類
- RocksDBKeySerializationUtils:處理鍵的序列化工具類
- RocksDBNativeMetricMonitor:rocksdb的監控指標管理
- RocksDBOperationUtils:操作rocksdb的工具類
- RocksDBSerializedCompositeKeyBuilder:封裝了序列化組合件的相關方法
- RocksDBSnapshotTransformFactoryAdaptor:一個裝飾器
- RocksDBStateDataTransfer:傳輸的線程配置
- RocksDBStateDownloader:下載保存的狀態文件(增量狀態)
- RocksDBStateUploader:上傳保存的狀態文件(增量狀態)
- RocksDBWriteBatchWrapper:批量寫入封裝
- RocksIteratorWrapper:rocksdb的迭代器封裝,做接口隔離
以上就是rocksdb的一個基本組成了,要學習主要是從statebackend需要實現的方法來看,然后這些方法具體在什么位置調用的。resolveCheckpoint就是由jobMaster操控的,要不是手動觸發,要不是停止任務時需要進行savepoint。createCheckpointStorage是在Task的run方法,invoke調用的,生成的是checkpointStorage對象,這個對象是用於上面快照提供StreamFactory用的。看rocksdb會被大量的文件路徑迷惑,只有createCheckpointStorage這個是提供給checkpoint使用的路徑(可以在StreamTask中查找到相關方法),其他的路徑是本地存儲rocksdb文件使用的,另外是臨時存儲jni文件使用的。operatorStatebackend使用的依舊是flink自帶的DefaultOperatorStateBackend,只有keyedStateBackend才是rocksdb開發的。
Rocksdb的狀態后端基本就到此結束,主要包含的內容和一些邏輯介紹的差不多了,剩下就是Flink是執行checkpoint的細節問題,這里留一個之前一直迷惑的問題:關於內存設置的迷惑,不清楚的可以先看《Flink內存設置思路》。Flink1.8按照查找到的資料來看,rocksdb使用的是non-heap內存,對於容器啟動1.8只有堆和非堆,兩個參數加起來占滿了整個容器。但是對於1.10而言,其除了堆和非堆,還有很大一塊未分配的內存,解釋是用於native內存,這塊不歸jvm管理,rocksdb使用的就是這塊內存。這就存在一個疑問了,1.8為什么使用的是非堆內存而1.10就是native內存呢,這兩者有什么變化嗎?
看MapState等的執行操作,發現依舊是使用native方法,將序列化后的字符串放入。但是看依賴可以發現依賴了不同版本的frocksdbjni.
可以看見1.10版本的rocksdb在jni層面提供了java可以干涉的配置項。實際1.10版本多了配置類RocksDBMemoryConfiguration,其都被RocksDBOperationUtils使用。對於1.10和1.8版本,可以發現1.10版本新增了allocateSharedCachesIfConfigured方法,其會生成一個RocksDBSharedResources對象,里面就包含了圖片中所說的writeBufferManager,然后這個對象會被包裝成resourceContainer,被keyedStateBackend持有。主要影響的位置在ResourceContainer的getColumnOptions方法,如果存在資源管控,就會設置columnOptions。也就是為column進行參數設置,其他的都沒有發生變化,在jni內部處理邏輯上產生的影響。剩下jni層為什么1.8是non-heap,1.10是native就沒有繼續深入了,上面是一個內存管控的問題,如果有哪位知道最初的那個問題,可以留言,或者說有其他解釋。