1.前言
對於做實時計算的朋友來說,資源設置都是一個比較麻煩的問題。實時計算不同於離線計算,它的任務都是並行的,啟動就會一直占用集群資源,如果資源設置的過多會造成極大的浪費,設置的過少任務會不斷發生failover。這里說的資源主要指的就是內存資源,所以本文對Flink的內存設置提供一些思路,尤其是對於容器環境,內存的設置極為重要,否則會被頻繁的kill。
本文將分別介紹1.9版本之前和之后的兩類設置方法,這是由於這個版本之后Flink的內存模型發生了極大的變化。
本文是基於個人的一個使用經驗,部分代碼,和其他文章而成,里面可能存在錯誤,望指正。
2.Flink <= 1.9
2.1 例子
這里用Flink1.8為例,計算內存的代碼位於 org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters類的create方法。
按照計算步驟,以taskmanager.heap.size=6g為例子,其他參數保持不動,最終得到的參數如下:
-Xms4148m -Xmx4148m -XX:MaxDirectMemorySize=1996m
兩塊內存加起來是6144m = 6g jvm的設置符合參數。
Flink Dashboard上面顯示的是:
JVM Heap Size:3.95 GB Flink Managed Memory:2.74 GB
JVM (Heap/Non-Heap) Commit: Heap:3.95 GB Non-Heap:141 MB Total:4.09 GB
Outside JVM:Capacity:457 MB
NetWork: count: xxxxx
2.2 計算過程
設容器內存總大小是x
詳細看create方法:
1. cutoff:容器超過3g, 簡單可以記成 0.25x
flink為了防止內存溢出,計算的時候先切了一塊內存下來不參與后續計算,這塊就是cutoff
計算公式:
cutoff = Math.max(containerized.heap-cutoff-min, taskmanager.heap.size * containerized.heap-cutoff-ratio)
默認值是600和0.25,所以6g的時候=Math.max(600, 6144*0.25) = 1536m
剩余大小 0.75x6g = 4608m
2. networkBufMB:簡單記成 0.75*0.1x,最大1g
網絡buffer使用內存分成新舊版,這里只關注新版,涉及參數:
taskmanager.memory.segment-size:32kb
taskmanager.network.memory.fraction:0.1
taskmanager.network.memory.min:64mb
taskmanager.network.memory.max:1g
計算公式:
Math.min(taskmanager.network.memory.max,Math.max(taskmanager.network.memory.min, taskmanager.network.memory.fraction * (x - cutoff))
這里的結果就是:Math.min(1g, Math.max(64mb, 0.1 * 4608m) = 460.8m
3. heapSizeMB:0.75 * 0.9x
taskmanager.memory.off-heap默認為false,主要指的是Flink Managed Memory使用Heap還是Non-heap,默認使用Heap,如果開啟使用Non-heap將再減少一部分資源。
計算公式:x - cutoff - networkBufMB
這里就是:4147.2 (注意:這個就是-xmx 4148m)
4. offHeapSizeMB: x - heapSizeMB
就是1996m (注意:這個就是XX:MaxDirectMemorySize: 1996m)
后續:上面只是一個jvm的參數預估設置,實際設置還與運行中環境有關,TaskManagerServices.fromConfiguration
會計算一個 freeHeapMemoryWithDefrag,計算之前會手動觸發gc,然后用Jvm最大內存 - 總內存 + 空閑內存。
這個值可以認為是一個空運行的flink任務剩余的堆內存了。
后面將計算Flink管理的內存,這個指的是Flink Managed Memory Segment: taskmanager.memory.fraction默認是0.7,
被Flink管理的內存就是:freeHeapMemoryWithDefrag * 0.7
2.3 內存划分

所以雖然6g內存計算出來后,heap是4148,但是在dashbord中顯示不足4148, 為3.95G=4044.8, Flink managed內存小於 0.75*0.9*0.7 = 2903.04 , dashboard上顯示2.74g = 2805.76m
框架運行需要:4148 - 4044.8 = 103.2m,3.95 * 0.7 = 2.765 > 2.74。沒有相等,其他的內存使用暫時沒有探究了。
Flink Managed內存一般用於批處理作業,流處理作業可以調整 taskmanager.memory.fraction,使得這部分內存用於用戶代碼。
Non - heap空間一般用於 JVM 的棧空間、方法區等堆外開銷外,還包括網絡 buffer、batch 緩存、RocksDB
3. Flink >= 1.10
Flink 1.10對整個內存做了個大改版,需要參考官方文檔進行升級:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/ops/memory/mem_migration.html
3.1 例子
這里設置單個taskmanager為14g,taskmanager.memory.managed.fraction為0.5,將會得到以下內容:
-Xmx5721030656 = 5456MB = 5.328g
-=1207959552 = 1152MB = 1.125g
-XX:MaxMetaspaceSize=100663296 = 96MB
可以發現,上面的加起來等於6704MB,遠遠不足14g,和1.8版本有很大的不同。
再看dashboard:
JVM Heap Size:5.19 GB Flink Managed Memory:6.45 GB
JVM (Heap/Non-Heap) : Heap:5.19 GB Non-Heap:1.33 GB Total:6.52 GB
Outside JVM:Capacity:1.01GB
NetWork: count: xxxxx
可以計算得到6.45+6.52+1.01 = 13.98 等於14
3.2 計算過程
taskmanager.memory.process.size 設置的是容器的內存大小,等於之前的 taskmanager.heap.size
計算過程在org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils中processSpecFromConfig方法,TaskExecutorProcessSpec類展示了1.10版本整個內存的組成。
計算方法分成3種:
1.指定了taskmanager.memory.task.heap.size和taskmanager.memory.managed.size 見方法:deriveProcessSpecWithExplicitTaskAndManagedMemory
2.指定了taskmanager.memory.flink.size 見方法:deriveProcessSpecWithTotalFlinkMemory
3.指定了taskmanager.memory.process.size(容器環境一般指定這個,決定全局容量)
totalProcessMemorySize = 設置的值 14g, jvmMetaspaceSize = taskmanager.memory.jvm-metaspace.size,默認96m, 這個對應參數-XX:MaxMetaspaceSize=100663296。
jvmOverheadSize:
taskmanager.memory.jvm-overhead.min 192m
taskmanager.memory.jvm-overhead.max 1g
taskmanager.memory.jvm-overhead.fraction 0.1
公式 14g * 0.1 = 1.4g 必須在[192m, 1g]之間,所以jvmOverheadSize的大小是1g
totalFlinkMemorySize = 14g - 1g - 96m = 13216m
frameworkHeapMemorySize:taskmanager.memory.framework.heap.size 默認128m
frameworkOffHeapMemorySize:taskmanager.memory.framework.off-heap.size 默認128m
taskOffHeapMemorySize:taskmanager.memory.task.off-heap.size 默認0
確定好上面這些參數后,就是最重要的三個指標的計算了:taskHeapMemorySize,networkMemorySize,managedMemorySize
計算分成確定了:taskmanager.memory.task.heap.size還是沒確定。
1)確定了taskmanager.memory.task.heap.size
taskHeapMemorySize = 設置值
managedMemorySize = 設置了使用設置值,否則使用 0.4 * totalFlinkMemorySize
如果 taskHeapMemorySize + taskOffHeapMemorySize + frameworkHeapMemorySize + frameworkOffHeapMemorySize + managedMemorySize > totalFlinkMemorySize異常
networkMemorySize 等於剩余的大小,之后還會check這塊內存是否充足,可以自己查看對應代碼
2)未設置heap大小
先確定 managedMemorySize = 設置了使用設置值,否則使用 0.4 * totalFlinkMemorySize,這里就是 0.5 * 13216m = 6608 = 6.45g (這里就是dashboard的顯示內容)
再確定network buffer大小,這個也是有兩種情況,不細說。 [64mb, 1g] 0.1 * totalFlinkMemorySize = 1321.6, 所以是1g
最后剩余的就是taskHeapMemorySize,不能為負數,這里等於 13216 - 6608 - 1024 - 128 - 128 = 5328 = 5.2g (這里約等於dashboard的顯示heap大小)
最后jvm的參數的計算過程:
jvmHeapSize = frameworkHeapSize + taskHeapSize = 5328 + 128 = 5456
jvmDirectSize = frameworkOffHeapMemorySize + taskOffHeapSize + networkMemSize = 128 + 1024 = 1152
jvmMetaspaceSize = 96m
3.3 內存划分
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html

從計算過程,結合上圖可以看出Flink 1.10中的一個內存划分了。
總內存 = Flink 內存 + JVM Metaspace (96m)+ JVM Overhead (計算為0.1 * 全局大小,結果必須在[192m, 1g]之間)
Flink內存被划分成6部分:框架運行需要的Heap和Non Heap,默認都是128m
任務需要的Heap和Non Heap(默認0), Heap是通過計算其他5部分內存,Flink內存剩余得到
網絡緩沖 (0.1 * Flink內存,結果必須在[64mb, 1g]之間)
Flink管理內存:0.4 * Flink內存

4. 總結
Flink 1.10之前對內存的划分比較簡單,主要就是Heap + Non-Heap,之后對內存做了更細致的切分。
Flink 1.8可以調整taskmanager.memory.fraction 減少Heap中的管理的內存,增大用戶代碼的內存使用,調整containerized.heap-cutoff-ratio,控制Non-heap空間,這個影響rocksdb。
Flink 1.10可以調整taskmanager.memory.managed.fraction 控制managed內存,這個影響rocksdb,也會影響taskHeap大小,需要衡量。
也可以看到Flink內存模型的變化managed內存位置也發生了變化,作用也有了些許變化。
JVM 主要划分 Heap 和 Non-Heap,Non-Heap又划分為Direct和Native等。
1.8的Non-Heap都是通過XX:MaxDirectMemorySize設置的
1.10的Network buffer在Direct里面,另一部分是Native(包括Managed Memory),主要用於rocksdb,如果使用的是Heap狀態后台,可以設置小點,也用於Batch。
