Spark2.1.0——深入理解事件總線


Spark2.1.0——深入理解事件總線

概覽

  Spark程序在運行的過程中,Driver端的很多功能都依賴於事件的傳遞和處理,而事件總線在這中間發揮着至關重要的紐帶作用。事件總線通過異步線程,提高了Driver執行的效率。

       Spark定義了一個特質[1]ListenerBus,可以接收事件並且將事件提交到對應事件的監聽器。為了對ListenerBus有個直觀的理解,我們先來看看它的代碼實現,見代碼清單1。

代碼清單1        ListenerBus的定義

private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {

  private[spark] val listeners = new CopyOnWriteArrayList[L]

  final def addListener(listener: L): Unit = {
    listeners.add(listener)
  }

  final def removeListener(listener: L): Unit = {
    listeners.remove(listener)
  }

  final def postToAll(event: E): Unit = {
    val iter = listeners.iterator
    while (iter.hasNext) {
      val listener = iter.next()
      try {
        doPostEvent(listener, event)
      } catch {
        case NonFatal(e) =>
          logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
      }
    }
  }

  protected def doPostEvent(listener: L, event: E): Unit

  private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
    val c = implicitly[ClassTag[T]].runtimeClass
    listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
  }

}

代碼清單1中展示了ListenerBus是個泛型特質,其泛型參數為 [L <: AnyRef, E],其中L是代表監聽器的泛型參數,可以看到ListenerBus支持任何類型的監聽器,E是代表事件的泛型參數。ListenerBus中各個成員的作用如下:

  • listeners:用於維護所有注冊的監聽器,其數據結構為CopyOnWriteArrayList[L];
  • addListener:向listeners中添加監聽器的方法,由於listeners采用CopyOnWriteArrayList來實現,所以addListener方法是線程安全的;
  • removeListener:從listeners中移除監聽器的方法,由於listeners采用CopyOnWriteArrayList來實現,所以removeListener方法是線程安全的;
  • postToAll:此方法的作用是將事件投遞給所有的監聽器。雖然CopyOnWriteArrayList本身是線程的安全的,但是由於postToAll方法內部引入了“先檢查后執行”的邏輯,因而postToAll方法不是線程安全的,所以所有對postToAll方法的調用應當保證在同一個線程中;
  • doPostEvent:用於將事件投遞給指定的監聽器,此方法只提供了接口定義,具體實現需要子類提供;
  • findListenersByClass:查找與指定類型相同的監聽器列表。

下面將分別對以下內容進行介紹:

  1. ListenerBus的繼承體系
  2. SparkListenerBus詳解
  3. LiveListenerBus詳解

[1] 特質是Scala語言中提供真正的多重繼承的語法特性,類似於Java的Interface,但是又可以實現方法。有關Scala特質的更多介紹請訪問Scala官網http://www.scala-lang.org。

ListenerBus的繼承體系

理解了ListenerBus的定義后,本小節一起來看看有哪些類繼承了它。ListenerBus的類繼承體系如圖1所示。

圖1  ListenerBus的類繼承體系

從圖1中可以看到有三種ListenerBus的具體實現,分別為:

  • SparkListenerBus:用於將SparkListenerEvent類型的事件投遞到SparkListenerInterface類型的監聽器;
  • StreamingQueryListenerBus:用於將StreamingQueryListener.Event類型的事件投遞到StreamingQueryListener類型的監聽器,此外還會將StreamingQueryListener.Event類型的事件交給SparkListenerBus;
  • StreamingListenerBus:用於將StreamingListenerEvent類型的事件投遞到StreamingListener類型的監聽器,此外還會將StreamingListenerEvent類型的事件交給SparkListenerBus。

SparkListenerBus也有兩種實現:

  • LiveListenerBus:采用異步線程將SparkListenerEvent類型的事件投遞到SparkListener類型的監聽器;
  • ReplayListenerBus:用於從序列化的事件數據中重播事件。

有了對事件總線的這些介紹,讀者已經在宏觀上對其有所認識。但是如果沒有具體的實現,ListenerBus本身也無法發揮作用。下一小節我們將選擇對SparkListenerBus從更加微觀的角度說明如何使用事件總線。

SparkListenerBus詳解

  有了上一節對ListenerBus類繼承體系的介紹,本小節將詳細介紹SparkListenerBus的實現,見代碼清單2。

代碼清單2         SparkListenerBus的實現

private[spark] trait SparkListenerBus
  extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {

  protected override def doPostEvent(
      listener: SparkListenerInterface,
      event: SparkListenerEvent): Unit = {
    event match {
      case stageSubmitted: SparkListenerStageSubmitted =>
        listener.onStageSubmitted(stageSubmitted)
      case stageCompleted: SparkListenerStageCompleted =>
        listener.onStageCompleted(stageCompleted)
      case jobStart: SparkListenerJobStart =>
        listener.onJobStart(jobStart)
      case jobEnd: SparkListenerJobEnd =>
        listener.onJobEnd(jobEnd)
      case taskStart: SparkListenerTaskStart =>
        listener.onTaskStart(taskStart)
      case taskGettingResult: SparkListenerTaskGettingResult =>
        listener.onTaskGettingResult(taskGettingResult)
      case taskEnd: SparkListenerTaskEnd =>
        listener.onTaskEnd(taskEnd)
      case environmentUpdate: SparkListenerEnvironmentUpdate =>
        listener.onEnvironmentUpdate(environmentUpdate)
      case blockManagerAdded: SparkListenerBlockManagerAdded =>
        listener.onBlockManagerAdded(blockManagerAdded)
      case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
        listener.onBlockManagerRemoved(blockManagerRemoved)
      case unpersistRDD: SparkListenerUnpersistRDD =>
        listener.onUnpersistRDD(unpersistRDD)
      case applicationStart: SparkListenerApplicationStart =>
        listener.onApplicationStart(applicationStart)
      case applicationEnd: SparkListenerApplicationEnd =>
        listener.onApplicationEnd(applicationEnd)
      case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
        listener.onExecutorMetricsUpdate(metricsUpdate)
      case executorAdded: SparkListenerExecutorAdded =>
        listener.onExecutorAdded(executorAdded)
      case executorRemoved: SparkListenerExecutorRemoved =>
        listener.onExecutorRemoved(executorRemoved)
      case blockUpdated: SparkListenerBlockUpdated =>
        listener.onBlockUpdated(blockUpdated)
      case logStart: SparkListenerLogStart => // ignore event log metadata
      case _ => listener.onOtherEvent(event)
    }
  }

}

我們看到SparkListenerBus已經實現了ListenerBus的doPostEvent方法,通過對SparkListenerEvent事件的匹配,執行SparkListenerInterface監聽器的相應方法。

這里的SparkListenerEvent其實是個特質,代碼清單2中列出的SparkListenerStageSubmitted、SparkListenerStageCompleted等都是繼承了SparkListenerEvent特質的樣例類[2]。為說明問題,這里僅僅摘選SparkListenerEvent及部分SparkListenerEvent子類的實現如下:

@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
  protected[spark] def logEvent: Boolean = true
}

@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
  extends SparkListenerEvent

@DeveloperApi
case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
  extends SparkListenerEvent
// 省略其他SparkListenerEvent的實現
private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent

  SparkListenerInterface也是一個特質,其中定義了所有SparkListener應當遵守的接口規范。由於SparkListenerInterface中定義了很多接口,為說明問題只摘抄SparkListenerInterface中的部分接口定義,代碼如下:

private[spark] trait SparkListenerInterface {
  def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit
  def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit
 // 省略其他接口方法
  def onOtherEvent(event: SparkListenerEvent): Unit
}

結合代碼清單2,我們知道以上代碼片段中的onStageCompleted和onStageSubmitted將在SparkListenerBus的doPostEvent方法中分別匹配到SparkListenerStageCompleted和SparkListenerStageSubmitted事件時執行,而對於doPostEvent中無法匹配的事件,都將執行onOtherEvent方法。

         在詳細介紹了ListenerBus及SparkListenerBus后,我們知道當有事件需要通知監聽器的時候,可以調用ListenerBus的postToAll方法,postToAll方法遍歷所有監聽器並調用SparkListenerBus實現的doPostEvent方法,doPostEvent方法對事件類型進行匹配后調用監聽器的不同方法。整個投遞事件的過程是通過方法調用實現的,所以這是一個同步調用。在監聽器比較多的時候這個過程會相對比較耗時(比如用於寫日志的EventLoggingListener在調度頻繁的時候,有可能導致寫入延遲,這將導致部分事件的丟失。此問題已在spark2.3.0版本中得到改進。),在Spark UI(在《Spark內核設計的藝術 架構設計與實現》一書的第4章中詳細介紹)中為了達到頁面的即時刷新 ,實現了SparkListenerBus的子類LiveListenerBus。下一小節將圍繞LiveListenerBus來詳細說明異步投遞消息的實現細節。


[2] 樣例類是Scala語言的語法特性。樣例類是一種特殊的類型,常用作事件、參數、模式匹配等。有關樣例類的更多介紹,請讀者閱讀Scala語言的相關資料。

LiveListenerBus詳解

  LiveListenerBus繼承了SparkListenerBus,並實現了將事件異步投遞給監聽器,達到實時刷新UI界面數據的效果。LiveListenerBus主要由以下部分組成:

  • eventQueue:是SparkListenerEvent事件的阻塞隊列,隊列大小可以通過Spark屬性spark.scheduler.listenerbus.eventqueue.size進行配置,默認為10000(Spark早期版本中屬於靜態屬性,固定為10000,這導致隊列堆滿時,只得移除一些最老的事件,最終導致各種問題與bug);
  • started:標記LiveListenerBus的啟動狀態的AtomicBoolean類型的變量;
  • stopped:標記LiveListenerBus的停止狀態的AtomicBoolean類型的變量;
  • droppedEventsCounter:使用AtomicLong類型對刪除的事件進行計數,每當日志打印了droppedEventsCounter后,會將droppedEventsCounter重置為0;
  • lastReportTimestamp:用於記錄最后一次日志打印droppedEventsCounter的時間戳;
  • processingEvent:用來標記當前正有事件被listenerThread線程處理;
  • logDroppedEvent:AtomicBoolean類型的變量,用於標記是否由於eventQueue已滿,導致新的事件被刪除;
  • eventLock:用於當有新的事件到來時釋放信號量,當對事件進行處理時獲取信號量;
  • listeners:繼承自LiveListenerBus的監聽器數組;
  • listenerThread:處理事件的線程。

異步事件處理線程

         listenerThread用於異步處理eventQueue中的事件,為了便於說明,這里將展示listenerThread及LiveListenerBus中的主要代碼片段,見代碼清單3。

代碼清單3         LiveListenerBus主要邏輯的代碼片段

  private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
  private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)

  private def validateAndGetQueueSize(): Int = {
    val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
    if (queueSize <= 0) {
      throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
    }
    queueSize
  }

  private val started = new AtomicBoolean(false)
  private val stopped = new AtomicBoolean(false)
  private val droppedEventsCounter = new AtomicLong(0L)
  @volatile private var lastReportTimestamp = 0L
  private var processingEvent = false
  private val logDroppedEvent = new AtomicBoolean(false)
  private val eventLock = new Semaphore(0)

  private val listenerThread = new Thread(name) {
    setDaemon(true)
    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
      LiveListenerBus.withinListenerThread.withValue(true) {
        while (true) {
          eventLock.acquire() // 獲取信號量
          self.synchronized {
            processingEvent = true
          }
          try {
            val event = eventQueue.poll //從eventQueue中獲取事件
            if (event == null) {
              // Get out of the while loop and shutdown the daemon thread
              if (!stopped.get) {
                throw new IllegalStateException("Polling `null` from eventQueue means" +
                  " the listener bus has been stopped. So `stopped` must be true")
              }
              return
            }
            postToAll(event) // 事件處理
          } finally {
            self.synchronized {
              processingEvent = false
            }
          }
        }
      }
    }
  }

通過分析代碼清單3,listenerThread的工作步驟為:

  1. 不斷獲取信號量(當可以獲取信號量時,說明還有事件未處理);
  2. 通過同步控制,將processingEvent設置為true;
  3. 從eventQueue中獲取事件;
  4. 調用超類ListenerBus的postToAll方法(postToAll方法對監聽器進行遍歷,並調用SparkListenerBus的doPostEvent方法對事件進行匹配后執行監聽器的相應方法);
  5. 每次循環結束依然需要通過同步控制,將processingEvent設置為false;

值得一提的是,listenerThread的run方法中調用了Utils的tryOrStopSparkContext,tryOrStopSparkContext方法可以保證當listenerThread的內部循環拋出異常后啟動一個新的線程停止SparkContext(SparkContext的內容將在第4章詳細介紹,tryOrStopSparkContext方法的具體實現請閱讀Utils工具類的實現)。

LiveListenerBus的消息投遞

         在解釋了異步線程listenerThread的工作內容后,還有一個要點沒有解釋:eventQueue中的事件是如何放進去的呢?由於eventQueue定義在LiveListenerBus中,因此ListenerBus和SparkListenerBus中並沒有操縱eventQueue的方法,要將事件放入eventQueue只能依靠LiveListenerBus自己了,其post方法就是為此目的而生的,見代碼清單4。

代碼清單4        向LiveListenerBus投遞SparkListenerEvent事件

  def post(event: SparkListenerEvent): Unit = {
    if (stopped.get) {
      logError(s"$name has already stopped! Dropping event $event")
      return
    }
    val eventAdded = eventQueue.offer(event) // 向eventQueue中添加事件
    if (eventAdded) {
      eventLock.release()
    } else {
      onDropEvent(event)
      droppedEventsCounter.incrementAndGet()
    }
    // 打印刪除事件數的日志
    val droppedEvents = droppedEventsCounter.get
    if (droppedEvents > 0) {
      if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
        if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
          val prevLastReportTimestamp = lastReportTimestamp
          lastReportTimestamp = System.currentTimeMillis()
          logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
            new java.util.Date(prevLastReportTimestamp))
        }
      }
    }
  }

從代碼清單4看到post方法的處理步驟如下:

  1. 判斷LiveListenerBus是否已經處於停止狀態;
  2. 向eventQueue中添加事件。如果添加成功,則釋放信號量進而催化listenerThread能夠有效工作。如果eventQueue已滿造成添加失敗,則移除事件,並對刪除事件計數器droppedEventsCounter進行自增;
  3. 如果有事件被刪除,並且當前系統時間距離上一次打印droppedEventsCounter超過了60秒則將droppedEventsCounter打印到日志。

LiveListenerBus與監聽器

        與LiveListenerBus配合使用的監聽器,並非是父類SparkListenerBus的類型參數SparkListenerInterface,而是繼承自SparkListenerInterface的SparkListener及其子類。圖2列出了Spark中監聽器SparkListener以及它的6種最常用的實現[3]

圖2     SparkListener的類繼承體系

SparkListener雖然實現了SparkListenerInterface中的每個方法,但是其實都是空實現,具體的實現需要交給子類去完成。

本文首先對事件總線的接口定義進行了一些介紹,之后選擇ListenerBus的子類SparkListenerBus與LiveListenerBus作為具體的實現例子進行分析,最后本文選擇LiveListenerBus作為具體的實現例子進行分析,這里將通過圖3更加直觀的展示ListenerBus、SparkListenerBus及LiveListenerBus的工作原理。

圖3     LiveListenerBus的工作流程圖

最后對於圖3作一些補充說明:圖中的DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint及LocalSchedulerBackend都是LiveListenerBus的事件來源,它們都是通過調用LiveListenerBus的post方法將消息交給異步線程listenerThread處理的。


[3] 除了本節列出的的六種SparkListener的子類外,還有很多其他的子類,這里就不一一列出了,感興趣的讀者可以查閱Spark相關文檔或閱讀源碼知曉。

 

關於《Spark內核設計的藝術 架構設計與實現》

經過近一年的准備,基於Spark2.1.0版本的《Spark內核設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:

Spark內核設計的藝術

 

紙質版售賣鏈接如下:

京東:https://item.jd.com/12302500.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM