背壓(Backpressure )與流控(Flow Control)


  Backpressure只是解決Flow Control的其中一個方案。
  什么是Flow Control,就像小學做的那道數學題:一個水池,有一個進水管和一個出水管。如果進水管水流更大,過一段時間水池就會滿(溢出)。這就是沒有Flow Control導致的結果。
  解決Flow Control有幾種思路:

  (1)Backpressure,就是消費者需要多少,生產者就生產多少。這有點類似於TCP里的流量控制,接收方根據自己的接收窗口的情況來控制接收速率,並通過反向的ACK包來控制發送方的發送速率。這種方案只對於cold Observable有效。 coldObservable是那些允許降低速率的發送源,比如兩台機器傳一個文件,速率可大可小,即使降低到每秒幾個字節,只要時間足夠長,還是能夠完成的。相反的例子就是音視頻直播,速率低於某個值整個功能就沒法用了(這種類似於hot Observable)。

  (2)節流(Throttling),說白了就是丟棄。消費不過來,就處理其中一部分,剩下的丟棄。至於處理哪些和丟棄哪些,就有不同的策略,也就是sample (or throttleLast)、throttleFirst、debounce (or throttleWithTimeout)這三種。還是舉音視頻直播的例子,在下游處理不過來的時候,就需要丟棄數據包。

  (3)打包(buffer和window)。buffer和window基本一樣,只是輸出格式不太一樣。它們是把上游多個小包裹打成大包裹,分發到下游。這樣下游需要處理的包裹的個數就減少了。

  (4)是一種特殊情況,阻塞住整個調用鏈(Callstackblocking)。之所以說這是一種特殊情況,是因為這種方式只適用於整個調用鏈都在一個線程上同步執行,這要求中間的各個operator都不能啟動新的線程。在平常使用中這種應該是比較少見的,因為我們經常使用subscribeOn或observeOn來切換執行線程,而且有些復雜的operator本身也會內部啟動新的線程來處理。另外,如果真的出現了完全同步的調用鏈,前面的(1)(2)(3)仍然有可能適用的,只不過這種阻塞的方式更簡單,不需要額外的支持。

舉個例子比較一下(1)和(4)。(4)相當於很多車行駛在盤山公路上,而公路只有一條車道。那么排在最前面的第一輛車就擋住了整條路,后面的車也只能排在后面。而(1)相當於銀行辦業務時的窗口叫號,窗口主動叫某個號過去(相當於請求),那個人才過去辦理。

  然后,從細的方面解釋一下sample,throttleFirst,debounce。以及onBackpressureBuffer,onBackpressureDrop,onBackpressureBlock和ConnectableObservable(multicast)。

  sample就是throttleLast,采樣。類比一下音頻采樣,8kHz的音頻就是每125微秒采一個值。sample可以配置成,比如每100毫秒采樣一個值,但100毫秒內上游可能過來很多值,選那個值呢,就是選最后那個值。所以它也叫throttleLast。

  throttleFirst跟sample類似,比如還是每100毫秒采樣一個值,但選這100毫秒內的第一個值。

  debounce,也叫throttleWithTimeout,名字里就包含一個例子。比如,一個網絡程序維護一個TCP連接,不停地收發數據,但中間沒數據可以收發的時候,就有間歇。這段間歇的時間,可以稱為idle time。當idle time超過一個預設值的時候,就算超時了(timeout),這個時候可能就需要把連接斷開了。實際上一些做server端的網絡程序就是這么工作的。每收發一個數據包之后,啟動一個計時器,等待idle time過去之后的超時,如果計時器到時之前,又有收發數據包的行為,那么計時器重置,等待一個新的idle time。當計時器到時了,就time out了,這個連接就可以關閉了。debounce的行為,跟這個非常類似,可以用它來找到連續的收發事件之后idle time超時后的timeout事件。

  最后還有一個新的問題需要說明。Backpressure有些Observable是支持的,有些不支持。但它們可以通過operator來轉化。

  onBackpressureBuffer,onBackpressureDrop,onBackpressureBlock就可以把一個不支持Backpressure的Observable轉成一個支持Backpressure的Observable(即支持request請求)。但轉完之后的策略不太相同。

  onBackpressureBuffer是不丟棄數據的處理方式。把上游收到的全部緩存下來,等下游來請求再發給下游。相當於一個水庫。但上游太快,就會buffer溢出。

  onBackpressureDrop就是當上游來數據的時候,看下游有沒有需求,有需求就發給下游,否則上游來的數據就丟掉。

  onBackpressureBlock也是看下游有沒有需求,下游沒有需求,不丟棄,但試圖堵住上游的入口(能不能真堵得住還得看上游的情況了),自己並不緩存。

  相反,有時候一些operator也能把一個支持Backpressure的Observable變成一個不支持Backpressure的Observable。比如,ConnectableObservable就是這樣。它類似於把一條河的主干,在下游分成若干支流(但不太一樣的是每條支流的水量都跟主干一樣,是拷貝的)。那么很好理解,下游某個支流想對上游產生背壓,是不太可能的,它阻止不了水流流向其它支流。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM