Flink重點難點:Flink任務綜合調優(Checkpoint/反壓/內存)


在閱讀本文之前,你應該閱讀過的系列:

CheckPoint調優

我們在Flink重點難點:狀態(Checkpoint和Savepoint)容錯與兩階段提交一文中對Flink的Checkpoint做過詳細的介紹。

Flink中基於異步輕量級的分布式快照技術提供了Checkpoints容錯機制,Checkpoints可以將同一時間點作業/算子的狀態數據全局統一快照處理,包括前面提到的算子狀態和鍵值分區狀態。當發生了故障后,Flink會將所有任務的狀態恢復至最后一次Checkpoint中的狀態,並從那里重新開始執行。

對於Flink Checkpoint的優化至關重要。我們常見的優化 Checkpoint的手段如下:

一、設置最小時間間隔

當Flink應用開啟Checkpoint功能,並配置Checkpoint時間間隔,應用中就會根據指定的時間間隔周期性地對應用進行Checkpoint操作。默認情況下Checkpoint操作都是同步進行,也就是說,當前面觸發的Checkpoint動作沒有完全結束時,之后的Checkpoint操作將不會被觸發。在這種情況下,如果Checkpoint過程持續的時間超過了配置的時間間隔,就會出現排隊的情況。如果有非常多的Checkpoint操作在排隊,就會占用額外的系統資源用於Checkpoint,此時用於任務計算的資源將會減少,進而影響到整個應用的性能和正常執行。

在這種情況下,如果大狀態數據確實需要很長的時間來進行Checkpoint,那么只能對Checkpoint的時間間隔進行優化,可以通過Checkpoint之間的最小間隔參數進行配置,讓Checkpoint之間根據Checkpoint執行速度進行調整,前面的Checkpoint沒有完全結束,后面的Checkpoint操作也不會觸發。

streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)

通過最小時間間隔參數配置,可以降低Checkpoint對系統的性能影響,但需要注意的事,對於非常大的狀態數據,最小時間間隔只能減輕Checkpoint之間的堆積情況。如果不能有效快速地完成Checkpoint,將會導致系統Checkpoint頻次越來越低,當系統出現問題時,沒有及時對狀態數據有效地持久化,可能會導致系統丟失數據。因此,對於非常大的狀態數據而言,應該對Checkpoint過程進行優化和調整,例如采用增量Checkpoint的方法等。

用戶也可以通過配置CheckpointConfig中setMaxConcurrentCheckpoints()方法設定並行執行的checkpoint數量,這種方法也能有效降低checkpoint堆積的問題,但會提高資源占用。同時,如果開始了並行checkpoint操作,當用戶以手動方式觸發savepoint的時候,checkpoint操作也將繼續執行,這將影響到savepoint過程中對狀態數據的持久化。

二、預估狀態容量

除了對已經運行的任務進行checkpoint優化,對整個任務需要的狀態數據量進行預估也非常重要,這樣才能選擇合適的checkpoint策略。對任務狀態數據存儲的規划依賴於如下基本規則:

1.正常情況下應該盡可能留有足夠的資源來應對頻繁的反壓。

2.需要盡可能提供給額外的資源,以便在任務出現異常中斷的情況下處理積壓的數據。這些資源的預估都取決於任務停止過程中數據的積壓量,以及對任務恢復時間的要求。

3.系統中出現臨時性的反壓沒有太大的問題,但是如果系統中頻繁出現臨時性的反壓,例如下游外部系統臨時性變慢導致數據輸出速率下降,這種情況就需要考慮給予算子一定的資源。

4.部分算子導致下游的算子的負載非常高,下游的算子完全是取決於上游算子的輸出,因此對類似於窗口算子的估計也將會影響到整個任務的執行,應該盡可能給這些算子留有足夠的資源以應對上游算子產生的影響。

三、異步Snapshot

默認情況下,應用中的checkpoint操作都是同步執行的,在條件允許的情況下應該盡可能地使用異步的snapshot,這樣講大幅度提升checkpoint的性能,尤其是在非常復雜的流式應用中,如多數據源關聯、co-functions操作或windows操作等,都會有較好的性能改善。

Flink提供了異步快照(Asynchronous Snapshot)的機制。當實際執行快照時,Flink可以立即向下廣播Checkpoint Barrier,表示自己已經執行完自己部分的快照。同時,Flink啟動一個后台線程,它創建本地狀態的一份拷貝,這個線程用來將本地狀態的拷貝同步到State Backend上,一旦數據同步完成,再給Checkpoint Coordinator發送確認信息。拷貝一份數據肯定占用更多內存,這時可以利用寫入時復制(Copy-on-Write)的優化策略。Copy-on-Write指:如果這份內存數據沒有任何修改,那沒必要生成一份拷貝,只需要有一個指向這份數據的指針,通過指針將本地數據同步到State Backend上;如果這份內存數據有一些更新,那再去申請額外的內存空間並維護兩份數據,一份是快照時的數據,一份是更新后的數據。

在使用異步快照需要確認應用遵循以下兩點要求:

1.首先必須是Flink托管狀態,即使用Flink內部提供的托管狀態所對應的數據結構,例如常用的有ValueState、ListState、ReducingState等類型狀態。

2.StateBackend必須支持異步快照,在Flink1.2的版本之前,只有RocksDB完整地支持異步的Snapshot操作,從Flink1.3版本以后可以在heap-based StateBackend中支持異步快照功能。

四、壓縮狀態數據

Flink中提供了針對checkpoint和savepoint的數據進行壓縮的方法,目前Flink僅支持通過用snappy壓縮算法對狀態數據進行壓縮,在未來的版本中Flink將支持其他壓縮算法。在壓縮過程中,Flink的壓縮算法支持key-group層面壓縮,也就是不同的key-group分別被壓縮成不同的部分,因此解壓縮過程可以並發執行,這對大規模數據的壓縮和解壓縮帶來非常高的性能提升和較強的可擴展性。Flink中使用的壓縮算法在ExecutionConfig中進行指定,通過將setUseSnapshotCompression方法中的值設定為true即可。

五、觀察checkpoint延遲時間

checkpoint延遲啟動時間並不會直接暴露在客戶端中,而是需要通過以下公式計算得出。如果改時間過長,則表明算子在進行barrier對齊,等待上游的算子將數據寫入到當前算子中,說明系統正處於一個反壓狀態下。checkpoint延遲時間可以通過整個端到端的計算時間減去異步持續的時間和同步持續的時間得出。

六、Checkpoint相關配置

默認情況下,Checkpoint機制是關閉的,需要調用env.enableCheckpointing(n)來開啟,每隔n毫秒進行一次Checkpoint。Checkpoint是一種負載較重的任務,如果狀態比較大,同時n值又比較小,那可能一次Checkpoint還沒完成,下次Checkpoint已經被觸發,占用太多本該用於正常數據處理的資源。增大n值意味着一個作業的Checkpoint次數更少,整個作業用於進行Checkpoint的資源更小,可以將更多的資源用於正常的流數據處理。同時,更大的n值意味着重啟后,整個作業需要從更長的Offset開始重新處理數據。

此外,還有一些其他參數需要配置,這些參數統一封裝在了CheckpointConfig里:

val cpConfig: CheckpointConfig = env.getCheckpointConfig

默認的Checkpoint配置是支持Exactly-Once投遞的,這樣能保證在重啟恢復時,所有算子的狀態對任一條數據只處理一次。用上文的Checkpoint原理來說,使用Exactly-Once就是進行了Checkpoint Barrier對齊,因此會有一定的延遲。如果作業延遲小,那么應該使用At-Least-Once投遞,不進行對齊,但某些數據會被處理多次。

// 使用At-Least-Once
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)

如果一次Checkpoint超過一定時間仍未完成,直接將其終止,以免其占用太多資源:

// 超時時間1小時
env.getCheckpointConfig.setCheckpointTimeout(3600*1000)

如果兩次Checkpoint之間的間歇時間太短,那么正常的作業可能獲取的資源較少,更多的資源被用在了Checkpoint上。對這個參數進行合理配置能保證數據流的正常處理。比如,設置這個參數為60秒,那么前一次Checkpoint結束后60秒內不會啟動新的Checkpoint。這種模式只在整個作業最多允許1個Checkpoint時適用。

// 兩次Checkpoint的間隔為60秒
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(60*1000)

默認情況下一個作業只允許1個Checkpoint執行,如果某個Checkpoint正在進行,另外一個Checkpoint被啟動,新的Checkpoint需要掛起等待。

// 最多同時進行3個Checkpoint
env.getCheckpointConfig.setMaxConcurrentCheckpoints(3)

如果這個參數大於1,將與前面提到的最短間隔相沖突。

Checkpoint的初衷是用來進行故障恢復,如果作業是因為異常而失敗,Flink會保存遠程存儲上的數據;如果開發者自己取消了作業,遠程存儲上的數據都會被刪除。如果開發者希望通過Checkpoint數據進行調試,自己取消了作業,同時希望將遠程數據保存下來,需要設置為:

// 作業取消后仍然保存Checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

RETAIN_ON_CANCELLATION模式下,用戶需要自己手動刪除遠程存儲上的Checkpoint數據。

默認情況下,如果Checkpoint過程失敗,會導致整個應用重啟,我們可以關閉這個功能,這樣Checkpoint失敗不影響作業的運行。

env.getCheckpointConfig.setFailOnCheckpointingErrors(false)

反壓調優

我們在 Flink重點原理與機制 | 網絡流控及反壓機制一文中介紹過Flink中的反壓機制和現象。

Flink1.5之前是基於TCP流控+bounded buffer實現反壓。在Flink 1.5之后實現了自己托管的credit-based流控機制,在應用層模擬TCP的流控機制。

反壓的定位

當你的任務出現反壓時,如果你的上游是類似 Kafka 的消息系統,很明顯的表現就是消費速度變慢,Kafka 消息出現堆積。

如果你的業務對數據延遲要求並不高,那么反壓其實並沒有很大的影響。但是對於規模很大的集群中的大作業,反壓會造成嚴重的“並發症”。首先任務狀態會變得很大,因為數據大規模堆積在系統中,這些暫時不被處理的數據同樣會被放到“狀態”中。另外,Flink 會因為數據堆積和處理速度變慢導致 checkpoint 超時,而 checkpoint 是 Flink 保證數據一致性的關鍵所在,最終會導致數據的不一致發生。

那么我們應該如何發現任務是否出現反壓了呢?

Flink Web UI

Flink 的后台頁面是我們發現反壓問題的第一選擇。Flink 的后台頁面可以直觀、清晰地看到當前作業的運行狀態。

如上圖所示,是 Flink 官網給出的計算反壓狀態的案例。需要注意的是,只有用戶在訪問點擊某一個作業時,才會觸發反壓狀態的計算。在默認的設置下,Flink 的 TaskManager 會每隔 50 ms 觸發一次反壓狀態監測,共監測 100 次,並將計算結果反饋給 JobManager,最后由 JobManager 進行計算反壓的比例,然后進行展示。

這個比例展示邏輯如下:

  • OK: 0 <= Ratio <= 0.10,正常;
  • LOW: 0.10 < Ratio <= 0.5,一般;
  • HIGH: 0.5 < Ratio <= 1,嚴重。

官網同樣給出了不同反壓狀態下,Flink Web UI 中任務運行的狀態,如下圖所示:

Flink Metrics

如果你想對 Flink 做更為詳細的監控的話,Flink 本身提供了大量的 REST API 來獲取任務的各種狀態。

Flink 提供的所有系統監控指標你都點擊這里找到: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/

隨着版本的持續變更,截止 1.14.0 版本,Flink 提供的監控指標中與反壓最為密切的如下表所示:

我們逐個介紹一下這四個指標。

  • outPoolUsage

這個指標代表的是當前 Task 的數據發送速率,當一個 Task 的 outPoolUsage 很高,則代表着數據發送速度很快。但是當一個 Task 的 outPoolUsage 很低,那么就需要特別注意,有可能是下游的處理速度很低導致的,也有可能當前節點就是反壓節點,導致數據處理速度很慢。

  • inPoolUsage

inPoolUsage 表示當前 Task 的數據接收速率,通常會和 outPoolUsage 配合使用;如果一個節點的 inPoolUsage 很高而 outPoolUsage 很低,則這個節點很有可能就是反壓節點。

  • floatingBuffersUsage 和 exclusiveBuffersUsage

floatingBuffersUsage 表示處理節點緩沖池的使用率;exclusiveBuffersUsage 表示數據輸入通道緩沖池的使用率。

反壓問題處理

我們已經知道反壓產生的原因和監控的方法,當線上任務出現反壓時,需要如何處理呢?

主要通過以下幾個方面進行定位和處理:

  • 數據傾斜
  • GC
  • 代碼本身

數據傾斜

數據傾斜問題是我們生產環境中出現頻率最多的影響任務運行的因素,可以在 Flink 的后台管理頁面看到每個 Task 處理數據的大小。當數據傾斜出現時,通常是簡單地使用類似 KeyBy 等分組聚合函數導致的,需要用戶將熱點 Key 進行預處理,降低或者消除熱點 Key 的影響。

GC

垃圾回收問題也是造成反壓的因素之一。不合理的設置 TaskManager 的垃圾回收參數會導致嚴重的 GC 問題,我們可以通過 -XX:+PrintGCDetails 參數查看 GC 的日志。

代碼本身

開發者錯誤地使用 Flink 算子,沒有深入了解算子的實現機制導致性能問題。我們可以通過查看運行機器節點的 CPU 和內存情況定位問題。

內存調優

我們在 Flink重點難點:內存模型與內存結構這篇文章中詳細講解了Flink的內存模型。

Flink JVM 進程的進程總內存(Total Process Memory)包含了由 Flink 應用使用的內存(Flink 總內存)以及由運行 Flink 的 JVM 使用的內存。Flink 總內存(Total Flink Memory)包括 JVM 堆內存(Heap Memory)和堆外內存(Off-Heap Memory)。其中堆外內存包括直接內存(Direct Memory)和本地內存(Native Memory)。

配置 Flink 進程內存最簡單的方法是指定以下兩個配置項中的任意一個:

Flink有三種部署方式(這里不談Flink on k8s),一種為本地模式,一種為standalone模式,還有一種為yarn或者mesos模式,這三種模式中,用戶必須要選擇一種進行配置(本地模式除外),否則flink將無法啟動,這意味着,用戶需要從以下的無默認值的配置參數中選擇一個給出明確的配置。

不建議同時設置進程總內存和 Flink 總內存。這可能會造成內存配置沖突,從而導致部署失敗。額外配置其他內存部分時,同樣需要注意可能產生的配置沖突

配置TaskManager內存

Flink 的 TaskManager 負責執行用戶代碼。根據實際需求為 TaskManager 配置內存將有助於減少 Flink 的資源占用,增強作業運行的穩定性。

本篇內存配置文檔僅針對 TaskManager與JobManager相比,TaskManager 具有相似但更加復雜的內存模型。

配置總內存

Flink JVM 進程的進程總內存(Total Process Memory)包含了由 Flink 應用使用的內存(Flink 總內存)以及由運行 Flink 的 JVM 使用的內存。其中,Flink 總內存(Total Flink Memory)包括 JVM 堆內存(Heap Memory)、托管內存(Managed Memory)以及其他直接內存(Direct Memory)或本地內存(Native Memory)。

如果你是在本地運行 Flink(例如在 IDE 中)而非創建一個集群,那么本文介紹的配置並非所有都是適用的,詳情請參考本地執行

其他情況下,配置 Flink 內存最簡單的方法就是配置總內存。此外,Flink 也支持更細粒度的內存配置,比如說配置堆內存和托管內存

Flink 會根據默認值或其他配置參數自動調整剩余內存部分的大小。

配置堆內存和托管內存

如配置總內存中所述,另一種配置 Flink 內存的方式是同時設置任務堆內存和托管內存, 通過這種方式,用戶可以更好地掌控用於 Flink 任務的 JVM 堆內存及 Flink 的托管內存的大小。

Flink 會根據默認值或其他配置參數自動調整剩余內存部分的大小。關於各內存部分的更多細節,請參考后續的內存詳解。

注意:如果已經明確設置了任務堆內存和托管內存,建議不要再設置進程總內存或 Flink 總內存,否則可能會造成內存配置沖突。

任務(算子)堆內存

如果希望確保指定大小的 JVM 堆內存給用戶代碼使用,可以明確指定任務堆內存(taskmanager.memory.task.heap.size )指定的內存將被包含在總的 JVM 堆空間中,專門用於 Flink 算子及用戶代碼的執行。

托管內存

托管內存是由 Flink 負責分配和管理的本地(堆外)內存。以下場景需要使用托管內存:

  • 流處理作業中用於 RocksDB State Backend。
  • 批處理作業中用於排序、哈希表及緩存中間結果。
  • 流處理和批處理作業中用於「在Python進程中執行用戶自定義函數」。

可以通過以下兩種范式指定托管內存的大小:

  • 通過 taskmanager.memory.managed.size 明確指定其大小。
  • 通過 taskmanager.memory.managed.fraction 指定在Flink 總內存中的占比。

當同時指定二者時,會優先采用指定的大小(Size)。若二者均未指定,會根據默認占比進行計算。

消費者權重

對於包含不同種類的托管內存消費者的作業,可以進一步控制托管內存如何在消費者之間分配。通過taskmanager.memory.managed.consumer-weights可以為每一種類型的消費者指定一個權重,Flink 會按照權重的比例進行內存分配。目前支持的消費者類型包括:

  • DATAPROC:用於流處理中的 RocksDB State Backend 和批處理中的內置算法。
  • PYTHON:用戶 Python 進程。

例如,一個流處理作業同時使用到了 RocksDB State Backend 和 Python UDF,消費者權重設置為 DATAPROC:70,PYTHON:30,那么 Flink 會將 70% 的托管內存用於 RocksDB State Backend,30% 留給 Python 進程。

只有作業中包含某種類型的消費者時,Flink 才會為該類型分配托管內存。例如,一個流處理作業使用 Heap State Backend 和 Python UDF,消費者權重設置為 DATAPROC:70,PYTHON:30,那么 Flink 會將全部托管內存用於 Python 進程,因為 Heap State Backend 不使用托管內存。

提示對於未出現在消費者權重中的類型,Flink將不會為其分配托管內存。如果缺失的類型是作業運行所必須的,則會引發內存分配失敗。默認情況下,消費者權重中包含了所有可能的消費者類型。上述問題僅可能出現在用戶顯式地配置了消費者權重的情況下。

配置堆外內存(直接內存或本地內存)

你也可以調整框架堆外內存(Framework Off-heap Memory)。這是一個進階配置,建議僅在確定 Flink 框架需要更多的內存時調整該配置。

Flink 將框架堆外內存和任務堆外內存都計算在 JVM 的直接內存限制中。

內存詳解

TaskManager內存也包括堆內存和堆外內存。下表中列出了 Flink TaskManager內存模型的所有組成部分,以及影響其大小的相關配置參數。

我們可以看到,有些內存部分的大小可以直接通過一個配置參數進行設置,有些則需要根據多個參數進行調整。

框架內存

通常情況下,不建議對框架堆內存和框架堆外內存進行調整。除非你非常肯定 Flink 的內部數據結構及操作需要更多的內存。這可能與具體的部署環境及作業結構有關,例如非常高的並發度。此外,Flink 的部分依賴(例如 Hadoop)在某些特定的情況下也可能會需要更多的直接內存或本地內存。

提示:不管是堆內存還是堆外內存,Flink 中的框架內存和任務內存之間目前是沒有隔離的。對框架和任務內存的區分,主要是為了在后續版本中做進一步優化。

配置JobManager內存

配置 JobManager 內存最簡單的方法就是進程的配置總內存,本地模式下不需要為 JobManager 進行內存配置,配置參數將不會生效。

如上圖所示,下表中列出了 Flink JobManager 內存模型的所有組成部分,以及影響其大小的相關配置參數。

配置JVM堆內存

如配置總內存中所述,配置 JobManager 內存的方式是明確指定 JVM 堆內存的大小(jobmanager.memory.heap.size)。通過這種方式,用戶可以更好地掌控用於以下用途的 JVM 堆內存大小。

  • Flink 框架
  • 在作業提交時(例如一些特殊的批處理 Source)及 Checkpoint 完成的回調函數中執行的用戶代碼

Flink 需要多少 JVM 堆內存,很大程度上取決於運行的作業數量、作業的結構及上述用戶代碼的需求。

提示:如果已經明確設置了 JVM 堆內存,建議不要再設置進程總內存或 Flink 總內存,否則可能會造成內存配置沖突。

在啟動 JobManager 進程時,Flink 啟動腳本及客戶端通過設置 JVM 參數 -Xms 和 -Xmx 來管理 JVM 堆空間的大小。

配置堆外內存

堆外內存包括 JVM 直接內存 和 本地內存。可以通過配置參數 jobmanager.memory.enable-jvm-direct-memory-limit 設置是否啟用 JVM 直接內存限制。如果該配置項設置為 true,Flink 會根據配置的堆外內存大小設置 JVM 參數 -XX:MaxDirectMemorySize。

可以通過配置參數jobmanager.memory.off-heap.size設置堆外內存的大小。如果遇到 JobManager 進程拋出 "OutOfMemoryError: Direct buffer memory"的異常,可以嘗試調大這項配置。

以下情況可能用到堆外內存:

  • Flink 框架依賴(例如 Akka 的網絡通信)
  • 在作業提交時(例如一些特殊的批處理 Source)及 Checkpoint 完成的回調函數中執行的用戶代碼

提示:如果同時配置了 Flink 總內存和 JVM 堆內存,且沒有配置堆外內存,那么堆外內存的大小將會是 Flink 總內存減去JVM 堆內存。這種情況下,堆外內存的默認大小將不會生效。

如果你是在本地運行 Flink(例如在 IDE 中)而非創建一個集群,那么 JobManager 的內存配置將不會生效。

內存調優

獨立部署模式(Standalone Deployment)下的內存配置

獨立部署模式下,我們通常更關注 Flink 應用本身使用的內存大小。建議配置 Flink 總內存(taskmanager.memory.flink.size 或者 jobmanager.memory.flink.size)或其組成部分。此外,如果出現 Metaspace 不足的問題,可以調整 JVM Metaspace 的大小。

這種情況下通常無需配置進程總內存,因為不管是 Flink 還是部署環境都不會對 JVM 開銷 進行限制,它只與機器的物理資源相關。

容器(Container)的內存配置

在容器化部署模式(Containerized Deployment)下(Kubernetes、Yarn 或 Mesos),建議配置進程總內存(taskmanager.memory.process.size或者jobmanager.memory.process.size)。該配置參數用於指定分配給 Flink JVM 進程的總內存,也就是需要申請的容器大小。

提示:如果配置了 Flink 總內存,Flink 會自動加上 JVM 相關的內存部分,根據推算出的進程總內存大小申請容器。

注意:如果 Flink 或者用戶代碼分配超過容器大小的非托管的堆外(本地)內存,部署環境可能會殺掉超用內存的容器,造成作業執行失敗。

  • State Backend 的內存配置

執行無狀態作業或者使用 Heap State Backend(MemoryStateBackend 或 FsStateBackend)時,建議將托管內存設置為 0。這樣能夠最大化分配給 JVM 上用戶代碼的內存。

  • RocksDB State Backend

RocksDBStateBackend使用本地內存。默認情況下,RocksDB 會限制其內存用量不超過用戶配置的托管內存。因此,使用這種方式存儲狀態時,配置足夠多的托管內存是十分重要的。如果你關閉了 RocksDB 的內存控制,那么在容器化部署模式下如果 RocksDB 分配的內存超出了申請容器的大小(進程總內存),可能會造成 TaskExecutor 被部署環境殺掉。請同時參考如何調整 RocksDB 內存以及 state.backend.rocksdb.memory.managed。

  • SortMerge數據Shuffle內存配置

對於SortMerge數據Shuffle,每個ResultPartition需要的網絡緩沖區(Buffer)數目是由taskmanager.network.sort-shuffle.min-buffers這個配置決定的。它的 默認值是64,是比較小的。雖然64個網絡Buffer已經可以支持任意規模的並發,但性能可能不是最好的。對於大並發的作業,通 過增大這個配置值,可以提高落盤數據的壓縮率並且減少網絡小包的數量,從而有利於提高Shuffle性能。為了增大這個配置值, 你可能需要通過調整taskmanager.memory.network.fraction,taskmanager.memory.network.min和taskmanager.memory.network.max這三個參數來增大總的網絡內存大小從而避免出現insufficient number of network buffers錯誤。

除了網絡內存,SortMerge數據Shuffle還需要使用一些JVM Direct Memory來進行Shuffle數據的寫出與讀取。所以,為了使 用SortMerge數據Shuffle你可能還需要通過增大這個配置值taskmanager.memory.task.off-heap.size來為其來預留一些JVM Direct Memory。如果在你開啟 SortMerge數據Shuffle之后出現了Direct Memory OOM的錯誤,你只需要繼續加大上面的配置值來預留更多的Direct Memory 直到不再發生Direct Memory OOM的錯誤為止。

常見問題

IllegalConfigurationException

如果遇到從 TaskExecutorProcessUtils 或 JobManagerProcessUtils 拋出的 IllegalConfigurationException 異常,這通常說明您的配置參數中存在無效值(例如內存大小為負數、占比大於 1 等)或者配置沖突。請根據異常信息,確認出錯的內存部分的相關文檔及配置信息

OutOfMemoryError: Java heap space

該異常說明 JVM 的堆空間過小。可以通過增大總內存、TaskManager 的任務堆內存、JobManager的JVM堆內存等方法來增大JVM堆空間。

提示:也可以增大 TaskManager 的框架堆內存。這是一個進階配置,只有在確認是 Flink 框架自身需要更多內存時才應該去調整。

OutOfMemoryError: Direct buffer memory

該異常通常說明 JVM 的直接內存限制過小,或者存在直接內存泄漏(Direct Memory Leak)。請確認用戶代碼及外部依賴中是否使用了 JVM 直接內存,以及如果使用了直接內存,是否配置了足夠的內存空間。可以通過調整堆外內存來增大直接內存限制。

OutOfMemoryError: Metaspace

該異常說明 JVM Metaspace 限制過小。可以嘗試調整 TaskManager、JobManager 的 JVM Metaspace。

IOException: Insufficient number of network buffers

該異常僅與 TaskManager 相關。

該異常通常說明網絡內存過小。可以通過調整以下配置參數增大網絡內存:

  • taskmanager.memory.network.min
  • taskmanager.memory.network.max
  • taskmanager.memory.network.fraction

容器(Container)內存超用

如果 Flink 容器嘗試分配超過其申請大小的內存(Yarn、Mesos 或 Kubernetes),這通常說明 Flink 沒有預留出足夠的本地內存。可以通過外部監控系統或者容器被部署環境殺掉時的錯誤信息判斷是否存在容器內存超用。

對於 JobManager 進程,你還可以嘗試啟用 JVM 直接內存限制(jobmanager.memory.enable-jvm-direct-memory-limit),以排除 JVM 直接內存泄漏的可能性。

如果使用了 RocksDBStateBackend 且沒有開啟內存控制,也可以嘗試增大 TaskManager 的托管內存。

對於 JobManager 進程,你還可以嘗試啟用 JVM 直接內存限制(jobmanager.memory.enable-jvm-direct-memory-limit),以排除 JVM 直接內存泄漏的可能性。

如果使用了 RocksDBStateBackend 且沒有開啟內存控制,也可以嘗試增大 TaskManager 的托管內存。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM