隨着對spark的了解,有時會覺得spark就像一個寶盒一樣時不時會出現一些難以置信的新功能。每一個新功能被挖掘,就可以使開發過程變得更加便利一點。甚至使很多不可能完成或者完成起來比較復雜的操作,變成簡單起來。有些功能是框架專門開放給用戶使用,有些則是框架內部使用但是又對外暴露了接口,用戶也可以使用的功能。
今天和大家分享的是兩個監聽器SparkListener和streamingListener,由於這兩個監聽器的存在使得很多功能的開發變得輕松很多,也使很多技術實現變得輕便很多。
結合我的使用經驗,這兩個監聽器主要可以如下兩種用法:
1. 可以獲取我們在sparkUI上的任何指標。當你想獲取指標並且想做監控預警或者打算重構sparkUI,那你不需要再通過爬蟲解析復雜的網頁代碼就可以獲取sparkUI上的各種指標。
2. 對spark任務的各種事件做相應的操作,嵌入回調代碼。
比如:你可以在sparkListener中的onApplicationStart方法中做driver端的第三方框架的連接池初始化(連接僅限driver端使用)以及其他變量的初始化,並放置到公共對象中,driver端直接就可以使用。且在onApplicaionComple方法中做連接的釋放工作,以及變量的收集持久化操作,以次達到隱藏變量初始化的操作,冰做成公共jar包供其它人使用。
又如:你可以在StreamingListener的onbatchStart操作中獲取kafka讀取的offset位置以及讀取數據條數,在onBatchCompl方法中將這些offset信息保存到mysql/zk中,達到優雅隱藏容錯代碼的目的。同樣可以做成公共jar共其他項目使用。
等等這些都是一些非常酷炫的操作,當然還會有其他的酷炫的使用場景還在等待着你去挖掘。
性能分析
在使用過程中,大家可能比較關系另外一個問題:指標收集,會對流式計算性能產生多大的影響?
答案就是,在指標收集這一塊,對於流式計算或者spark core產生的影響會很小。因為即使你不收集SparkUI也會收集,這些指標一樣會生成。只是對於driver端的開銷會稍微變大,如果在流式計算場景可能需要你調大driver端的cpu和內存
一 .自定義Listener 並注冊(偽代碼如下):
val spark:SparkSession=null val ssc:StreamingContext=null
/*注冊streamingListnener*/ ssc.addStreamingListener(new MyStreamingListener) /*注冊sparkListener*/ spark.sparkContext.addSparkListener(new MySparkListener)
/*自定義streamingListener*/ class MyStreamingListener extends StreamingListener{ //TODO 重載內置方法 }
/*自定義SparkListnener*/
class MySparkListener extends SparkListener { //TODO 重載內置方法 }
二.SparkListener內置方法講解
通過如下重寫的方法可以發現,你可以通過這個SparkListener對App,job,stage,task,executor,blockManager等產生的指標進行實時的收集,並在這些事件觸發時嵌入一些代碼,可以達到當某些指標達到警戒閾值觸發自定義的一些規則。如下已經列出了獲取的spark級別指標和事件回調函數,下一節會列出streaming的性能監控點以及收集的指標。
class MySparkListener extends SparkListener { /*當整個應用開始執行時*/
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit={ /*獲取appID-->spark on yarn模式下的appID一致*/ applicationStart.appId /*appName*/ applicationStart.appName /*driver端的日志,如果配置了日志的截斷機制,獲取的將不是完整日志*/ applicationStart.driverLogs /*提交的用戶*/ applicationStart.sparkUser /*開始的事件*/ applicationStart.time } /*當整個Application結束時調用的回調函數*/
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { /*結束的時間*/ applicationEnd.time } /*當job開始執行時觸發的回調函數*/
override def onJobStart(jobStart: SparkListenerJobStart): Unit = { /*jobId和SParkUI上的一致*/ jobStart.jobId /*配置信息*/ jobStart.properties /*當前job根據寬窄依賴生成的所有strageID*/ jobStart.stageIds /*job開始的時間*/ jobStart.time jobStart.properties /*當前job每一個stage的信息抽象*/ jobStart.stageInfos.foreach(stageInfo => { stageInfo.stageId /*stage提交的時間,不是task開始執行的時間,這個時間是stage開始抽象成taskDesc的開始時間*/ stageInfo.submissionTime /*這個stage完成的時間*/ stageInfo.completionTime /*當前stage發成了錯誤會重試,重試會在stageID后加上“_重試次數”*/ stageInfo.attemptId /*當前staget的詳細信息*/ stageInfo.details /*當前stage累加器的中間結果*/ stageInfo.accumulables /*如果當前stage失敗,返回失敗原因,如果做日志預警,可以在此處判斷非空並嵌入代碼收集錯誤日志*/ stageInfo.failureReason stageInfo.name /*當前stage抽象出的taskSet的長度*/ stageInfo.numTasks /*父依賴stage的id*/ stageInfo.parentIds stageInfo.rddInfos /*task指標收集*/ stageInfo.taskMetrics stageInfo.taskLocalityPreferences stageInfo.stageFailed("") }) } /*當job結束時觸發的回調函數*/
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { jobEnd.jobId jobEnd.time jobEnd.jobResult } /*當提交stage時觸發的回調函數*/
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { stageSubmitted.properties stageSubmitted.stageInfo.taskLocalityPreferences stageSubmitted.stageInfo.stageFailed("") stageSubmitted.stageInfo.attemptId stageSubmitted.stageInfo.taskMetrics.executorDeserializeTime stageSubmitted.stageInfo.taskMetrics.executorDeserializeCpuTime stageSubmitted.stageInfo.taskMetrics.executorCpuTime stageSubmitted.stageInfo.taskMetrics.diskBytesSpilled stageSubmitted.stageInfo.taskMetrics.inputMetrics.recordsRead stageSubmitted.stageInfo.taskMetrics.inputMetrics.bytesRead stageSubmitted.stageInfo.taskMetrics.outputMetrics.recordsWritten stageSubmitted.stageInfo.taskMetrics.outputMetrics.bytesWritten stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.totalBytesRead stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.recordsRead stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.fetchWaitTime stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.localBlocksFetched stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.localBytesRead stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.remoteBlocksFetched stageSubmitted.stageInfo.taskMetrics.shuffleWriteMetrics.bytesWritten stageSubmitted.stageInfo.taskMetrics.shuffleWriteMetrics.recordsWritten stageSubmitted.stageInfo.taskMetrics.shuffleWriteMetrics.writeTime stageSubmitted.stageInfo.taskMetrics.executorRunTime stageSubmitted.stageInfo.taskMetrics.jvmGCTime stageSubmitted.stageInfo.taskMetrics.memoryBytesSpilled stageSubmitted.stageInfo.taskMetrics.peakExecutionMemory stageSubmitted.stageInfo.taskMetrics.resultSerializationTime stageSubmitted.stageInfo.taskMetrics.resultSize stageSubmitted.stageInfo.taskMetrics.updatedBlockStatuses stageSubmitted.stageInfo.rddInfos stageSubmitted.stageInfo.parentIds stageSubmitted.stageInfo.details stageSubmitted.stageInfo.numTasks stageSubmitted.stageInfo.name stageSubmitted.stageInfo.accumulables stageSubmitted.stageInfo.completionTime stageSubmitted.stageInfo.submissionTime stageSubmitted.stageInfo.stageId stageSubmitted.stageInfo.failureReason } /*當stage完成時觸發的回調函數*/
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { stageCompleted.stageInfo.attemptId stageCompleted.stageInfo.failureReason stageCompleted.stageInfo.stageId stageCompleted.stageInfo.submissionTime stageCompleted.stageInfo.completionTime stageCompleted.stageInfo.accumulables stageCompleted.stageInfo.details stageCompleted.stageInfo.name stageCompleted.stageInfo.numTasks stageCompleted.stageInfo.parentIds stageCompleted.stageInfo.rddInfos stageCompleted.stageInfo.taskMetrics stageCompleted.stageInfo.stageFailed() stageCompleted.stageInfo.taskLocalityPreferences } /*當task開始時觸發的回調函數*/
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { taskStart.stageAttemptId taskStart.stageId taskStart.taskInfo.executorId taskStart.taskInfo.taskId taskStart.taskInfo.finishTime taskStart.taskInfo.launchTime taskStart.taskInfo.accumulables taskStart.taskInfo.attemptNumber taskStart.taskInfo.failed taskStart.taskInfo.gettingResultTime taskStart.taskInfo.gettingResult taskStart.taskInfo.executorId taskStart.taskInfo.host taskStart.taskInfo.index taskStart.taskInfo.killed taskStart.taskInfo.speculative taskStart.taskInfo.taskLocality taskStart.taskInfo.duration taskStart.taskInfo.finished taskStart.taskInfo.id taskStart.taskInfo.running taskStart.taskInfo.successful taskStart.taskInfo.status } /*獲取task執行的結果*/
override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { taskGettingResult.taskInfo } /*當task執行完成時執行的回調函數*/
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { taskEnd.taskMetrics.resultSize taskEnd.taskMetrics.updatedBlockStatuses taskEnd.taskMetrics.resultSerializationTime taskEnd.taskMetrics.peakExecutionMemory taskEnd.taskMetrics.memoryBytesSpilled taskEnd.taskMetrics.jvmGCTime taskEnd.taskMetrics.executorRunTime taskEnd.taskMetrics.shuffleWriteMetrics taskEnd.taskMetrics.shuffleReadMetrics taskEnd.taskMetrics.outputMetrics taskEnd.taskMetrics.inputMetrics taskEnd.taskMetrics.diskBytesSpilled taskEnd.taskMetrics.executorCpuTime taskEnd.taskMetrics.executorDeserializeCpuTime taskEnd.taskMetrics.executorDeserializeTime taskEnd.taskInfo.executorId taskEnd.taskInfo.host taskEnd.taskInfo.index taskEnd.taskInfo.killed taskEnd.taskInfo.speculative taskEnd.taskInfo.taskLocality taskEnd.taskInfo.duration taskEnd.taskInfo.finished taskEnd.taskInfo.taskId taskEnd.taskInfo.id taskEnd.taskInfo.running taskEnd.stageId taskEnd.reason taskEnd.stageAttemptId taskEnd.taskType } /*新增bockManger時觸發的回調函數*/
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { blockManagerAdded.blockManagerId.executorId blockManagerAdded.blockManagerId.host blockManagerAdded.blockManagerId.port blockManagerAdded.blockManagerId.topologyInfo blockManagerAdded.blockManagerId.hostPort blockManagerAdded.blockManagerId.isDriver blockManagerAdded.maxMem blockManagerAdded.time } /*當blockManage中管理的內存或者磁盤發生變化時觸發的回調函數*/
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { blockUpdated.blockUpdatedInfo.blockId blockUpdated.blockUpdatedInfo.blockManagerId blockUpdated.blockUpdatedInfo.diskSize blockUpdated.blockUpdatedInfo.memSize blockUpdated.blockUpdatedInfo.storageLevel } /*當blockManager回收時觸發的回調函數*/
override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { blockManagerRemoved.blockManagerId blockManagerRemoved.time } /*當 上下文環境發生變化是觸發的回調函數*/
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { environmentUpdate.environmentDetails } /*當RDD發生unpersist時發生的回調函數*/
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { unpersistRDD.rddId } override def onOtherEvent(event: SparkListenerEvent): Unit = { } /*當新增一個executor時觸發的回調函數*/
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { executorAdded.executorId executorAdded.executorInfo.executorHost executorAdded.executorInfo.logUrlMap executorAdded.executorInfo.totalCores executorAdded.time } /*當executor發生變化時觸發的回調函數*/
override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { executorMetricsUpdate.accumUpdates executorMetricsUpdate.execId } /*當移除一個executor時觸發的回調函數*/
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { executorRemoved.executorId executorRemoved.reason executorRemoved.time } }
三.StreamingListener內置方法講解
class MyStreamingListener extends StreamingListener { /*流式計算開始時,觸發的回調函數*/
override def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = { streamingStarted.time } /*當前batch提交時觸發的回調函數*/
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = { batchSubmitted.batchInfo.streamIdToInputInfo.foreach(tuple=>{ val streamInputInfo = tuple._2 streamInputInfo.metadata streamInputInfo.numRecords streamInputInfo.inputStreamId streamInputInfo.metadataDescription }) batchSubmitted.batchInfo.numRecords batchSubmitted.batchInfo.outputOperationInfos batchSubmitted.batchInfo.submissionTime batchSubmitted.batchInfo.batchTime batchSubmitted.batchInfo.processingEndTime batchSubmitted.batchInfo.processingStartTime batchSubmitted.batchInfo.processingDelay batchSubmitted.batchInfo.schedulingDelay batchSubmitted.batchInfo.totalDelay } /*當前batch開始執行時觸發的回調函數*/
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = { batchStarted.batchInfo.totalDelay batchStarted.batchInfo.schedulingDelay batchStarted.batchInfo.processingDelay batchStarted.batchInfo.processingStartTime batchStarted.batchInfo.processingEndTime batchStarted.batchInfo.batchTime batchStarted.batchInfo.submissionTime batchStarted.batchInfo.outputOperationInfos batchStarted.batchInfo.numRecords batchStarted.batchInfo.streamIdToInputInfo } /*當前batch完成時觸發的回調函數*/
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { batchCompleted.batchInfo.streamIdToInputInfo batchCompleted.batchInfo.numRecords batchCompleted.batchInfo.outputOperationInfos batchCompleted.batchInfo.submissionTime batchCompleted.batchInfo.batchTime batchCompleted.batchInfo.processingEndTime batchCompleted.batchInfo.processingStartTime batchCompleted.batchInfo.processingDelay batchCompleted.batchInfo.schedulingDelay batchCompleted.batchInfo.totalDelay /*獲取offset,並持久化到第三方容器*/ batchCompleted.batchInfo.streamIdToInputInfo.foreach(tuple=>{ val offsets = tuple._2.metadata.get("offsets").get classOf[List[OffsetRange]].cast(offsets).foreach(offsetRange => { val partition = offsetRange.partition val minOffset = offsetRange.fromOffset val maxOffset = offsetRange.untilOffset val topicName = offsetRange.topic //TODO 將kafka容錯信息,寫到mysql/redis/zk等框架達到數據容錯
} ) } /*當接收器啟動時觸發的回調函數(非directStreaming)*/
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { receiverStarted.receiverInfo.executorId receiverStarted.receiverInfo.active receiverStarted.receiverInfo.lastError receiverStarted.receiverInfo.lastErrorMessage receiverStarted.receiverInfo.location receiverStarted.receiverInfo.name receiverStarted.receiverInfo.streamId receiverStarted.receiverInfo.lastErrorTime } /*當接收器結束時觸發的回調函數(非directStreaming)*/
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = { receiverStopped.receiverInfo.lastErrorTime receiverStopped.receiverInfo.lastError receiverStopped.receiverInfo.streamId receiverStopped.receiverInfo.name receiverStopped.receiverInfo.location receiverStopped.receiverInfo.lastErrorMessage receiverStopped.receiverInfo.active receiverStopped.receiverInfo.executorId } /*當接收器發生錯誤時觸發的回調函數(非directStreaming)*/
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = { receiverError.receiverInfo.executorId receiverError.receiverInfo.active receiverError.receiverInfo.lastErrorMessage receiverError.receiverInfo.lastError receiverError.receiverInfo.location receiverError.receiverInfo.name receiverError.receiverInfo.streamId receiverError.receiverInfo.lastErrorTime } /*當output開始時觸發的回調函數*/
override def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = { outputOperationStarted.outputOperationInfo.description outputOperationStarted.outputOperationInfo.batchTime outputOperationStarted.outputOperationInfo.endTime outputOperationStarted.outputOperationInfo.failureReason outputOperationStarted.outputOperationInfo.id outputOperationStarted.outputOperationInfo.name outputOperationStarted.outputOperationInfo.startTime outputOperationStarted.outputOperationInfo.duration } /*當output結束時觸發的回調函數*/
override def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = { outputOperationCompleted.outputOperationInfo.duration outputOperationCompleted.outputOperationInfo.startTime outputOperationCompleted.outputOperationInfo.name outputOperationCompleted.outputOperationInfo.id outputOperationCompleted.outputOperationInfo.failureReason outputOperationCompleted.outputOperationInfo.endTime outputOperationCompleted.outputOperationInfo.batchTime outputOperationCompleted.outputOperationInfo.description } }