記錄spark streaming 中監聽器StreamingListener的相關信息
概述
StreamingListener 是針對spark streaming的各個階段的事件監聽機制
在用法上跟SparkListener很類似,但是有些細節區別
代碼記錄
//需要監聽spark streaming中各個階段的事件只需實現這個特質中對應的事件函數即可
//本身既有注釋說明
trait StreamingListener {
/** Called when the streaming has been started */
/** streaming 啟動的事件 */
def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { }
/** Called when a receiver has been started */
/** 接收啟動事件 */
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
/** Called when a receiver has reported an error */
def onReceiverError(receiverError: StreamingListenerReceiverError) { }
/** Called when a receiver has been stopped */
def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }
/** Called when a batch of jobs has been submitted for processing. */
/** 每個批次提交的事件 */
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }
/** Called when processing of a batch of jobs has started. */
/** 每個批次啟動的事件 */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
/** Called when processing of a batch of jobs has completed. */
/** 每個批次完成的事件 */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
/** Called when processing of a job of a batch has started. */
def onOutputOperationStarted(
outputOperationStarted: StreamingListenerOutputOperationStarted) { }
/** Called when processing of a job of a batch has completed. */
def onOutputOperationCompleted(
outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
}
示例代碼
package z.cloud.test.listener
class MyTestStreamingListener(ssc : StreamingContext) extends StreamingListener with Logging{
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
val batchInfo = batchCompleted.batchInfo
val execTime = batchInfo.processingDelay.getOrElse(0L)
val schedulingTime = batchInfo.schedulingDelay.getOrElse(0L)
logInfo(s"執行時間: $execTime 調度延時 : $schedulingTime")
}
}
示例代碼應用
//streamingListener不需要在配置中設置,可以直接添加到streamingContext中
object My{
def main(args : Array[String]) : Unit = {
val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf,Seconds(20))
ssc.addStreamingListener(new MyTestStreamingListener(ssc))
....
}
}