Spark Streaming反壓機制


反壓(Back Pressure)機制主要用來解決流處理系統中,處理速度比攝入速度慢的情況。是控制流處理中批次流量過載的有效手段。

1 反壓機制原理

Spark Streaming中的反壓機制是Spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。

1.1 反壓定義

當批處理時間(Batch Processing Time)大於批次間隔(Batch Interval,即 BatchDuration)時,說明處理數據的速度小於數據攝入的速度,持續時間過長或源頭數據暴增,容易造成數據在內存中堆積,最終導致Executor OOM或任務奔潰。

 

1.2 反壓的數據源方式及限流處理

spark streaming的數據源方式有兩種:

  1. 若是基於Receiver的數據源,可以通過設置spark.streaming.receiver.maxRate來控制最大輸入速率;
  2. 若是基於Direct的數據源(如Kafka Direct Stream),則可以通過設置spark.streaming.kafka.maxRatePerPartition來控制最大輸入速率。

當然,在事先經過壓測,且流量高峰不會超過預期的情況下,設置這些參數一般沒什么問題。但最大值,不代表是最優值,最好還能根據每個批次處理情況來動態預估下個批次最優速率。

在Spark 1.5.0以上,就可通過背壓機制來實現。開啟反壓機制,即設置spark.streaming.backpressure.enabled為true,Spark Streaming會自動根據處理能力來調整輸入速率,從而在流量高峰時仍能保證最大的吞吐和性能。

 

1.3 反壓的實現原理

Spark Streaming的反壓機制中,有以下幾個重要的組件:

  • RateController
  • RateEstimator
  • RateLimiter

主要是通過RateController組件來實現。RateController繼承自接口StreamingListener,並實現了onBatchCompleted方法。每一個Batch處理完成后都會調用此方法,具體如下:

 override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
    val elements = batchCompleted.batchInfo.streamIdToInputInfo

    for {
      // 處理結束時間
      processingEnd <- batchCompleted.batchInfo.processingEndTime
      // 處理時間,即`processingEndTime` - `processingStartTime`
      workDelay <- batchCompleted.batchInfo.processingDelay
      // 在調度隊列中的等待時間,即`processingStartTime` - `submissionTime`
      waitDelay <- batchCompleted.batchInfo.schedulingDelay
      // 當前批次處理的記錄數
      elems <- elements.get(streamUID).map(_.numRecords)
    } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
  }

可以看到,接着又調用的是computeAndPublish方法,如下:

private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
    Future[Unit] {
      // 根據處理時間、調度時間、當前Batch記錄數,預估新速率
      val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
      newRate.foreach { s =>
      // 設置新速率
        rateLimit.set(s.toLong)
      // 發布新速率
        publish(getLatestRate())
      }
    }

更深一層,具體調用的是rateEstimator.compute方法來預估新速率,

 

def compute(
      time: Long,
      elements: Long,
      processingDelay: Long,
      schedulingDelay: Long): Option[Double]

RateEstimator是速率估算器,主要用來估算最大處理速率,默認的在2.2之前版本中只支持PIDRateEstimator,在以后的版本可能會支持使用其他插件,其源碼如下:

 

  def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
    conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
      case "pid" =>
        val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
        val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
        val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
        val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)
        new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)
    //默認的只支持pid,其他的配置拋出異常
      case estimator =>
        throw new IllegalArgumentException(s"Unknown rate estimator: $estimator")
    }

 

以上這兩個組件都是在Driver端用於更新最大速度的,而RateLimiter是用於接收到Driver的更新通知之后更新Executor的最大處理速率的組件。RateLimiter是一個抽象類,它並不是Spark本身實現的,而是借助了第三方Google的GuavaRateLimiter來產生的。

它實質上是一個限流器,也可以叫做令牌,如果Executor中task每秒計算的速度大於該值則阻塞,如果小於該值則通過,將流數據加入緩存中進行計算。這種機制也可以叫做令牌桶機制,圖示如下:

 

 接收到的newRate進行比較,取兩者中的最小值來作為最大處理速率,如果沒有設置,直接設置為newRate。源碼如下:

private[receiver] def updateRate(newRate: Long): Unit =
    if (newRate > 0) {
      if (maxRateLimit > 0) {
        //如果設置了maxRateLimit則取兩者中的最小值
        rateLimiter.setRate(newRate.min(maxRateLimit))
      } else {
        rateLimiter.setRate(newRate)
      }
    }

spark 1.5引入的反壓機制架構圖如下:

 

 

2. 反壓機制相關參數

參數名稱 默認值 說明
spark.streaming.backpressure.enabled false 是否啟用反壓機制
spark.streaming.backpressure.initialRate 初始最大接收速率。只適用於Receiver Stream,不適用於Direct Stream。
spark.streaming.backpressure.rateEstimator pid 速率控制器,Spark 默認只支持此控制器,可自定義。
spark.streaming.backpressure.pid.proportional 1.0 只能為非負值。當前速率與最后一批速率之間的差值對總控制信號貢獻的權重。用默認值即可。
spark.streaming.backpressure.pid.integral 0.2 只能為非負值。比例誤差累積對總控制信號貢獻的權重。用默認值即可
spark.streaming.backpressure.pid.derived 0 只能為非負值。比例誤差變化對總控制信號貢獻的權重。用默認值即可
spark.streaming.backpressure.pid.minRate 100 只能為正數,最小速率

 

3. 反壓機制的使用

//啟用反壓
conf.set("spark.streaming.backpressure.enabled","true")
//最小攝入條數控制
conf.set("spark.streaming.backpressure.pid.minRate","1")
//最大攝入條數控制
conf.set("spark.streaming.kafka.maxRatePerPartition","12")

注意:

  1. 反壓機制真正起作用時需要至少處理一個批:由於反壓機制需要根據當前批的速率,預估新批的速率,所以反壓機制真正起作用前,應至少保證處理一個批。
  2. 如何保證反壓機制真正起作用前應用不會崩潰:要保證反壓機制真正起作用前應用不會崩潰,需要控制每個批次最大攝入速率。若為Direct Stream,如Kafka Direct Stream,則可以通過spark.streaming.kafka.maxRatePerPartition參數來控制。此參數代表了 每秒每個分區最大攝入的數據條數。假設BatchDuration為10秒,spark.streaming.kafka.maxRatePerPartition為12條,kafka topic 分區數為3個,則一個批(Batch)最大讀取的數據條數為360條(3*12*10=360)。同時,需要注意,該參數也代表了整個應用生命周期中的最大速率,即使是背壓調整的最大值也不會超過該參數。

 4. 查看日志

創建速率控制器
INFO PIDRateEstimator: Created PIDRateEstimator with proportional = 1.0, integral = 0.2, derivative = 0.0, min rate = 1.0
計算當前批次速率
// records 記錄數(對應WebUI: Input Size)
// processing time 處理時間,毫秒(對應WebUI: Processing Time)
// scheduling delay 調度時間,毫秒(對應WebUI: Scheduling Delay)
TRACE PIDRateEstimator: 
time = 1558888897224, # records = 33, processing time = 24548, scheduling delay = 8
預估新批次速率
TRACE PIDRateEstimator: 
 latestRate = -1.0, error = -2.344305035033404
 latestError = -1.0, historicalError = 0.0010754440280267231
 delaySinceUpdate = 1.558888897225E9, dError = -8.623482003280801E-10
第一次計算跳過速率估計
TRACE PIDRateEstimator: First run, rate estimation skipped
當前批次沒有記錄或沒有延遲則跳過速率估計
TRACE PIDRateEstimator: Rate estimation skipped
以新的預估速率運行
TRACE PIDRateEstimator: New rate = 1.0

WebUI

 

 可以看到,開啟反壓后,攝入速率Input Rate可以根據處理時間Processing Time來調整,從而保證應用的穩定性。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM