spark 執行報錯 java.io.EOFException: Premature EOF from inputStream


使用spark2.4跟spark2.3 做替代公司現有的hive選項。

跑個別任務spark有以下錯誤

java.io.EOFException: Premature EOF from inputStream
    at com.hadoop.compression.lzo.LzopInputStream.readFully(LzopInputStream.java:74)
    at com.hadoop.compression.lzo.LzopInputStream.readHeader(LzopInputStream.java:115)
    at com.hadoop.compression.lzo.LzopInputStream.<init>(LzopInputStream.java:54)
    at com.hadoop.compression.lzo.LzopCodec.createInputStream(LzopCodec.java:112)
    at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:129)
    at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:269)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:268)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:226)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:97)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:294)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:294)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:294)
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:294)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:294)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:294)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:294)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

排查原因 發現是讀取0 size 大小的文件時出錯

 

 

並沒有發現spark官方有修復該bug

手動修改代碼 過濾掉這種文件

在 HadoopRDD.scala 類相應位置修改如圖即可

      // We get our input bytes from thread-local Hadoop FileSystem statistics.
      // If we do a coalesce, however, we are likely to compute multiple partitions in the same
      // task and in the same thread, in which case we need to avoid override values written by
      // previous partitions (SPARK-13071).
      private def updateBytesRead(): Unit = {
        getBytesReadCallback.foreach { getBytesRead =>
          inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
        }
      }

      private var reader: RecordReader[K, V] = null
      private val inputFormat = getInputFormat(jobConf)
      HadoopRDD.addLocalConfiguration(
        new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(createTime),
        context.stageId, theSplit.index, context.attemptNumber, jobConf)

      reader =
        try {
          if (split.inputSplit.value.getLength != 0) {  //文件大小不為零 采取讀取
            inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
          } else {
            logWarning(s"Skipped the file size 0 file: ${split.inputSplit}")
            finished = true                           //大小為0  即結束 跳過
            null
          }
        } catch {
          case e: FileNotFoundException if ignoreMissingFiles =>
            logWarning(s"Skipped missing file: ${split.inputSplit}", e)
            finished = true
            null
          // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
          case e: FileNotFoundException if !ignoreMissingFiles => throw e
          case e: IOException if ignoreCorruptFiles =>
            logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
            finished = true
            null
        }
      // Register an on-task-completion callback to close the input stream.
      context.addTaskCompletionListener[Unit] { context =>
        // Update the bytes read before closing is to make sure lingering bytesRead statistics in
        // this thread get correctly added.
        updateBytesRead()
        closeIfNeeded()
      }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM