Apache Flink - 內存管理


JVM:

  • JAVA本身提供了垃圾回收機制來實現內存管理
  • 現今的GC(如Java和.NET)使用分代收集(generation collection),依照對象存活時間的長短使用不同的垃圾收集算法,以達到最好的收集性能。

    以Java為例,整個Java堆可以切割成為三個部分:

    1. Young:
      1. Eden:存放新生對象。
      2. Survivor:存放經過垃圾回收沒有被清除的對象。
      3. semi-Spaces:和Survivor做Copying collection。
    2. Tenured:對象多次回收沒有被清除,則移到該區塊。
    3. Perm:存放加載的類別還有方法對象。

    Java不同的世代使用不同的GC算法。

    1. Minor collection:
      YOUNG世代使用將Eden還有Survivor內的數據利用semi-space做復制收集(Copying collection),
      並將原本Survivor內經過多次垃圾收集仍然存活的對象移動到Tenured。
    2. Major collection則會進行Minor collection,Tenured世代則進行標記壓縮收集。
  • JVM存在的問題:
    1. Java 對象存儲密度低。一個只包含 boolean 屬性的對象占用了16個字節內存:對象頭占了8個,boolean 屬性占了1個,對齊填充占了7個。而實際上只需要一個bit。
    2. 在處理大量數據時會生成大量對象,Java GC可能會被反復觸發,其中Full GC或Major GC的開銷是非常大的,GC 會達到秒級甚至分鍾級。
    3. OOM 問題影響穩定性。OutOfMemoryError是分布式計算框架經常會遇到的問題,當JVM中所有對象大小超過分配給JVM的內存大小時,就會發生OutOfMemoryError錯誤,導致JVM崩潰,分布式框架的健壯性和性能都會受到影響。

Flink的內存管理:

  • Flink 並不是將大量對象存在堆上,而是將對象都序列化到一個預分配的內存塊上,這個內存塊叫做 MemorySegment,它代表了一段固定長度的內存(默認大小為 32KB),也是 Flink 中最小的內存分配單元,並且提供了非常高效的讀寫方法。每條記錄都會以序列化的形式存儲在一個或多個MemorySegment中。
  • Flink堆內存划分:
    • Network Buffers: 一定數量的32KB大小的緩存,主要用於數據的網絡傳輸。在 TaskManager 啟動的時候就會分配。默認數量是 2048 個,可以通過 taskmanager.network.numberOfBuffers 來配置
    • Memory Manager Pool: 這是一個由 MemoryManager 管理的,由眾多MemorySegment組成的超大集合。Flink 中的算法(如 sort/shuffle/join)會向這個內存池申請 MemorySegment,將序列化后的數據存於其中,使用完后釋放回內存池。默認情況下,池子占了堆內存的 70% 的大小。
    • Remaining (Free) Heap: 這部分的內存是留給用戶代碼以及 TaskManager 的數據結構使用的,可以把這里看成的新生代。
  • 序列化與反序列化可以理解為編碼與解碼的過程。序列化以后的數據希望占用比較小的空間,而且數據能夠被正確地反序列化出來。為了能正確反序列化,序列化時僅存儲二進制數據本身肯定不夠,需要增加一些輔助的描述信息。此處可以采用不同的策略,因而產生了很多不同的序列化方法。Java本身自帶的序列化和反序列化的功能,但是輔助信息占用空間比較大,在序列化對象時記錄了過多的類信息。
  • Flink實現了自己的序列化框架,Flink處理的數據流通常是一種類型,所以可以只保存一份對象Schema信息,節省存儲空間。又因為對象類型固定,所以可以通過偏移量存取。
  • Java支持任意Java或Scala類型,類型信息由 TypeInformation 類表示,TypeInformation 支持以下幾種類型:
    • BasicTypeInfo: 任意Java 基本類型或 String 類型。
    • BasicArrayTypeInfo: 任意Java基本類型數組或 String 數組。
    • WritableTypeInfo: 任意 Hadoop Writable 接口的實現類。
    • TupleTypeInfo: 任意的 Flink Tuple 類型(支持Tuple1 to Tuple25)。Flink tuples 是固定長度固定類型的Java Tuple實現。
    • CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)。
    • PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java對象的所有成員變量,要么是 public 修飾符定義,要么有 getter/setter 方法。
    • GenericTypeInfo: 任意無法匹配之前幾種類型的類。
  • 針對前六種類型數據集,Flink皆可以自動生成對應的TypeSerializer,能非常高效地對數據集進行序列化和反序列化。對於最后一種數據類型,Flink會使用Kryo進行序列化和反序列化。每個TypeInformation中,都包含了serializer,類型會自動通過serializer進行序列化,然后用Java Unsafe接口寫入MemorySegments。如下圖展示 一個內嵌型的Tuple3<integer,double,person> 對象的序列化過程:

操縱二進制數據:

  • Flink 提供了如 group、sort、join 等操作,這些操作都需要訪問海量數據。以sort為例。
  • 首先,Flink 會從 MemoryManager 中申請一批 MemorySegment,用來存放排序的數據。

  • 這些內存會分為兩部分,一個區域是用來存放所有對象完整的二進制數據。另一個區域用來存放指向完整二進制數據的指針以及定長的序列化后的key(key+pointer)。將實際的數據和point+key分開存放有兩個目的。第一,交換定長塊(key+pointer)更高效,不用交換真實的數據也不用移動其他key和pointer。第二,這樣做是緩存友好的,因為key都是連續存儲在內存中的,可以增加cache命中。 排序會先比較 key 大小,這樣就可以直接用二進制的 key 比較而不需要反序列化出整個對象。訪問排序后的數據,可以沿着排好序的key+pointer順序訪問,通過 pointer 找到對應的真實數據。

Flink使用堆外內存:

  • 啟動超大內存(上百GB)的JVM需要很長時間,GC停留時間也會很長(分鍾級)。使用堆外內存可以極大地減小堆內存(只需要分配Remaining Heap),使得 TaskManager 擴展到上百GB內存不是問題。
  • 進行IO操作時,使用堆外內存可以zero-copy,使用堆內內存至少要復制一次。
  • 堆外內存在進程間是共享的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM