Spark2.2(三十八):Spark Structured Streaming2.4之前版本使用agg和dropduplication消耗內存比較多的問題(Memory issue with spark structured streaming)調研


在spark中《Memory usage of state in Spark Structured Streaming》講解Spark內存分配情況,以及提到了HDFSBackedStateStoreProvider存儲多個版本的影響;從stackoverflow上也可以看到別人遇到了structured streaming中內存問題,同時也對問題做了分析《Memory issue with spark structured streaming》;另外可以從spark的官網問題修復列表中查看到如下內容:

1)在流聚合中從值中刪除冗余密鑰數據(Split out min retain version of state for memory in HDFSBackedStateStoreProvider)

問題描述:

HDFSBackedStateStoreProvider has only one configuration for minimum versions to retain of state which applies to both memory cache and files. As default version of "spark.sql.streaming.minBatchesToRetain" is set to high (100), which doesn't require strictly 100x of memory, but I'm seeing 10x ~ 80x of memory consumption for various workloads. In addition, in some cases, requiring 2x of memory is even unacceptable, so we should split out configuration for memory and let users adjust to trade-off memory usage vs cache miss.

In normal case, default value '2' would cover both cases: success and restoring failure with less than or around 2x of memory usage, and '1' would only cover success case but no longer require more than 1x of memory. In extreme case, user can set the value to '0' to completely disable the map cache to maximize executor memory.

修復情況:

對應官網bug情況概述《[SPARK-24717][SS] Split out max retain version of state for memory in HDFSBackedStateStoreProvider #21700》、《Split out min retain version of state for memory in HDFSBackedStateStoreProvider

相關知識:

Spark Structrued Streaming源碼分析--(三)Aggreation聚合狀態存儲與更新

HDFSBackedStateStoreProvider存儲state的目錄結構在該文章中介紹的,另外這些文件是一個系列,建議可以多讀讀,下邊借用作者文章中的圖展示下state存儲目錄結構:

 

  備注:spark2.2到spark2.4 checkpoint下offset下文件內容變化:

  • Spark2.2
[MCP@CDH-143 ~]$ hadoop fs -cat /user/my/streaming/checkpoint/mycheckpoint2/offsets/99
v1
{
    "batchWatermarkMs":1553382000000,
    "batchTimestampMs":1553393700220,
    "conf":{"spark.sql.shuffle.partitions":"600"}
}
{
    "MyTopic2":
    {
    "23":40168,"17":36491,"8":42938,"11":44291,"20":48748,"2":44540,"5":44467,"14":43479,"13":41859,
    "4":41019,"22":42281,"16":45344,"7":43083,"1":41771,"10":42247,"19":43330,"18":41246,"9":45828,
    "21":40189,"3":46158,"12":48594,"15":44804,"6":43951,"0":49266
    }
}
  • Spark2.4
[MCP@CDH-143 ~]$ hadoop fs -cat /user/my/streaming/checkpoint_test/mycheckpoint1/offsets/1741
v1
{
    "batchWatermarkMs":1557741000000,
    "batchTimestampMs":1557747791249,
    "conf":
    {
        "spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider",
        "spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2",
        "spark.sql.streaming.multipleWatermarkPolicy":"min",
        "spark.sql.streaming.aggregation.stateFormatVersion":"2",
        "spark.sql.shuffle.partitions":"900"
    }
}
{
    "MyTopic1":
    {
        "17":180087368,"8":178758076,"11":177803667,"2":179003711,"5":179528808,"14":181918503,"13":179712700,"4":180465249,"16":179847967,
        "7":180236912,"1":182446091,"10":179839083,"19":176659272,"18":178425265,"9":179766656,"3":177935949,"12":179457233,"15":178173350,
        "6":179053634,"0":177830020
    }
}

2)Spark2.4下解決情況,在HDFSBackedStateStoreProvider中為內存分配最大保留版本的狀態(Remove redundant key data from value in streaming aggregation

問題描述:

Key/Value of state in streaming aggregation is formatted as below:

  • key: UnsafeRow containing group-by fields
  • value: UnsafeRow containing key fields and another fields for aggregation results

which data for key is stored to both key and value.

This is to avoid doing projection row to value while storing, and joining key and value to restore origin row to boost performance, but while doing a simple benchmark test, I found it not much helpful compared to "project and join". (will paste test result in comment)

So I would propose a new option: remove redundant in stateful aggregation. I'm avoiding to modify default behavior of stateful aggregation, because state value will not be compatible between current and option enabled.

修復情況:

對應官網bug情況概述《[SPARK-24763][SS] Remove redundant key data from value in streaming aggregation #21733》、《Remove redundant key data from value in streaming aggregation

可能能解決問題的另外辦法:在spark2.3版本下自定義StateStoreProviver《https://github.com/chermenin/spark-states》

3)Spark2.4下測試情況,以及問題

  • Spark程序業務:

  讀取kafka上topic,然后對流數據源進行聚合統計,之后輸出到kafka的另外一個topic。

  • 運行情況:
  • 1)在cdh環境下(spark2.2)使用spark structured streaming,經常會出現內存溢出問題(state存儲到內存版本數過多導致,這個bug已經在spark2.4修改),在spark2.2下解決方案:將executor內存設置大一些。我的提交腳本如下:
/spark-submit \
--conf “spark.yarn.executor.memoryOverhead=4096M”
--num-executors 15 \
--executor-memory 46G \
--executor-cores 3 \
--driver-memory 6G \

  這種情況下,可以保證spark程序可以長久的穩定運行,而且executor storage memory 低於10M(已經穩定運行超過20天)。
  但是,實際上這個spark程序處理的數據量不大,15分鍾觸發一次trigger,每次trigger處理記錄數30w條,每條記錄大概有2000個列。

  • 2)從spark2.4版本升級信息上,看到了spark2.4版本中解決了state存儲消耗內存問題得到了解決。

於是在CDH下將spark升級到SPARK2.4,嘗試去運行spark程序,發現使用內存卻是降低了,
但是一個問題卻出現了,隨着運行時間的增長,executor的storage memory在不斷增長(從Spark on Yarn Resource Manager UI上查看到"Executors->Storage Memory"),
這個程序已經運行了14天了(在SPARK2.2下以這個提交資源運行,正常運行時間不超過1天,就會出現Executor內存異常問題)。
spark2.4下程序提交腳本如下:

/spark-submit \
--conf “spark.yarn.executor.memoryOverhead=4096M”
--num-executors 15 \
--executor-memory 9G \
--executor-cores 2 \
--driver-memory 6G \

 Storage Memory跟隨時間增長情況統計:

Run-time(hour) Storage Memory size(MB) Memory growth rate(MB/hour)
23.5H 41.6MB/1.5GB 1.770212766
108.4H 460.2MB/1.5GB 4.245387454
131.7H 559.1MB/1.5GB 4.245254366
135.4H 575MB/1.5GB 4.246676514
153.6H 641.2MB/1.5GB 4.174479167
219H 888.1MB/1.5GB 4.055251142
263H 1126.4MB/1.5GB 4.282889734
309H 1228.8MB/1.5GB 3.976699029

跟進信息請參考《https://issues.apache.org/jira/browse/SPARK-27648

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM