Spark Streaming任務延遲監控及告警


概述

StreamingListener 是針對spark streaming的各個階段的事件監聽機制。

StreamingListener接口

//需要監聽spark streaming中各個階段的事件只需實現這個特質中對應的事件函數即可
//本身既有注釋說明
trait StreamingListener {

  /** Called when the streaming has been started */
  /** streaming 啟動的事件 */
  def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { }

  /** Called when a receiver has been started */
  /** 接收啟動事件 */
  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }

  /** Called when a receiver has reported an error */
  def onReceiverError(receiverError: StreamingListenerReceiverError) { }

  /** Called when a receiver has been stopped */
  def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }

  /** Called when a batch of jobs has been submitted for processing. */
  /** 每個批次提交的事件 */
  def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }

  /** Called when processing of a batch of jobs has started.  */
  /** 每個批次啟動的事件 */
  def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }

  /** Called when processing of a batch of jobs has completed. */
  /** 每個批次完成的事件  */
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }

  /** Called when processing of a job of a batch has started. */
  def onOutputOperationStarted(
      outputOperationStarted: StreamingListenerOutputOperationStarted) { }

  /** Called when processing of a job of a batch has completed. */
  def onOutputOperationCompleted(
      outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
}

自定義StreamingListener

功能:監控批次處理時間,若超過閾值則告警,每次告警間隔2分鍾

class SparkStreamingDelayListener(private val appName:String, private val duration: Int,private val times: Int) extends StreamingListener{

  private val logger = LoggerFactory.getLogger("SparkStreamingDelayListener")

//每個批次完成時執行
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    val batchInfo = batchCompleted.batchInfo
    val processingStartTime = batchCompleted.batchInfo.processingStartTime
    val numRecords = batchCompleted.batchInfo.numRecords
    val processingEndTime = batchInfo.processingEndTime
    val processingDelay = batchInfo.processingDelay
    val totalDelay = batchInfo.totalDelay

    //將每次告警時間寫入redis,用以判斷告警間隔大於2分鍾
    val jedis = RedisClusterClient.getJedisClusterClient()
    val current_time = (System.currentTimeMillis / 1000).toInt
    val redis_time = jedis.get(appName)
    var flag = false
    if(redis_time==null || current_time-redis_time.toInt>120){
      jedis.set(appName,current_time.toString)
      flag = true
    }
    
    //若批次處理延遲大於批次時長指定倍數,並且告警間隔大約2分鍾,則告警
    if(totalDelay.get >= times * duration * 1000 && flag){
      val monitorContent = appName+": numRecords ->"+numRecords+",processingDelay ->"+processingDelay.get/1000+" s,totalDelay -> "+totalDelay.get/1000+"s"
      println(monitorContent)
      val msg = "Streaming_"+appName+"_DelayTime:"+totalDelay.get/1000+"S"
      val getURL = "http://node1:8002/message/weixin?msg="+msg
      HttpClient.doGet(getURL)
    }
  }
}

應用

//streamingListener不需要在配置中設置,可以直接添加到streamingContext中
object My{
    def main(args : Array[String]) : Unit = {
        val sparkConf = new SparkConf()
        val ssc = new StreamingContext(sparkConf,Seconds(20))
        ssc.addStreamingListener(new SparkStreamingDelayListener("Userid2Redis", duration,times))

        ....
    }
}

訂閱關注微信公眾號《大數據技術進階》,及時獲取更多大數據架構和應用相關技術文章!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM