今天我們來講解akka-streams,這應該算akka框架下實現的一個很高級的工具。之前在學習akka streams的時候,我是覺得雲里霧里的,感覺非常復雜,而且又難學,不過隨着對akka源碼的深入,才逐漸明白它到底是怎么一回事。下面介紹主要摘自akka官網,但會融入我的理解,以及部分源碼,以減少大家學習的難度。
首先近幾年流式計算很火,有各種各樣的框架,比如spark、storm、flink等,當然前提是我們得有這樣的需求。隨着數據量越來越大,我們很難一次性處理全部的數據,只能采用流水線或周期性的取一部分數據進行加工。簡單來說就是“分而治之”。
Actors是基於消息通信的異步機制,也可以用來處理流式數據。akka使actor變得穩定、可恢復,但我們還需要仔細的考慮數據過載的問題。比如某個actor處理消息過慢,導致后續消息積壓在mailbox中。Actor的消息也可能丟失,必要時就需要重傳。當以固定的模式處理流式數據的元素時,actor就顯得力不從心了,或者我們需要花很大的代價來確保正確性、准確性。
所以,akka團隊提供了一套Akka Streams API,主要目的是提供一套直觀的、安全的方法來規范流式處理過程,這樣我們就可以用有限的資源來高效的執行流式計算,當然再也不會有內存溢出的錯誤了。當然前提是有一套背壓的機制,背壓是“Reactive Streams”的核心概念,Akka是“Reactive Streams”的創建成員。這也就意味着我們可以在Akka Streams中無縫的與其他Reactive Streams實現進行交互。
Akka Streams與Reactive Streams完全解耦,前者關注在數據流轉換的格式化,后者是用來定義通用的機制來跨異步邊界移動數據而且不會丟失數據、緩存數據、耗盡資源。簡單來說,Akka Streams是面向開發者的,它內部使用Reactive Streams接口來傳遞數據。其實,簡單來說就是Akka Streams定義了一套開發者友好的API,並在內部把這些API轉換成了Reactive Streams接口,並在內部用actor實現了Reactive Streams接口。
那Reactive Streams接口都有什么呢?
- Publisher
- Subscriber
- Subscription
- Processor
Reactive Streams由四個組件構成,分別為消息發布者、訂閱者、訂閱(或者稱為令牌)、處理器。
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
Publisher貌似很簡單,就只有subscribe接口,是訂閱者調用的,用來訂閱發布者的消息。發布者在訂閱者調用request之后把消息push給訂閱者。
public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
Subscriber也很簡單,就是四個接口,分別為異步觸發。當然了,是由Publisher觸發調用的。onSubscribe告訴訂閱者訂閱成功,並返回了一個Subscription,通過Subscription訂閱者可以告訴發布者發送指定數量的消息;onNext是發布者有消息時,調用訂閱者這個接口來達到發布消息的目的的;onError通知訂閱者,發布者出現了錯誤;onComplete通知訂閱者消息發送完畢。當然這些接口都是異步的。
public interface Subscription { public void request(long n); public void cancel(); }
Subscription只有兩個接口,請求n個消息,取消此次訂閱。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
Processor代表一個處理階段,同時繼承了Subscriber,Publisher。
其實Reactive Streams只是通過上面的四個組件和相關的函數,對反應式流進行了一個框架性的約定,並沒有具體的實現。簡單來說,它只提供通用的、合適的解決方案,大家都按照這個規約來實現就好了。Akka Streams就是這樣的一個實現,只不過又對其進行了封裝,使其更加易用。
我們來看看Akka Streams的核心概念。Akka Streams是一個庫,用有限的緩沖空間來處理、轉換一系列數據。翻譯成日常術語就是,它能夠表達成對一系列數據處理的連,每個加工節點都是獨立的(而且盡量並行的),同時只緩存有限數量的元素。當然了,有限的緩存這一點與Actor模型有很大不同,因為Akka Streams並不會去主動丟棄數據。
Akka Streams中的Stream就是一個active的移動、轉換數據的進程。Element是流中南的一個處理單元。所有的轉換操作把Elements從上游移動到下游。背壓是一種流量控制手段,一種數據消費者通知生產者當前可用性的方法,它可以減慢上游生產者產生數據的速度。在Akka中,背壓是非阻塞和異步的。Akka Streams中所有操作都是非阻塞的。Akka Streams的計算邏輯是用Graph來描述的,它定義了元素被處理的路徑,但不一定是一個DAG。Operator是編譯Graph的通用名稱,常見的有map、filter。
Akka Streams有幾個核心的概念,需要我們理解和掌握。
Source。這是一個只會產生數據的操作,它在下游可以接收的時候發送數據。
Sink。這是一個只有輸入的操作。對數據的請求和接受有可能會減慢上游數據的產生速度。
Flow。這是只有一個輸入和一個輸出的操作,它連接上下游,傳輸數據。
RunnableGraph。這是一個同時具有Source和Sink的流,也就意味着它可以運行。簡單來說就是,它可以被編譯成actor拓撲了,數據可以經過actor進行流轉並被處理。
val source = Source(1 to 10) val sink = Sink.fold[Int, Int](0)(_ + _) // connect the Source to the Sink, obtaining a RunnableGraph val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right) // materialize the flow and get the value of the FoldSink val sum: Future[Int] = runnable.run()
我們簡要分析一下這幾個核心概念的源碼。
/** * A `Source` is a set of stream processing steps that has one open output. It can comprise * any number of internal sources and transformations that are wired together, or it can be * an “atomic” source, e.g. from a collection or a file. Materialization turns a Source into * a Reactive Streams `Publisher` (at least conceptually). */ final class Source[+Out, +Mat]( override val traversalBuilder: LinearTraversalBuilder, override val shape: SourceShape[Out]) extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat]
官網注釋的最后一句話非常重要,它說Materialization把一個Source轉換成了Reactive Streams規范中的Publisher,至少是概念上的。
/** * A `Sink` is a set of stream processing steps that has one open input. * Can be used as a `Subscriber` */ final class Sink[-In, +Mat]( override val traversalBuilder: LinearTraversalBuilder, override val shape: SinkShape[In]) extends Graph[SinkShape[In], Mat]
Sink可以被用作一個Subscriber。
/** * A `Flow` is a set of stream processing steps that has one open input and one open output. */ final class Flow[-In, +Out, +Mat]( override val traversalBuilder: LinearTraversalBuilder, override val shape: FlowShape[In, Out]) extends FlowOpsMat[Out, Mat] with Graph[FlowShape[In, Out], Mat]
/** * Flow with attached input and output, can be executed. */ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBuilder) extends Graph[ClosedShape, Mat] {
官網對Flow和RunnableGraph的注釋很簡單,這其實非常不利於我們深層次的研究AkkaStreams的實現原理。但我們可以不負責任的猜一下。AkkaStreams的API首先被翻譯成RecativeStreams相關的組件及其接口的調用,然后通過ActorSystem和actors實現這些核心組件,比如Publisher、Subscriber。當然了,考慮到這個編譯過程的復雜性,這部分的源碼估計要后面很久才能深入分析了。