原文地址:http://projectreactor.io/docs/reference/
Reactor簡介
Reactor是一個基礎庫,用在構建實時數據流應用、要求有容錯和低延遲至毫秒、納秒、皮秒的服務。
— PrefaceTL;DR
什么是Reactor?
讓我們大致了解一下Reactor。在你使用喜歡的搜索敲入一些關鍵詞如Reactive、spring Reactive、Asynchronous java或者僅僅是"What the heck is Reactor?".簡而言之,Reactor是一個輕量級的JVM基礎庫,它可以幫助我們構建的服務和應用高效而異步的傳遞消息。
高效的含義是什么呢?
傳遞一個消息從A到B時GC產生的內存很小或者完全沒有。
當消費者處理消息的速度低於生產者產生消息的速度時產生了溢出時,必須盡快處理。
盡可能的提供無鎖的異步流。
據以往的經驗來看,我們知道異步編程是困難的,特別是當一個平台提供了很多選項如JVM。
Reactor瞄准絕大部分場景中真正的無阻塞,並且提供了一組比原生Jdk的java.util.concurrent庫更高效的API。Reactor也提供了一個可選性(不建議使用):
阻塞等待:如Future.get()。
Unsafe數據獲取:如ReentrantLock.lock()。
異常拋出:如try ..catch ...finally
同步阻塞:如 syschronized
Wrapper配置(GC壓力):例如 new Wrapper<T>(event)
讓我們先使用一個純正的Executor方法:
private ExecutorService threadPool = Executors.newFixedThreadPool(8); final List<T> batches = new ArrayList<T>(); Callable<T> t = new Callable<T>() { //1 public T run() { synchronized(batches) { //2 T result = callDatabase(msg); //3 batches.add(result); return result; } } }; Future<T> f = threadPool.submit(t); //4 T result = f.get() //5
1.分配回調方法---可能會導致gc壓力。
2.Synchronization將強制對每個線程停止檢查。
3. 存在消費者的消費能力低於生產者生產能力的隱患。
4. 使用線程池將task傳遞到目標線程--肯定通過FutureTask給gc造成壓力。
5. 阻塞直至callDatabase()響應。
從上述的簡單示例中,容易看出擴展性會受到嚴重的影響。
不斷分配的對象將導致gc停止工作,特別是耗時比較多的大任務時。當一個gc停止工作時將會從降低全局的性能。
隊列默認情況下長度是不受限制的。任務會堆積到數據庫中。
后台日志不是一個內存泄露的地方,但是副作用就比較煩人了:在gc暫停工作時需要掃描更多對象;損失數據重要bit的風險;等等。
經典鏈接Queue分配節點時產生的內存壓力。
使用阻塞方式應答請求時發生惡性循環。
阻塞方式應答導致生產者效率慢下來。實際上,因為需要提交更多任務時等待響應,流程變成了基本的同步方式。
同數據存儲的通信異常將以不友好的形式傳遞到生產者,通過線程邊界來分離工作,這使容錯的協商變的比較容易。
完全的、真正的非阻塞比較難以實現---特別是有比較時髦名稱的分布式系統中如微服務架構。然而,Reactor卻沒有妥協,它試圖利用可用的最佳模式來使開發者不必覺得像是在寫一個數學論文而僅僅是一個微服務(nanservice)。
沒有什么比較光更快的了(除了流言蜚語和病毒貓視頻),在某些方面,延遲是每個真實世界的系統必須關注的。為此:
反應器提供了一個框架,可以幫助你減輕惡心的延遲引起的副作用,在應用程序中使用最小的開銷:使用一些靈活的結構,通過在啟動時預先分配在運行時的分配數據結構來避免分配問題。
限制主消息傳送結構,因而不會導致任務無限的累積。
利用流行的模式例如Reactive和事件驅動架構來提供一個包含應答的非阻塞的、端對端流。
實現了最新的Reactive流標准,通過不發送多於當前容量的請求來使受限的結構更有效率。
使用這些概念到進程間通信,提供了理解控制流的非阻塞IO驅動。
對開發者暴露功能API,幫助開發者使用一個無副作用的方式組織代碼,也幫助你確定在什么場景下你是線程安全和具有容錯性的。
項目簡介:
該項目始於2012年,孕育時間較長。2013年出現Reactor1.x版本。該版本成功部署到不同的組織,不僅有開源組織如MeltDown、還有商業機構如Pivotal RTI。2014年我們實現了新的"Reactive流標准",並在2015年的4月開始了版本2.0的大規模重構目標。Reactive流標准拉近了分發機制的鴻溝:控制多少線程傳遞多少數據。
同時我們也決定重新調整我們的一些事件驅動和任務協調API的來應對日益流行、記錄的reactive擴展。
Reactor由Pivotal贊助支持,有兩個核心提交者。因為Pivotal同時也是spring框架的東家,我們的很多同事也是不同spring項目的核心貢獻者,所以我們也提供從Reactor到spring的集成同時也支持spring框架的一些重要功能如spring消息模塊的STOMP代理。也就是說,我們不會強迫僅僅想使用Reactor的人去適應spring。我們保留了一個大容量Reactive的內嵌工具。事實上,Reactor的目標之一是在你解決異步和功能性問題時保持公正的態度。
Reactor遵循Apache 2.0 licensed ,可以通過 GitHub獲取。
使用要求:
Reactor需要jdk7及以上版本。
但完整的功能組合表達式需要java8的lambdas支持。
作為后備,支持spring clojure和groovy的擴展。
Reactor需要jvm支持Unsafe方式獲取(如:android不支持)時才能表現最全的功能。
當Unsafe獲取不支持是所有基於RingBuffer的特定將不能工作。
Reactor打包成傳統的jar形式存在於maven中央庫中,可以使用你喜歡的工具來拉取這個依賴包。
架構總覽:

Reactor基本代碼划分為幾個子模塊,這樣你可以單獨使用某一模塊而拋棄不需要的模塊。
下述是一些使用Reactor模塊和其它混合的Reactive技術示例,完成異步目標:
-
Spring XD + Reactor-Net (Core/Stream) : 使用Reactor 作為Sink/Source IO 驅動.
-
Grails | Spring + Reactor-Stream (Core) : 使用Stream和 Promise作為后台處理程序。
-
Spring Data + Reactor-Bus (Core) : 生產數據庫事件(Save/Delete/…).
-
Spring集成Java DSL + Reactor Stream (Core) : Microbatch MessageChannel from Spring Integration.
-
RxJavaReactiveStreams + RxJava + Reactor-Core : Combine rich composition with efficient asynchronous IO Processor
-
RxJavaReactiveStreams + RxJava + Reactor-Net (Core/Stream) : Compose input data with RxJava and gate with Async IO drivers.

Reactive stream
Reactive stream是一個新的標准,不同的廠商和技術組織包括Netflix,oracle,Pivotal,TypeSafe支持這個標准。該標准有望被java9或者以后的版本的標准收錄進去。
該標准的目標是提供一種同步或者異步的具有控制流機制的數據序列。這個標准是輕量級的,第一個目標是JVM。它提供了4個java接口,一個Tck和一系列的示例。根據需要,4個java接口的實現非常直接,這個項目的內涵是Tck對操作的校驗。

圖三 Reactive stream協約
Reactive Streams接口 org.reactivestreams.Pubslisher: 數據源(從0到N個數據,n是任意的).它提供兩個可選的中斷事件:error和completion。 org.reactivestreams.Subscriber: 數據序列的消費者(從0到N個數據,n是任意的).它在初始化時接收一個Subscription,subscription獲取Subscriber將要處理多少數據。 與其它數據時序信號交互的其它回調有: next (新消息)和可選的completion/error.
org.reactivestreams.Subscription:在初始化時傳給Subscriber的一個小的追蹤器。.它控制着我們准備消費掉多少數據和什么時候停止消費(取消).
org.reactivestreams.Processor: 既是Subscriber和Publisher的組件標識!

圖四:Reactive Stream發布協約
通過傳遞Subscriber,一個請求數據從subscriber到publisher有兩種方式: 無限制的: 在訂閱時, 僅調用Subscription的request(Long.MAX_VALUE)方法. 有限制的: 在訂閱時, 保留subscription的引用,並且當subscriber准備處理數據時調用request(long)方法。 通常, 在訂閱時Subscribers將請求一組初始數據或者甚至1個數據。 然后,onNext認為執行成功(例如后面的Commit, Flush等等), 請求更多的數據。 建議使用線性組的請求。為避免請求重疊,例如每次下次請求時請求10個或者更多的數據。
表1 目前為止,Reactor直接使用的Reactive stream接口及實現
| Reactive Streams | Reactor模塊 | 實現 | 說明 |
|---|---|---|---|
| Processor |
reactor-core, reactor-stream |
reactor.core.processor.*, reactor.rx.* |
在core模塊,提供了RingBuffer處理器,在stream模塊,提供了一整組操作和Broadcaster。 |
| Publisher |
reactor-core, reactor-bus, reactor-stream, reactor-net |
reactor.core.processor.*, reactor.rx.stream.*, reactor.rx.action.*, reactor.io.net.* |
在core模塊,處理器繼承了Publisher.在bus模塊,發布一個不限制的路由事件,在stream模塊,stream擴展直接繼承Publisher. 在net模塊,Chanel繼承了Publisher來消費請求數據,同時也提供了具有flush和close的回調的providers. |
| Subscriber |
reactor-core, reactor-bus, reactor-stream, reactor-net |
reactor.core.processor.*, reactor.bus.EventBus.*, reactor.rx.action.*, reactor.io.net.impl.* |
在core模塊,處理器繼承了Subscriber. 在bus模塊,提供了無限制的Publisher/Subscriber能力.在stream模塊,Subscribers計算特定的回調行為.在Net模塊,subscriber的IO層實現處理寫、關閉和flush. |
| Subscription |
reactor-stream, reactor-net |
reactor.rx.subscription.*, reactor.io.net.impl.* |
在stream模塊, 提供了一個優化過的PushSubscriptions和 buffering-ready ReactiveSubscription. 在Net模塊, 使用自定義Subscription實現背壓的方式實現異步IO讀。 |
從reactor 2啟動時我們就一直遵循這個標准,並且隨着標准的改變而改變直到1.0.0正式版准備發布。現在可以通過maven中央庫及其流行的鏡像可以找到該標准,你將發現它作為過渡,依賴於reactor-core模塊。
Reactive擴展
Reactive擴展或者通常稱作Rx,是一種定義完備的功能api,這些api擴展了觀察者模式到一個史詩的程度。
Rx模式支持實現了使用少數設計的關鍵字來處理Reactive 數據序列:
使用回調鏈來抽象實時及延遲:當可以獲得到數據時調用。
抽象了一直使用的線程模式:同步或者異步僅僅是我們處理的Observable/Stream。
控制錯誤傳遞及停止:錯誤和完成信號及數據的有效負載信號傳遞到鏈中。
在多個預先定義的api中解決了多個擴展-聚合及其它組合問題。
Reactive擴展的標准Jvm實現是RxJava。它提供了一個功能豐富的Api。
Reactor 2 提供了一個特定模塊實現了Reactive擴展的一部分功能。建議需要使用Reactive stream全部功能的用戶使用RxJava。最后,當組合完整的RxJava系統時,用戶可以從Reactor提供的強大的異步和IO的中獲益。
表2:Rx和Reactor stream的不同點:
| rx | reactor-stream | 說明 |
|---|---|---|
| Observable |
reactor.rx.Stream |
Reactive Stream Publisher的實現 |
| Operator |
reactor.rx.action.Action |
Reactive Stream Processor的實現 |
| Observable with 1 data at most |
reactor.rx.Promise |
返回唯一結果的類型, Reactive Stream Processor實現並提供了可選的異步分發功能。 |
| Factory API (just, from, merge….) |
reactor.rx.Streams |
和core模塊的 data-focused 子類一樣, 返回 Stream |
| Functional API (map, filter, take….) |
reactor.rx.Stream |
和core模塊的data-focused 子類一樣, 返回Stream |
| Schedulers |
reactor.core.Dispatcher, org.reactivestreams.Processor |
Reactor Stream計算無限制的共享Dispatcher或者有限的Processor的操作。 |
| Observable.observeOn() |
Stream.dispatchOn() |
只是dispatcher參數的一個適配命名。 |
