反壓(Back Pressure)機制主要用來解決流處理系統中,處理速度比攝入速度慢的情況。是控制流處理中批次流量過載的有效手段。
1 反壓機制原理
Spark Streaming中的反壓機制是Spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。
1.1 反壓定義
當批處理時間(Batch Processing Time)大於批次間隔(Batch Interval,即 BatchDuration)時,說明處理數據的速度小於數據攝入的速度,持續時間過長或源頭數據暴增,容易造成數據在內存中堆積,最終導致Executor OOM或任務奔潰。
1.2 反壓的數據源方式及限流處理
spark streaming的數據源方式有兩種:
- 若是基於Receiver的數據源,可以通過設置spark.streaming.receiver.maxRate來控制最大輸入速率;
- 若是基於Direct的數據源(如Kafka Direct Stream),則可以通過設置spark.streaming.kafka.maxRatePerPartition來控制最大輸入速率。
當然,在事先經過壓測,且流量高峰不會超過預期的情況下,設置這些參數一般沒什么問題。但最大值,不代表是最優值,最好還能根據每個批次處理情況來動態預估下個批次最優速率。
在Spark 1.5.0以上,就可通過背壓機制來實現。開啟反壓機制,即設置spark.streaming.backpressure.enabled為true,Spark Streaming會自動根據處理能力來調整輸入速率,從而在流量高峰時仍能保證最大的吞吐和性能。
1.3 反壓的實現原理
Spark Streaming的反壓機制中,有以下幾個重要的組件:
- RateController
- RateEstimator
- RateLimiter
主要是通過RateController組件來實現。RateController繼承自接口StreamingListener,並實現了onBatchCompleted方法。每一個Batch處理完成后都會調用此方法,具體如下:
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { val elements = batchCompleted.batchInfo.streamIdToInputInfo for { // 處理結束時間 processingEnd <- batchCompleted.batchInfo.processingEndTime // 處理時間,即`processingEndTime` - `processingStartTime` workDelay <- batchCompleted.batchInfo.processingDelay // 在調度隊列中的等待時間,即`processingStartTime` - `submissionTime` waitDelay <- batchCompleted.batchInfo.schedulingDelay // 當前批次處理的記錄數 elems <- elements.get(streamUID).map(_.numRecords) } computeAndPublish(processingEnd, elems, workDelay, waitDelay) }
可以看到,接着又調用的是computeAndPublish
方法,如下:
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit = Future[Unit] { // 根據處理時間、調度時間、當前Batch記錄數,預估新速率 val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay) newRate.foreach { s => // 設置新速率 rateLimit.set(s.toLong) // 發布新速率 publish(getLatestRate()) } }
更深一層,具體調用的是rateEstimator.compute
方法來預估新速率,
def compute(
time: Long,
elements: Long,
processingDelay: Long,
schedulingDelay: Long): Option[Double]
RateEstimator是速率估算器,主要用來估算最大處理速率,默認的在2.2之前版本中只支持PIDRateEstimator,在以后的版本可能會支持使用其他插件,其源碼如下:
def create(conf: SparkConf, batchInterval: Duration): RateEstimator = conf.get("spark.streaming.backpressure.rateEstimator", "pid") match { case "pid" => val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0) val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2) val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0) val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100) new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate) //默認的只支持pid,其他的配置拋出異常 case estimator => throw new IllegalArgumentException(s"Unknown rate estimator: $estimator") }
以上這兩個組件都是在Driver端用於更新最大速度的,而RateLimiter是用於接收到Driver的更新通知之后更新Executor的最大處理速率的組件。RateLimiter是一個抽象類,它並不是Spark本身實現的,而是借助了第三方Google的GuavaRateLimiter來產生的。
它實質上是一個限流器,也可以叫做令牌,如果Executor中task每秒計算的速度大於該值則阻塞,如果小於該值則通過,將流數據加入緩存中進行計算。這種機制也可以叫做令牌桶機制,圖示如下:
接收到的newRate進行比較,取兩者中的最小值來作為最大處理速率,如果沒有設置,直接設置為newRate。源碼如下:
private[receiver] def updateRate(newRate: Long): Unit = if (newRate > 0) { if (maxRateLimit > 0) { //如果設置了maxRateLimit則取兩者中的最小值 rateLimiter.setRate(newRate.min(maxRateLimit)) } else { rateLimiter.setRate(newRate) } }
spark 1.5引入的反壓機制架構圖如下:
2. 反壓機制相關參數
參數名稱 | 默認值 | 說明 |
spark.streaming.backpressure.enabled | false | 是否啟用反壓機制 |
spark.streaming.backpressure.initialRate | 無 | 初始最大接收速率。只適用於Receiver Stream,不適用於Direct Stream。 |
spark.streaming.backpressure.rateEstimator | pid | 速率控制器,Spark 默認只支持此控制器,可自定義。 |
spark.streaming.backpressure.pid.proportional | 1.0 | 只能為非負值。當前速率與最后一批速率之間的差值對總控制信號貢獻的權重。用默認值即可。 |
spark.streaming.backpressure.pid.integral | 0.2 | 只能為非負值。比例誤差累積對總控制信號貢獻的權重。用默認值即可 |
spark.streaming.backpressure.pid.derived | 0 | 只能為非負值。比例誤差變化對總控制信號貢獻的權重。用默認值即可 |
spark.streaming.backpressure.pid.minRate | 100 | 只能為正數,最小速率 |
3. 反壓機制的使用
//啟用反壓 conf.set("spark.streaming.backpressure.enabled","true") //最小攝入條數控制 conf.set("spark.streaming.backpressure.pid.minRate","1") //最大攝入條數控制 conf.set("spark.streaming.kafka.maxRatePerPartition","12")
注意:
- 反壓機制真正起作用時需要至少處理一個批:由於反壓機制需要根據當前批的速率,預估新批的速率,所以反壓機制真正起作用前,應至少保證處理一個批。
- 如何保證反壓機制真正起作用前應用不會崩潰:要保證反壓機制真正起作用前應用不會崩潰,需要控制每個批次最大攝入速率。若為Direct Stream,如Kafka Direct Stream,則可以通過spark.streaming.kafka.maxRatePerPartition參數來控制。此參數代表了 每秒每個分區最大攝入的數據條數。假設BatchDuration為10秒,spark.streaming.kafka.maxRatePerPartition為12條,kafka topic 分區數為3個,則一個批(Batch)最大讀取的數據條數為360條(3*12*10=360)。同時,需要注意,該參數也代表了整個應用生命周期中的最大速率,即使是背壓調整的最大值也不會超過該參數。
4. 查看日志
創建速率控制器
INFO PIDRateEstimator: Created PIDRateEstimator with proportional = 1.0, integral = 0.2, derivative = 0.0, min rate = 1.0
計算當前批次速率
// records 記錄數(對應WebUI: Input Size) // processing time 處理時間,毫秒(對應WebUI: Processing Time) // scheduling delay 調度時間,毫秒(對應WebUI: Scheduling Delay) TRACE PIDRateEstimator: time = 1558888897224, # records = 33, processing time = 24548, scheduling delay = 8
預估新批次速率
TRACE PIDRateEstimator: latestRate = -1.0, error = -2.344305035033404 latestError = -1.0, historicalError = 0.0010754440280267231 delaySinceUpdate = 1.558888897225E9, dError = -8.623482003280801E-10
第一次計算跳過速率估計
TRACE PIDRateEstimator: First run, rate estimation skipped
當前批次沒有記錄或沒有延遲則跳過速率估計
TRACE PIDRateEstimator: Rate estimation skipped
以新的預估速率運行
TRACE PIDRateEstimator: New rate = 1.0
WebUI
可以看到,開啟反壓后,攝入速率Input Rate
可以根據處理時間Processing Time
來調整,從而保證應用的穩定性。