随着对spark的了解,有时会觉得spark就像一个宝盒一样时不时会出现一些难以置信的新功能。每一个新功能被挖掘,就可以使开发过程变得更加便利一点。甚至使很多不可能完成或者完成起来比较复杂的操作,变成简单起来。有些功能是框架专门开放给用户使用,有些则是框架内部使用但是又对外暴露了接口,用户也可以使用的功能。
今天和大家分享的是两个监听器SparkListener和streamingListener,由于这两个监听器的存在使得很多功能的开发变得轻松很多,也使很多技术实现变得轻便很多。
结合我的使用经验,这两个监听器主要可以如下两种用法:
1. 可以获取我们在sparkUI上的任何指标。当你想获取指标并且想做监控预警或者打算重构sparkUI,那你不需要再通过爬虫解析复杂的网页代码就可以获取sparkUI上的各种指标。
2. 对spark任务的各种事件做相应的操作,嵌入回调代码。
比如:你可以在sparkListener中的onApplicationStart方法中做driver端的第三方框架的连接池初始化(连接仅限driver端使用)以及其他变量的初始化,并放置到公共对象中,driver端直接就可以使用。且在onApplicaionComple方法中做连接的释放工作,以及变量的收集持久化操作,以次达到隐藏变量初始化的操作,冰做成公共jar包供其它人使用。
又如:你可以在StreamingListener的onbatchStart操作中获取kafka读取的offset位置以及读取数据条数,在onBatchCompl方法中将这些offset信息保存到mysql/zk中,达到优雅隐藏容错代码的目的。同样可以做成公共jar共其他项目使用。
等等这些都是一些非常酷炫的操作,当然还会有其他的酷炫的使用场景还在等待着你去挖掘。
性能分析
在使用过程中,大家可能比较关系另外一个问题:指标收集,会对流式计算性能产生多大的影响?
答案就是,在指标收集这一块,对于流式计算或者spark core产生的影响会很小。因为即使你不收集SparkUI也会收集,这些指标一样会生成。只是对于driver端的开销会稍微变大,如果在流式计算场景可能需要你调大driver端的cpu和内存
一 .自定义Listener 并注册(伪代码如下):
val spark:SparkSession=null val ssc:StreamingContext=null
/*注册streamingListnener*/ ssc.addStreamingListener(new MyStreamingListener) /*注册sparkListener*/ spark.sparkContext.addSparkListener(new MySparkListener)
/*自定义streamingListener*/ class MyStreamingListener extends StreamingListener{ //TODO 重载内置方法 }
/*自定义SparkListnener*/
class MySparkListener extends SparkListener { //TODO 重载内置方法 }
二.SparkListener内置方法讲解
通过如下重写的方法可以发现,你可以通过这个SparkListener对App,job,stage,task,executor,blockManager等产生的指标进行实时的收集,并在这些事件触发时嵌入一些代码,可以达到当某些指标达到警戒阈值触发自定义的一些规则。如下已经列出了获取的spark级别指标和事件回调函数,下一节会列出streaming的性能监控点以及收集的指标。
class MySparkListener extends SparkListener { /*当整个应用开始执行时*/
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit={ /*获取appID-->spark on yarn模式下的appID一致*/ applicationStart.appId /*appName*/ applicationStart.appName /*driver端的日志,如果配置了日志的截断机制,获取的将不是完整日志*/ applicationStart.driverLogs /*提交的用户*/ applicationStart.sparkUser /*开始的事件*/ applicationStart.time } /*当整个Application结束时调用的回调函数*/
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { /*结束的时间*/ applicationEnd.time } /*当job开始执行时触发的回调函数*/
override def onJobStart(jobStart: SparkListenerJobStart): Unit = { /*jobId和SParkUI上的一致*/ jobStart.jobId /*配置信息*/ jobStart.properties /*当前job根据宽窄依赖生成的所有strageID*/ jobStart.stageIds /*job开始的时间*/ jobStart.time jobStart.properties /*当前job每一个stage的信息抽象*/ jobStart.stageInfos.foreach(stageInfo => { stageInfo.stageId /*stage提交的时间,不是task开始执行的时间,这个时间是stage开始抽象成taskDesc的开始时间*/ stageInfo.submissionTime /*这个stage完成的时间*/ stageInfo.completionTime /*当前stage发成了错误会重试,重试会在stageID后加上“_重试次数”*/ stageInfo.attemptId /*当前staget的详细信息*/ stageInfo.details /*当前stage累加器的中间结果*/ stageInfo.accumulables /*如果当前stage失败,返回失败原因,如果做日志预警,可以在此处判断非空并嵌入代码收集错误日志*/ stageInfo.failureReason stageInfo.name /*当前stage抽象出的taskSet的长度*/ stageInfo.numTasks /*父依赖stage的id*/ stageInfo.parentIds stageInfo.rddInfos /*task指标收集*/ stageInfo.taskMetrics stageInfo.taskLocalityPreferences stageInfo.stageFailed("") }) } /*当job结束时触发的回调函数*/
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { jobEnd.jobId jobEnd.time jobEnd.jobResult } /*当提交stage时触发的回调函数*/
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { stageSubmitted.properties stageSubmitted.stageInfo.taskLocalityPreferences stageSubmitted.stageInfo.stageFailed("") stageSubmitted.stageInfo.attemptId stageSubmitted.stageInfo.taskMetrics.executorDeserializeTime stageSubmitted.stageInfo.taskMetrics.executorDeserializeCpuTime stageSubmitted.stageInfo.taskMetrics.executorCpuTime stageSubmitted.stageInfo.taskMetrics.diskBytesSpilled stageSubmitted.stageInfo.taskMetrics.inputMetrics.recordsRead stageSubmitted.stageInfo.taskMetrics.inputMetrics.bytesRead stageSubmitted.stageInfo.taskMetrics.outputMetrics.recordsWritten stageSubmitted.stageInfo.taskMetrics.outputMetrics.bytesWritten stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.totalBytesRead stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.recordsRead stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.fetchWaitTime stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.localBlocksFetched stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.localBytesRead stageSubmitted.stageInfo.taskMetrics.shuffleReadMetrics.remoteBlocksFetched stageSubmitted.stageInfo.taskMetrics.shuffleWriteMetrics.bytesWritten stageSubmitted.stageInfo.taskMetrics.shuffleWriteMetrics.recordsWritten stageSubmitted.stageInfo.taskMetrics.shuffleWriteMetrics.writeTime stageSubmitted.stageInfo.taskMetrics.executorRunTime stageSubmitted.stageInfo.taskMetrics.jvmGCTime stageSubmitted.stageInfo.taskMetrics.memoryBytesSpilled stageSubmitted.stageInfo.taskMetrics.peakExecutionMemory stageSubmitted.stageInfo.taskMetrics.resultSerializationTime stageSubmitted.stageInfo.taskMetrics.resultSize stageSubmitted.stageInfo.taskMetrics.updatedBlockStatuses stageSubmitted.stageInfo.rddInfos stageSubmitted.stageInfo.parentIds stageSubmitted.stageInfo.details stageSubmitted.stageInfo.numTasks stageSubmitted.stageInfo.name stageSubmitted.stageInfo.accumulables stageSubmitted.stageInfo.completionTime stageSubmitted.stageInfo.submissionTime stageSubmitted.stageInfo.stageId stageSubmitted.stageInfo.failureReason } /*当stage完成时触发的回调函数*/
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { stageCompleted.stageInfo.attemptId stageCompleted.stageInfo.failureReason stageCompleted.stageInfo.stageId stageCompleted.stageInfo.submissionTime stageCompleted.stageInfo.completionTime stageCompleted.stageInfo.accumulables stageCompleted.stageInfo.details stageCompleted.stageInfo.name stageCompleted.stageInfo.numTasks stageCompleted.stageInfo.parentIds stageCompleted.stageInfo.rddInfos stageCompleted.stageInfo.taskMetrics stageCompleted.stageInfo.stageFailed() stageCompleted.stageInfo.taskLocalityPreferences } /*当task开始时触发的回调函数*/
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { taskStart.stageAttemptId taskStart.stageId taskStart.taskInfo.executorId taskStart.taskInfo.taskId taskStart.taskInfo.finishTime taskStart.taskInfo.launchTime taskStart.taskInfo.accumulables taskStart.taskInfo.attemptNumber taskStart.taskInfo.failed taskStart.taskInfo.gettingResultTime taskStart.taskInfo.gettingResult taskStart.taskInfo.executorId taskStart.taskInfo.host taskStart.taskInfo.index taskStart.taskInfo.killed taskStart.taskInfo.speculative taskStart.taskInfo.taskLocality taskStart.taskInfo.duration taskStart.taskInfo.finished taskStart.taskInfo.id taskStart.taskInfo.running taskStart.taskInfo.successful taskStart.taskInfo.status } /*获取task执行的结果*/
override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { taskGettingResult.taskInfo } /*当task执行完成时执行的回调函数*/
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { taskEnd.taskMetrics.resultSize taskEnd.taskMetrics.updatedBlockStatuses taskEnd.taskMetrics.resultSerializationTime taskEnd.taskMetrics.peakExecutionMemory taskEnd.taskMetrics.memoryBytesSpilled taskEnd.taskMetrics.jvmGCTime taskEnd.taskMetrics.executorRunTime taskEnd.taskMetrics.shuffleWriteMetrics taskEnd.taskMetrics.shuffleReadMetrics taskEnd.taskMetrics.outputMetrics taskEnd.taskMetrics.inputMetrics taskEnd.taskMetrics.diskBytesSpilled taskEnd.taskMetrics.executorCpuTime taskEnd.taskMetrics.executorDeserializeCpuTime taskEnd.taskMetrics.executorDeserializeTime taskEnd.taskInfo.executorId taskEnd.taskInfo.host taskEnd.taskInfo.index taskEnd.taskInfo.killed taskEnd.taskInfo.speculative taskEnd.taskInfo.taskLocality taskEnd.taskInfo.duration taskEnd.taskInfo.finished taskEnd.taskInfo.taskId taskEnd.taskInfo.id taskEnd.taskInfo.running taskEnd.stageId taskEnd.reason taskEnd.stageAttemptId taskEnd.taskType } /*新增bockManger时触发的回调函数*/
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { blockManagerAdded.blockManagerId.executorId blockManagerAdded.blockManagerId.host blockManagerAdded.blockManagerId.port blockManagerAdded.blockManagerId.topologyInfo blockManagerAdded.blockManagerId.hostPort blockManagerAdded.blockManagerId.isDriver blockManagerAdded.maxMem blockManagerAdded.time } /*当blockManage中管理的内存或者磁盘发生变化时触发的回调函数*/
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { blockUpdated.blockUpdatedInfo.blockId blockUpdated.blockUpdatedInfo.blockManagerId blockUpdated.blockUpdatedInfo.diskSize blockUpdated.blockUpdatedInfo.memSize blockUpdated.blockUpdatedInfo.storageLevel } /*当blockManager回收时触发的回调函数*/
override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { blockManagerRemoved.blockManagerId blockManagerRemoved.time } /*当 上下文环境发生变化是触发的回调函数*/
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { environmentUpdate.environmentDetails } /*当RDD发生unpersist时发生的回调函数*/
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { unpersistRDD.rddId } override def onOtherEvent(event: SparkListenerEvent): Unit = { } /*当新增一个executor时触发的回调函数*/
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { executorAdded.executorId executorAdded.executorInfo.executorHost executorAdded.executorInfo.logUrlMap executorAdded.executorInfo.totalCores executorAdded.time } /*当executor发生变化时触发的回调函数*/
override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { executorMetricsUpdate.accumUpdates executorMetricsUpdate.execId } /*当移除一个executor时触发的回调函数*/
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { executorRemoved.executorId executorRemoved.reason executorRemoved.time } }
三.StreamingListener内置方法讲解
class MyStreamingListener extends StreamingListener { /*流式计算开始时,触发的回调函数*/
override def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = { streamingStarted.time } /*当前batch提交时触发的回调函数*/
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = { batchSubmitted.batchInfo.streamIdToInputInfo.foreach(tuple=>{ val streamInputInfo = tuple._2 streamInputInfo.metadata streamInputInfo.numRecords streamInputInfo.inputStreamId streamInputInfo.metadataDescription }) batchSubmitted.batchInfo.numRecords batchSubmitted.batchInfo.outputOperationInfos batchSubmitted.batchInfo.submissionTime batchSubmitted.batchInfo.batchTime batchSubmitted.batchInfo.processingEndTime batchSubmitted.batchInfo.processingStartTime batchSubmitted.batchInfo.processingDelay batchSubmitted.batchInfo.schedulingDelay batchSubmitted.batchInfo.totalDelay } /*当前batch开始执行时触发的回调函数*/
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = { batchStarted.batchInfo.totalDelay batchStarted.batchInfo.schedulingDelay batchStarted.batchInfo.processingDelay batchStarted.batchInfo.processingStartTime batchStarted.batchInfo.processingEndTime batchStarted.batchInfo.batchTime batchStarted.batchInfo.submissionTime batchStarted.batchInfo.outputOperationInfos batchStarted.batchInfo.numRecords batchStarted.batchInfo.streamIdToInputInfo } /*当前batch完成时触发的回调函数*/
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { batchCompleted.batchInfo.streamIdToInputInfo batchCompleted.batchInfo.numRecords batchCompleted.batchInfo.outputOperationInfos batchCompleted.batchInfo.submissionTime batchCompleted.batchInfo.batchTime batchCompleted.batchInfo.processingEndTime batchCompleted.batchInfo.processingStartTime batchCompleted.batchInfo.processingDelay batchCompleted.batchInfo.schedulingDelay batchCompleted.batchInfo.totalDelay /*获取offset,并持久化到第三方容器*/ batchCompleted.batchInfo.streamIdToInputInfo.foreach(tuple=>{ val offsets = tuple._2.metadata.get("offsets").get classOf[List[OffsetRange]].cast(offsets).foreach(offsetRange => { val partition = offsetRange.partition val minOffset = offsetRange.fromOffset val maxOffset = offsetRange.untilOffset val topicName = offsetRange.topic //TODO 将kafka容错信息,写到mysql/redis/zk等框架达到数据容错
} ) } /*当接收器启动时触发的回调函数(非directStreaming)*/
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { receiverStarted.receiverInfo.executorId receiverStarted.receiverInfo.active receiverStarted.receiverInfo.lastError receiverStarted.receiverInfo.lastErrorMessage receiverStarted.receiverInfo.location receiverStarted.receiverInfo.name receiverStarted.receiverInfo.streamId receiverStarted.receiverInfo.lastErrorTime } /*当接收器结束时触发的回调函数(非directStreaming)*/
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = { receiverStopped.receiverInfo.lastErrorTime receiverStopped.receiverInfo.lastError receiverStopped.receiverInfo.streamId receiverStopped.receiverInfo.name receiverStopped.receiverInfo.location receiverStopped.receiverInfo.lastErrorMessage receiverStopped.receiverInfo.active receiverStopped.receiverInfo.executorId } /*当接收器发生错误时触发的回调函数(非directStreaming)*/
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = { receiverError.receiverInfo.executorId receiverError.receiverInfo.active receiverError.receiverInfo.lastErrorMessage receiverError.receiverInfo.lastError receiverError.receiverInfo.location receiverError.receiverInfo.name receiverError.receiverInfo.streamId receiverError.receiverInfo.lastErrorTime } /*当output开始时触发的回调函数*/
override def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = { outputOperationStarted.outputOperationInfo.description outputOperationStarted.outputOperationInfo.batchTime outputOperationStarted.outputOperationInfo.endTime outputOperationStarted.outputOperationInfo.failureReason outputOperationStarted.outputOperationInfo.id outputOperationStarted.outputOperationInfo.name outputOperationStarted.outputOperationInfo.startTime outputOperationStarted.outputOperationInfo.duration } /*当output结束时触发的回调函数*/
override def onOutputOperationCompleted(outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = { outputOperationCompleted.outputOperationInfo.duration outputOperationCompleted.outputOperationInfo.startTime outputOperationCompleted.outputOperationInfo.name outputOperationCompleted.outputOperationInfo.id outputOperationCompleted.outputOperationInfo.failureReason outputOperationCompleted.outputOperationInfo.endTime outputOperationCompleted.outputOperationInfo.batchTime outputOperationCompleted.outputOperationInfo.description } }