sparksql 報錯Container killed by YARN for exceeding memory limits. xGB of x GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling


對此 提高了對外內存 spark.executor.memoryOverhead  = 4096m 

重新執行sql 改報下面的錯誤

19/12/25 15:49:02 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) from bigdata-datanode:7339
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 5721030656, max: 5726797824)
    at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640)
    at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594)
    at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764)
    at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:740)
    at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:226)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
    at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324)
    at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
    at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
    at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137)
    at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:80)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:122)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)

從錯誤來看 是堆外內存溢出

Spark的shuffle部分使用了netty框架進行網絡傳輸,但netty會申請堆外內存緩存 Shuffle時,每個Reduce都需要獲取每個map對應的輸出,當一個reduce需要獲取的一個map數據比較大 超出配置的限制就報了這個錯。

我們看下spark2.4的源碼 發現新增了一個配置

 private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM =
    ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem")
      .doc("Remote block will be fetched to disk when size of the block is above this threshold " +
        "in bytes. This is to avoid a giant request takes too much memory. We can enable this " +
        "config by setting a specific value(e.g. 200m). Note this configuration will affect " +
        "both shuffle fetch and block manager remote block fetch. For users who enabled " +
        "external shuffle service, this feature can only be worked when external shuffle" +
        "service is newer than Spark 2.2.")
      .bytesConf(ByteUnit.BYTE)
      // fetch-to-mem is guaranteed to fail if the message is bigger than 2 GB, so we might
      // as well use fetch-to-disk in that case.  The message includes some metadata in addition
      // to the block data itself (in particular UploadBlock has a lot of metadata), so we leave
      // extra room.
      .createWithDefault(Int.MaxValue - 512)

為避免shuffle從遠程拉取的塊過大 加載到內存中。超出這個配置的大小 溢寫到磁盤中

 val wrappedStreams = new ShuffleBlockFetcherIterator(
      context,
      blockManager.shuffleClient,
      blockManager,
      mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
      serializerManager.wrapStream,
      // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
      SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
      SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
      SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
      SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
      SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))

跟進去看下

    // Fetch remote shuffle blocks to disk when the request is too large. Since the shuffle data is
    // already encrypted and compressed over the wire(w.r.t. the related configs), we can just fetch
    // the data and write it to file directly.
    if (req.size > maxReqSizeShuffleToMem) {
      shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
        blockFetchingListener, this)
    } else {
      shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
        blockFetchingListener, null)
    }
  }

在spark-default.conf文件中 配置下

spark.maxRemoteBlockSizeFetchToMem 512m

重新執行sql試試

 

 

 

 

 

 可以看到shuffle 文件是很大的 難怪會失敗 通過給更多內存貌似也不行了 由於任務量太大 還是比較慢  有些時候報以下錯誤

FetchFailed(BlockManagerId(1958, bigdata-odatanode, 7339, None), shuffleId=18, mapId=1, reduceId=1, message=
org.apache.spark.shuffle.FetchFailedException: Connection from bigdata-datanode/ip:7339 closed
  at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
  at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
  at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
  at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage40.sort_addToSorter_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage40.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
  at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
  at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:794)
  at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:755)
  at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:917)
  at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:953)
  at org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage45.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:232)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:123)
  at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection from bigdata-datanode/ip:7339 closed

這個錯誤是 心跳超時 連接斷開的問題 ,分析可能是excutor發現gc或者 文件太大導致 后觀察excutor沒有full gc那么就是文件太大 運行太慢。

繼續調大並行度 目前並行度設置 

spark.sql.shuffle.partitions 1000

將之改成5000

重新跑sql 快了很多

 

 

 

 上面的錯誤減少了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM