spark 源碼分析之十七 -- Spark磁盤存儲剖析


上篇文章 spark 源碼分析之十六 -- Spark內存存儲剖析 主要剖析了Spark 的內存存儲。本篇文章主要剖析磁盤存儲。

總述

磁盤存儲相對比較簡單,相關的類關系圖如下:

 

 

我們先從依賴類 DiskBlockManager 剖析。

 

DiskBlockManager

文檔說明如下:

Creates and maintains the logical mapping between logical blocks and physical on-disk locations. 
One block is mapped to one file with a name given by its BlockId.
Block files are hashed among the directories listed in spark.local.dir (or in SPARK_LOCAL_DIRS, if it's set).

創建並維護邏輯block和block落地的物理文件的映射關系。一個邏輯block通過它的BlockId的name屬性映射到具體的文件。

 

類結構

其類結構如下:

可以看出,這個類主要用於創建並維護邏輯block和block落地文件的映射關系。保存映射關系,有兩個解決方案:一者是使用Map存儲每一條具體的映射鍵值對,二者是指定映射函數像分區函數等等,給定的key通過映射函數映射到具體的value。

成員變量

成員變量如下:

subDirsPerLocalDir:這個變量表示本地文件下有幾個文件,默認為64,根據參數 spark.diskStore.subDirectories 來調節。

subDirs:是一個二維數組表示本地目錄和子目錄名稱的組合關系,即 ${本地目錄1 ... 本地目錄n}/${子目錄1 ... 子目錄64}

localDirs:表示block落地本地文件根目錄,通過 createLocalDirs 方法獲取,方法如下:

思路:它先調用調用Utils的 getConfiguredLocalDirs 方法,獲取到配置的目錄集合,然后map每一個父目錄,調用Utils的createDirectory方法,在每一個子目錄下創建一個 以blockmgr 為前綴的目錄。其依賴方法 createDirectory 如下:

這個方法允許重試次數為10,目的是為了防止創建的目錄跟已存在的目錄重名。

 

getConfiguredLocalDirs 方法如下:

大多數生產情況下,都是使用yarn,我們直接看一下spark on yarn 環境下,目錄到底在哪里。直接來看getYarnLocalDirs方法:

LOCAL_DIRS的定義是什么?

任務是跑在yarn 上的,下面就去定位一下hadoop yarn container的相關源碼。

定位LOCAL_DIRS環境變量

在ContainerLaunch類的 sanitizeEnv 方法中,找到了如下語句:

 

addToMap 方法如下:

即,數據被添加到了envirment map變量和 nmVars set集合中了。

在ContainerLaunch 的 call 方法中調用了 sanitizeEnv 方法:

appDirs變量定義如下:

即每一個 appDir格式如下:${localDir}/usercache/${user}/appcache/${application-id}/

localDirs 定義如下:

dirHandler是一個 LocalDirsHandlerService 類型變量,這是一個服務,在其serviceInit方法中,實例化了 MonitoringTimerTask對象:

在 MonitoringTimerTask 構造方法中,發現了:

 NM_LOCAL_DIRS 常量定義如下:

 

即:yarn.nodemanager.local-dirs 參數,該參數定義在yarn-default.xml下。

即localDir如下:

${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/${application-id}/

再結合createDirectory方法,磁盤存儲的本地目錄是:

 ${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/${application-id}/blockmgr-隨機的uuid/

核心方法

根據文件內容創建File對象,如下:

思路:先根據filename即blockId的name字段生成正的hashcode(abs(hashcode))

dirId 是指的第幾個父目錄(從0開始數),subDirId是指的父目錄下的第幾個子目錄(從0開始數)。最后拼接父子目錄為一個新的父目錄subDir。

然后以subDir為父目錄,創建File對象,並返回之。

跟getFile 方法相關的方法如下:

比較簡單,不做過多說明。

 

創建一個臨時Block,包括臨時本地block 或 shuffle block,如下:

 

還有一個方法,是停止 DiskBlockManager之后的回調方法:

若deleteFilesOnStop 為 true,即DiskBlockManager停止時,是否需要清除本地存儲的block文件。

在 BlockManager 中初始化DiskBlockManager時,deleteFilesOnStop 通過構造方法傳入

 

總結:DiskBlockManager 是用來創建並維護邏輯block和落地后的block文件的映射關系的,它還負責創建用於shuffle或本地的臨時文件。

 下面看一下在DiskStore中可能會用到的類以及其相關類的說明。

CountingWritableChannel 

它主要對sink做了包裝,在寫入sink的同時,還記錄向sink寫的數據的總量。源碼如下:

代碼比較簡單,不做過多說明。

ManagedBuffer

類說明如下:

This interface provides an immutable view for data in the form of bytes. 
The implementation should specify how the data is provided:
- FileSegmentManagedBuffer: data backed by part of a file
- NioManagedBuffer: data backed by a NIO ByteBuffer
- NettyManagedBuffer: data backed by a Netty ByteBuf
The concrete buffer implementation might be managed outside the JVM garbage collector.
For example, in the case of NettyManagedBuffer, the buffers are reference counted.
In that case, if the buffer is going to be passed around to a different thread, retain/release should be called.

 

類結構如下:

 

EncryptedManagedBuffer

它是一個適配器,它將幾乎所以轉換的請求委托給了 blockData,下面來看一下這個類相關的剖析。

首先先看一下它的父類 -- BlockData

 

BlockData

接口說明如下:

它是一個接口,它定義了存儲方式以及如何提供不同的方式來讀去底層的block 數據。

定義方法如下:

方法說明如下:

toInputStream用於返回用於讀取該文件的輸入流。

toNetty用於返回netty對block數據的包裝類,方便netty包來讀取數據。

toChunkedByteBuffer用於將block包裝成ChunkedByteBuffer。

toByteBuffer 用於將block數據轉換為內存中直接讀取的 ByteBuffer 對象。

當對該block的操作執行完畢后,需要調用dispose來做后續的收尾工作。

size表示block文件的大小。

它有三個子類:DiskBlockData、EncryptedBlockData和ByteBufferBlockData。

即block的三種存在形式:磁盤、加密后的block和內存中的ByteBuffer

分別介紹如下:

 

DiskBlockData

該類主要用於將磁盤中的block文件轉換為指定的流或對象。

先來看其簡單的方法實現:

構造方法:

相關字段說明如下:

minMemoryMapBytes表示 磁盤block映射到內存塊中最小大小,默認為2MB,可以通過 spark.storage.memoryMapThreshold 進行調整。

maxMemoryMapBytes表示 磁盤block映射到內存塊中最大大小,默認為(Integer.MAX_VALUE - 15)B,可以通過 spark.storage.memoryMapLimitForTests 進行調整。

對應源碼如下:

比較簡單的方法如下: 

size方法直接返回block文件的大小。

dispose空實現。

open是一個私有方法,主要用於獲取讀取該block文件的FileChannel對象。

 

toByteBuffer方法實現如下:

 

Utils的tryWithResource方法如下,它先執行createResource方法,然后執行Function對象的apply方法,最終釋放資源,思路就是 創建資源 --使用資源-- 釋放資源三步曲:

即先獲取讀取block文件的FileChannel對象,若blockSize 小於 最小的內存映射字節大小,則將channel的數據讀取到buffer中,返回的是HeapByteBuffer對象,即數據被寫入到了堆里,即它是non-direct buffer,相當於數據被讀取到中間臨時內存中,否則使用FileChannelImpl的map方法返回 MappedByteBuffer 對象。

MappedByteBuffer文檔說明如下:

A direct byte buffer whose content is a memory-mapped region of a file.
Mapped byte buffers are created via the FileChannel.map method. This class extends the ByteBuffer class with operations that are specific to memory-mapped file regions.
A mapped byte buffer and the file mapping that it represents remain valid until the buffer itself is garbage-collected.
The content of a mapped byte buffer can change at any time, for example if the content of the corresponding region of the mapped file is changed by this program or another. Whether or not such changes occur, and when they occur, is operating-system dependent and therefore unspecified. 
All or part of a mapped byte buffer may become inaccessible at any time, for example if the mapped file is truncated. An attempt to access an inaccessible region of a mapped byte buffer will not change the buffer's content and will cause an unspecified exception to be thrown either at the time of the access or at some later time. It is therefore strongly recommended that appropriate precautions be taken to avoid the manipulation of a mapped file by this program, or by a concurrently running program, except to read or write the file's content.
Mapped byte buffers otherwise behave no differently than ordinary direct byte buffers.

 

它是direct buffer,即直接從磁盤讀數據,不經過中間臨時內存,可以參照ByteBuffer的文檔對Direct vs. non-direct buffers 的說明如下:

Direct vs. non-direct buffers
A byte buffer is either direct or non-direct. Given a direct byte buffer, the Java virtual machine will make a best effort to perform native I/O operations directly upon it. That is, it will attempt to avoid copying the buffer's content to (or from) an intermediate buffer before (or after) each invocation of one of the underlying operating system's native I/O operations.
A direct byte buffer may be created by invoking the allocateDirect factory method of this class. The buffers returned by this method typically have somewhat higher allocation and deallocation costs than non-direct buffers. The contents of direct buffers may reside outside of the normal garbage-collected heap, and so their impact upon the memory footprint of an application might not be obvious. It is therefore recommended that direct buffers be allocated primarily for large, long-lived buffers that are subject to the underlying system's native I/O operations. In general it is best to allocate direct buffers only when they yield a measureable gain in program performance.
A direct byte buffer may also be created by mapping a region of a file directly into memory. An implementation of the Java platform may optionally support the creation of direct byte buffers from native code via JNI. If an instance of one of these kinds of buffers refers to an inaccessible region of memory then an attempt to access that region will not change the buffer's content and will cause an unspecified exception to be thrown either at the time of the access or at some later time.
Whether a byte buffer is direct or non-direct may be determined by invoking its isDirect method. This method is provided so that explicit buffer management can be done in performance-critical code. 

 

toChunkedByteBuffer 方法如下:

首先,ChunkedByteBuffer對象里包含的是數據分成多個小的chunk,而不是連續的數組。

先把文件讀到內存中的 HeapByteBuffer 對象中即單個chunk,然后放入存放chunk的ListBuffer中,最終轉換為Array存入到ChunkedByteBuffer 對象中。

toNetty實現如下:

DefaultFileRegion說明請繼續向下看,先不做過多說明。

 

EncryptedBlockData

這個類主要是用於加密的block磁盤文件轉換為特定的流或對象。

構造方法如下:

file指block文件,blockSize指block文件大小,key是用於加密的密鑰。

先來看三個比較簡單的方法:

 

open方法不再直接根據FileInputStream獲取其 FileChannelImpl 對象了,而是獲取 FileChannelImpl 之后,再調用了 CryptoStreamUtils 的 createReadableChannel 方法,如下:

進一步將channel 對象封裝為 CryptoInputStream 對象,對ErrorHandlingReadableChannel的讀操作,實際上是讀的 CryptoInputStream,這個流內部有一個根據key來初始化的加密器,這個加密器負責對數據的解密操作。

 

toByteBuffer方法如下:

思路:如果block數據大小在整數范圍內,則直接將加密的block解密之后存放在內存中。

toChunkedByteBuffer方法除了解密操作外,跟DiskBlockData 中toChunkedByteBuffer方法無異,不做過多說明,代碼如下:

toNetty 方法,源碼如下:

ReadableChannelFileRegion類在下文介紹,先不做過多說明。

 

toInputStream方法,源碼如下:

思路:這個就不能直接open方法返回的獲取inputStream,因為 CryptoInputStream 是沒有獲取inputStream的接口的,Channels.newInputStream返回的是ChannelInputStream,ChannelInputStream對channel做了裝飾。

ByteBufferBlockData

整體比較簡單,主要來看一下dispose方法,ChunkedByteBuffer 方法的 dispose 如下:

即使用StorageUtils的dispose 方法去清理每一個chunk,StorageUtils的dispose 方法如下:

即獲取它的cleaner,然后調用cleaner的clean方法。我們以 DirectByteBufferR 為例,做進一步說明:

在其構造方法中初始化Cleaner,如下:

base是調用unsafe類的靜態方法allocateMemory分配指定大小內存后返回的內存地址,size是內存大小。

類聲明:

沒錯它是一個虛引用,隨時會被垃圾回收。

 

Cleaner的構造方法如下:

var1 是待清理的對象,var2 是執行清理任務的Runnable對象。

再看它的成員變量:

沒錯,它自己本身就是雙向鏈表上的一個節點,也是雙向鏈表。

 其create 方法如下:

思路:創建cleanr並把它加入到雙向鏈表中。

 

Cleaner的 clean方法如下:

它會先調用remove 方法,調用成功則執行內存清理任務,注意這里沒有異步任務同步調用Runnable的run方法。

remove 方法如下:

思路:從雙向鏈表中移除指定的cleaner。

Deallocator 類如下:

unsafe的allocateMemory方法使用了off-heap memory,這種方式的內存分配不是在堆里,不受GC的管理,使用Unsafe.freeMemory()來釋放它。

先調用 unsafe釋放內存,然后調用Bits的 unreserveMemory 方法:

至此,dispose 方法結束。

 

 

下面看一下,ReadableChannelFileRegion的繼承關系:

我們按繼承關系來看類: ReferenceCounted --> FileRegion --> AbstractReferenceCounted --> AbstractFileRegion --> ReadableChannelFileRegion。

ReferenceCounted

類說明如下:

A reference-counted object that requires explicit deallocation.
When a new ReferenceCounted is instantiated, it starts with the reference count of 1. 
retain() increases the reference count, and release() decreases the reference count.
If the reference count is decreased to 0, the object will be deallocated explicitly,
and accessing the deallocated object will usually result in an access violation. If an object that implements ReferenceCounted is a container of other objects that implement ReferenceCounted,
the contained objects will also be released via release() when the container's reference count becomes 0.

這是netty包下的一個接口。

它是一個引用計數對象,需要顯示調用deallocation。

ReferenceCounted對象實例化時,引用計數設為1,調用retain方法增加引用計數,release方法則釋放引用計數。

如果引用計數減少至0,對象會被顯示deallocation,訪問已經deallocation的對象會造成訪問問題。

如果一個對象實現了ReferenceCounted接口的容器包含了其他實現了ReferenceCounted接口的對象,當容器的引用減少為0時,被包含的對象也需要通過 release 方法釋放之,即引用減1。

主要有三類核心方法:

retain:Increases the reference count by 1 or the specified increment.

touch:Records the current access location of this object for debugging purposes. If this object is determined to be leaked, the information recorded by this operation will be provided to you via ResourceLeakDetector. This method is a shortcut to touch(null).

release:Decreases the reference count by 1 and deallocates this object if the reference count reaches at 0. Returns true if and only if the reference count became 0 and this object has been deallocated

refCnt:Returns the reference count of this object. If 0, it means this object has been deallocated.

FileRegion

它也是netty下的一個包,FileRegion數據通過支持零拷貝的channel將數據傳輸到目標channel。

A region of a file that is sent via a Channel which supports zero-copy file transfer .

 

注意:文件零拷貝傳輸對JDK版本和操作系統是有要求的:

FileChannel.transferTo(long, long, WritableByteChannel) has at least four known bugs in the old versions of Sun JDK and perhaps its derived ones. Please upgrade your JDK to 1.6.0_18 or later version if you are going to use zero-copy file transfer.
If your operating system (or JDK / JRE) does not support zero-copy file transfer, sending a file with FileRegion might fail or yield worse performance. For example, sending a large file doesn't work well in Windows.
Not all transports support it

 

接口結構如下:

下面對新增方法的解釋:

count:Returns the number of bytes to transfer.

position:Returns the offset in the file where the transfer began.

transferred:Returns the bytes which was transfered already.

transferTo:Transfers the content of this file region to the specified channel.

AbstractReferenceCounted

這個類是通過一個變量來記錄引用的增加或減少情況。

類結構如下:

先來看成員變量:

refCnt就是內部記錄引用數的一個volatile類型的變量,refCntUpdater是一個 AtomicIntegerFieldUpdater 類型常量,AtomicIntegerFieldUpdater 基於反射原子性更新某個類的 volatile 類型成員變量。

A reflection-based utility that enables atomic updates to designated volatile int fields of designated classes. 
This class is designed for use in atomic data structures in which several fields of the same node are independently subject to atomic updates. Note that the guarantees of the compareAndSet method in this class are weaker than in other atomic classes.
Because this class cannot ensure that all uses of the field are appropriate for purposes of atomic access,
it can guarantee atomicity only with respect to other invocations of compareAndSet and set on the same updater.

 

方法如下:

1. 設置或獲取 refCnt 變量

2. 增加引用:

3. 減少引用:

 

AbstractFileRegion

AbstractFileRegion 繼承了AbstractReferenceCounted, 但他還是一個抽象類,只是實現了部分的功能,如下:

DefaultFileRegion

 文檔說明如下:

Default FileRegion implementation which transfer data from a FileChannel or File. 
Be aware that the FileChannel will be automatically closed once refCnt() returns 0.

 

先來看一下它主要的成員變量:

f:是指要傳輸的源文件。

file:是指要傳輸的源FileChannel

position:傳輸開始的字節位置

count:總共需要傳輸的字節數量

transferred:指已經傳輸的字節數量

 

關鍵方法 transferTo 的源碼如下:

思路:先計算出剩余需要傳輸的字節的總大小。然后從 position 的相對位置開始傳輸到指定的target sink。

注意:position是指相對於position最初開始位置的大小,絕對位置為 this.position + position。

其中,open 方法如下,它返回一個隨機讀取文件的 FileChannel 對象。

其deallocate 方法如下:

思路:直接關閉,取消成員變量對於FileChannel的引用,便於垃圾回收時可以回收FileChannel,然后關閉FileChannel即可。

 

總結:它通過 RandomeAccessFile 獲取 可以支持隨機訪問 FileChannelImpl 的FileChannel,然后根據相對位置計算出絕對位置以及需要傳輸的字節總大小,最后將數據傳輸到target。

其引用計數的處理調用其父類 AbstractReferenceCounted的對應方法。

ReadableChannelFileRegion

其源碼如下:

其內部的buffer 的大小時 64KB,_traferred 變量記錄了已經傳輸的字節數量。ReadableByteChannel 是按順序讀的,所以pos參數沒有用。

 

下面,重點對DiskStore做一下剖析。 

DiskStore

它就是用來保存block 到磁盤的。

 構造方法如下:

它有三個成員變量:

blockSizes 記錄了每一個block 的blockId 和其大小的關系。可以通過get 方法獲取指定blockId 的block大小。如下:

 

putBytes方法如下:

putBytes將數據寫入到磁盤中;getBytes獲取的是BlockData數據,注意現在只是返回文件的引用,文件的內容並沒有返回,使得上文所講的多種多樣的BlockData轉換操作直接對接FileChannel,即本地文件,可以充分發揮零拷貝等特性,數據傳輸效率會更高。

其中put 方法如下:

思路很簡單,先根據diskManager獲取到block在磁盤中的文件的抽象 -- File對象,然后獲取到filechannel,調用回調函數將數據寫入到本地block文件中,最后記錄block和其block大小,最后關閉out channel。如果中途拋出異常,則格式化已寫入的數據,確保數據的寫入是原子化操作(要么全成功,要么全失敗)。

put方法依賴的方法如下:

openForWrite方法,先獲取filechannel,然后如果數據有加密,在創建加密的channel用來處理加密的數據

總結:本篇文章介紹了維護blockId和block物理文件的映射關系的DiskBlockManager;Hadoop yarn定位LOCAL_DIRS環境變量是如何定義的;定義了block的存儲方式以及轉換成流或channel或其他對象的BlockData接口以及它的三個具體的實現,順便介紹了directByteBuffer內存清理機制--Cleaner以及相關類的解釋;用作數據傳輸的DefaultFileRegion和ReadableChannelFileRegion類以及其相關類;最后介紹了磁盤存儲里的重頭戲--DiskStore,並重點介紹了其用於存儲數據和刪除數據的方法。

不足之處:本篇文章對磁盤IO中的nio以及netty中的相關類介紹的不是很詳細,可以閱讀相關文檔做進一步理解。畢竟如何高效地和磁盤打交道也是比較重要的技能。后面有機會可能會對java的集合io多線程jdk部分的源碼做一次徹底剖析,但那是后話了。目前打算先把spark中認為自己比較重要的梳理一遍。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM