響應式編程的實踐


響應式編程在前端開發以及Android開發中有頗多運用,然而它的非阻塞異步編程模型以及對消息流的處理模式也在后端得到越來越多的應用。除了Netflix的OSS中大量使用了響應式編程之外,最近阿里也提出Dubbo 3.0版本將全面擁抱響應式編程。

我之前針對某些項目需求也給出了響應式編程的方案,較好地解決了並行編程與異步編程的問題。不過在深入了解響應式編程之后,我也給出了自己的一些實踐總結。

響應式編程並非銀彈

響應式編程並非銀彈。事實上在軟件領域,Brooks提出的“沒有銀彈”一說或許將永遠生效。當我們在選擇使用響應式編程時,一定要明確它的適用場景,主要包括:

  • 處理由用戶或其他系統發起的事件,如鼠標點擊、鍵盤按鍵或者物聯網設備等無時無刻都在發射信號的情況
  • 處理磁盤或網絡等高延遲的IO數據,且保證這些IO操作是異步的
  • 業務的處理流程是流式的,且需要高響應的非阻塞操作

除此之外,我們當然也可以利用一些響應式編程框架如Rx,簡化並發編程與數據流操作的實現。諸如RxJava就提供非常完整的工廠方法,可以將非響應式編程的Iterable、Array以及與響應式編程有一定相關性的Future、Callable轉換為Observable或Flowable。

理解Source的本質

Akka Stream將流數據源定義為Source,RxJava則定義為Observable或Flowable。這些響應式編程框架都為Source提供了豐富的operator。其中除了組合流的操作之外,最基本的操作即為:filter、map、flatMap、reduce等。

粗略看來,這些操作皆為函數式的編程接口,從FP的角度看,我們甚至可以將Source視為一個monad。而站在Java編程的角度看,我們則很容易將Source視為等同於集合的數據結構。更何況,響應式編程實則脫胎於Observer模式與Iterator模式,其中Iterator模式就是針對集合的操作,只不過Observable或Flowable是push模型,而Iterator模式則為pull模型罷了。

然而這就是本質的區別,即Source是一個不斷發射事件(data、error、complete)的源頭,具有時間序列的特點,而Iterable則是一個靜態的數據結構,在對它進行操作時,該數據結構中存儲的數據就已經存在了。

合理設計Source的粒度

在演示Observable或Flowable的API時,我們往往喜歡采用Fluent Interface的方式連續地調用它的operator,形成一個整體的流處理過程。這並非總是合理的。當一個Source被多個operator串聯起來的時候,會使得這個Source更加難以被重用

例如,在加載網頁時,默認發起對后端服務的調用並返回需要的用戶信息,若建模為流A,其轉換如下所示:

uri ----> user ----> | -->

同時,有一個鼠標點擊事件也會通過隨機生成URL發起對后端服務的調用並返回需要的用戶信息,倘若建模為流B,其轉換如下所示:

click ----> uri ----> user ----> | -->

顯然,這兩個流在從uri到user的流處理上出現了重復。如果我們創建的流A與流B並不包含uri到user的轉換,就可以通過merge等合並操作將A與B合並,然后再共同重用從uri到user的轉換。我們也無需擔心創建細粒度流的成本,因為這些流的創建是lazy的,流雖然創建了,對流的操作卻不會立即執行。

分離操作的邏輯

無論是哪個響應式框架,都為流(Source)提供了豐富的operator。這些operator多數都支持lambda表達式。在處理簡單的業務邏輯時,這樣的實現是沒有問題的;然而一旦邏輯變得非常復雜,lambda表達式的表達能力就不夠了。從編程實踐看,lambda表達式本身就應該保持微小的粒度。這時,就應該將這些邏輯單獨分離出來,放到單獨的類與方法中。

例如,我們根據device的配置信息去調用遠程服務獲取設備信息,然后提取信息獲得業務需要的指標,對指標進行轉換,最后將轉換的數據寫入到數據庫中。結合函數的轉換本質,我們可以將這些操作拆分為多個連續的操作:

deviceConfig --> deviceInfo --> List<extractedInfo> --> transformedInfo --> write

倘若這些轉換的邏輯非常復雜,就可以將這些邏輯分別封裝到DeviceFetcher、DeviceExtractor、DeviceTransformer與DeviceWriter這四個類中,於是代碼可以寫為:

Flowable.fromIterable(deviceConfigs)
    .parallel()
    .runOn(Schedulers.computation())
    .map(DeviceFetcher::fetch)
    .flatMap(DeviceExtractor::extract)
    .map(DeviceTransformer::transform)
    .sequential()
    .blockingSubscribe(info -> DeviceWriter.write(info); err -> log(err); () -> log("done."));

這一實踐提倡將流的操作與每個操作的業務分離開,既能夠保證流操作的簡單與純粹,又能保證操作業務的重用與可擴展。

API的設計

如果我們要設計符合響應式編程設計的API,則應該盡可能保證每個方法都是非阻塞的。要做到這一點,就應該保證每個方法返回的類型是Source或Publisher。例如針對要返回多個數據的流,可以返回Observable<T>或者Flowable<T>;如果確定只返回一個數據,則可以返回Single<T>;倘若不確定,則返回Maybe<T>。倘若該API方法僅僅是一個命令,無需返回結果,又需要保證方法是非阻塞的,則可以考慮返回Completable<T>

從某種意義上說,返回Future<T>CompletableFuture<T>或者CompletableStage<T>也可以認為是響應式的。這三個類型由於是JDK自身提供的,因此更純粹。唯一不便的是這些接口沒有提供類似Observable那樣豐富的operator,但好在Observable與Flowable都提供了fromFuture()方法對其進行轉換,因而這樣的設計也是可取的。

Akka Stream的流拓撲圖

Akka Stream對流處理的抽象被建模為圖。這一設計思想使得流的處理變得更加直觀,流的處理變成了“搭積木”游戲。可惜Java的DSL能力實在太弱,如果對比Scala與Java,你會發現GraphDSL對Graph的構造在表現上簡直是天壤之別。

例如這是官方文檔中Java版本對Graph的構造:

RunnableGraph.fromGraph(GraphDSL.create(builder -> {
    final Outlet<Integer> A = builder.add(Source.single(0)).out();
    final UniformFanOutShape<Integer, Integer> B = builder.add(Broadcast.create(2));
    final UniformFanInShape<Integer, Integer> C = builder.add(Merge.create(2));
    final FlowShape<Integer, Integer> D = builder.add(Flow.of(Integer.class).map(i -> i + 1));
    final UniformFanOutShape<Integer, Integer> E = builder.add(Balance.create(2));
    final UniformFanInShape<Integer, Integer> F = builder.add(Merge.create(2));
    final Inlet<Integer> G = builder.add(Sink.<Integer>foreach(System.out::println)).in();

    builder.from(F).toFanIn(C); //feedback loop
    builder.from(A).viaFanOut(B).viaFanIn(C).toFanIn(F);
    builder.from(B).via(D).viaFanOut(E).toFanIn(F);
    builder.from(E).toInlet(G);

    return ClosedShape.getInstance();
})).run(mat);

如下是官方文檔中Scala版本對同一個Graph的構造:

RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  val A: Outlet[Int]                  = builder.add(Source.single(0)).out
  val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2))
  val C: UniformFanInShape[Int, Int]  = builder.add(Merge[Int](2))
  val D: FlowShape[Int, Int]          = builder.add(Flow[Int].map(_ + 1))
  val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2))
  val F: UniformFanInShape[Int, Int]  = builder.add(Merge[Int](2))
  val G: Inlet[Any]
  
                C     <~      F
  A  ~>  B  ~>  C     ~>      F
         B  ~>  D  ~>  E  ~>  F
                       E  ~>  G
  ClosedShape
})

我們也看到,倘若在GraphDSL中我們能夠將構成Graph的“材料”對象事先創建好,而將build工作統一放在一起,可以在一定程度改進代碼的表現力。

我們可以將Akka Stream的Graph(完整的Graph,稱為ClosedShape,是可以運行的,又稱之為RunnableShape)看做是流處理的”模具“,至於那些由Inlet與Outlet端口組成的基礎Shape,則是設計這些模具的”基礎材料“。

模具是靜態的,基礎材料組合材料是可重用的單元,然后再組合可以重用的業務單元(以函數、類或者接口形式進行封裝),這個模具就具有了業務處理能力。如果這個拓撲圖過於復雜,我們還可以利用基礎Shape組合形成一個個更粗粒度Partial Shap。這些Partial Shape不是封閉的,可以理解為更粗粒度的Source、Sink和Flow,它使得模具的組裝變得更加簡單。**

材料、業務單元、模具之間的關系可以形象地用下圖來表示:

一旦流處理的模具打造完畢,打開數據流的”水龍頭“,讓數據源源不斷地流入Graph中,流處理就可以”自動“運行。只要Source沒有發出complete或error信號,它就將一直運行下去。Akka Stream之所以將Graph的運行器稱之為materializer,大約也是源於這樣的隱喻吧。

使用Akka Stream進行響應式流處理,我建議參考這樣的思維。

原文地址:https://iamzhangyi.github.io/2018/02/03/summary-of-reactive-programming/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM