對此 提高了對外內存 spark.executor.memoryOverhead = 4096m
重新執行sql 改報下面的錯誤
19/12/25 15:49:02 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from bigdata-datanode:7339 io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 5721030656, max: 5726797824) at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640) at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594) at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137) at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:80) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:122) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
從錯誤來看 是堆外內存溢出
Spark的shuffle部分使用了netty框架進行網絡傳輸,但netty會申請堆外內存緩存 Shuffle時,每個Reduce都需要獲取每個map對應的輸出,當一個reduce需要獲取的一個map數據比較大 超出配置的限制就報了這個錯。
我們看下spark2.4的源碼 發現新增了一個配置
private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM = ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem") .doc("Remote block will be fetched to disk when size of the block is above this threshold " + "in bytes. This is to avoid a giant request takes too much memory. We can enable this " + "config by setting a specific value(e.g. 200m). Note this configuration will affect " + "both shuffle fetch and block manager remote block fetch. For users who enabled " + "external shuffle service, this feature can only be worked when external shuffle" + "service is newer than Spark 2.2.") .bytesConf(ByteUnit.BYTE) // fetch-to-mem is guaranteed to fail if the message is bigger than 2 GB, so we might // as well use fetch-to-disk in that case. The message includes some metadata in addition // to the block data itself (in particular UploadBlock has a lot of metadata), so we leave // extra room. .createWithDefault(Int.MaxValue - 512)
為避免shuffle從遠程拉取的塊過大 加載到內存中。超出這個配置的大小 溢寫到磁盤中
val wrappedStreams = new ShuffleBlockFetcherIterator( context, blockManager.shuffleClient, blockManager, mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), serializerManager.wrapStream, // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024, SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue), SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS), SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM), SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
跟進去看下
// Fetch remote shuffle blocks to disk when the request is too large. Since the shuffle data is // already encrypted and compressed over the wire(w.r.t. the related configs), we can just fetch // the data and write it to file directly. if (req.size > maxReqSizeShuffleToMem) { shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray, blockFetchingListener, this) } else { shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray, blockFetchingListener, null) } }
在spark-default.conf文件中 配置下
spark.maxRemoteBlockSizeFetchToMem 512m
重新執行sql試試



可以看到shuffle 文件是很大的 難怪會失敗 通過給更多內存貌似也不行了 由於任務量太大 還是比較慢 有些時候報以下錯誤
FetchFailed(BlockManagerId(1958, bigdata-odatanode, 7339, None), shuffleId=18, mapId=1, reduceId=1, message= org.apache.spark.shuffle.FetchFailedException: Connection from bigdata-datanode/ip:7339 closed at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage40.sort_addToSorter_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage40.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83) at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:794) at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:755) at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:917) at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:953) at org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage45.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:232) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Connection from bigdata-datanode/ip:7339 closed
這個錯誤是 心跳超時 連接斷開的問題 ,分析可能是excutor發現gc或者 文件太大導致 后觀察excutor沒有full gc那么就是文件太大 運行太慢。
繼續調大並行度 目前並行度設置
spark.sql.shuffle.partitions 1000
將之改成5000
重新跑sql 快了很多

上面的錯誤減少了
