在閱讀本文之前,你應該閱讀過的系列:
前言
在介紹內存模型之前的基礎知識。
1. 堆內內存(on-heap memory)
1.1 什么是堆內內存
Java 虛擬機在執行Java程序的過程中會把它在主存中管理的內存部分划分成多個區域,每個區域存放不同類型的數據。下圖所示為java虛擬機運行的時候,主要的內存分區:
在這些分區中,占用內存空間最大的一部分叫做“堆(heap)”,也就是我們所說的堆內內存(on-heap memory)。java虛擬機中的“堆”主要是存放所有對象的實例。這一塊區域在java虛擬機啟動的時候被創建,被所有的線程所共享,同時也是垃圾收集器的主要工作區域,因此這一部分區域除了被叫做“堆內內存”以外,也被叫做“GC堆”(Garbage Collected Heap)。
1.2 堆內內存的垃圾回收
堆內內存是java垃圾收集器的主要工作區域,為了提高垃圾回收的效率,在堆內內存的內部又划分出了新生代、老年代和永久代。在新生代內存中又按照8:1:1的比例(java虛擬機默認分配比例為8:1:1,這個比例也可以自定義)划分出了Eden, Survivor1, Survivor2三個區域。
在執行垃圾回收算法的時候,不同的回收算法會對內存區域造成不一樣的影響。但是大部分的回收算法會造成堆內內存空間在物理上的不連續性。下面以最基本的垃圾回收算法“標記 - 清除算法”為例:
可以看到,內存區域在經過垃圾回收之后,產生大量不連續的內存空間。因此,java虛擬機中的堆內內存區域,只是邏輯上的連續,並不能保證物理上的連續性。所以,操作系統並不能直接得到堆內內存區域所存儲的數據在主存中的正確地址。在一些特定的時間點,Java虛擬機會進行一次徹底的垃圾回收(full gc)。徹底回收時,垃圾收集器會對所有分配的堆內內存進行完整的掃描,在掃描期間,絕大部分正在運行的java線程都會被暫時停止。這意味着:這樣一次垃圾收集對Java應用造成的影響,跟堆內內存所存儲的數據的多少是成正比的,過大的堆內內存會影響Java應用的性能。
2. 堆外內存(off-heap memory)
2.1 堆外內存的產生
為了解決堆內內存過大帶來的長時間的GC停頓的問題,以及操作系統對堆內內存不可知的問題,java虛擬機開辟出了堆外內存(off-heap memory)。堆外內存意味着把一些對象的實例分配在Java虛擬機堆內內存以外的內存區域,這些內存直接受操作系統(而不是虛擬機)管理。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響。同時因為這部分區域直接受操作系統的管理,別的進程和設備(例如GPU)可以直接通過操作系統對其進行訪問,減少了從虛擬機中復制內存數據的過程。
2.2 堆外內存的分配
java 在NIO 包中提供了ByteBuffer類,對堆外內存進行訪問。下圖為NIO包中ByteBuffer的層次繼承關系
使用下面的方式,可以直接開辟指定大小的對外內存:
import sun.nio.ch.DirectBuffer;
import java.nio.ByteBuffer;
public class TestDirectByteBuffer {
public static void main(String[] args) throws Exception {
while (true) {
ByteBuffer buffer = ByteBuffer.allocateDirect(10 * 1024 * 1024);
}
}
}
這樣我們就開辟出了一塊大小為10M的堆外內存。
3. 堆外內存的優缺點以及與堆內內存聯系
3.1堆外內存的優缺點
優點 :
可以很方便的自主開辟很大的內存空間,對大內存的伸縮性很好
減少垃圾回收帶來的系統停頓時間
直接受操作系統控制,可以直接被其他進程和設備訪問,減少了原本從虛擬機復制的過程
特別適合那些分配次數少,讀寫操作很頻繁的場景
缺點 :
容易出現內存泄漏,並且很難排查
堆外內存的數據結構不直觀,當存儲結構復雜的對象時,會浪費大量的時間對其進行串行化。
3.2 堆內內存與堆外內存的聯系
雖然堆外內存本身不受垃圾回收算法的管轄,但是因為其是由ByteBuffer所創造出來的,因此這個buffer自身作為一個實例化的對象,其自身的信息(例如堆外內存在主存中的起始地址等信息)必須存儲在堆內內存中,具體情況如下圖所示。
當在堆內內存中存放的buffer對象實例被垃圾回收算法回收掉的時候,這個buffer對應的堆外內存區域同時也就被釋放掉了。
0 簡介
首先,Flink 使用自主的內存管理:
JVM 內存管理的不足
1)Java 對象存儲密度低。Java 的對象在內存中存儲包含 3 個主要部分:對象頭、實例數據、對齊填充部分。例如,一個只包含 boolean 屬性的對象占 16byte:對象頭占 8byte,boolean 屬性占 1byte,為了對齊達到 8 的倍數額外占 7byte。而實際上只需要一個 bit(1/8字節)就夠了。
2)Full GC 會極大地影響性能。尤其是為了處理更大數據而開了很大內存空間的 JVM來說,GC 會達到秒級甚至分鍾級。
3)OOM 問題影響穩定性。OutOfMemoryError 是分布式計算框架經常會遇到的問題,當JVM中所有對象大小超過分配給JVM的內存大小時,就會發生OutOfMemoryError錯誤,導致 JVM 崩潰,分布式框架的健壯性和性能都會受到影響。
4)緩存未命中問題。CPU 進行計算的時候,是從 CPU 緩存中獲取數據。現代體系的CPU 會有多級緩存,而加載的時候是以 Cache Line 為單位加載。如果能夠將對象連續存儲,這樣就會大大降低 Cache Miss。使得 CPU 集中處理業務,而不是空轉。(Java 對象在堆上存儲的時候並不是連續的,所以從內存中讀取 Java 對象時,緩存的鄰近的內存區域的數據往往不是 CPU 下一步計算所需要的,這就是緩存未命中。此時 CPU 需要空轉等待從內存中重新讀取數據。)
Flink 並不是將大量對象存在堆內存上,而是將對象都序列化到一個預分配的內存塊上,這個內存塊叫做 MemorySegment,它代表了一段固定長度的內存(默認大小為 32KB),也是 Flink 中最小的內存分配單元,並且提供了非常高效的讀寫方法,很多運算可以直接操作二進制數據,不需要反序列化即可執行。每條記錄都會以序列化的形式存儲在一個或多個MemorySegment 中。
1 內存模型
1.1 JobManager 內存模型
JobManagerFlinkMemory.java
在 1.10 中,Flink 統一了 TM 端的內存管理和配置,相應的在 1.11 中,Flink 進一步對 JM 端的內存配置進行了修改,使它的選項和配置方式與 TM 端的配置方式保持一致。
1.2 TaskManager 內存模型
Flink 1.10 對 TaskManager 的內存模型和 Flink 應用程序的配置選項進行了重大更改,讓用戶能夠更加嚴格地控制其內存開銷。
TaskExecutorFlinkMemory.java
JVM Heap:JVM 堆上內存
- Framework Heap Memory:Flink 框架本身使用的內存,即 TaskManager 本身所占用的堆上內存,不計入 Slot 的資源中。配置參數:taskmanager.memory.framework.heap.size=128MB,默認 128MB。
- Task Heap Memory:Task 執行用戶代碼時所使用的堆上內存。配置參數:taskmanager.memory.task.heap.size
Off-Heap Mempry:JVM 堆外內存
- DirectMemory:JVM 直接內存
1)Framework Off-Heap Memory:Flink框架本身所使用的內存,即TaskManager本身所占用的對外內存,不計入 Slot 資源。配置參數:taskmanager.memory.framework.off-heap.size=128MB,默認128MB。
2)Task Off-Heap Memory:Task 執行用戶代碼所使用的對外內存。配置參數:taskmanager.memory.task.off-heap.size=0,默認 0
3)Network Memory:網絡數據交換所使用的堆外內存大小,如網絡數據交換緩沖區
配置參數:
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
Managed Memory:Flink 管理的堆外內存,用於排序、哈希表、緩存中間結果及RocksDB State Backend 的本地內存。
配置參數:
taskmanager.memory.managed.fraction=0.4
taskmanager.memory.managed.size
- JVM specific memory:JVM 本身使用的內存
- JVM metaspace:JVM 元空間
- JVM over-head 執行開銷:JVM 執行時自身所需要的內容,包括線程堆棧、IO、編譯緩存等所使用的內存。
配置參數:
taskmanager.memory.jvm-overhead.min=192mb
taskmanager.memory.jvm-overhead.max=1gb
taskmanager.memory.jvm-overhead.fraction=0.1
總體內存
總進程內存:Flink Java 應用程序(包括用戶代碼)和 JVM 運行整個進程所消耗的總內存。
總進程內存 = Flink 使用內存 + JVM 元空間 + JVM 執行開銷
配置項:taskmanager.memory.process.size: 1728m
Flink 總內存:僅 Flink Java 應用程序消耗的內存,包括用戶代碼,但不包括 JVM為其運行而分配的內存。
Flink 使用內存:框架堆內外 + task 堆內外 + network + manage。配置項:taskmanager.memory.flink.size: 1280m
說明:配置項詳細信息查看如下鏈接:https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#memory-configuration
1.3 內存分配
JobManager 內存分配
YarnClusterDescriptor.java
JobManagerProcessUtils.java
- TaskManager 內存分配
ActiveResourceManager.java
TaskExecutorProcessUtils.java
2 內存數據結構
內存段
內存段在 Flink 內部叫 MemorySegment,是 Flink 中最小的內存分配單元,默認大小 32KB。它即可以是堆上內存(Java 的 byte 數組),也可以是堆外內存(基於 Netty 的 DirectByteBuffer),同時提供了對二進制數據進行讀取和寫入的方法。
HeapMemorySegment:用來分配堆上內存
HybridMemorySegment:用來分配堆外內存和堆上內存,2017 年以后的版本實際上只使用了 HybridMemorySegment。
如下圖展示一個內嵌型的 Tuple3< Integer,Double,Person> 對象的序列化過程:
可以看出這種序列化方式存儲密度是相當緊湊的。其中 int 占 4 字節,double 占 8 字節,POJO 多個一個字節的 header,PojoSerializer 只負責將 header 序列化進去,並委托每個字段對應的 serializer 對字段進行序列化。
內存頁
內存頁是 MemorySegment 之上的數據訪問視圖,數據讀取抽象為 DataInputView,數據寫入抽象為 DataOutputView。使用時就無需關心 MemorySegment 的細節,會自 動處理跨 MemorySegment 的讀取和寫入。
Buffer
Task 算子之間在網絡層面上傳輸數據,使用的是 Buffer,申請和釋放由 Flink自行管理,實現類為 NetworkBuffer。1 個 NetworkBuffer 包裝了 1 個MemorySegment。同時繼承了 AbstractReferenceCountedByteBuf,是 Netty 中的抽象類。
Buffer 資源池
BufferPool 用來管理 Buffer,包含 Buffer 的申請、釋放、銷毀、可用 Buffer 通知等,實現類是 LocalBufferPool,每個 Task 擁有自己的 LocalBufferPool。
BufferPoolFactory 用 來 提 供 BufferPool 的 創 建 和 銷 毀 , 唯 一 的 實 現 類 是NetworkBufferPool , 每 個 TaskManager 只 有 一 個 NetworkBufferPool 。同一個TaskManager 上的 Task 共享 NetworkBufferPool,在 TaskManager 啟動的時候創建並分配內存。
3 內存管理器
MemoryManager 用來管理 Flink 中用於排序、Hash 表、中間結果的緩存或使用堆外內存的狀態后端(RocksDB)的內存。1.10 之前版本,負責 TaskManager 所有內存。1.10 版本開始,管理范圍是 Slot 級別。
堆外內存資源申請:
MemoryManager.java
MemorySegmentFactory.java
RocksDB 自己負責內存申請和釋放
RocksDBOperationUtils.java
MemoryManager.java
4 網絡傳輸中的內存管理
網絡上傳輸的數據會寫到 Task 的 InputGate(IG)中,經過 Task 的處理后,再由 Task寫到 ResultPartition(RS) 中。每個 Task 都包括了輸入和輸入,輸入和輸出的數據存在Buffer 中(都是字節數據)。Buffer 是 MemorySegment 的包裝類。
1)TaskManager(TM)在啟動時,會先初始化 NetworkEnvironment 對象,TM 中所有與網絡相關的東西都由該類來管理(如 Netty 連接),其中就包括 NetworkBufferPool。根據配置, Flink 會 在 NetworkBufferPool 中 生 成 一 定 數 量 ( 默 認 2048 ) 的 內 存 塊MemorySegment(關於 Flink 的內存管理,后續文章會詳細談到),內存塊的總數量就代表了網絡傳輸中所有可用的內存。NetworkEnvironment 和 NetworkBufferPool 是 Task 之間共享的,每個 TM 只會實例化一個。
2)Task 線程啟動時,會向 NetworkEnvironment 注冊,NetworkEnvironment 會為 Task的 InputGate(IG)和 ResultPartition(RP) 分別創建一個 LocalBufferPool(緩沖池)並設置可申請的 MemorySegment(內存塊)數量。IG 對應的緩沖池初始的內存塊數量與 IG 中InputChannel 數量一致,RP 對應的緩沖池初始的內存塊數量與 RP 中的 ResultSubpartition數量一致。不過,每當創建或銷毀緩沖池時,NetworkBufferPool 會計算剩余空閑的內存塊數量,並平均分配給已創建的緩沖池。注意,這個過程只是指定了緩沖池所能使用的內存塊數量,並沒有真正分配內存塊,只有當需要時才分配。為什么要動態地為緩沖池擴容呢?因為內存越多,意味着系統可以更輕松地應對瞬時壓力(如 GC),不會頻繁地進入反壓狀態,所以我們要利用起那部分閑置的內存塊。
3)在 Task 線程執行過程中,當 Netty 接收端收到數據時,為了將 Netty 中的數據拷貝到 Task 中,InputChannel(實際是 RemoteInputChannel)會向其對應的緩沖池申請內存塊(上圖中的①)。如果緩沖池中也沒有可用的內存塊且已申請的數量還沒到池子上限,則會向 NetworkBufferPool 申請內存塊(上圖中的②)並交給 InputChannel 填上數據(上圖中的③和④)。如果緩沖池已申請的數量達到上限了呢?或者 NetworkBufferPool 也沒有可用內存塊了呢?這時候,Task 的 Netty Channel 會暫停讀取,上游的發送端會立即響應停止發送,拓撲會進入反壓狀態。當 Task 線程寫數據到 ResultPartition 時,也會向緩沖池請求內存塊,如果沒有可用內存塊時,會阻塞在請求內存塊的地方,達到暫停寫入的目的。
4)當一個內存塊被消費完成之后(在輸入端是指內存塊中的字節被反序列化成對象了,在輸出端是指內存塊中的字節寫入到 Netty Channel 了),會調用 Buffer.recycle() 方法,會將內存塊還給 LocalBufferPool (上圖中的⑤)。如果 LocalBufferPool 中當前申請的數量超過了池子容量(由於上文提到的動態容量,由於新注冊的 Task 導致該池子容量變小),則LocalBufferPool 會將該內存塊回收給 NetworkBufferPool(上圖中的⑥)。如果沒超過池子容量,則會繼續留在池子中,減少反復申請的開銷。
反壓的過程
1)記錄“A”進入了 Flink 並且被 Task 1 處理。(這里省略了 Netty 接收、反序列化等過程)
2)記錄被序列化到 buffer 中。
3)該 buffer 被發送到 Task 2,然后 Task 2 從這個 buffer 中讀出記錄。
記錄能被 Flink 處理的前提是:必須有空閑可用的 Buffer。
結合上面兩張圖看:Task 1 在輸出端有一個相關聯的 LocalBufferPool(稱緩沖池 1),Task 2 在輸入端也有一個相關聯的 LocalBufferPool(稱緩沖池 2)。如果緩沖池 1 中有空閑可用的 buffer 來序列化記錄 “A”,我們就序列化並發送該 buffer。
注意兩個場景:
1)本地傳輸:如果 Task 1 和 Task 2 運行在同一個 worker 節點(TaskManager),該buffer 可以直接交給下一個 Task。一旦 Task 2 消費了該 buffer,則該 buffer 會被緩沖池 1回收。如果 Task 2 的速度比 1 慢,那么 buffer 回收的速度就會趕不上 Task 1 取 buffer的速度,導致緩沖池 1 無可用的 buffer,Task 1 等待在可用的 buffer 上。最終形成 Task 1的降速。
2)遠程傳輸:如果 Task 1 和 Task 2 運行在不同的 worker 節點上,那么 buffer 會在發送到網絡(TCP Channel)后被回收。在接收端,會從 LocalBufferPool 中申請 buffer,然后拷貝網絡中的數據到 buffer 中。如果沒有可用的 buffer,會停止從 TCP 連接中讀取數據。在輸出端,通過 Netty 的水位值機制來保證不往網絡中寫入太多數據(后面會說)。如果網絡中的數據(Netty 輸出緩沖中的字節數)超過了高水位值,我們會等到其降到低水位值以下才繼續寫入數據。這保證了網絡中不會有太多的數據。如果接收端停止消費網絡中的數據(由於接收端緩沖池沒有可用 buffer),網絡中的緩沖數據就會堆積,那么發送端也會暫停發送。另外,這會使得發送端的緩沖池得不到回收,writer 阻塞在向 LocalBufferPool 請求buffer,阻塞了 writer 往 ResultSubPartition 寫數據。
這種固定大小緩沖池就像阻塞隊列一樣,保證了 Flink 有一套健壯的反壓機制,使得Task 生產數據的速度不會快於消費的速度。我們上面描述的這個方案可以從兩個 Task 之間的數據傳輸自然地擴展到更復雜的 pipeline 中,保證反壓機制可以擴散到整個 pipeline
你好,我是王知無,一個大數據領域的硬核原創作者。
做過后端架構、數據中間件、數據平台&架構、算法工程化。
專注大數據領域實時動態&技術提升&個人成長&職場進階,歡迎關注。