flink spark storm的反壓機制(backpressure)


簡單介紹下flink、spark和storm的backpressure機制。

1、storm 反壓

實現原理

Storm 是通過監控 Bolt 中的接收隊列負載情況來實現反壓:

  • 如果一個executor發現recv queue負載超過高水位值(high watermark)就會通知反壓線程(backpressure thread)。
  • 反壓線程將反壓信息寫到 Zookeeper。
  • Zookeeper 上的 watch 會通知該拓撲(topo)的所有 Worker,該拓撲出現反壓。
  • Spout 減緩發送 tuple的速率。

問題:停止發送,等待系統回復,再次高速生產,然后再次停止發送,造成往數據流顛簸

2、spark 反壓

spark streaming 是以微批次模擬流式處理,設置batch interval時間:

val ssc = new StreamingContext(sparkConf, Seconds(3))

如果batch process time大於batch interval time=3s,程序的處理能力不足,積累的數據越來越多,最終會造成Executor的OOM。

解決辦法

靜態限速 (spark 1.5 以前)

sparkConf.set("spark.streaming.kafka.maxRatePerPartition","10")

動態反壓(spark 1.5 以后)

sparkConf.set("spark.streaming.backpressure.enabled","true")

實現原理

通過在Driver端進行速率估算,並將速率更新到Executor端的各個Receiver,從而實現反壓。

速率控制:

整個背壓機制的核心,就是Drvier端的RateContoller,它作為控制核心,繼承自StreamingListener,監聽Batch的完成情況,記錄下它們的關鍵延遲,然后傳遞給computeAndPublish方法,遍歷Executor並進行速率估算和更新。

速率估算:

PIDRateEstimator是目前RateEstimator的唯一官方實現。PID(Proportional Integral Derivative,比例積分差分控制算法)是工控領域中,經過多次的驗證是一種非常有效的工業控制器算法。Spark Streaming將它引入,作為根據最新的Rate,以及比例(Proportional) 積分(Integral)微分(Derivative)這3個變量,來確定最新的Rate。

速率更新:

計算完新Rate,就該把它發布出去了。

RateController通過ReceiverTracker,利用RPC消息,發布Rate到Receiver所在的Executor節點上,該節點上的ReceiverSupervisorImpl會接收消息,並把速率更新到BlockGenerator上,從而以控制每個批次的數據生成。

3、flink 反壓

每個子任務都有自己的本地緩存池,收到的數據以及發出的數據,都會序列化之后,放入到緩沖池里。然后,兩個TaskManager之間,只會建立一條物理鏈路(底層使用Netty通訊),所有子任務之間的通訊,都由這條鏈路承擔。

當任何一個子任務的發送緩存(不管是子任務自己的本地緩存,還是底層傳輸時Netty的發送緩存)耗盡時,發送方就會被阻塞,產生背壓;同樣,任何任務接收數據時,如果本地緩存用完了,都會停止從底層Netty那里讀取數據,這樣很快上游的數據很快就會占滿下游的底層接收緩存,從而背壓到發送端,形成對上游所有的任務的背壓。

很顯然,這種思路有個明顯的問題,任務一個下游子任務的產生背壓,都會影響整條TaskManager之間的鏈路,導致全鏈路所有子任務背壓。比如上圖的B.3子任務,此時還有處理能力,但也無法收到數據。

為了解決上節的單任務背壓影響全鏈路的問題,在Flink 1.5之后,引入了Credit-based Flow Control,基於信用點的流量控制。

這種方法,首先把每個子任務的本地緩存分為兩個部分,獨占緩存(Exclusive Buffers)和浮動緩存(Floating Buffers);

然后,獨占緩存的大小作為信用點發給數據發送方,發送方會按照不同的子任務分別記錄信用點,並發送盡可能多數據給接收方,發送后則降低對應信用點的大小;

當信用點為0時,則不再發送,起到背壓的作用。在發送數據的同時,發送方還會把隊列中暫存排隊的數據量發給接收方,接收方收到后,根據本地緩存的大小,決定是否去浮動緩存里請求更多的緩存來加速隊列的處理,起到動態控制流量的作用。整個過程參考上圖。

通過這樣的設計,就實現了任務級別的背壓:任意一個任務產生背壓,只會影響這個任務,並不會對TaskManger上的其它任務造成影響。

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM