akka-streams - 從應用角度學習:basic stream parts


   實際上很早就寫了一系列關於akka-streams的博客。但那個時候純粹是為了了解akka而去學習的,主要是從了解akka-streams的原理為出發點。因為akka-streams是akka系列工具的基礎,如:akka-http, persistence-query等都是基於akka-streams的,其實沒有真正把akka-streams用起來。這段時間所遇到的一些需求也是通過集合來解決的。不過,現在所處的環境還是逼迫着去真正了解akka-streams的應用場景。現狀是這樣的:跨入大數據時代,已經有大量的現代IT系統從傳統關系數據庫轉到分布式數據庫(非關系數據庫)了。不難想象,這些應用的數據操作編程不說截然不同吧,肯定也會有巨大改變。特別是在傳統SQL編程中依賴數據關系的join已經不復存在了,groupby、disctict等操作方法也不是所有的分布式數據庫都能支持的。而這些操作在具體的數據呈現和數據處理中又是不可缺少的。當然,有很多需求可以通過集合來滿足,但涉及到大數據處理我想最好還是通過流處理來實現,因為流處理stream-processing的其中一項特點就是能夠在有限的內存空間里處理無限量的數據。所以流處理應該是分布式數據處理的理想方式了。這是這次寫akka-streams的初衷:希望能通過akka-streams來實現分布式數據處理編程。

先從基本流部件basic stream parts開始,即source,flow,sink。這幾個部件可以組合成一個所謂線性流linear-stream。一個流對數據的處理包括兩部分:1、對流中元素進行轉變,如:source:Source[Int,NotUsed] = Source(1 to 10).map(i => i.toString),把流里的所有Int轉變成String、2、對流內元素進行計算得出運算結果,如:sink: Sink[Int,Future[Int]] = Sink.fold(0)(_ + _)。當我們run這個sink后得出Future[Int],如:res: Future[Int] = src.runWith(sink)。這兩項對流元素的操作所產生的結果不同:元素轉換得到動態流動的一串元素、運算元素得到一個靜態值,這個運算值materialized-value只能在Sink里獲取。即使有這樣的表示方式:Source[Int,Future[Int]],這是個迷惑,這個運算值只能通過自定義的graph才能得到,也就是說基本組件是沒這個功能的。舉個具體的例子吧:val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int] 這個表達式貌似可以在Source方獲取運算值,再看看Source.maybe[Int]:

  def maybe[T]: Source[T, Promise[Option[T]]] = Source.fromGraph(MaybeSource.asInstanceOf[Graph[SourceShape[T], Promise[Option[T]]]])

可以看出這個Source.maybe是從graph構建的。

上面這個例子里用一個Source對接一個Sink已經組成了一個完整的流,那么Flow是用來干什么的呢?由於運算值是無法當作流元素傳遞的,Flow只能是用來對Source傳下來的元素進行轉換后再傳遞給Sink,也就是說Flow是由一個或多個處理環節構成的。用Flow來分步實現功能是流處理實現並行運算的基本方式,如:

Source(1 to 10).async.via(Flow[Int].map(i => i + 1)).async.runWith(sink)

用async把這個流分割成3個運算發送給3個actor去同時運算。乍看之下map好像是個Flow,它們的作用也似乎相同,也可以對接Source。如:Source(1 to 10).map(_ + 1)。但map和Flow還是有分別的,從類型款式來看Flow[In,Out,M]比起map[A,B]多出來了M,運算值。所以via(map(_.toString))無法匹配類型。那么對於定義帶有預先處理環節的Sink就必須用Flow來實現了:ex_sink = Flow[Int].map(_ + 1).to(sink)。

雖然運算值不能像流元素一樣流動,但akka-streams提供了機制讓用戶選擇是否返回某個節點的運算值M。系統默認只選擇最最左邊節點的M,如:

// A source that can be signalled explicitly from the outside
val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int] // A flow that internally throttles elements to 1/second, and returns a Cancellable // which can be used to shut down the stream
val flow: Flow[Int, Int, Cancellable] = throttler // A sink that returns the first element of a stream in the returned Future
val sink: Sink[Int, Future[Int]] = Sink.head[Int] val stream: RunnableGraph[(Cancellable, Future[Int])] = source.viaMat(flow)(Keep.right).toMat(sink)(Keep.both) val stream1: RunnableGraph[(Promise[Option[Int]], Cancellable, Future[Int])] = source.viaMat(flow)(Keep.both).toMat(sink)(Keep.both)

運算值M可以通過viaMat,toMat選擇,然后stream.run()獲取。akka-streams提供了簡便一點的運算方式runWith:指定runWith參數流組件的M為最終運算值。如:

// Using runWith will always give the materialized values of the stages added // by runWith() itself
val r4: Future[Int] = source.via(flow).runWith(sink) val r5: Promise[Option[Int]] = flow.to(sink).runWith(source) val r6: (Promise[Option[Int]], Future[Int]) = flow.runWith(source, sink)

值得注意的是:我們可以分別從Source,Sink,Flow開始針對Source runWith(Sink), Sink runWith(Source)及Flow runWith (Source,Sink)。用基礎流組件Source,Flow,Sink構成的流是直線型的。也就是說從Source流出的元素會一個不漏的經過Flow進入Sink,不能多也不能少。可能Source.filter會產生疑惑,不過看看filter函數定義就明白了:

def filter(p: Out => Boolean): Repr[Out] = via(Filter(p)) @InternalApi private[akka] final case class Filter[T](p: T => Boolean) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.filter override def toString: String = "Filter"

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler with InHandler { def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider private var buffer: OptionVal[T] = OptionVal.none override def preStart(): Unit = pull(in) override def onPush(): Unit =
        try { val elem = grab(in) if (p(elem)) if (isAvailable(out)) { push(out, elem) pull(in) } else buffer = OptionVal.Some(elem) else pull(in) } catch { case NonFatal(ex) => decider(ex) match { case Supervision.Stop => failStage(ex) case _                => pull(in) } } override def onPull(): Unit = buffer match { case OptionVal.Some(value) => push(out, value) buffer = OptionVal.none if (!isClosed(in)) pull(in) else completeStage() case _ => // already pulled
 } override def onUpstreamFinish(): Unit =
        if (buffer.isEmpty) super.onUpstreamFinish() // else onPull will complete
 setHandlers(in, out, this) } }

怎樣?夠復雜的了吧。很明顯,復雜點的流處理需要根據上游元素內容來維護內部狀態從而重新構建向下游發送元素的機制。如果想實現join,groupby,distict這些功能就必然對流動元素除轉換之外還需要進行增減操作。這項需求可能還必須留在后面的sream-graph章節中討論解決方案了。不過臨時解決方法可以通過運算值M來實現。因為M可以是一個集合,在構建這個M集合時是可以對集合元素進行增減的,下面這段代碼示范了一種cassandra數據表groupby的效果:

 def getVouchers(terminalid: String, susp: Boolean)(implicit classicSystem: ActorSystem) = { implicit val session = CassandraSessionRegistry(classicSystem).sessionFor("alpakka.cassandra") implicit val ec = classicSystem.dispatcher var stmt = "select * from pos_on_cloud.txn_log where terminal = ? and txndate = ?"
    if (susp) stmt = "select * from pos_on_cloud.txn_hold where terminal = ? and txndate = ?" val source = session.select(stmt,terminalid,LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd"))) val sink = Sink.fold[List[TxnItem],TxnItem](List[TxnItem]()){(acc,txn) =>
      if (acc.isEmpty) txn.copy(price = 1) :: acc else
        if (acc.head.num == txn.num) { if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales) { val nacc = acc.head.copy( price = acc.head.price + 1, qty = acc.head.qty + txn.qty, amount = acc.head.amount + txn.amount, dscamt = acc.head.dscamt + txn.dscamt ) nacc :: acc.drop(1) } else acc } else txn :: acc } for { vchs <- source.map(TxnItem.fromCqlRow).toMat(sink)(Keep.right).run() _ <- session.close(ec) } yield vchs }

當然,基本流組件在流模式數據庫讀寫方面還是比較高效的,如:

    def futTxns(items: Seq[TxnItem]): Future[Seq[TxnItem]] = Source(items.toSeq) .via( CassandraFlow.create(CassandraWriteSettings.defaults, CQLScripts.insertTxns, statementBinder) ) .runWith(Sink.seq)

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM