Flink內存設置思路


1.前言

  對於做實時計算的朋友來說,資源設置都是一個比較麻煩的問題。實時計算不同於離線計算,它的任務都是並行的,啟動就會一直占用集群資源,如果資源設置的過多會造成極大的浪費,設置的過少任務會不斷發生failover。這里說的資源主要指的就是內存資源,所以本文對Flink的內存設置提供一些思路,尤其是對於容器環境,內存的設置極為重要,否則會被頻繁的kill。

  本文將分別介紹1.9版本之前和之后的兩類設置方法,這是由於這個版本之后Flink的內存模型發生了極大的變化。

  本文是基於個人的一個使用經驗,部分代碼,和其他文章而成,里面可能存在錯誤,望指正。

2.Flink <= 1.9

2.1 例子

  這里用Flink1.8為例,計算內存的代碼位於 org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters類的create方法。

      按照計算步驟,以taskmanager.heap.size=6g為例子,其他參數保持不動,最終得到的參數如下:

    -Xms4148m  -Xmx4148m   -XX:MaxDirectMemorySize=1996m

            兩塊內存加起來是6144m = 6g jvm的設置符合參數。

      Flink Dashboard上面顯示的是:

    JVM Heap Size:3.95 GB    Flink Managed Memory:2.74 GB

              JVM (Heap/Non-Heap) Commit: Heap:3.95 GB  Non-Heap:141 MB  Total:4.09 GB

      Outside JVM:Capacity:457 MB

      NetWork: count:  xxxxx

2.2 計算過程

  設容器內存總大小是x

  詳細看create方法:

    1. cutoff:容器超過3g, 簡單可以記成 0.25x

      flink為了防止內存溢出,計算的時候先切了一塊內存下來不參與后續計算,這塊就是cutoff

      計算公式:

        cutoff = Math.max(containerized.heap-cutoff-min, taskmanager.heap.size * containerized.heap-cutoff-ratio)

      默認值是600和0.25,所以6g的時候=Math.max(600, 6144*0.25) = 1536m

          剩余大小 0.75x6g = 4608m

    2. networkBufMB:簡單記成 0.75*0.1x,最大1g

      網絡buffer使用內存分成新舊版,這里只關注新版,涉及參數:

        taskmanager.memory.segment-size:32kb

        taskmanager.network.memory.fraction:0.1

        taskmanager.network.memory.min:64mb

        taskmanager.network.memory.max:1g

      計算公式:

        Math.min(taskmanager.network.memory.max,Math.max(taskmanager.network.memory.min, taskmanager.network.memory.fraction * (x - cutoff))

      這里的結果就是:Math.min(1g,  Math.max(64mb, 0.1 * 4608m) = 460.8m

    3. heapSizeMB:0.75 * 0.9x

      taskmanager.memory.off-heap默認為false,主要指的是Flink Managed Memory使用Heap還是Non-heap,默認使用Heap,如果開啟使用Non-heap將再減少一部分資源。

        計算公式:x - cutoff - networkBufMB

      這里就是:4147.2    (注意:這個就是-xmx 4148m)

    4. offHeapSizeMB: x - heapSizeMB

      就是1996m       (注意:這個就是XX:MaxDirectMemorySize: 1996m)

      后續:上面只是一個jvm的參數預估設置,實際設置還與運行中環境有關,TaskManagerServices.fromConfiguration

       會計算一個 freeHeapMemoryWithDefrag,計算之前會手動觸發gc,然后用Jvm最大內存 - 總內存 + 空閑內存。

       這個值可以認為是一個空運行的flink任務剩余的堆內存了。

       后面將計算Flink管理的內存,這個指的是Flink Managed Memory Segment:  taskmanager.memory.fraction默認是0.7,

       被Flink管理的內存就是:freeHeapMemoryWithDefrag * 0.7

 

2.3 內存划分

  

   所以雖然6g內存計算出來后,heap是4148,但是在dashbord中顯示不足4148, 為3.95G=4044.8, Flink managed內存小於 0.75*0.9*0.7 = 2903.04 , dashboard上顯示2.74g = 2805.76m

   框架運行需要:4148 - 4044.8 = 103.2m,3.95 * 0.7 = 2.765 >  2.74。沒有相等,其他的內存使用暫時沒有探究了。

   Flink Managed內存一般用於批處理作業,流處理作業可以調整 taskmanager.memory.fraction,使得這部分內存用於用戶代碼。

   Non - heap空間一般用於 JVM 的棧空間、方法區等堆外開銷外,還包括網絡 buffer、batch 緩存、RocksDB

3. Flink >= 1.10

  Flink 1.10對整個內存做了個大改版,需要參考官方文檔進行升級:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/ops/memory/mem_migration.html

3.1 例子

  這里設置單個taskmanager為14g,taskmanager.memory.managed.fraction為0.5,將會得到以下內容:

    -Xmx5721030656  = 5456MB = 5.328g

    -=1207959552  = 1152MB = 1.125g

    -XX:MaxMetaspaceSize=100663296 = 96MB

  可以發現,上面的加起來等於6704MB,遠遠不足14g,和1.8版本有很大的不同。

  再看dashboard:

    JVM Heap Size:5.19 GB   Flink Managed Memory:6.45 GB

              JVM (Heap/Non-Heap) : Heap:5.19 GB  Non-Heap:1.33 GB  Total:6.52 GB

      Outside JVM:Capacity:1.01GB

      NetWork: count:  xxxxx

  可以計算得到6.45+6.52+1.01 = 13.98 等於14

3.2 計算過程

  taskmanager.memory.process.size 設置的是容器的內存大小,等於之前的 taskmanager.heap.size

  計算過程在org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils中processSpecFromConfig方法,TaskExecutorProcessSpec類展示了1.10版本整個內存的組成。

  計算方法分成3種:

    1.指定了taskmanager.memory.task.heap.size和taskmanager.memory.managed.size   見方法:deriveProcessSpecWithExplicitTaskAndManagedMemory

    2.指定了taskmanager.memory.flink.size  見方法:deriveProcessSpecWithTotalFlinkMemory

    3.指定了taskmanager.memory.process.size(容器環境一般指定這個,決定全局容量)

      totalProcessMemorySize = 設置的值 14g,   jvmMetaspaceSize = taskmanager.memory.jvm-metaspace.size,默認96m,  這個對應參數-XX:MaxMetaspaceSize=100663296

      jvmOverheadSize:

        taskmanager.memory.jvm-overhead.min   192m

        taskmanager.memory.jvm-overhead.max  1g

        taskmanager.memory.jvm-overhead.fraction  0.1

        公式  14g * 0.1 = 1.4g  必須在[192m, 1g]之間,所以jvmOverheadSize的大小是1g

      totalFlinkMemorySize =  14g - 1g - 96m = 13216m

      frameworkHeapMemorySize:taskmanager.memory.framework.heap.size  默認128m

      frameworkOffHeapMemorySize:taskmanager.memory.framework.off-heap.size 默認128m

      taskOffHeapMemorySize:taskmanager.memory.task.off-heap.size 默認0

      確定好上面這些參數后,就是最重要的三個指標的計算了:taskHeapMemorySize,networkMemorySize,managedMemorySize

      計算分成確定了:taskmanager.memory.task.heap.size還是沒確定。

         1)確定了taskmanager.memory.task.heap.size

            taskHeapMemorySize = 設置值

            managedMemorySize = 設置了使用設置值,否則使用 0.4 * totalFlinkMemorySize

            如果 taskHeapMemorySize + taskOffHeapMemorySize + frameworkHeapMemorySize + frameworkOffHeapMemorySize + managedMemorySize > totalFlinkMemorySize異常

            networkMemorySize 等於剩余的大小,之后還會check這塊內存是否充足,可以自己查看對應代碼

         2)未設置heap大小

            先確定 managedMemorySize = 設置了使用設置值,否則使用 0.4 * totalFlinkMemorySize,這里就是 0.5 * 13216m = 6608 = 6.45g (這里就是dashboard的顯示內容)

            再確定network buffer大小,這個也是有兩種情況,不細說。 [64mb, 1g] 0.1 * totalFlinkMemorySize = 1321.6, 所以是1g

            最后剩余的就是taskHeapMemorySize,不能為負數,這里等於  13216 - 6608 - 1024 - 128 - 128 = 5328 = 5.2g (這里約等於dashboard的顯示heap大小)

      最后jvm的參數的計算過程:

            jvmHeapSize = frameworkHeapSize + taskHeapSize = 5328 + 128 = 5456

            jvmDirectSize = frameworkOffHeapMemorySize + taskOffHeapSize + networkMemSize = 128 + 1024 = 1152

            jvmMetaspaceSize = 96m

3.3 內存划分

  https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_detail.html

  

 

  從計算過程,結合上圖可以看出Flink 1.10中的一個內存划分了。

  總內存 =  Flink 內存 + JVM Metaspace (96m)+ JVM Overhead (計算為0.1 * 全局大小,結果必須在[192m, 1g]之間)

  Flink內存被划分成6部分:框架運行需要的Heap和Non Heap,默認都是128m

               任務需要的Heap和Non Heap(默認0), Heap是通過計算其他5部分內存,Flink內存剩余得到

               網絡緩沖 (0.1 * Flink內存,結果必須在[64mb, 1g]之間)

               Flink管理內存:0.4 * Flink內存

  

 

 4. 總結

  Flink 1.10之前對內存的划分比較簡單,主要就是Heap + Non-Heap,之后對內存做了更細致的切分。

  Flink 1.8可以調整taskmanager.memory.fraction 減少Heap中的管理的內存,增大用戶代碼的內存使用,調整containerized.heap-cutoff-ratio,控制Non-heap空間,這個影響rocksdb。

  Flink 1.10可以調整taskmanager.memory.managed.fraction 控制managed內存,這個影響rocksdb,也會影響taskHeap大小,需要衡量。

  也可以看到Flink內存模型的變化managed內存位置也發生了變化,作用也有了些許變化。

  JVM 主要划分 Heap 和 Non-Heap,Non-Heap又划分為Direct和Native等。

   1.8的Non-Heap都是通過XX:MaxDirectMemorySize設置的

   1.10的Network buffer在Direct里面,另一部分是Native(包括Managed Memory),主要用於rocksdb,如果使用的是Heap狀態后台,可以設置小點,也用於Batch。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM