Akka Stream文檔翻譯:Quick Start Guide: Reactive Tweets


Quick Start Guide: Reactive Tweets

快速入門指南: Reactive Tweets

(reactive tweets 大概可以理解為“響應式推文”,在此可以測試下GFW是否還在正常工作 Twitter)

A typical use case for stream processing is consuming a live stream of data that we want to extract or aggregate some other data from. In this example we'll consider consuming a stream of tweets and extracting information concerning Akka from them.

流處理的一個典型用例是從一個活動數據流中提取或者聚合我們想要的數據。在這個例子中,我們將會消費一個推文流並且從中獲取跟Akka相關的信息。

We will also consider the problem inherent to all non-blocking streaming solutions: "What if the subscriber is too slow to consume the live stream of data?". Traditionally the solution is often to buffer the elements, but this can—and usually will—cause eventual buffer overflows and instability of such systems. Instead Akka Streams depend on internal backpressure signals that allow to control what should happen in such scenarios.

我們還會考慮所有非阻塞的流解決方案都有固有的一個問題:“如果采集者太慢而不能消費實時數據流的話該怎么辦?”。通常采用的方案是把流的元素緩存起來,但是這樣可能——並且經常會——最終導致緩沖溢出以及系統不穩定。與此不同的是,Akka Streams依靠一個內部的反向壓力(backpressure)信號,使得我們可以控制在這種情況下該怎么做。

Here's the data model we'll be working with throughout the quickstart examples:

在這個快速開始示例中,我們將使用下面的數據模型。

final case class Author(handle: String)
 
final case class Hashtag(name: String)
 
final case class Tweet(author: Author, timestamp: Long, body: String) {
  def hashtags: Set[Hashtag] =
    body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet
}
 
val akka = Hashtag("#akka")

 

Transforming and consuming simple streams

轉化和消費簡單的流

In order to prepare our environment by creating an  ActorSystem and  ActorFlowMaterializer, which will be responsible for materializing and running the streams we are about to create:
為了准備工作環境,我們需要創建ActorSystem和ActorFlowMaterializer,它們將會負責物化和運行我們將要創建的流:
implicit val system = ActorSystem("reactive-tweets")
implicit val materializer = ActorFlowMaterializer()

The ActorFlowMaterializer can optionally take ActorFlowMaterializerSettings which can be used to define materialization properties, such as default buffer sizes (see also Buffers in Akka Streams), the dispatcher to be used by the pipeline etc. These can be overridden withAttributes on FlowSourceSink and Graph.

ActorFlowMaterializer可選地接受ActorFlowMaterializerSetting作為參數,這個參為用來決定物化相關的屬性,比如默認的buffer大小(參見Buffers in Akka Streams),管道(pipeline)所使用的分配器(dispatcher)。這些可以通過Flow, Source, Sink, Graph的withAttributes覆蓋。

Let's assume we have a stream of tweets readily available, in Akka this is expressed as a Source[Out, M]:

假設我們有一個已准備好的推文流,在Akka中應該這樣表達Source[Out, M]:

val tweets: Source[Tweet, Unit]

Streams always start flowing from a Source[Out,M1] then can continue through Flow[In,Out,M2] elements or more advanced graph elements to finally be consumed by a Sink[In,M3] (ignore the type parameters M1M2 and M3 for now, they are not relevant to the types of the elements produced/consumed by these classes). Both Sources and Flows provide stream operations that can be used to transform the flowing data, a Sink however does not since its the "end of stream" and its behavior depends on the type of Sink used.

流總是以一個Source[Out, M1]開始,然后經過Flow[In, Out, M2]元素,或者更加高級的graph元素,最終被Sink[In, M3]消費(先忽略類型參數M1, M2和M3, 他們與這些類所生產/消費的元素類型無關)。Sources和Flows都提供了流操作,可以用來轉換流動的數據,但是Sink就沒有這樣的功能,因為它是“流的末端”,Sink的行為取決於所使用的Sink的類型。

In our case let's say we want to find all twitter handles of users which tweet about #akka, the operations should look familiar to anyone who has used the Scala Collections library, however they operate on streams and not collections of data:

在我們的例子中,我們想要獲取所有發了關於#akka的推文的用戶,任何使用過Scala集合庫的人都會發現這里使用的操作很熟悉,然而它們作用於流而不是數據集。
val authors: Source[Author, Unit] =
  tweets
    .filter(_.hashtags.contains(akka))
    .map(_.author)

Finally in order to materialize and run the stream computation we need to attach the Flow to a Sink that will get the flow running. The simplest way to do this is to call runWith(or by using the shorthand version (which are defined only for the most popular sinks such as FoldSink andForeachSink):sink) on a Source. For convenience a number of common Sinks are predefined and collected as methods on the Sink companion object. For now let's simply print each author:

最后,為了物化並且運行上邊的流計算,我們需要把這個Flow掛在一個使它運行起來的Sink上。最簡單的作法就是在Source上調用runWith(sink)。為了方便,有一些Sink已經被預先定義好了,並且在Sinkcompanion object里被作為方法收集了起來。讓我們先簡單地打印出每個作者:

authors.runWith(Sink.foreach(println))

or by using the shorthand version (which are defined only for the most popular sinks such as FoldSink andForeachSink):

或者使用簡化版(只在最流行的sink,比如FoldSinkForeachSink里定義了):

authors.runForeach(println)

Materializing and running a stream always requires a FlowMaterializer to be in implicit scope (or passed in explicitly, like this: .run(materializer)).

物化和運行一個流總要求在隱式作用域里有一個FlowMaterializer(或者顯示地傳遞,像這樣:.run(materializer)).

 

Flattening sequences in streams

使流中的序列扁平化

In the previous section we were working on 1:1 relationships of elements which is the most common case, but sometimes we might want to map from one element to a number of elements and receive a "flattened" stream, similarly like flatMap works on Scala Collections. In order to get a flattened stream of hashtags from our stream of tweets we can use the mapConcat combinator:

上一節中我們處理的元素是1:1的關系,這也是最常見的情況,但是有時候我們想要把一個元素映射成一些元素,得到一個“扁平化”的流,就像Scala集合中的 flatMap .為了從我們的推文流中獲得一個扁平化的hashtag流,我們使用 mapConcat 這個連接器:

val hashtags: Source[Hashtag, Unit] = tweets.mapConcat(_.hashtags.toList)

Note

The name flatMap was consciously avoided due to its proximity with for-comprehensions and monadic composition. It is problematic for two reasons: first, flattening by concatenation is often undesirable in bounded stream processing due to the risk of deadlock (with merge being the preferred strategy), and second, the monad laws would not hold for our implementation of flatMap (due to the liveness issues).

Please note that the mapConcat requires the supplied function to return a strict collection (f:Out=>immutable.Seq[T]), whereas flatMap would have to operate on streams all the way through.

 

Broadcasting a stream

廣播一個流

Now let's say we want to persist all hashtags, as well as all author names from this one live stream. For example we'd like to write all author handles into one file, and all hashtags into another file on disk. This means we have to split the source stream into 2 streams which will handle the writing to these different files.

假如我們想要持久化這個實時流中的所有hashtag,以及所有的作者名字。比如,我們想要把所有的用戶名寫到磁盤里的一個文件里,把所有的hashtag寫到另一個文件里。這意味着我們必須把這個作為源的流分成兩個流,分到不同的文件里。

Elements that can be used to form such "fan-out" (or "fan-in") structures are referred to as "junctions" in Akka Streams. One of these that we'll be using in this example is called Broadcast, and it simply emits elements from its input port to all of its output ports.

能夠用於實現這種“扇出”結構的元素在Akka Streams里邊被稱為"交叉點"。我們這個例子里用到的一種交叉點被稱為Broadcast(廣播),它單純地把元素從輸入端發射到所有輸出端。

Akka Streams intentionally separate the linear stream structures (Flows) from the non-linear, branching ones (FlowGraphs) in order to offer the most convenient API for both of these cases. Graphs can express arbitrarily complex stream setups at the expense of not reading as familiarly as collection transformations. It is also possible to wrap complex computation graphs as Flows, Sinks or Sources, which will be explained in detail in Constructing Sources, Sinks and Flows from Partial Graphs. FlowGraphs are constructed like this:

 Akka Streams有意地把線性的流結構(Flows)和非線性的、分支的流結構(FlowGraphs)分開,以便於為這兩種情況提供最方便的API。圖(Graph)可以表示任意復雜的流,但是就不像集合轉換那樣流起來很熟悉了。也可以把復雜的計算圖包裝成Flows, Sinks和Sources,這將在Constructing Sources, Sinks and Flows from Partial Graphs 里詳細描述。FlowGraphs像這樣構造:

val writeAuthors: Sink[Author, Unit] = ???
val writeHashtags: Sink[Hashtag, Unit] = ???
val g = FlowGraph.closed() { implicit b =>
  import FlowGraph.Implicits._
 
  val bcast = b.add(Broadcast[Tweet](2))
  tweets ~> bcast.in
  bcast.out(0) ~> Flow[Tweet].map(_.author) ~> writeAuthors 
  bcast.out(1) ~> Flow[Tweet].mapConcat(_.hashtags.toList) ~> writeHashtags
}
g.run()

Note

The ~> (read as "edge", "via" or "to") operator is only available if FlowGraph.Implicits._ are imported. Without this import you can still construct graphs using the builder.addEdge(from,[through,]to) method.

As you can see, inside the FlowGraph we use an implicit graph builder to mutably construct the graph using the ~>"edge operator" (also read as "connect" or "via" or "to"). Once we have the FlowGraph in the value g it is immutable, thread-safe, and freely shareable. A graph can can be run() directly - assuming all ports (sinks/sources) within a flow have been connected properly. It is possible to construct partial graphs where this is not required but this will be covered in detail in Constructing and combining Partial Flow Graphs.

你可以看到,在FlowGraph里我們可以用一個隱式的圖構建器,使用~>這個“邊操作符”(edge operator),構造圖。一旦FlowGraph被放在g中,那么它就是不可變的、線程安全的以及可以自由共享的。一個graph可以被直接run()運行——假如流中的所有端(sinks/sources)都被正確地連接起來。也可以構造不完全圖(partial graph),這不是必須的,其細節將在 Constructing and combining Partial Flow Graphs 中詳細闡述。

As all Akka Streams elements, Broadcast will properly propagate back-pressure to its upstream element.

就像所有的Akka Streams元素一樣, Broadcast可以正確地向它的上流元素施加back-pressure.

Back-pressure in action

Back-pressure實戰

One of the main advantages of Akka Streams is that they always propagate back-pressure information from stream Sinks (Subscribers) to their Sources (Publishers). It is not an optional feature, and is enabled at all times. To learn more about the back-pressure protocol used by Akka Streams and all other Reactive Streams compatible implementations read Back-pressure explained.

Akka Streams的一個主要優勢就是它們總是從流的Sinks(收集者)傳播back-pressure信息到Sources(發布者)。這不是一個可選的特性,而是總是開啟的。可以查看Back-pressure explained.來了解更多Akka Streams和其它Reactive Streams實現所采用的back-pressure協議。

A typical problem applications (not using Akka Streams) like this often face is that they are unable to process the incoming data fast enough, either temporarily or by design, and will start buffering incoming data until there's no more space to buffer, resulting in either OutOfMemoryError s or other severe degradations of service responsiveness. With Akka Streams buffering can and must be handled explicitly. For example, if we are only interested in the "most recent tweets, with a buffer of 10 elements" this can be expressed using the buffer element:

 這樣的程序(不使用Akka Streams)通常會遇到一個典型問題就它們不能足夠快地處理流入的數據,這種情況或者是暫時的或者是就是這么設計的,那么這時候就會緩存流入的數據直到再沒有空間來緩存, 結果不是發生OutOfMemoryError就是服務的響應性發生嚴重的下降。當使用Akka Streams時,緩沖可以而且必須被顯示地處理。比如,如果你只關心“最近的推文,使用一個包含10個元素的緩沖”, 這可以通過使用buffer這種元素來表達:
tweets
  .buffer(10, OverflowStrategy.dropHead)
  .map(slowComputation)
  .runWith(Sink.ignore)

The buffer element takes an explicit and required OverflowStrategy, which defines how the buffer should react when it receives another element element while it is full. Strategies provided include dropping the oldest element (dropHead), dropping the entire buffer, signalling errors etc. Be sure to pick and choose the strategy that fits your use case best.

Buffer這種元素接受一個顯式的以及必須的溢出策略(OverFlowStrategy),這個策略決定當buffer己經滿的時候它接受到另外的element(譯注:指流中的數據元素)時該怎么辦。已提供的策略包括丟棄最舊的元素(dropHead),丟棄整個buffer,報告發生錯誤等。一定要選擇最適於你的實際情況的策略。

 

Materialized values

物化的值


先插播一段Akka Streams文檔中對於materialize的描述,不然下邊說的東西不好理解。

Stream Materialization

流的物化

When constructing flows and graphs in Akka Streams think of them as preparing a blueprint, an execution plan. Stream materialization is the process of taking a stream description (the graph) and allocating all the necessary resources it needs in order to run. In the case of Akka Streams this often means starting up Actors which power the processing, but is not restricted to that - it could also mean opening files or socket connections etc. – depending on what the stream needs.

當在Akka Streams中構建流(flow)和圖(graph)時,可以把它們當作正在准備一個藍圖,一個執行計划。流的物化就是獲取一個流的描述(就是流程圖),然后分配它運行時需要的資源。在Akka Streams的例子中,這意味着起動驅動這個流處理過程的actor,但是不僅限於此——也可能意味着打開文件或者socket連接,等——取決於這個流需要什么。


 

So far we've been only processing data using Flows and consuming it into some kind of external Sink - be it by printing values or storing them in some external system. However sometimes we may be interested in some value that can be obtained from the materialized processing pipeline. For example, we want to know how many tweets we have processed. While this question is not as obvious to give an answer to in case of an infinite stream of tweets (one way to answer this question in a streaming setting would to create a stream of counts described as "up until now, we've processed N tweets"), but in general it is possible to deal with finite streams and come up with a nice result such as a total count of elements.

 到目前為止我們只是使用Flow來處理數據,並且把數據消費到一些外部的Sink中——或者是打印出值,或者是存儲到外部系統中。(譯注:意思是不在我們使用這個流的程序的內部獲取結果)但是,有時候我們感興趣的是一些從物化后的處理管道中獲取的值(譯注:指我們想要從這個流獲取一些值,賦給變量)。比如,我們想要知道我們處理過了多少推文。在流是無窮的情況下,這個問題的答案不是那么明顯(一種解答方法是創建一個計數流,描述“到目前為止,我們已經處理了N條推文),但是在有限的流中,這個問題是可以解決的,可以得到一個很好的答案,比如元素的總數。

First, let's write such an element counter using FoldSink and see how the types look like:

首先,讓我們來使用FoldSink寫一個元素計數器,來看一下其中的類型是什么樣的:
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
 
val counter: RunnableFlow[Future[Int]] = tweets.map(t => 1).toMat(sumSink)(Keep.right)
 
val sum: Future[Int] = counter.run()
 
sum.foreach(c => println(s"Total tweets processed: $c"))

First, we prepare the FoldSink which will be used to sum all Int elements of the stream. Next we connect thetweets stream though a map step which converts each tweet into the number 1, finally we connect the flow usingtoMat the previously prepared Sink. Remember those mysterious type parameters on Source Flow and Sink? They represent the type of values these processing parts return when materialized. When you chain these together, you can explicitly combine their materialized values: in our example we used the Keep.right predefined function, which tells the implementation to only care about the materialized type of the stage currently appended to the right. As you can notice, the materialized type of sumSink is Future[Int] and because of using Keep.right, the resultingRunnableFlow has also a type parameter of Future[Int].

首先,們准備一個FoldSink,它用來計算這個流中所有Int元素的和。然后我們給推文流(譯注:就是tweets這個Source)連接一個map的步驟,把每個推文轉換成數字1,然后我們使用toMat把這個flow連起來之前准備好的Sink上。還記得Source、Flow和Sink中那些神秘的類型參數嗎?它們表示這些處理組件在物化后返回的值。當你把它們連結在一起時,你可以顯式地組合這些物化的值:在我們的例子中,我們使用Keep.right這個預定義的函數,它告訴流的實現只關注當前附加在右端的處理階段的類型。你可以看到,sumSink的物化后的類型是Future[Int],因為使用了Keep.right,作為結果的RunnableFlow也有Futue[Int]這個類型參數。
 
This step does  not yet materialize the processing pipeline, it merely prepares the description of the Flow, which is now connected to a Sink, and therefore can be  run(), as indicated by its type:  RunnableFlow[Future[Int]]. Next we call run() which uses the implicit  ActorFlowMaterializer to materialize and run the flow. The value returned by calling run() on a  RunnableFlow[T] is of type  T. In our case this type is  Future[Int] which, when completed, will contain the total length of our tweets stream. In case of the stream failing, this future would complete with a Failure.
 
這個步驟仍然沒有把處理管道物化,它只是准備好了對於Flow的描述, 因為這個Flow現在連結到一個Sink,因此它可以run()了,這個可以通過它的類型RunnableFlow[Future[Int]]指明。下一步我們可以調用run(),它會使用隱式的ActorFlowMaterializer來物化並且運行這個flow。在一個RunnableFlow[T]上調用run()會返回一個類型為T的值。在我們的例子中,這個值是Future[Int],它在完成以后會包括我們的推文流的長度。在流失敗(failing)的情況下, 這個future將會以一個Failure完成。
 
RunnableFlow may be reused and materialized multiple times, because it is just the "blueprint" of the stream. This means that if we materialize a stream, for example one that consumes a live stream of tweets within a minute, the materialized values for those two materializations will be different, as illustrated by this example:
一個RunnableFlow可以被重用並且物化很多次,因為它只是流的“藍圖”。這意味着如果我們物化一個流,比如一個消費一分鍾內的實時推文流的流,物化兩次后得到的物化的值是不同的,就像下邊的例子一樣:
val sumSink = Sink.fold[Int, Int](0)(_ + _)
val counterRunnableFlow: RunnableFlow[Future[Int]] =
  tweetsInMinuteFromNow
    .filter(_.hashtags contains akka)
    .map(t => 1)
    .toMat(sumSink)(Keep.right)
 
// materialize the stream once in the morning
val morningTweetsCount: Future[Int] = counterRunnableFlow.run()
// and once in the evening, reusing the flow
val eveningTweetsCount: Future[Int] = counterRunnableFlow.run()

Many elements in Akka Streams provide materialized values which can be used for obtaining either results of computation or steering these elements which will be discussed in detail in Stream Materialization. Summing up this section, now we know what happens behind the scenes when we run this one-liner, which is equivalent to the multi line version above:

Akka Streams中的很多元素都提供了物化的值,可以用於獲取計算的結果或者駕馭這些元素,其中細節在 Stream Materialization 中討論。總結一下這一節,運行這一行代碼,等於運行上邊的那個多行的版本,我們現在應該知道其背后的細節了。
val sum: Future[Int] = tweets.map(t => 1).runWith(sumSink)

Note

runWith() is a convenience method that automatically ignores the materialized value of any other stages except those appended by the runWith() itself. In the above example it translates to using Keep.right as the combiner for materialized values.

 
 
runWith()是一個例利方法,它自動的忽略了其它階段的物化值,而只保留通過runWith()附加的那個階段的值。在上邊的例子中,它自動地使用Keep.right作為物化值的組合器。
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM