Spark Shuffle 堆外內存溢出問題與解決(Shuffle通信原理)


 

Spark Shuffle 堆外內存溢出問題與解決(Shuffle通信原理)

http://xiguada.org/spark-shuffle-direct-buffer-oom/

 

問題描述

Spark-1.6.0已經在一月份release,為了驗證一下它的性能,我使用了一些大的SQL驗證其性能,其中部分SQL出現了Shuffle失敗問題,詳細的堆棧信息如下所示:

16/02/17 15:36:36 WARN server.TransportChannelHandler: Exception in connection from /10.196.134.220:7337

java.lang.OutOfMemoryError: Direct buffer memory

    at java.nio.Bits.reserveMemory(Bits.java:658)

    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)

    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)

    at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:645)

    at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:228)

    at io.netty.buffer.PoolArena.allocate(PoolArena.java:212)

    at io.netty.buffer.PoolArena.allocate(PoolArena.java:132)

    at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271)

    at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)

    at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)

    at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)

    at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)

    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)

    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)

    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)

    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)

    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)

    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)

    at java.lang.Thread.run(Thread.java:744)

 

從失敗信息可以看出,是堆外內存溢出問題,為什么會出現堆外內存溢出呢?

  Sparkshuffle部分使用了netty框架進行網絡傳輸,但netty會申請堆外內存緩存(PooledByteBufAllocator ,AbstractByteBufAllocator);Shuffle時,每個Reduce都需要獲取每個map對應的輸出,當一個reduce需要獲取的一個map數據比較大(比如1G),這時候就會申請一個1G的堆外內存,而堆外內存是有限制的,這時候就出現了堆外內存溢出。

Shuffle不使用堆外內存

Executor增加配置-Dio.netty.noUnsafe=true,就可以讓shuffle不使用堆外內存,但相同的作業還是出現了OOM,這種方式沒辦法解決問題。

java.lang.OutOfMemoryError: Java heap space

        at io.netty.buffer.PoolArena$HeapArena.newUnpooledChunk(PoolArena.java:607)

        at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)

        at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)

        at io.netty.buffer.PoolArena.allocate(PoolArena.java:132)

        at io.netty.buffer.PooledByteBufAllocator.newHeapBuffer(PooledByteBufAllocator.java:256)

        at io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:136)

        at io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:127)

        at io.netty.buffer.CompositeByteBuf.allocBuffer(CompositeByteBuf.java:1347)

        at io.netty.buffer.CompositeByteBuf.consolidateIfNeeded(CompositeByteBuf.java:276)

        at io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:116)

        at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:148)

        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:82)

        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)

        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)

        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)

        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)

        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)

        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)

 

 

當數據量大時能否直接寫磁盤

MapReduceShuffle數據量大時,會把Shuffle數據寫到磁盤。

 

Spark Shuffle通信機制

 

 

 

上圖顯示了Shuffle的通信原理。

服務端會啟動Shuffle_Service

1)客戶端代碼調用堆棧

BlockStoreShuffleReader.read

ShuffleBlockFetcherIterator.sendRequest

ExternalShuffleClient.fetchBlocks

OneForOneBlockFetcher.start

TransportClient.sendRpc

發送RpcRequest(OpenBlocks)信息

2)服務端代碼調用堆棧

TransportRequestHandler.processRpcRequest

ExternalShuffleBlockHandler.receive

ExternalShuffleBlockHandler.handleMessage

ExternalShuffleBlockResolver.getBlockData(shuffle_ShuffleId_MapId_ReduceId)

ExternalShuffleBlockResolver.getSortBasedShuffleBlockData

FileSegmentManagedBuffer

handleMessage會把所需的appid的一個executor需要被fetchblock全部封裝成List<ManagedBuffer>,然后注冊為一個Stream,然后把streamIdblockid的個數返回給客戶端,最后返回給客戶端的信息為RpcResponse(StreamHandle(streamId, msg.blockIds.length))

(3)客戶端

客戶端接收到RpcResponse后,會為每個blockid調用:

TransportClient.fetchChunk

Send ChunkFetchRequest(StreamChunkId(streamId, chunkIndex))

 

4)服務端

TransportRequestHandler.processFetchRequest

OneForOneStreamManager.getChunk

返回respond(new ChunkFetchSuccess(req.streamChunkId, buf))給客戶端,buf就是某一個blockid的FileSegmentManagedBuffer。

 

5)客戶端

OneForOneBlockFetcher.ChunkCallback.onSuccess

listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer)

ShuffleBlockFetcherIterator.sendRequest.BlockFetchingListener.onBlockFetchSuccess

results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf))

客戶端的另外一個線程

ShuffleBlockFetcherIterator.next

(result.blockId, new BufferReleasingInputStream(buf.createInputStream(), this))

 

Download文件的通信原理

另外還有一個stream通信協議,客戶端首先需要構造StreamRequest請求,StreamRequest中包含待下載文件的URL

1)客戶端調用堆棧

Executor.updateDependencies...

org.apache.spark.util.Utils.fetchFile

org.apache.spark.util.Utils.doFetchFile

NettyRpcEnv.openChannel

TransportClient.stream

Send StreamRequest(streamId) streamId為文件的目錄。

 

2)服務端處理流程

TransportRequestHandler.handle

TransportRequestHandler.processStreamRequest

OneForOneStreamManager.openStream

返回new StreamResponse(req.streamId, buf.size(), buf)

 

3)客戶端處理流程

TransportResponseHandler.handle

TransportFrameDecoder.channelRead

TransportFrameDecoder.feedInterceptor

StreamInterceptor.handle

callback.onData即NettyRpcEnv.FileDownloadCallback.onData

然后返回client.stream(parsedUri.getPath(), callback)Utils.doFetchFile,最后org.apache.spark.util.Utils.downloadFile

 

 

問題分析:

  當前spark shuffle時使用Fetch協議,由於使用堆外內存存儲Fetch的數據,當Fetch某個map的數據特別大時,容易出現堆外內存的OOM。而申請內存部分在Netty自帶的代碼中,我們無法修改。

另外一方面,Stream是下載文件的協議,需要提供文件的URL,而Shuffle只會獲取文件中的一段數據,並且也不知道URL,因此不能直接使用Stream接口。

解決方案:

  新增一個FetchStream通信協議,在OneForOneBlockFetcher中,如果一個block小於100Mspark.shuffle.max.block.size.inmemory)時,使用原有的方式Fetch數據,如果大於100M時,則使用新增的FetchStream協議,服務端在處理FetchStreamRequestFetchRequest的區別在於,FetchStreamRequest返回數據流,客戶端根據返回的數據量寫到本地臨時文件,然后構造FileSegmentManagedBuffer給后續處理流程。

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM