Spark Shuffle 堆外內存溢出問題與解決(Shuffle通信原理)
http://xiguada.org/spark-shuffle-direct-buffer-oom/
問題描述
Spark-1.6.0已經在一月份release,為了驗證一下它的性能,我使用了一些大的SQL驗證其性能,其中部分SQL出現了Shuffle失敗問題,詳細的堆棧信息如下所示:
16/02/17 15:36:36 WARN server.TransportChannelHandler: Exception in connection from /10.196.134.220:7337
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:658)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:645)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:228)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:212)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:132)
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:744)
從失敗信息可以看出,是堆外內存溢出問題,為什么會出現堆外內存溢出呢?
Spark的shuffle部分使用了netty框架進行網絡傳輸,但netty會申請堆外內存緩存(PooledByteBufAllocator ,AbstractByteBufAllocator);Shuffle時,每個Reduce都需要獲取每個map對應的輸出,當一個reduce需要獲取的一個map數據比較大(比如1G),這時候就會申請一個1G的堆外內存,而堆外內存是有限制的,這時候就出現了堆外內存溢出。
Shuffle不使用堆外內存
為Executor增加配置-Dio.netty.noUnsafe=true,就可以讓shuffle不使用堆外內存,但相同的作業還是出現了OOM,這種方式沒辦法解決問題。
java.lang.OutOfMemoryError: Java heap space
at io.netty.buffer.PoolArena$HeapArena.newUnpooledChunk(PoolArena.java:607)
at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:132)
at io.netty.buffer.PooledByteBufAllocator.newHeapBuffer(PooledByteBufAllocator.java:256)
at io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:136)
at io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:127)
at io.netty.buffer.CompositeByteBuf.allocBuffer(CompositeByteBuf.java:1347)
at io.netty.buffer.CompositeByteBuf.consolidateIfNeeded(CompositeByteBuf.java:276)
at io.netty.buffer.CompositeByteBuf.addComponent(CompositeByteBuf.java:116)
at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:148)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:82)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
當數據量大時能否直接寫磁盤
MapReduce中Shuffle數據量大時,會把Shuffle數據寫到磁盤。
Spark Shuffle通信機制
上圖顯示了Shuffle的通信原理。
服務端會啟動Shuffle_Service。
(1)客戶端代碼調用堆棧
BlockStoreShuffleReader.read
ShuffleBlockFetcherIterator.sendRequest
ExternalShuffleClient.fetchBlocks
OneForOneBlockFetcher.start
TransportClient.sendRpc
發送RpcRequest(OpenBlocks)信息
(2)服務端代碼調用堆棧
TransportRequestHandler.processRpcRequest
ExternalShuffleBlockHandler.receive
ExternalShuffleBlockHandler.handleMessage
ExternalShuffleBlockResolver.getBlockData(shuffle_ShuffleId_MapId_ReduceId)
ExternalShuffleBlockResolver.getSortBasedShuffleBlockData
FileSegmentManagedBuffer
handleMessage會把所需的appid的一個executor需要被fetch的block全部封裝成List<ManagedBuffer>,然后注冊為一個Stream,然后把streamId和blockid的個數返回給客戶端,最后返回給客戶端的信息為RpcResponse(StreamHandle(streamId, msg.blockIds.length))。
(3)客戶端
客戶端接收到RpcResponse后,會為每個blockid調用:
TransportClient.fetchChunk
Send ChunkFetchRequest(StreamChunkId(streamId, chunkIndex))
(4)服務端
TransportRequestHandler.processFetchRequest
OneForOneStreamManager.getChunk
返回respond(new ChunkFetchSuccess(req.streamChunkId, buf))給客戶端,buf就是某一個blockid的FileSegmentManagedBuffer。
(5)客戶端
OneForOneBlockFetcher.ChunkCallback.onSuccess
listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer)
ShuffleBlockFetcherIterator.sendRequest.BlockFetchingListener.onBlockFetchSuccess
results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf))
客戶端的另外一個線程
ShuffleBlockFetcherIterator.next
(result.blockId, new BufferReleasingInputStream(buf.createInputStream(), this))
Download文件的通信原理
另外還有一個stream通信協議,客戶端首先需要構造StreamRequest請求,StreamRequest中包含待下載文件的URL。
(1)客戶端調用堆棧
Executor.updateDependencies...
org.apache.spark.util.Utils.fetchFile
org.apache.spark.util.Utils.doFetchFile
NettyRpcEnv.openChannel
TransportClient.stream
Send StreamRequest(streamId) streamId為文件的目錄。
(2)服務端處理流程
TransportRequestHandler.handle
TransportRequestHandler.processStreamRequest
OneForOneStreamManager.openStream
返回new StreamResponse(req.streamId, buf.size(), buf)
(3)客戶端處理流程
TransportResponseHandler.handle
TransportFrameDecoder.channelRead
TransportFrameDecoder.feedInterceptor
StreamInterceptor.handle
callback.onData即NettyRpcEnv.FileDownloadCallback.onData
然后返回client.stream(parsedUri.getPath(), callback)給Utils.doFetchFile,最后org.apache.spark.util.Utils.downloadFile
問題分析:
當前spark shuffle時使用Fetch協議,由於使用堆外內存存儲Fetch的數據,當Fetch某個map的數據特別大時,容易出現堆外內存的OOM。而申請內存部分在Netty自帶的代碼中,我們無法修改。
另外一方面,Stream是下載文件的協議,需要提供文件的URL,而Shuffle只會獲取文件中的一段數據,並且也不知道URL,因此不能直接使用Stream接口。
解決方案:
新增一個FetchStream通信協議,在OneForOneBlockFetcher中,如果一個block小於100M(spark.shuffle.max.block.size.inmemory)時,使用原有的方式Fetch數據,如果大於100M時,則使用新增的FetchStream協議,服務端在處理FetchStreamRequest和FetchRequest的區別在於,FetchStreamRequest返回數據流,客戶端根據返回的數據量寫到本地臨時文件,然后構造FileSegmentManagedBuffer給后續處理流程。