本文針對spark 2.0+版本
概述
spark 提供了一系列整個任務生命周期中各個階段變化的事件監聽機制,通過這一機制可以在任務的各個階段做一些自定義的各種動作。SparkListener便是這些階段的事件監聽接口類 通過實現這個類中的各種方法便可實現自定義的事件處理動作。
自定義示例代碼
import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListenerApplicationStart, SparkListenerApplicationEnd, SparkListener} /** * Created by silent on 2019/1/11. */ class MySparkAppListener extends SparkListener with Logging { override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { val appId = applicationStart.appId logInfo("***************************************************" + appId.get) } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { logInfo("************************ app end time ************************ " + applicationEnd.time) } }
主函數運行示例
object Main extends App { val spark = SparkSession.builder() .appName("main") .master("local[2]") .config("spark.extraListeners","com.moxiu.silent.SparkListenerDemo.MySparkAppListener") .getOrCreate() //spark.sparkContext.addSparkListener(new MySparkAppListener) spark.stop() }
說明:
自定義監聽sparListener后的注冊方式有兩種:
方法1:conf配置中指定
//spark2.0以下 val sparkConf=new SparkConf() sparkConf.set("spark.extraListeners","org.apache.spark.MySparkAppListener") // spark2.0+ val spark = SparkSession.builder() .appName("main") .master("local[2]") .config("spark.extraListeners","com.moxiu.silent.SparkListenerDemo.MySparkAppListener") .getOrCreate()
方法2:sparkContext 類中指定
//spark2.0前 val sc = new SparkContext(sparkConf) sc.addSparkListener(new MySparkAppListener) //spark2.0+ spark.sparkContext.addSparkListener(new MySparkAppListener)
sparkListerner 代碼記錄
//SparkListener 下各個事件對應的函數名非常直白,即如字面所表達意思。 //想對哪個階段的事件做一些自定義的動作,變繼承SparkListener實現對應的函數即可 abstract class SparkListener extends SparkListenerInterface { //階段完成時觸發的事件 override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { } //階段提交時觸發的事件 override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { } //任務啟動時觸發的事件 override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { } //下載任務結果的事件 override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { } //任務結束的事件 override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { } //job啟動的事件 override def onJobStart(jobStart: SparkListenerJobStart): Unit = { } //job結束的事件 override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { } //環境變量被更新的事件 override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { } //塊管理被添加的事件 override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { } override def onBlockManagerRemoved( blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { } //取消rdd緩存的事件 override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { } //app啟動的事件 override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { } //app結束的事件 [以下各事件也如同函數名所表達各個階段被觸發的事件不在一一標注] override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { } override def onExecutorMetricsUpdate( executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { } override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { } override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { } override def onExecutorBlacklisted( executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { } override def onExecutorUnblacklisted( executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { } override def onNodeBlacklisted( nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { } override def onNodeUnblacklisted( nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { } override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { } override def onOtherEvent(event: SparkListenerEvent): Unit = { } }
