在使用yarn cluster模式提交flink的任務時候,往往會涉及到很多內存參數的配置
例如下面的提交命令:
flink run -d -m yarn-cluster -yjm 512 -ytm 5028 -yD jobmanager.memory.off-heap.size=64m -yD jobmanager.memory.jvm-metaspace.size=128m -yD jobmanager.memory.jvm-overhead.min=64m -yD taskmanager.memory.jvm-metaspace.size=128m -yD taskmanager.memory.jvm-overhead.max=192m -yD taskmanager.memory.network.max=128m -yD taskmanager.memory.managed.size=64m -c com.xxx.xxx ./xxx.jar
我們需要知道Jobmanager和Taskmanager對應的內存模型,來分配相應的內存,讓內存利用率最大化。
JobManager內存模型:
參數設置:
- jobmanager.memory.process.size:對應到圖中的 Total Process Memory 。對應到 -yjm。
-
jobmanager.memory.flink.size:對應到圖中的Total Flink Memory,作業管理器的總Flink內存大小。這包括JobManager消耗的所有內存,除了JVM元空間和JVM開銷,它由JVM堆內存和堆外內存組成。
- jobmanager.memory.heap.size :對應到圖中的JVM Head:JobManager的JVM堆內存大小。
-
jobmanager.memory.off-heap.size:默認值:128mb,對應到圖中的Off-Heap Memory。JobManager的堆外內存。
-
jobmanager.memory.jvm-metaspace.size:默認值:256mb ,對應到圖中的JVM Metaspace。JobManager JVM 進程的 Metaspace。
-
JVM Overhead,是用於其他 JVM 開銷的本地內存,例如棧空間、垃圾回收空間等,JVM開銷的大小是為了彌補總進程內存中配置的部分。
-
jobmanager.memory.jvm-overhead.fraction:默認值 0.1(Total Process Memory的0.1)。
- jobmanager.memory.jvm-overhead.min:默認值192mb
-
jobmanager.memory.jvm-overhead.max:默認值1gb
- 按照比例算,如果內存大小小於或大於配置的最小或最大大小,則將使用最小或最大大小。可以通過將最小和最大大小設置為相同的值,可以顯式指定JVM開銷的確切大小。
-
那么如果設置了 -yjm 1024 ,JobManager的JVM的堆內存大小是多少呢?
首先,在沒有顯示的設置其它內存的情況下,所有的內存組成部分都用默認值
- JVM Metaspace:256mb
- JVM Overhead:1024 * 0.1 = 102.4 由於102.4 < 192 所以JVM Overhead的大小為192mb
- Total Flink Memory:1024 - 256 - 192 = 576
- Off-Heap Memory:128mb
- JVM Overhead:576 - 128 = 448mb
通過觀察啟動日志也可以發現:JVM Overhead的大小為448mb
有點時候,我們會希望運用到JVM Head的內存更多一點,那么就可以嘗試着把其它的部分調低,來給到JVM Head更多的內存。
TaskManager的內存模型:
參數設置:
- taskmanager.memory.process.size:對應到圖中的Total Process Memory,TaskExecutors的總進程內存大小。這包括TaskExecutor消耗的所有內存,包括總Flink內存、JVM元空間和JVM開銷。對應到 -ytm。
-
taskmanager.memory.flink.size:對應到圖中的Total Flink Memory,TaskExecutors的總Flink內存大小。它由框架堆內存、任務堆內存、任務堆外內存、托管內存和網絡內存組成。
-
taskmanager.memory.framework.heap.size:默認值128mb,對應到圖中的Framework Heap。用於 Flink 框架的 JVM 堆內存。
-
taskmanager.memory.task.heap.size:對應到圖中的Task Heap。用於 Flink 應用的算子及用戶代碼的 JVM 堆內存,
- 托管內存:對應到圖中的Managed Memory,流處理作業中使用 RocksDB State Backend,批處理作業中用於排序、哈希表及緩存中間結果
-
taskmanager.memory.managed.size
-
taskmanager.memory.managed.fraction :默認0.4
-
如果未明確指定托管內存大小,托管內存為Flink內存總量(Total Flink Memory)的0.4。
-
-
taskmanager.memory.framework.off-heap.size:默認值128mb,對應到圖中的Framework Off-Head。用於Flink框架的堆外內存。
-
taskmanager.memory.task.off-heap.size:默認0,對應到圖中的Task Off-Head 。用於 Flink 應用的算子及用戶代碼的堆外內存。
-
網絡內存(Network Memory),對應到圖中的 Network ,用於任務之間數據傳輸的直接內存(例如網絡傳輸緩沖)
- taskmanager.memory.network.min:默認值64mb
-
taskmanager.memory.network.max:默認值1gb
-
taskmanager.memory.network.fraction:默認值0.1(Total Flink Memory)
- 網絡內存是為ShuffleEnvironment保留的堆外內存(例如,網絡緩沖區)。按照比例算,如果內存大小小於/大於配置的最小/最大大小,則將使用最小/最大大小。通過將min/max設置為相同的值,可以明確指定網絡內存的確切大小。
-
taskmanager.memory.jvm-metaspace.size:默認值256mb,對應到圖中的JVM Metaspace,Flink JVM 進程的 Metaspace。
-
JVM 開銷:對應到JVM Overhead,用於其他 JVM 開銷的本地內存,例如棧空間、垃圾回收空間等。
-
taskmanager.memory.jvm-overhead.min:默認值192mb
-
taskmanager.memory.jvm-overhead.max:默認值1gb
-
taskmanager.memory.jvm-overhead.fraction:默認值0.1(Total Process Memory)
-
那么如果設置了 -ytm 1024 、Managed Memory 為100mb,TaskManager的JVM的堆內存大小是多少呢?
首先,在沒有顯示的設置其它內存的情況下,所有的內存組成部分都用默認值
- JVM Metaspace:256mb
- JVM Overhead:1024 * 0.1 = 102.4 由於102.4 < 192 所以JVM Overhead的大小為192mb
- Framework Heap:128mb
- Framework Off-Head:128mb
- Task Off-Head : 0
- Total Flink Memory = 1024 - 256 - 192 = 576mb
- Managed Memory = 100mb
- Network Memory = 576 * 0.1 < 64 所以Network Memory:64mb
- JVM Head:576 - 100 - 128 - 64 = 284mb
- Task Head:284 - 128 = 156mb
在啟動日志中也能夠看得到:
在設置內存的時候 如果托管內存(Managed Memory)用不到的話,可以設置為0,以至於能將更多的內存用於JVM Head內存上。
並行度和TaskManager的關系
- -ys 設置一個taskmanager的slot個數
- -p 設置任務的並行度
taskmanager的數量 = p / ys + 1
jobmanager的數量 = 1
消耗的container的數量 = TaskManager的數量+1,+1是因為還有JobManager這個進程。
消耗的vcore的數量 = TaskManager的數量 * solt + 1,+1理由同上
消耗的yarn內存數 = jobmanager的數量 * yjm + taskmanager的數量 * ytm