SparkListener記錄(spark-2.2.0)


這里記錄一下SparkListener一些常用的監聽使用方式

概述

    spark 提供了一系列整個任務生命周期中各個階段變化的事件監聽機制
    通過這一機制可以在任務的各個階段做一些自定義的各種動作
   
    SparkListener便是這些階段的事件監聽接口類
    通過實現這個類中的各種方法便可實現自定義的事件處理動作

代碼記錄

//SparkListener 下各個事件對應的函數名非常直白,即如字面所表達意思。
//想對哪個階段的事件做一些自定義的動作,變繼承SparkListener實現對應的函數即可

abstract class SparkListener extends SparkListenerInterface {
  //階段完成時觸發的事件
  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }

  //階段提交時觸發的事件
  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }

  //任務啟動時觸發的事件
  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }

  //下載任務結果的事件
  override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }

  //任務結束的事件
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }

  //job啟動的事件
  override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }

  //job結束的事件
  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }

  //環境變量被更新的事件
  override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }

  //塊管理被添加的事件
  override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }

  override def onBlockManagerRemoved(
      blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }

  //取消rdd緩存的事件
  override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }

  //app啟動的事件
  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }

  //app結束的事件 [以下各事件也如同函數名所表達各個階段被觸發的事件不在一一標注]
  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { } 

  override def onExecutorMetricsUpdate(
      executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }

  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }

  override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }

  override def onExecutorBlacklisted(
      executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { }

  override def onExecutorUnblacklisted(
      executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { }

  override def onNodeBlacklisted(
      nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }

  override def onNodeUnblacklisted(
      nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }

  override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }

  override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}

示例代碼

package org.apache.spark


import org.apache.spark.util.Utils
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart}

/**
  * Created by cloud on 18/1/19.
  */
class MySparkAppListener(val sparkConf: SparkConf) extends SparkListener with Logging{

  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
    val appId = applicationStart.appId
    logInfo(appId)
  }

  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
    logInfo("app end time " + applicationEnd.time)
  }

}

示例代碼使用
//設置spark.extraListeners參數即可
//如果在代碼中設置,則一定要設置在sparkContext初始化之前
//因為監聽器是在sparkContext初始化的時候加載的
object MyObject {
    def main(args : Array[String]) : Unit = {
        val sparkConf=new SparkConf()

        sparkConf.set("spark.extraListeners","org.apache.spark.MySparkAppListener")

        val sc = new SparkContext(sparkConf)
        .....
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM