Akka(23): Stream:自定義流構件功能-Custom defined stream processing stages


    從總體上看:akka-stream是由數據源頭Source,流通節點Flow和數據流終點Sink三個框架性的流構件(stream components)組成的。這其中:Source和Sink是stream的兩個獨立端點,而Flow處於stream Source和Sink中間可能由多個通道式的節點組成,每個節點代表某些數據流元素轉化處理功能,它們的鏈接順序則可能代表整體作業的流程。一個完整的數據流(可運行數據流)必須是一個閉合的數據流,即:從外表上看,數據流兩頭必須連接一個Source和一個Sink。我們可以直接把一個Sink連接到一個Source來獲取一個最簡單的可運行數據流,如下:

  Source(1 to 10).runWith(Sink.foreach(println))

從另一個角度說明:akka-stream又包括數據流圖Graph及運算器Materializer兩個部分。Graph代表運算方案,Materializer負責准備環境並把運算方案Graph放置到Actor系統里去實際運算產生效果(effects)及獲取運算結果。所以:akka-stream必須有一個Graph描述功能和流程。每個Graph又可以由一些代表更細小功能的子Graph組成。一個可運行數據流必須由一個閉合的數據流圖(closed graph)來代表,而這個ClosedGraph又是由代表不同數據轉化處理功能的子圖(sub-graph)組成。定制數據流功能就是針對Graph按功能需要進行自定義。

一個Graph可以用GraphShape和GraphStage兩個部分來描述:GraphShape描述了Graph的輸入輸出端口數量,GraphStage描述數據在流通中的轉化處理過程。我們先來分析一下GraphShape,它們的基類是Shape:

/** * A Shape describes the inlets and outlets of a [[Graph]]. In keeping with the * philosophy that a Graph is a freely reusable blueprint, everything that * matters from the outside are the connections that can be made with it, * otherwise it is just a black box. */
abstract class Shape { /** * Scala API: get a list of all input ports */ def inlets: immutable.Seq[Inlet[_]] /** * Scala API: get a list of all output ports */ def outlets: immutable.Seq[Outlet[_]] /** * Create a copy of this Shape object, returning the same type as the * original; this constraint can unfortunately not be expressed in the * type system. */ def deepCopy(): Shape ...}

Shape的子類必須實現上面這三個抽象函數。akka-stream預先提供了一些基本的形狀,包括SourceShape/FlowShape/SinkShape: 

/** * A Source [[Shape]] has exactly one output and no inputs, it models a source * of data. */ final case class SourceShape[+T](out: Outlet[T @uncheckedVariance]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = EmptyImmutableSeq override val outlets: immutable.Seq[Outlet[_]] = out :: Nil override def deepCopy(): SourceShape[T] = SourceShape(out.carbonCopy()) } object SourceShape { /** Java API */ def of[T](outlet: Outlet[T @uncheckedVariance]): SourceShape[T] = SourceShape(outlet) } /** * A Flow [[Shape]] has exactly one input and one output, it looks from the * outside like a pipe (but it can be a complex topology of streams within of * course). */ final case class FlowShape[-I, +O](in: Inlet[I @uncheckedVariance], out: Outlet[O @uncheckedVariance]) extends Shape { override val inlets: immutable.Seq[Inlet[_]] = in :: Nil override val outlets: immutable.Seq[Outlet[_]] = out :: Nil override def deepCopy(): FlowShape[I, O] = FlowShape(in.carbonCopy(), out.carbonCopy()) } object FlowShape { /** Java API */ def of[I, O](inlet: Inlet[I @uncheckedVariance], outlet: Outlet[O @uncheckedVariance]): FlowShape[I, O] = FlowShape(inlet, outlet) }

還有一個稍微復雜點的雙向流形狀BidiShape: 

//#bidi-shape /** * A bidirectional flow of elements that consequently has two inputs and two * outputs, arranged like this: * * {{{ * +------+ * In1 ~>| |~> Out1 * | bidi | * Out2 <~| |<~ In2 * +------+ * }}} */ final case class BidiShape[-In1, +Out1, -In2, +Out2]( in1: Inlet[In1 @uncheckedVariance], out1: Outlet[Out1 @uncheckedVariance], in2: Inlet[In2 @uncheckedVariance], out2: Outlet[Out2 @uncheckedVariance]) extends Shape { //#implementation-details-elided
  override val inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil override val outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: Nil /** * Java API for creating from a pair of unidirectional flows. */ def this(top: FlowShape[In1, Out1], bottom: FlowShape[In2, Out2]) = this(top.in, top.out, bottom.in, bottom.out) override def deepCopy(): BidiShape[In1, Out1, In2, Out2] = BidiShape(in1.carbonCopy(), out1.carbonCopy(), in2.carbonCopy(), out2.carbonCopy()) //#implementation-details-elided
} //#bidi-shape
object BidiShape { def fromFlows[I1, O1, I2, O2](top: FlowShape[I1, O1], bottom: FlowShape[I2, O2]): BidiShape[I1, O1, I2, O2] = BidiShape(top.in, top.out, bottom.in, bottom.out) /** Java API */ def of[In1, Out1, In2, Out2]( in1: Inlet[In1 @uncheckedVariance], out1: Outlet[Out1 @uncheckedVariance], in2: Inlet[In2 @uncheckedVariance], out2: Outlet[Out2 @uncheckedVariance]): BidiShape[In1, Out1, In2, Out2] = BidiShape(in1, out1, in2, out2) }

還有一對多的UniformFanOutShape和多對一的UniformFanInShape。下面是我們自定義的一個多對多的Shape:

  case class TwoThreeShape[I, I2, O, O2, O3]( in1: Inlet[I], in2: Inlet[I2], out1: Outlet[O], out2: Outlet[O2], out3: Outlet[O3]) extends Shape { override def inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil override def outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: out3 :: Nil override def deepCopy(): Shape = TwoThreeShape( in1.carbonCopy(), in2.carbonCopy(), out1.carbonCopy(), out2.carbonCopy(), out3.carbonCopy() ) }

這是一個二進三出的形狀。我們只需要實現inlets,outlets和deepCopy這三個函數。

GraphStage描述了數據流構件的行為,通過數據流元素在構件中進出流動方式和在流動過程中的轉變來定義流構件的具體功能。下面是GraphStage的類型定義:

/** * A GraphStage represents a reusable graph stream processing stage. A GraphStage consists of a [[Shape]] which describes * its input and output ports and a factory function that creates a [[GraphStageLogic]] which implements the processing * logic that ties the ports together. */
abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] { final override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, NotUsed) = (createLogic(inheritedAttributes), NotUsed) @throws(classOf[Exception]) def createLogic(inheritedAttributes: Attributes): GraphStageLogic }

每個構件都需要根據需求通過實現createLogic來設計GraphStageLogic功能。GraphStageLogic定義如下:

/** * Represents the processing logic behind a [[GraphStage]]. Roughly speaking, a subclass of [[GraphStageLogic]] is a * collection of the following parts: * * A set of [[InHandler]] and [[OutHandler]] instances and their assignments to the [[Inlet]]s and [[Outlet]]s * of the enclosing [[GraphStage]] * * Possible mutable state, accessible from the [[InHandler]] and [[OutHandler]] callbacks, but not from anywhere * else (as such access would not be thread-safe) * * The lifecycle hooks [[preStart()]] and [[postStop()]] * * Methods for performing stream processing actions, like pulling or pushing elements * * The stage logic is completed once all its input and output ports have been closed. This can be changed by * setting `setKeepGoing` to true. * * The `postStop` lifecycle hook on the logic itself is called once all ports are closed. This is the only tear down * callback that is guaranteed to happen, if the actor system or the materializer is terminated the handlers may never * see any callbacks to `onUpstreamFailure`, `onUpstreamFinish` or `onDownstreamFinish`. Therefore stage resource * cleanup should always be done in `postStop`. */
abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: Int) {...}

GraphStageLogic主要負責通過InHandler和OutHandler響應輸出輸入端口的事件,對元素的轉變和在端口上的流動方式進行控制:

/** * Collection of callbacks for an input port of a [[GraphStage]] */ trait InHandler { /** * Called when the input port has a new element available. The actual element can be retrieved via the * [[GraphStageLogic.grab()]] method. */ @throws(classOf[Exception]) def onPush(): Unit /** * Called when the input port is finished. After this callback no other callbacks will be called for this port. */ @throws(classOf[Exception]) def onUpstreamFinish(): Unit = GraphInterpreter.currentInterpreter.activeStage.completeStage() /** * Called when the input port has failed. After this callback no other callbacks will be called for this port. */ @throws(classOf[Exception]) def onUpstreamFailure(ex: Throwable): Unit = GraphInterpreter.currentInterpreter.activeStage.failStage(ex) } /** * Collection of callbacks for an output port of a [[GraphStage]] */ trait OutHandler { /** * Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]] * is now allowed to be called on this port. */ @throws(classOf[Exception]) def onPull(): Unit /** * Called when the output port will no longer accept any new elements. After this callback no other callbacks will * be called for this port. */ @throws(classOf[Exception]) def onDownstreamFinish(): Unit = { GraphInterpreter .currentInterpreter .activeStage .completeStage() } }

可以看到:我們需要實現InHandler.onPush()和OutHandler.onPull。akka-stream在數據流的各環節都實現了Reactive-Stream-Specification,所以對於輸入端口InHandler來講需要響應上游推送信號onPush,輸出端口OutHandler要響應下游的讀取信號onPull。就構件自身來說需要:從輸入端口pull(in),對輸出端口push(out)。

下面我們就示范設計一個循環產生一串指定字符的Source。Source只有一個輸出端口,我們只需要觀察輸出端口下游的讀取信號。所以在這種情況下我們只需要重寫函數OutHandler即可:

class AlphaSource(chars: Seq[String]) extends GraphStage[SourceShape[String]] { val outport = Outlet[String]("output") val shape = SourceShape(outport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) { var pos: Int = 0 setHandler(outport,new OutHandler { override def onPull(): Unit = { push(outport,chars(pos)) pos += 1
          if (pos == chars.length) pos = 0 } }) } }

GraphStage類是Graph子類:

abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] {...} abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, M] {...}

所以我們可以把AlphaSource當作Graph然后用Source.fromGraph來構建Source構件:

  val sourceGraph: Graph[SourceShape[String],NotUsed] = new AlphaSource(Seq("A","B","C","D")) val alphaSource = Source.fromGraph(sourceGraph).delay(1.second,DelayOverflowStrategy.backpressure) alphaSource.runWith(Sink.foreach(println))

同樣對於Sink:我們只需要觀察上游推送信號然后讀取數據:

class UppercaseSink extends GraphStage[SinkShape[String]] { val inport = Inlet[String]("input") val shape = SinkShape(inport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler { override def preStart(): Unit = pull(inport) override def onPush(): Unit = { println(grab(inport).toUpperCase) pull(inport) } setHandler(inport,this) } }

從上面的AlphaSource,UppercaseSink我們略為嘗試了一把數據流元素流動控制,主要是對輸出輸入端口狀態變化采取一種被動的響應:通過push,pull來對端口進行操作。下面列出了一些常用的端口狀態事件及操作方法:

輸出端口狀態變化事件是通過OutHandler中的回調函數(callback)來捕獲的。用setHandler(out,outHandler)來注冊OutHandler實例。下面是針對輸出端口的操作函數:

1、push(out,elem):對端口推出數據,只容許在下游使用pull提出讀取數據要求后才能進行,在此之前不容許多次調用

2、complete(out):正常手動關閉端口

3、fail(out,exeption):異常手動關閉端口

輸出端口響應事件包括:

1、onPull():下游可以接收數據,此時可以用push(out,elem)來向輸出端口發送數據

2、onDownStreamFinish():下游終止讀取數據,此后不會再收到任何onPull事件

下面的函數可以獲得輸出端口的當前狀態:

1、isAvailable(out):true代表可以使用push(out,elem)

2、isClosed(out):true代表輸出端口已經關閉,無法聆聽事件或者推送數據

同樣,輸入端口狀態捕獲是通過用setHandler(in,inHandler)登記的inHandler中callback實現的。輸入端口操作函數包括:

1、pull(in):向上游提出讀取數據要求,只容許在上游已經完成了數據推送后才能使用,在此之前不容許多次調用

2、grab(in):從端口讀取當前數據,只有在上游完成了數據推送后才能使用,其中不容許多次調用

3、cancel(in):手動關閉輸入端口

輸入端口事件:

1、onPush():上游已經發送數據至輸入端口,此時可以用grab(in)來讀取當前數據,用pull(in)向上游要求下一個數據

2、onUpstreamFinish():上游已經終止數據發送,此后再不會捕獲onPush事件,不得使用pull(in)向上游請求數據

3、onUpstreamFalure():上游異常終止

獲取輸入端口狀態方法:

1、isAvailable(in):true代表現在可以使用grab(in)讀取當前數據

2、hasBeenPulled(in):true代表已經使用pull(in)進行了數據讀取要求,在此狀態下不容許再次使用pull(in)

3、isClosed(in):true代表端口已經關閉,此后不可施用pull(in)及無法捕獲onPush事件

從上面的pull(in)和push(out,elem)的功能描述可以得出它們是嚴格相互依賴、相互循環配合的,即:下游pull(in)前上游必須先push(out),而上游push(out,elem)前下游必須先pull(in)。這容易理解,因為akka-stream是Reactive-Stream,是push,pull結合模式上下游相互溝通的。但如此則很不方便某些應用場景,比如數據流動控制。akka-stream還提供了一套更簡單的API使用戶可以更靈活的對端口進行操作。這個API中的函數包括下面這些:

1、emit(out,elem):臨時替換OutHandler,向端口發送elem,然后再恢復OutHandler

2、emitMultiple(out,Iterable(e1,e2,e3...)):臨時替換OutHandler,向端口發送一串數據,然后再恢復OutHandler

3、read(in)(andThen):臨時替換InHandler,從端口讀取一個數據元素,然后再恢復InHandler

4、readN(in)(andThen):臨時替換InHandler,從端口讀取n個數據元素,然后再恢復InHandler

5、abortEmitting():取消輸出端口上未完成的數據推送

6、abortReading():取消輸入端口上未完成的讀取操作

這個API實際上也支持reactive-stream-backpressure,我們從emitMultiple函數源代碼中可以得出:

 /** * Emit a sequence of elements through the given outlet and continue with the given thunk * afterwards, suspending execution if necessary. * This action replaces the [[OutHandler]] for the given outlet if suspension * is needed and reinstalls the current handler upon receiving an `onPull()` * signal (before invoking the `andThen` function). */ final protected def emitMultiple[T](out: Outlet[T], elems: Iterator[T], andThen: () ⇒ Unit): Unit =
    if (elems.hasNext) { if (isAvailable(out)) { push(out, elems.next()) if (elems.hasNext) setOrAddEmitting(out, new EmittingIterator(out, elems, getNonEmittingHandler(out), andThen)) else andThen() } else { setOrAddEmitting(out, new EmittingIterator(out, elems, getNonEmittingHandler(out), andThen)) } } else andThen()

下面我們就定制一個Flow GraphStage,利用read/emit讓用戶自定義的函數可以控制數據流元素的流動和篩選。對於Flow,同時需要關注輸入端口上游推送數據狀態及輸出端口上下游讀取請求狀態:

trait Row trait Move case object Stand extends Move case class Next(rows: Iterable[Row]) extends Move class FlowValve(controller: Row => Move) extends GraphStage[FlowShape[Row,Row]] { val inport = Inlet[Row]("input") val outport = Outlet[Row]("output") val shape = FlowShape.of(inport,outport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler { override def onPush(): Unit = { controller(grab(inport)) match { case Next(rows) => emitMultiple(outport,rows) case _ => pull(inport) } } override def onPull(): Unit = pull(inport) setHandlers(inport,outport,this) } }

上面這個FlowValve類型是專門為施用一個用戶自定義函數controller而設的。controller函數根據上游推送的數據元素內容來決定Stand越過當前數據元素或者Next(...)向下游發送一或多個元素。當下游可以接受數據發出pull請求時FlowValve會把它直接傳遞給上游。下面是用戶自定義函數的一個例子:

 case class Order(burger: String, qty: Int) extends Row case class Burger(msg: String) extends Row def orderDeliver: Row => Move = order => { order match { case Order(name,qty) =>

        if (qty > 0) { val burgers: Iterable[Burger] = (1 to qty).foldLeft(Iterable[Burger]()) { (b, a) => b ++ Iterable(Burger(s"$name $a of ${qty}")) } Next(burgers) } else Stand } } val flowGraph: Graph[FlowShape[Row,Row],NotUsed] = new FlowValve(orderDeliver) val deliverFlow: Flow[Row,Row,NotUsed] = Flow.fromGraph(flowGraph) val orders = List(Order("cheeze",2),Order("beef",3),Order("pepper",1),Order("Rice",0) ,Order("plain",1),Order("beef",2)) Source(orders).via(deliverFlow).to(Sink.foreach(println)).run()

試運算結果顯示如下: 

 

Burger(cheeze 1 of 2) Burger(cheeze 2 of 2) Burger(beef 1 of 3) Burger(beef 2 of 3) Burger(beef 3 of 3) Burger(pepper 1 of 1) Burger(plain 1 of 1) Burger(beef 1 of 2) Burger(beef 2 of 2)

 

正是我們預料的結果。對於一對多擴散型和多對一合並型形狀的數據流構件akka-stream提供了UniformFanIn和UniformFanOut兩種GraphStage。把這兩個結合起來使用可以構建多對多形狀的構件,所以預設定的GraphStage已經夠用。

下面是本次示范涉及的源代碼:

import akka.NotUsed import akka.actor._ import akka.stream.ActorMaterializer import akka.stream.scaladsl._ import akka.stream.stage._ import akka.stream._ import scala.concurrent.duration._ import scala.collection.immutable.Iterable class AlphaSource(chars: Seq[String]) extends GraphStage[SourceShape[String]] { val outport = Outlet[String]("output") val shape = SourceShape(outport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) { var pos: Int = 0 setHandler(outport,new OutHandler { override def onPull(): Unit = { push(outport,chars(pos)) pos += 1
          if (pos == chars.length) pos = 0 } }) } } class UppercaseSink extends GraphStage[SinkShape[String]] { val inport = Inlet[String]("input") val shape = SinkShape(inport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler { override def preStart(): Unit = pull(inport) override def onPush(): Unit = { println(grab(inport).toUpperCase) pull(inport) } setHandler(inport,this) } } trait Row trait Move case object Stand extends Move case class Next(rows: Iterable[Row]) extends Move class FlowValve(controller: Row => Move) extends GraphStage[FlowShape[Row,Row]] { val inport = Inlet[Row]("input") val outport = Outlet[Row]("output") val shape = FlowShape.of(inport,outport) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with OutHandler { override def onPush(): Unit = { controller(grab(inport)) match { case Next(rows) => emitMultiple(outport,rows) case _ => pull(inport) } } override def onPull(): Unit = pull(inport) setHandlers(inport,outport,this) } } object GraphStages extends App { implicit val sys = ActorSystem("demoSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer( ActorMaterializerSettings(sys) .withInputBuffer(initialSize = 16, maxSize = 16) ) val sourceGraph: Graph[SourceShape[String],NotUsed] = new AlphaSource(Seq("a","b","c","d")) val alphaSource = Source.fromGraph(sourceGraph).delay(1.second,DelayOverflowStrategy.backpressure) // alphaSource.runWith(Sink.foreach(println))
 val sinkGraph: Graph[SinkShape[String],NotUsed] = new UppercaseSink val upperSink = Sink.fromGraph(sinkGraph) alphaSource.runWith(upperSink) case class Order(burger: String, qty: Int) extends Row case class Burger(msg: String) extends Row def orderDeliver: Row => Move = order => { order match { case Order(name,qty) =>

        if (qty > 0) { val burgers: Iterable[Burger] = (1 to qty).foldLeft(Iterable[Burger]()) { (b, a) => b ++ Iterable(Burger(s"$name $a of ${qty}")) } Next(burgers) } else Stand } } val flowGraph: Graph[FlowShape[Row,Row],NotUsed] = new FlowValve(orderDeliver) val deliverFlow: Flow[Row,Row,NotUsed] = Flow.fromGraph(flowGraph) val orders = List(Order("cheeze",2),Order("beef",3),Order("pepper",1),Order("Rice",0) ,Order("plain",1),Order("beef",2)) Source(orders).via(deliverFlow).to(Sink.foreach(println)).run() // Source(1 to 10).runWith(Sink.foreach(println))
 scala.io.StdIn.readLine() sys.terminate() }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM