Spark Streaming job的生成及數據清理總結


關於這次總結還是要從一個bug說起。。。。。。。

場景描述:項目的基本處理流程為:從文件系統讀取每隔一分鍾上傳的日志並由Spark Streaming進行計算消費,最后將結果寫入InfluxDB中,然后在監控系統中進行展示,監控。這里的spark版本為2.2.1。

Bug:程序開發完成之后,每個batch處理時間在15~20s左右,上線之后一直在跑,監控系統中數據也沒有什么異常,sparkui中只關注了任務處理時間,其他並沒有在意。后來程序運行了2天18個小時之后,監控系統發出報警NO DATA,先去數據庫查數據,確實沒有數據,在去sparkui看程序並沒有結束,狀態還是RUNNING,但是不處理任務,就在那里卡住了,后來看日志發現報了內存溢出異常:

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
    at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
    at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
    at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
    at net.jpountz.lz4.LZ4BlockOutputStream.finish(LZ4BlockOutputStream.java:235)
    at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:175)
    at java.io.ObjectOutputStream$BlockDataOutputStream.close(ObjectOutputStream.java:1828)
    at java.io.ObjectOutputStream.close(ObjectOutputStream.java:742)
    at org.apache.spark.serializer.JavaSerializationStream.close(JavaSerializer.scala:57)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:278)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:277)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1488)
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1006)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:930)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:776)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:775)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.spark.scheduler.DAGScheduler.submitWaitingChildStages(DAGScheduler.scala:775)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1278)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1729)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

后來以為是程序里面的資源沒有回收,仔細排查了一遍代碼,也沒找出來問題,后來在本地跑,發現sparkUI中EXECUTOR中storage memory和RDD blocks會一直增加,雖然每個Batch后倆者會下降,但是每一個Batch之后和上一個batch比較還是增加的。

解決:由於是Storage memory和RDD blocks在增長,覺得和內存相關,用內存調優改了下參數還是不行,然后以為是contentcleaner問題,把它調成5分鍾一次也不行,后來在我的另一個電腦跑的時候發現沒有問題了,這個電腦上的spark 版本是2.3.0;現在可以確定是版本問題,就直接去官網2.2.2版本里面找關於內存溢出修復的bug,當時就下載了2.2.2,然后以跑程序還是和原來2.2.1一樣,再然后就心態崩了,也沒想着去看2.3.0的BUG修復了,當時我在一個知識星球提問過這個問題,后來星球的主人幫我解決了這個問題,原來這個問題在2.3.0里面才被解決,具體網址:https://issues.apache.org/jira/browse/SPARK-21357,原因是因為FileInputSream會重寫Dstream中的clearMetadata方法,但是在FileInputStream中claerMetadata方法只是清理了文件並沒有清理generatedRDDs,因此才會出現內存溢出。

總結:本次bug本來在確定了版本問題之后,理應很好解決,但是由於自身原因,多走了彎路,后來得高人相助才得以順利解決問題。由此也發現了自己的一些問題,遇到問題不能只能留在表面,要深入代碼,在了解原理的基礎上在了解具體實現,在遇到問題是才能快速定位問題,並找到解決辦法。下面就是這次bug之后翻看spark streaming源碼之后對出現這個bug的前因后果的分析。

bug分析:spark Streaming程序只要啟動就會一直的運轉,期間從數據源得到數據,然后消費,最后輸出,在每一個的batch里面,都會根據具體的業務邏輯生成對應的jod,然后spark就處理提交的job,這里只要明白了job的生成及生成之后對緩存的數據的處理,也就好理解這個bug的出現原因了。

StreamingContent在啟動之后,會啟動JobScheduler;在JobSchedluer里面會啟動JobGenerator和ReceiveTracker;JobGenerator負責job相關的處理,ReceiveTracker負責Receive分發和worker端的receive通信,並處理其發來的信息。

如下是JobSchedluer的start方法:

 def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    listenerBus.start()
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)

    val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
      case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
      case _ => null
    }

    executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
      executorAllocClient,
      receiverTracker,
      ssc.conf,
      ssc.graph.batchDuration.milliseconds,
      clock)
    executorAllocationManager.foreach(ssc.addStreamingListener)
    receiverTracker.start()
    jobGenerator.start()
    executorAllocationManager.foreach(_.start())
    logInfo("Started JobScheduler")
  }

在JobScheduler的start方法里面,它首先創建了EventLoop[JobSchedulerEvent],它主要用來處理job的調度事件的,具體事件定義在processEvent里面:

 private def processEvent(event: JobSchedulerEvent) {
    try {
      event match {
        case JobStarted(job, startTime) => handleJobStart(job, startTime)
        case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
        case ErrorReported(m, e) => handleError(m, e)
      }
    } catch {
      case e: Throwable =>
        reportError("Error in job scheduler", e)
    }
  }

其后啟動了這個eventloop,在啟動之后會開啟一個線程來消費eventQueue發送的事件消息,eventQueue是LinkedBlockingDeque類型的。

private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

在這個事件的接收處理啟啟動之后,JobScheduler啟動了receiverTracker和jobGenerator,receiverTracker負責Receive分發和worker端的receive通信,並處理其發來的信息。接下來主要看jobGenerator.start的邏輯:

/** Start generation of jobs */
  def start(): Unit = synchronized {
    if (eventLoop != null) return // generator has already been started

    // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
    // See SPARK-10125
    checkpointWriter

    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = {
        jobScheduler.reportError("Error in job generator", e)
      }
    }
    eventLoop.start()

    if (ssc.isCheckpointPresent) {
      restart()
    } else {
      startFirstTime()
    }
  }

在JobGenertor的start方法里面創建了EventLoop[JobGeneratorEvent],用來處理具體的關於job的操作,具體的定義在processEvent中:

/** Processes all events */
  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }

在啟動完eventloop之后,接下來會看檢查點,如果第一次運行就進入startFirstTime方法中:

/** Starts the generator for the first time */
  private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    graph.start(startTime - graph.batchDuration)
    timer.start(startTime.milliseconds)
    logInfo("Started JobGenerator at " + startTime)
  }

在startFirstTime方法里面首先會設置一個startTime,其后啟動DstreamGraph,然后調用timer.start方法,timer的創建:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

通過跟代碼可以確定最后在線程里面運行的是triggerActionForNextInterval方法

def start(startTime: Long): Long = synchronized {
    nextTime = startTime
    thread.start()
    logInfo("Started timer for " + name + " at time " + nextTime)
    nextTime
  }
private val thread = new Thread("RecurringTimer - " + name) {
    setDaemon(true)
    override def run() { loop }
  }
private def triggerActionForNextInterval(): Unit = {
    clock.waitTillTime(nextTime)
    callback(nextTime)
    prevTime = nextTime
    nextTime += period
    logDebug("Callback for " + name + " called at time " + prevTime)
  }

  /**
   * Repeatedly call the callback every interval.
   */
  private def loop() {
    try {
      while (!stopped) {
        triggerActionForNextInterval()
      }
      triggerActionForNextInterval()
    } catch {
      case e: InterruptedException =>
    }
  }
}

在triggerActionForNextInterval方法中調用的callback方法,即timer創建的時候的eventLoop.post(GenerateJobs(new Time(longTime))方法,這里的eventloop是EventLoop[JobGeneratorEvent]類型的,所以最后會調用generateJobs方法:

/** Generate jobs and perform checkpointing for the given `time`.  */
  private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

這個方法里面先是調用jobScheduler.receiverTracker.allocateBlocksToBatch(time)方法將receive分配的block獲取到這個batch中,然后在調用graph.generateJobs(time)利用上面的block來生成具體的job。接下來看jobScheduler.submitJobSet方法:

def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }

submitJobSet方法中會根據JobSet為每一個job新建JobHandler,放入job的線程池中,等待spark的調度處理。到此job在邏輯上已經完成。

下面是根據代碼畫的關於job流入線程池的時序圖:

接下來看JobHandler的run方法。

def run() {
      val oldProps = ssc.sparkContext.getLocalProperties
      try {
        ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
        val formattedTime = UIUtils.formatBatchTime(
          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

        ssc.sc.setJobDescription(
          s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
        // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
        // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
        ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")

        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it's possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sparkContext.setLocalProperties(oldProps)
      }
    }
  }

在run方法中,在調起job.run()方法運行job之后,會往evenloop發送post(JobCompleted(job, clock.getTimeMillis())這里的eventloop是EventLoop[JobSchedulerEvent],因此具體的處理方法是handleJobCompletion:

private def handleJobCompletion(job: Job, completedTime: Long) {
    val jobSet = jobSets.get(job.time)
    jobSet.handleJobCompletion(job)
    job.setEndTime(completedTime)
    listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
    logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
    if (jobSet.hasCompleted) {
      listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
    }
    job.result match {
      case Failure(e) =>
        reportError("Error running job " + job, e)
      case _ =>
        if (jobSet.hasCompleted) {
          jobSets.remove(jobSet.time)
          jobGenerator.onBatchCompletion(jobSet.time)
          logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
            jobSet.totalDelay / 1000.0, jobSet.time.toString,
            jobSet.processingDelay / 1000.0
          ))
        }
    }
  }

在這個方法里面根據job.result會調用(若無錯誤)jobGenerator.onBatchCompletion(jobSet.time)

 def onBatchCompletion(time: Time) {
    eventLoop.post(ClearMetadata(time))
  }

這個方法中eventloop發送了ClearMatadata信號,即清理元數據信號,這個信號會被EventLoop[JobGeneratorEvent]接收處理;調用claerMetadata方法

/** Clear DStream metadata for the given `time`. */
  private def clearMetadata(time: Time) {
    ssc.graph.clearMetadata(time)

    // If checkpointing is enabled, then checkpoint,
    // else mark batch to be fully processed
    if (shouldCheckpoint) {
      eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
    } else {
      // If checkpointing is not enabled, then delete metadata information about
      // received blocks (block data not saved in any case). Otherwise, wait for
      // checkpointing of this batch to complete.
      val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
      jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
      jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
      markBatchFullyProcessed(time)
    }
  }

這里調用了ssc.graph.clearMetadata(time)方法:

def clearMetadata(time: Time) {
    logDebug("Clearing metadata for time " + time)
    this.synchronized {
      outputStreams.foreach(_.clearMetadata(time))
    }
    logDebug("Cleared old metadata for time " + time)
  }

上面會根據每個outputStreams來調用clearMatadata方法,這個outputstreams在DstreamGraph中定義,在調用類似foreachrdd這類觸發job的算子的時候,會調用Dstream.register方法新增outputstream。最后會調用到Dstream的claerMetadata方法:

private[streaming] def clearMetadata(time: Time) {
    val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
    val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
    logDebug("Clearing references to old RDDs: [" +
      oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
    generatedRDDs --= oldRDDs.keys
    if (unpersistData) {
      logDebug(s"Unpersisting old RDDs: ${oldRDDs.values.map(_.id).mkString(", ")}")
      oldRDDs.values.foreach { rdd =>
        rdd.unpersist(false)
        // Explicitly remove blocks of BlockRDD
        rdd match {
          case b: BlockRDD[_] =>
            logInfo(s"Removing blocks of RDD $b of time $time")
            b.removeBlocks()
          case _ =>
        }
      }
    }
    logDebug(s"Cleared ${oldRDDs.size} RDDs that were older than " +
      s"${time - rememberDuration}: ${oldRDDs.keys.mkString(", ")}")
    dependencies.foreach(_.clearMetadata(time))
  }

這里清理了generatedRDDs中的RDD,最后還調用了dependencies.foreach(_.clearMetadata(time))來清理數據;這個dependencies是Dstream定義的def dependencies: List[DStream[_]],其實在Dstream的子類里面會重寫,對於inputstream由於其是依賴的第一個,因此list為空,在其他Dstream中,例如MappedDStream中,其定義是list(parent)指向父類,這樣依賴的關系就可以用dependencies來表示。

override def dependencies: List[DStream[_]] = List()

在項目里面用的textFileStream()方法接收數據,其具體的實現在FileInputDstream中,在FileInputDstream中就重寫了clearMetadata方法:

protected[streaming] override def clearMetadata(time: Time) {
    batchTimeToSelectedFiles.synchronized {
      val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
      batchTimeToSelectedFiles --= oldFiles.keys
      recentlySelectedFiles --= oldFiles.values.flatten
      logInfo("Cleared " + oldFiles.size + " old files that were older than " +
        (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
      logDebug("Cleared files are:\n" +
        oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
    }
    // Delete file mod times that weren't accessed in the last round of getting new files
    fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
  }

上面是FileInputDstream重寫的方法,可以看到只是清理了file,但是並沒有針對generatedRDDs中的RDD進行操作,因此在每一個batch結束后,由於這里的數據清理不完全,導致內存一直增加,最后OOM。這個bug在2.3.0已經修改。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM