sparkUi的4040界面已經有了運行監控指標,為什么我們還要自定義存入redis?
1.結合自己的業務,可以將監控頁面集成到自己的數據平台內,方便問題查找,郵件告警
2.可以在sparkUi的基礎上,添加一些自己想要指標統計
一、spark的SparkListener
sparkListener是一個接口,我們使用時需要自定義監控類實現sparkListener接口中的各種抽象方法,SparkListener 下各個事件對應的函數名非常直白,即如字面所表達意思。 想對哪個階段的事件做一些自定義的動作,變繼承SparkListener實現對應的函數即可,這些方法會幫助我監控spark運行時各個階段的數據量,從而我們可以獲得這些監控指標數據
abstract class SparkListener extends SparkListenerInterface {
//stage完成的時調用
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }
//stage提交時調用
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }
override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }
//task結束時調用
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }
override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }
override def onBlockManagerRemoved(
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { }
override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}
1.實現自己SparkListener,對onTaskEnd方法是指標存入redis
(1)SparkListener是一個接口,創建一個MySparkAppListener類繼承SparkListener,實現里面的onTaskEnd即可
(2)方法:override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }
SparkListenerTaskEnd類:
case class SparkListenerTaskEnd(
//spark的stageId
stageId: Int,
//嘗試的階段Id(也就是下級Stage?)
stageAttemptId: Int,
taskType: String,
reason: TaskEndReason,
//task信息
taskInfo: TaskInfo,
// task指標
@Nullable taskMetrics: TaskMetrics)
extends SparkListenerEvent
(3)在 onTaskEnd方法內可以通過成員taskinfo與taskMetrics獲取的信息
/**
* 1、taskMetrics
* 2、shuffle
* 3、task運行(input output)
* 4、taskInfo
**/
(4)TaskMetrics可以獲取的監控信息
class TaskMetrics private[spark] () extends Serializable {
// Each metric is internally represented as an accumulator
private val _executorDeserializeTime = new LongAccumulator
private val _executorDeserializeCpuTime = new LongAccumulator
private val _executorRunTime = new LongAccumulator
private val _executorCpuTime = new LongAccumulator
private val _resultSize = new LongAccumulator
private val _jvmGCTime = new LongAccumulator
private val _resultSerializationTime = new LongAccumulator
private val _memoryBytesSpilled = new LongAccumulator
private val _diskBytesSpilled = new LongAccumulator
private val _peakExecutionMemory = new LongAccumulator
private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)]
val inputMetrics: InputMetrics = new InputMetrics()
/**
* Metrics related to writing data externally (e.g. to a distributed filesystem),
* defined only in tasks with output.
*/
val outputMetrics: OutputMetrics = new OutputMetrics()
/**
* Metrics related to shuffle read aggregated across all shuffle dependencies.
* This is defined only if there are shuffle dependencies in this task.
*/
val shuffleReadMetrics: ShuffleReadMetrics = new ShuffleReadMetrics()
/**
* Metrics related to shuffle write, defined only in shuffle map stages.
*/
val shuffleWriteMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics()
(5)代碼實現並存入redis
/**
* 需求1.想自定義spark的job運行情況存入redis,集成到自己的業務后台展示中
*/
class MySparkAppListener extends SparkListener with Logging {
val redisConf = "jedisConfig.properties"
val jedis: Jedis = JedisUtil.getInstance().getJedis
//父類的第一個方法
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
//在 onTaskEnd方法內可以獲取的信息有
/**
* 1、taskMetrics
* 2、shuffle
* 3、task運行(input output)
* 4、taskInfo
**/
val currentTimestamp = System.currentTimeMillis()
// TaskMetrics(task的指標)可以拿到的指標
/**
* private val _executorDeserializeTime = new LongAccumulator
* private val _executorDeserializeCpuTime = new LongAccumulator
* private val _executorRunTime = new LongAccumulator
* private val _executorCpuTime = new LongAccumulator
* private val _resultSize = new LongAccumulator
* private val _jvmGCTime = new LongAccumulator
* private val _resultSerializationTime = new LongAccumulator
* private val _memoryBytesSpilled = new LongAccumulator
* private val _diskBytesSpilled = new LongAccumulator
* private val _peakExecutionMemory = new LongAccumulator
* private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)]
*/
val metrics = taskEnd.taskMetrics
val taskMetricsMap = scala.collection.mutable.HashMap(
"executorDeserializeTime" -> metrics.executorDeserializeTime, //executor的反序列化時間
"executorDeserializeCpuTime" -> metrics.executorDeserializeCpuTime, //executor的反序列化的 cpu時間
"executorRunTime" -> metrics.executorRunTime, //executoor的運行時間
"resultSize" -> metrics.resultSize, //結果集大小
"jvmGCTime" -> metrics.jvmGCTime, //
"resultSerializationTime" -> metrics.resultSerializationTime,
"memoryBytesSpilled" -> metrics.memoryBytesSpilled, //內存溢寫的大小
"diskBytesSpilled" -> metrics.diskBytesSpilled, //溢寫到磁盤的大小
"peakExecutionMemory" -> metrics.peakExecutionMemory //executor的最大內存
)
val jedisKey = "taskMetrics_" + {
currentTimestamp
}
jedis.set(jedisKey, Json(DefaultFormats).write(jedisKey))
jedis.pexpire(jedisKey, 3600)
//======================shuffle指標================================
val shuffleReadMetrics = metrics.shuffleReadMetrics
val shuffleWriteMetrics = metrics.shuffleWriteMetrics
//shuffleWriteMetrics shuffle讀過程的指標有這些
/**
* private[executor] val _bytesWritten = new LongAccumulator
* private[executor] val _recordsWritten = new LongAccumulator
* private[executor] val _writeTime = new LongAccumulator
*/
//shuffleReadMetrics shuffle寫過程的指標有這些
/**
* private[executor] val _remoteBlocksFetched = new LongAccumulator
* private[executor] val _localBlocksFetched = new LongAccumulator
* private[executor] val _remoteBytesRead = new LongAccumulator
* private[executor] val _localBytesRead = new LongAccumulator
* private[executor] val _fetchWaitTime = new LongAccumulator
* private[executor] val _recordsRead = new LongAccumulator
*/
val shuffleMap = scala.collection.mutable.HashMap(
"remoteBlocksFetched" -> shuffleReadMetrics.remoteBlocksFetched, //shuffle遠程拉取數據塊
"localBlocksFetched" -> shuffleReadMetrics.localBlocksFetched, //本地塊拉取
"remoteBytesRead" -> shuffleReadMetrics.remoteBytesRead, //shuffle遠程讀取的字節數
"localBytesRead" -> shuffleReadMetrics.localBytesRead, //讀取本地數據的字節
"fetchWaitTime" -> shuffleReadMetrics.fetchWaitTime, //拉取數據的等待時間
"recordsRead" -> shuffleReadMetrics.recordsRead, //shuffle讀取的記錄總數
"bytesWritten" -> shuffleWriteMetrics.bytesWritten, //shuffle寫的總大小
"recordsWritte" -> shuffleWriteMetrics.recordsWritten, //shuffle寫的總記錄數
"writeTime" -> shuffleWriteMetrics.writeTime
)
val shuffleKey = s"shuffleKey${currentTimestamp}"
jedis.set(shuffleKey, Json(DefaultFormats).write(shuffleMap))
jedis.expire(shuffleKey, 3600)
//=================輸入輸出========================
val inputMetrics = taskEnd.taskMetrics.inputMetrics
val outputMetrics = taskEnd.taskMetrics.outputMetrics
val input_output = scala.collection.mutable.HashMap(
"bytesRead" -> inputMetrics.bytesRead, //讀取的大小
"recordsRead" -> inputMetrics.recordsRead, //總記錄數
"bytesWritten" -> outputMetrics.bytesWritten,//輸出的大小
"recordsWritten" -> outputMetrics.recordsWritten//輸出的記錄數
)
val input_outputKey = s"input_outputKey${currentTimestamp}"
jedis.set(input_outputKey, Json(DefaultFormats).write(input_output))
jedis.expire(input_outputKey, 3600)
//####################taskInfo#######
val taskInfo: TaskInfo = taskEnd.taskInfo
val taskInfoMap = scala.collection.mutable.HashMap(
"taskId" -> taskInfo.taskId ,
"host" -> taskInfo.host ,
"speculative" -> taskInfo.speculative , //推測執行
"failed" -> taskInfo.failed ,
"killed" -> taskInfo.killed ,
"running" -> taskInfo.running
)
val taskInfoKey = s"taskInfo${currentTimestamp}"
jedis.set(taskInfoKey , Json(DefaultFormats).write(taskInfoMap))
jedis.expire(taskInfoKey , 3600)
}
(5)程序測試
sparkContext.addSparkListener方法添加自己監控主類
sc.addSparkListener(new MySparkAppListener())
使用wordcount進行簡單測試

二、spark實時監控
1.StreamingListener是實時監控的接口,里面有數據接收成功、錯誤、停止、批次提交、開始、完成等指標,原理與上述相同
trait StreamingListener {
/** Called when a receiver has been started */
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
/** Called when a receiver has reported an error */
def onReceiverError(receiverError: StreamingListenerReceiverError) { }
/** Called when a receiver has been stopped */
def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }
/** Called when a batch of jobs has been submitted for processing. */
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }
/** Called when processing of a batch of jobs has started. */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
/** Called when processing of a batch of jobs has completed. */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
/** Called when processing of a job of a batch has started. */
def onOutputOperationStarted(
outputOperationStarted: StreamingListenerOutputOperationStarted) { }
/** Called when processing of a job of a batch has completed. */
def onOutputOperationCompleted(
outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
}
2.主要指標及用途
1.onReceiverError
監控數據接收錯誤信息,進行郵件告警
2.onBatchCompleted 該批次完成時調用該方法
(1)sparkstreaming的偏移量提交時,當改批次執行完,才進行offset的保存入庫,(該無法保證統計入庫完成后程序中斷、offset未提交)
(2)批次處理時間大於了規定的窗口時間,程序出現阻塞,進行郵件告警
三、spark、yarn的web返回接口進行數據解析,獲取指標信息
1.啟動某個本地spark程序
訪問 :http://localhost:4040/metrics/json/,得到一串json數據,解析gauges,則可獲取所有的信息
{
"version": "3.0.0",
"gauges": {
"local-1581865176069.driver.BlockManager.disk.diskSpaceUsed_MB": {
"value": 0
},
"local-1581865176069.driver.BlockManager.memory.maxMem_MB": {
"value": 1989
},
"local-1581865176069.driver.BlockManager.memory.memUsed_MB": {
"value": 0
},
"local-1581865176069.driver.BlockManager.memory.remainingMem_MB": {
"value": 1989
},
"local-1581865176069.driver.DAGScheduler.job.activeJobs": {
"value": 0
},
"local-1581865176069.driver.DAGScheduler.job.allJobs": {
"value": 0
},
"local-1581865176069.driver.DAGScheduler.stage.failedStages": {
"value": 0
},
"local-1581865176069.driver.DAGScheduler.stage.runningStages": {
"value": 0
},
"local-1581865176069.driver.DAGScheduler.stage.waitingStages": {
"value": 0
}
},
"counters": {
"local-1581865176069.driver.HiveExternalCatalog.fileCacheHits": {
"count": 0
},
"local-1581865176069.driver.HiveExternalCatalog.filesDiscovered": {
"count": 0
},
"local-1581865176069.driver.HiveExternalCatalog.hiveClientCalls": {
"count": 0
},
"local-1581865176069.driver.HiveExternalCatalog.parallelListingJobCount": {
"count": 0
},
"local-1581865176069.driver.HiveExternalCatalog.partitionsFetched": {
"count": 0
}
},
"histograms": {
"local-1581865176069.driver.CodeGenerator.compilationTime": {
"count": 0,
"max": 0,
"mean": 0,
"min": 0,
"p50": 0,
"p75": 0,
"p95": 0,
"p98": 0,
"p99": 0,
"p999": 0,
"stddev": 0
},
"local-1581865176069.driver.CodeGenerator.generatedClassSize": {
"count": 0,
"max": 0,
"mean": 0,
"min": 0,
"p50": 0,
"p75": 0,
"p95": 0,
"p98": 0,
"p99": 0,
"p999": 0,
"stddev": 0
},
"local-1581865176069.driver.CodeGenerator.generatedMethodSize": {
"count": 0,
"max": 0,
"mean": 0,
"min": 0,
"p50": 0,
"p75": 0,
"p95": 0,
"p98": 0,
"p99": 0,
"p999": 0,
"stddev": 0
},
"local-1581865176069.driver.CodeGenerator.sourceCodeSize": {
"count": 0,
"max": 0,
"mean": 0,
"min": 0,
"p50": 0,
"p75": 0,
"p95": 0,
"p98": 0,
"p99": 0,
"p999": 0,
"stddev": 0
}
},
"meters": { },
"timers": {
"local-1581865176069.driver.DAGScheduler.messageProcessingTime": {
"count": 0,
"max": 0,
"mean": 0,
"min": 0,
"p50": 0,
"p75": 0,
"p95": 0,
"p98": 0,
"p99": 0,
"p999": 0,
"stddev": 0,
"m15_rate": 0,
"m1_rate": 0,
"m5_rate": 0,
"mean_rate": 0,
"duration_units": "milliseconds",
"rate_units": "calls/second"
}
}
}
解析json獲取指標信息
val diskSpaceUsed_MB = gauges.getJSONObject(applicationId + ".driver.BlockManager.disk.diskSpaceUsed_MB").getLong("value")//使用的磁盤空間
val maxMem_MB = gauges.getJSONObject(applicationId + ".driver.BlockManager.memory.maxMem_MB").getLong("value") //使用的最大內存
val memUsed_MB = gauges.getJSONObject(applicationId + ".driver.BlockManager.memory.memUsed_MB").getLong("value")//內存使用情況
val remainingMem_MB = gauges.getJSONObject(applicationId + ".driver.BlockManager.memory.remainingMem_MB").getLong("value") //閑置內存
//#####################stage###################################
val activeJobs = gauges.getJSONObject(applicationId + ".driver.DAGScheduler.job.activeJobs").getLong("value")//當前正在運行的job
val allJobs = gauges.getJSONObject(applicationId + ".driver.DAGScheduler.job.allJobs").getLong("value")//總job數
val failedStages = gauges.getJSONObject(applicationId + ".driver.DAGScheduler.stage.failedStages").getLong("value")//失敗的stage數量
val runningStages = gauges.getJSONObject(applicationId + ".driver.DAGScheduler.stage.runningStages").getLong("value")//正在運行的stage
val waitingStages = gauges.getJSONObject(applicationId + ".driver.DAGScheduler.stage.waitingStages").getLong("value")//等待運行的stage
//#####################StreamingMetrics###################################
val lastCompletedBatch_processingDelay = gauges.getJSONObject(applicationId + ".driver.query.StreamingMetrics.streaming.lastCompletedBatch_processingDelay").getLong("value")// 最近批次執行的延遲時間
val lastCompletedBatch_processingEndTime = gauges.getJSONObject(applicationId + ".driver.query.StreamingMetrics.streaming.lastCompletedBatch_processingEndTime").getLong("value")//最近批次執行結束時間(毫秒為單位)
val lastCompletedBatch_processingStartTime = gauges.getJSONObject(applicationId + ".driver.query.StreamingMetrics.streaming.lastCompletedBatch_processingStartTime").getLong("value")//最近批次開始執行時間
//執行時間
val lastCompletedBatch_processingTime = (lastCompletedBatch_processingEndTime - lastCompletedBatch_processingStartTime)
val lastReceivedBatch_records = gauges.getJSONObject(applicationId + ".driver.query.StreamingMetrics.streaming.lastReceivedBatch_records").getLong("value")//最近批次接收的數量
val runningBatches = gauges.getJSONObject(applicationId + ".driver.query.StreamingMetrics.streaming.runningBatches").getLong("value")//正在運行的批次
val totalCompletedBatches = gauges.getJSONObject(applicationId + ".driver.query.StreamingMetrics.streaming.totalCompletedBatches").getLong("value")//完成的數據量
val totalProcessedRecords = gauges.getJSONObject(applicationId + ".driver.query.StreamingMetrics.streaming.totalProcessedRecords").getLong("value")//總處理條數
val totalReceivedRecords = gauges.getJSONObject(applicationId + ".driver.query.StreamingMetrics.streaming.totalReceivedRecords").getLong("value")//總接收條數
val unprocessedBatches = gauges.getJSONObject(applicationId + ".driver.query.StreamingMetrics.streaming.unprocessedBatches").getLong("value")//為處理的批次
val waitingBatches = gauges.getJSONObject(applicationId + ".driver.query.StreamingMetrics.streaming.waitingBatches").getLong("value")//處於等待狀態的批次
2.spark提交至yarn
val sparkDriverHost = sc.getConf.get("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES")
//監控信息頁面路徑為集群路徑+/proxy/+應用id+/metrics/json
val url = s"${sparkDriverHost}/metrics/json"
3.作用
1.該job(endTime, applicationUniqueName, applicationId, sourceCount, costTime, countPerMillis)可以做表格,做鏈路統計
2.磁盤與內存信息做餅圖,用來對內存和磁盤的監控
3.程序task的運行情況做表格,用來對job的監控
