1.前言
本文主要基於實踐過程中遇到的一系列問題,來詳細說明Flink的狀態后端是什么樣的執行機制,以理解自定義函數應該怎么寫比較合理,避免踩坑。
內容是基於Flink SQL的使用,主要說明自定義聚合函數的一些性能問題,狀態后端是rocksdb。
2.Flink State
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html
上面是官方文檔,這里按照個人思路快速理解一下重要內容:
在Flink中,最底層的接口是Function, 往上就是Stateful Function。函數的具體實現可以理解為operator,分為Keyed Operator和一般的Operator,簡單理解就是實時流數據需不需要分組處理。
正是因為有了兩大類算子,所以狀態也分成了Keyed State和Operator State。State是什么?個人理解就是一個臨時存儲的數據集,至於為什么需要臨時存儲很好理解:通常我們都需要實時統計一些結果,但是數據流是一條條處理的,必須保存中間狀態。比如sum函數,要從state中get之前的結果,加上本次的結果,再put到state中。又比如join操作,需要嘗試獲取join對象存不存在,保存自己的本次對象,便於其他數據進行join。
通過上面描述,可以看出一般聚合等涉及到多條數據的操作,都是需要保存狀態的,否則一條條記錄處理(比如提取某個字段的值),前后沒有關聯,自然不需要狀態了,前者就是Keyed State。那Operator State為什么存在?實際使用中,該狀態主要是用於保存source的消費位點,以便failover重新啟動的時候能夠找到正確的消費位置,這是Flink的一致性很重要的地方。
臨時的數據集臨時的原因在於:流是沒有邊界的,數據會不斷增大,不說內存,哪怕是磁盤容量,以及checkpoint操作性能問題,也無法做到無限狀態。所以每個state都需要設置ttl時間,判斷這個臨時的數據需要保存多久。比如你要統計每天的數據,那可能要保存24個小時以上,A數據0點出現一次,24點出現一次,保存的時間小於ttl,第一次的數據就會被清除,導致最后結果錯誤,24小時以上需要考慮數據延遲到達的問題。
被管理的狀態有以下幾種:ValueState,ListState,ReducingState,AggregatingState,FoldingState(廢棄),MapState。
Flink目前提供了3種狀態后端:Memory, Fs,Rocksdb。這些狀態后端必須實現上面所管理的狀態,所以新增狀態后端的時候一樣需要。
3.udaf與Flink序列化
Flink允許用戶編寫UDAF自定義聚合函數來滿足特殊的計算需求,這里就會存在一個令人疑惑的問題:用戶的代碼是無法控制的,那異常重啟的時候怎么恢復用戶代碼的數據呢?其實很簡單,將用戶的代碼生成對象,在checkpoint的時候一並序列化保存就好了。等到異常重啟的時候,反序列化就可以了。下面談談flink是如何序列化對象的。
3.1 Flink的類型與序列化
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html
你可以在flink-core包中org.apache.flink.api.java.typeutils, org.apache.flink.api.common.typeutils中找到大量與之相關的內容
Flink實現了:
1.所有的java基礎類型,包括封裝類型,以及Void,String,Date,BigDecimal,BigInteger, org.apache.flink.api.common.typeutils
2.基本類型數組,及對象數組 org.apache.flink.api.common.typeutils
3.組合類型:Tuple, Row, Pojo, Scala。實現都在org.apache.flink.api.java.typeutils
4.輔助類型:List, Map等
5.一般類型:這些不會被Flink序列化,而是被Kryo。 所以不被flink識別的Class,以及沒有自定義序列化器可以匹配的時候,都會使用Kryo進行序列化。但是牢記,Kryo不是萬能的,所以最好自己定義。
如果你不想用kryo序列化,希望程序拋出異常,以便確定自定義序列化是否缺失,可以禁用kryo: env.getConfig().disableGenericTypes();
3.2 用戶自定義狀態序列化
導讀中也說明了,如果使用的是Flink自己的序列化器,本節可以忽略掉,故不作更多說明。
4. rocksdb狀態后端問題
通過上面的介紹,可以明白自定義的函數對象都是需要序列化和反序列化的,這樣確保了異常重啟后狀態可以恢復。但是實際上不同的狀態后端處理方式是不一樣的,這也是本文想說明的內容。
上面提到flink提供了三種狀態后端,分別是基於內存,文件和rocksdb,但只有rocksdb支持增量式存儲。這其中是有什么不同之處呢?本文不討論rocksdb如何實現增量的,主要集中在rocksdb狀態后端相比於FsStateBackend有什么區別,會引發什么問題。
FsStateBackend是基於文件、全量存儲的,簡單猜測一下就可以知道其所有數據都在內存中,等到checkpoint的時候全量序列化寫入文件。實際上也確實如此:createKeyedStateBackend創建的是HeapKeyedStateBackend,對應的都是HeapListState, HeapValueState等等內容,和MemoryStateBackend沒什么區別。以HeapValueState為例,其value和update方法沒有進行多余的操作,只是簡單的從statTable中獲取和放回。
RocksDBStateBackend可以用相同的邏輯查看,其用的是完全不同的體系:RocksDBKeyedStateBackend,RocksDBValueState,RocksDBListState等。以RocksDBValueState為例,它用來存儲的並不是stateTable,而是rocksdb對象,每次獲取都需要從rocksdb讀取,然后反序列化成相應的對象,更新都需要序列化,然后更新rocksdb里面的內容。
通過上面的描述就會發現問題,rocksdb的狀態每次使用都需要序列化和反序列化,如果對象狀態太大,必然會帶來性能問題。
4.1 udaf運行過程
我們都知道udaf都有一個accumulator,這個肯定是需要被Flink管理的,那么具體是如何做到的呢?通過程序斷點可以看見執行過程:
1.自定義的聚合函數都被封裝成了:GroupAggProcessFunction,執行processElement。
可以看見里面的調用邏輯,首先注冊狀態清除定時器,然后state.value()獲取當前的accumulator,沒有就會調用function的createAccumulators方法初始化。
然后調用accumulate方法計算,獲取計算結果,后面就是更新accumulator和其他數據,輸出本次計算結果了。
2.state.value()執行的是ValueState,這個取決於所使用的狀態后端,這里探討的就是RocksDBValueState。
其從rocksdb中獲取序列化后的字符串,然后將其反序列化。這個就是問題所在。
通過上述過程我們發現,使用rocksdb狀態后端的時候,執行每一條數據,其對象都是需要序列化和反序列化的,而FsStateBackend使用的是內存,不會做額外操作。
如果聚合函數狀態對象過大,這個地方就可能成為性能瓶頸。
4.2 distinct
按照上述描述distinct去重函數也應該會是一個大對象,需要收集所有數據才對,實際使用過程中並沒有感知到很慢,這是怎么做到的呢?
這里要介紹一個重點內容:MapView。Flink操作distinct是通過類DistinctAccumulator完成的,其內部使用的是MapView。
可以發現,MapView會被翻譯成RocksDBMapState,accumulator序列化的時候會忽略掉這個字段,使用的時候都是操作的RocksDBMapState,對單條數據進行操作。
所以聚合函數對象不要使用大對象,盡量拆分成小對象,充分利用前面提到的ListState,MapState操作,否則在rocksdb做狀態后端時會引發性能問題。
AggregationCodeGenerator這個就是用來包裝聚合相關代碼的了,其中有個函數addAccumulatorDataViews()會將MapView替換成StateMapView。
// create DataViews val descFieldTerm = s"${dataViewFieldTerm}_desc" val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName val descDeserializeCode = s""" | $descClassQualifier $descFieldTerm = ($descClassQualifier) | ${classOf[EncodingUtils].getCanonicalName}.decodeStringToObject( | "$serializedData", | $descClassQualifier.class, | $contextTerm.getUserCodeClassLoader()); |""".stripMargin val createDataView = if (dataViewField.getType == classOf[MapView[_, _]]) { s""" | $descDeserializeCode | $dataViewFieldTerm = new ${classOf[StateMapView[_, _]].getCanonicalName}( | $contextTerm.getMapState( | (${classOf[MapStateDescriptor[_, _]].getCanonicalName}) $descFieldTerm)); |""".stripMargin } else if (dataViewField.getType == classOf[ListView[_]]) { s""" | $descDeserializeCode | $dataViewFieldTerm = new ${classOf[StateListView[_]].getCanonicalName}( | $contextTerm.getListState( | (${classOf[ListStateDescriptor[_]].getCanonicalName}) $descFieldTerm)); |""".stripMargin } else { throw new CodeGenException(s"Unsupported dataview type: $dataViewTypeTerm") } reusableOpenStatements.add(createDataView)
這就是一個基本過程。
5. 尷尬的選擇 BloomFilter
這是本人所遇到的一個問題:
面對大量數據的去重操作,有時候我們並不需要過於精准,如果去重內容是整型,可以使用bitmap進行精確去重
但是很多時候數據都是字符串,比如設備號,如果像Kylin一樣存在類似Global dictionary,可以為設備號生成一一映射的整型id,使用精確去重,但大多數情況下,我們只能選擇bloomFilter或者hyperloglog。
這里僅對bloomFilter進行討論,因為hyperloglog的使用的內存太少了,狀態后端FsStateBackend足夠了。
BloomFilter不一樣,單個BloomFilter也可能達到500MB,如果有幾千個組的group by計算不同頁面,坑位的數據,如果使用FsStateBackend是無法接受的。
我看到網上大部分使用BloomFilter都是使用ValueState<BloomFilter>,像我所說的,如果只有十幾個組的,內存消耗也不過幾個G,FsStateBackend足夠勝任,但是幾千個就不太適合了。
此處說明一些坑點,以及尷尬之處:
1.Guava提供的BloomFilter使用rocksdb時有嚴重的性能問題,可能需要自定義序列化方式,沒有測試過,改為Stream-lib提供的
2.像上述描述的,BloomFilter其實是一個大狀態,每次序列化全量是無法接受的。bloom filter本質上是一個Long[],由於ListState不能通過下標來獲取對應的對象,所以使用MapState,鍵是index,值是對應的Long。
3.根據bloom原理,需要多次hash,導致讀寫放大了N倍,任務運行到后面越來越慢
4.改成FsStateBackend性能暴增,問題是checkpoint慢,內存消耗大,原本目的就是解決內存消耗問題,采取rocksdb的增量保存,使用FsStateBackend返回回到了起點。
通過上述描述:可以明白如果使用FsStateBackend,性能確實沒問題,但是是全量內存使用,還是那個問題,幾千個group,內存消耗還是過大。如果使用Rocksdb,會發現讀寫放大,memTable命中率不高,性能越往后越差。
上述已經嘗試將大對象改成Map,減少全量序列化,性能比未改之前提升幾十倍以上,但是還是很慢。直接原因就是多次hash對比DistinctAccumulator造成讀寫放大,實際上性能也是其的1/5不到。如果使用BloomFilter的FsStateBackend比Rocksdb下distinct耗費的內存更多(前提是distinct滿足性能要求),那得不償失,這就是目前我面臨的問題。最后,如果存在混合使用的場景(部分字段需要精確去重),使用FsStateBackend就更尷尬了,這導致distinct的也是全量在內存之中,這也是我沒有使用hyperloglog的原因之一,其在rocksdb狀態下性能也很差(也許我應該自己開發hyperloglog的flink實現 -。- 暫未嘗試,先解決bloomFilter的問題)。
后續優化測試中的內容:
1.stream-lib的bloom過濾器是可以merge的,只要hashcount相同。實驗可以發現,初始化元素個數10w-3000w錯誤率0.01得到的hashCount都是5,但是表現不同。10w個使用了更少的容量,數據標記位更集中。
考慮到數據分布的不均衡,可以對其做動態的擴容,而不是每個group都使用最大值的那個,這樣可以再提升一波性能。但是存在的問題是由於容量發生了改變,舊有的數據位置出現了變動,更容易發生誤判,需要權衡。
2.對rocksdb的參數調優
https://issues.apache.org/jira/browse/FLINK-10993 社區也討論過,不知道為什么后面就涼了。
另外,針對上面的這種場景,是否有更好的解決方法,請留言給我。
6.總結
如何寫好一個udaf?
1.定義的accumulator盡量小,否則在rocksdb情況下每次序列化會消耗大量時間
2.需要明確自定義的accumulator使用的序列化規則,否則默認會使用kryo,而kryo不是萬能的,在某些情況性能極差,當然大部分情況還是可以的。所以觀察到性能瓶頸的時候,考慮這個地方。
3.accumulator無法變小,考慮使用MapView最終生成的MapState盡量減少序列化的內容
4.FsStateBackend和RocksdbBackend在某些情況下很尷尬,互有不足,需要權衡,或者自己開發一個適應場景的高效工具
5.上面內容都是基於1.8版本的,之前之后的版本有什么坑不在討論范疇,本文僅提供思路,需要具體問題具體分析