Akka(20): Stream:異步運算,壓力緩沖-Async, batching backpressure and buffering


   akka-stream原則上是一種推式(push-model)的數據流。push-model和pull-model的區別在於它們解決問題傾向性:push模式面向高效的數據流下游(fast-downstream-subscriber),pull model傾向高效的上游(fast-upstream-publisher)。現實中速度同等的上下游並不多見,不匹配的上下游速度最終造成數據丟失。如果下游的subscriber無法及時接收由publisher向下游推送的全部數據,那么無論有多大的緩沖區,最終會造成溢出丟失數據。如果上游的publisher無法及時滿足下游subscriber的數據讀取需求會加長下游的等待狀態造成超時甚至會使遺失下游請求遺失。對於akka-stream這種push模式的數據流,因為超速推送數據會造成數據丟失,所以必須想辦法控制publisher產生數據的速度。因為akka-stream已經在上下游環節全部實現了Reactive-Streams-Specification,所以上下游之間可以進行互動,這樣就可以在akka-stream里由下游通知上游自身可接收數據的狀態來控制上游數據流速,即所謂的壓力緩沖backpressure了。akka-stream的backpressure使用了緩沖區buffer來成批預存及補充數據,這樣可以提高數據傳輸效率。另外,如果用async進行數據流的並行運算的話上游就不必理會下游反應,可以把數據推進buffer然后立即繼續處理下一個數據元素。所以async運算模式的buffering就不可或缺了。akka-stream可以通過以下幾種方式來設定異步運算使用的緩沖大小:

1、在配置文件中設定默認buffer:

akka.stream.materializer.max-input-buffer-size = 16

2、在ActorMaterializerSetting中宏觀層面上設定:

val materializer = ActorMaterializer( ActorMaterializerSettings(system) .withInputBuffer( initialSize = 64, maxSize = 64))

3、通過Attribute屬性設定。因為Atrribute保持了層級關系,所以通過Attribute設定的inputbuffer也延續了屬性繼承: 

import Attributes._ val nestedSource = Source.single(0) .map(_ + 1) .named("nestedSource") // Wrap, no inputBuffer set
 val nestedFlow = Flow[Int].filter(_ != 0) .via(Flow[Int].map(_ - 2).withAttributes(inputBuffer(4, 4))) // override
    .named("nestedFlow") // Wrap, no inputBuffer set
 val nestedSink = nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
    .withAttributes(name("nestedSink") and inputBuffer(3, 3)) // override

在上面的示例里nestdSource繼承了Materializer全局inputBuffer屬性;nestedSink重寫了屬性;nestedFlow先是繼承了nestedSink的設定然后又重寫了自己的inputBuffer屬性。我們可以用addAttribute來新添加Attribute:

  val flow = Flow[Int].map(_ * 2).async.addAttributes(Attributes.inputBuffer(16,16)) val (_,fut) = flow.runWith(Source(1 to 10),Sink.foreach(println)) fut.andThen{case _ => sys.terminate()}

上面定義這些inputBuffer包括了起始值和最大值,主要應用在backpressure。所以,理論上inputBuffer可以設成一個字節(initial=1,max=1),因為有了backpressure就不用擔心數據溢出,但這樣會影響數據流傳輸效率。所以akka-stream默認的緩沖區長度為16字節。所以aka-stream的backpressure是batching backpressure。

由於akka-stream是push模式的,我們還可以用buffer來控制包括Source,Flow這些上游環節推送的數據:

  val source = Source(1 to 10).buffer(16,OverflowStrategy.dropTail) val sum = source.runFold(0)((acc,i) => i + acc) sum.map(println) //.andThen{case _ => sys.terminate()}
 val flow = Flow[Int].map(_ * 3).buffer(16,OverflowStrategy.dropNew) val (_,fut) = flow.runWith(Source(1 to 10),Sink.fold(0){(acc,a) => acc + a}) fut.map(println).andThen{case _ => sys.terminate()}

上游所設buffer對publisher過快產生的數據可以采用溢出處理策略OverflowStrategy。上面用Attribute添加的inputBuffer默認了OverflowStrategy.backpressure,其它OverflowStrategy選項如下:

object OverflowStrategy { /** * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for * the new element. */ def dropHead: OverflowStrategy = DropHead /** * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for * the new element. */ def dropTail: OverflowStrategy = DropTail /** * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element. */ def dropBuffer: OverflowStrategy = DropBuffer /** * If the buffer is full when a new element arrives, drops the new element. */ def dropNew: OverflowStrategy = DropNew /** * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until * space becomes available in the buffer. */ def backpressure: OverflowStrategy = Backpressure /** * If the buffer is full when a new element is available this strategy completes the stream with failure. */ def fail: OverflowStrategy = Fail }

當akka-stream需要與外界系統進行數據交換時就無法避免數據流上下游速率不匹配的問題了。如果外界系統不支持Reactive-Stream標准,就會發生數據丟失現象。對此akka-stream提供了具體的解決方法:如果外界系統是在上游過快產生數據可以用conflate函數用Seq這樣的集合把數據傳到下游。如果下游能及時讀取則Seq(Item)中的Item正是上游推送的數據元素,否則Seq(i1,i2,i3...)就代表上游在下游再次讀取時間段內產生的數據。因為Seq可以是無限大,所以理論上可以避免數據丟失。下面是這個函數的定義:

 /** * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary * until the subscriber is ready to accept them. For example a conflate step might average incoming numbers if the * upstream publisher is faster. * * This version of conflate allows to derive a seed from the first element and change the aggregated type to be * different than the input type. See [[FlowOps.conflate]] for a simpler version that does not change types. * * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not * duplicate elements. * * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. * * '''Emits when''' downstream stops backpressuring and there is a conflated element available * * '''Backpressures when''' never * * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels * * @param seed Provides the first state for a conflated value using the first unconsumed element as a start * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate * * See also [[FlowOps.conflate]], [[FlowOps.limit]], [[FlowOps.limitWeighted]] [[FlowOps.batch]] [[FlowOps.batchWeighted]] */ def conflateWithSeed[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = via(Batch(1L, ConstantFun.zeroLong, seed, aggregate).withAttributes(DefaultAttributes.conflate))

下面是conflateWithSeed函數用例:

import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import scala.concurrent.duration._ object StreamDemo1 extends App { implicit val sys = ActorSystem("streamSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(1,1) ) case class Tick() RunnableGraph.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ // this is the asynchronous stage in this graph
    val zipper = b.add(ZipWith[Tick, Seq[String], Seq[String]]((tick, count) => count).async) // this slows down the pipeline by 3 seconds
    Source.tick(initialDelay = 3.seconds, interval = 3.seconds, Tick()) ~> zipper.in0 // faster producer with all elements passed inside a Seq
    Source.tick(initialDelay = 1.second, interval = 1.second, "item") .conflateWithSeed(Seq(_)) { (acc,elem) => acc :+ elem } ~> zipper.in1 zipper.out ~> Sink.foreach(println) ClosedShape }).run() }

在上面這個例子里我們用ZipWith其中一個低速的輸入端來控制整個管道的速率。這時我們會發現輸出端Seq長度代表ZipWith消耗數據的延遲間隔。注意:前面3個輸出好像沒有延遲,這是akka-stream 預讀prefetch造成的。因為我們設定了InputBuffer(Initial=1,max=1),第一個數據被預讀當作及時消耗了。

如果沒有實現Reactive-Stream標准的外界系統上游producer速率過慢,有可能造成下游超時,akka-stream提供了expand函數來解決這個問題:

 /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older * element until new element comes from the upstream. For example an expand step might repeat the last element for * the subscriber until it receives an update from upstream. * * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream * subscriber. * * Expand does not support [[akka.stream.Supervision.Restart]] and [[akka.stream.Supervision.Resume]]. * Exceptions from the `seed` or `extrapolate` functions will complete the stream with failure. * * '''Emits when''' downstream stops backpressuring * * '''Backpressures when''' downstream backpressures or iterator runs empty * * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels * * @param seed Provides the first state for extrapolation using the first unconsumed element * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation * state. */ def expand[U](extrapolate: Out ⇒ Iterator[U]): Repr[U] = via(new Expand(extrapolate))

當上游無法及時發送下游請求的數據時我們可以用expand推送一個固定的數據元素來臨時滿足下游的要求:

 val lastFlow = Flow[Double] .expand(Iterator.continually(_))

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM