You should never do your asynchronous work alone.
— Jon Brisbin
完成Reactor 1后寫到
You should never do your asynchronous work alone.
— Stephane Maldini
完成Reactor 2后寫到
名稱解釋:back pressure:背壓。在交換機在阻止外來數據包發送到堵塞端口的時候可能會發生丟包。而背壓就是考驗交換機在這個時候避免丟包的能力。很多的交換機當發送或接收緩沖區溢出的時候通過將阻塞信號發送回源地址來實現背壓。交換機在全雙工時使用IEEE802.3x流控制達到同樣目的。
首先,我們使用groovy示例來展示core模塊的功能:
//Initialize context and get default dispatcher Environment.initialize() //RingBufferDispatcher with 8192 slots by default def dispatcher = Environment.sharedDispatcher() //Create a callback Consumer<Integer> c = { data -> println "some data arrived: $data" } //Create an error callback Consumer<Throwable> errorHandler = { it.printStackTrace } //Dispatch data asynchronously r.dispatch(1234, c, errorHandler) Environment.terminate()
下面,我們使用Stream reactive實現來看:
//standalone async processor def processor = RingBufferProcessor.<Integer>create() //send data, will be kept safe until a subscriber attaches to the processor processor.onNext(1234) processor.onNext(5678) //consume integer data processor.subscribe(new Subscriber<Integer>(){ void onSubscribe(Subscription s){ //unbounded subscriber s.request Long.MAX } void onNext(Integer data){ println data } void onError(Throwable err){ err.printStackTrace() } void onComplete(){ println 'done!' } } //Shutdown internal thread and call complete processor.onComplete()
Core模塊概覽
Reactor core模塊的子單元:
Common IO和功能類型,一些是直接從java8 功能接口回遷的。
Function、Supplier、consumer、Predicate、BiConsumer、BiFunction
Tuples
Resource、Pausable、Timer
Buffer,Codec和一組預定義的Codec。
Environment 上下文
Dispatcher 協議和一組預定義的Dispatcher。
預定義的Reactive Stream Processor
reactor-core可以用來逐漸替代另外的消息傳遞策略、調度時間任務或者以小的功能塊組織代碼。這種突破使開發者與其它Reactive基礎庫更好的合作,特別是對於沒有耐心的開發者,沒有了對RingBuffer的理解負擔。
注意:Reactor-core隱藏了LMAX disruptor,因此不會出現也不會和現有的Disruptor依賴沖突。
功能模塊
功能模塊重用是核心,通常情況下在你使用Reactor時就需要的功能。因此,功能編程酷在哪里?其中一個核心理念是將可執行代碼看做別的數據。另一點,類似於Closure或者匿名函數,此時業務邏輯由最初的調用者決定。它同樣避免了過量的If/SWITCH模塊,並且這種分離是概念更清晰:每個模塊完成一個功能且不需要共享任何東西。
組織功能模塊
每個功能組件都給出它的一般任務的明確意圖:
Consumer:簡單回調--一勞永逸的
BiCounsumer:兩個參數的簡單回調,通常用在序列比較,例如:前一個和下一個參數。
Function:轉換邏輯--請求/應答
BiFunction:兩個參數的轉換,通常用在累加器,比較前一個和下一個參數,返回一個新的值。
Supplier:工廠邏輯--輪詢
Predicate:測試路徑--過濾
注意:我們也將Publisher和Subscriber視作功能塊,敢於稱之為Reactive功能塊。盡管如此,它們作為基礎組件,廣泛應用到Reactor及其其它地方。Stream API接收reactor.fn參數,為你創建合適的Subscriber。
好消息是在功能模塊中包裝可執行指令可以向磚塊一樣進行復用。
Consumer<String> consumer = new Consumer<String>(){ @Override void accept(String value){ System.out.println(value); } }; //Now in Java 8 style for brievety Function<Integer, String> transformation = integer -> ""+integer; Supplier<Integer> supplier = () -> 123; BiConsumer<Consumer<String>, String> biConsumer = (callback, value) -> { for(int i = 0; i < 10; i++){ //lazy evaluate the final logic to run callback.accept(value); } }; //note how the execution flows from supplier to biconsumer biConsumer.accept( consumer, transformation.apply( supplier.get() ) );
最初聽起來,這可能不是一個引人注目的革命性變革。但是這種基本思維模式的改變,將揭示我們使異步代碼變的穩健和可組合性的使命是多么可貴。Dispatcher分發器將輸入數據和錯誤回調分發給consumer來處理。Reactor Stream模塊將更好的使用這些組件。
當使用Ioc容器如spring時,一個好的開發者將利用Java的配置屬性來返回一個無狀態的功能bean。然后可以優美的注入到stream Pipeline或者分發他們的執行代碼中的block中。
元組
你可以注意到這些接口,它們對輸入參數和比較少的固定數量的參數的泛型有很好的支持。你怎么傳遞超過1個或者超過2個的參數呢?答案是使用元組Tuple,Tuple類似於csv中一個單獨實例的一樣,可以在在功能性編程中保證它們的類型安全和支持多個數量的參數。
以前面的例子為例,我們嘗試提供兩個參數的BiConsumer而使用單個參數的Consumer
Consumer<Tuple2<Consumer<String>, String>> biConsumer = tuple -> { for(int i = 0; i < 10; i++){ //Correct typing, compiler happy tuple.getT1().accept(tuple.getT2()); } }; biConsumer.accept( Tuple.of( consumer, transformation.apply(supplier.get()) ) );
注意:Tuple需要分配更多的空間,因此在比較或者鍵值信號等一般使用場景中更多直接使用Bi***組件。
Environment和Dispatcher
功能性構建塊已經准備就緒,讓我們使用它們來進行異步編程。第一步是到Dispatcher分區。
在我們啟動任意Dispatcher前,需要保證可以有效的創建它們。通常,創建它們的代價比較高,原因是需要預分配一個內存分區來保持分配的信號,這就是前言中介紹的著名的運行時分配和啟動時預分配的不同對比。因此提出了一個名為"Environment"共享上下文概念,使用它來管理這些不同類型的Dispatcher,從而避免不必要的創建開銷。
Environment
reactor的使用者(或者可用的擴展庫如@Spring)創建或者停止Environment。它們自動從META_INF/reactor/reactor-environment.properties處讀取配置文件。
注意,屬性文件可以改變,通過在classpath下的META-INFO/reactor目錄下一個新的屬性配置可以改變屬性文件。
通過傳遞下面的環境變量reactor.profiles.active來在運行時段改變默認的配置文件。
java - jar reactor-app.jar -Dreactor.profiles.active=turbo
啟動和停止Environment
Environment env = Environment.initialize(); //Current registered environment is the same than the one initialized Assert.isTrue(Environment.get() == env); //Find a dispatcher named "shared" Dispatcher d = Environment.dispatcher("shared"); //get the Timer bound to this environment Timer timer = Environment.timer(); //Shutdown registered Dispatchers and Timers that might run non-daemon threads Environment.terminate(); //An option could be to register a shutdownHook to automatically invoke terminate.
注意:在一個給定的Jvm應用中,最好只維護一個Enviroment.在大多數情況下,使用Environment.initializeIfEmpty()就完全ok。
Dispacher分發器
從Reactor 1開始,Dispatcher就存在了。Dispatcher通常抽象消息傳遞的方法,和Java Executor有類似的通用約定。事實上Dispatcher繼承自Executor。
Dispatcher對有數據信號的傳送方式及消費者同步或異步執行的錯誤信息有一套比較嚴格的類型限制約定。這種方式在面對經典的Executors時解決了第一個問題--錯誤隔離。效果如下:
錯誤消費者的調用不需要終端當前分配的資源。如果沒有指定,它默認從當前存在的Environment中去尋找,並使用指定給它的errorJournalConsumer。
異步Dispatche提供的第二個獨特的特征是運行使用尾部遞歸策略來再次調度。尾部遞歸的應用場景是分發器發現Dispatcher的classLoader已經分配到正在運行的線程,這時,當當前消費者返回時將要執行的task放入到隊列中。
使用一個類似於 Groovy Spock test的異步的多線程分發器:
import reactor.core.dispatch.* //... given: def sameThread = new SynchronousDispatcher() def diffThread = new ThreadPoolExecutorDispatcher(1, 128) def currentThread = Thread.currentThread() Thread taskThread = null def consumer = { ev -> taskThread = Thread.currentThread() } def errorConsumer = { error -> error.printStackTrace() } when: "a task is submitted" sameThread.dispatch('test', consumer, errorConsumer) then: "the task thread should be the current thread" currentThread == taskThread when: "a task is submitted to the thread pool dispatcher" def latch = new CountDownLatch(1) diffThread.dispatch('test', { ev -> consumer(ev); latch.countDown() }, errorConsumer) latch.await(5, TimeUnit.SECONDS) // Wait for task to execute then: "the task thread should be different when the current thread" taskThread != currentThread
注意:
如Java Executor一樣,它們缺少了我們將加入到Reactor 2.x的一個特點:Reactive stream協議。這時在Reactor中僅有幾個未完成事項中的一個未完成事項--沒有將Reactive stream標准直接綁定到Reactor中。然后,你可以在Stream章節部分找到快速結合Reactor stream的方法。
表3 Dispatcher家族介紹
Dispatcher | From Environment | Description | Strengths | Weaknesses |
---|---|---|---|---|
RingBuffer |
sharedDispatcher() |
An LMAX DisruptorRingBuffer based Dispatcher. |
Small latency peaks tolerated Fastest Async Dispatcher, 10-15M+ dispatch/sec on commodity hardware Support ordering |
'Spin' Loop when getting the next slot on full capcity Single Threaded, no concurrent dispatch |
Mpsc |
sharedDispatcher() if Unsafe not available |
Alternative optimized message-passing structure. |
Latency peaks tolerated 5-10M+ dispatch/sec on commodity hardware Support ordering |
Unbounded and possibly using as much available heap memory as possible Single Threaded, no concurrent dispatch |
WorkQueue |
workDispatcher() |
An LMAX DisruptorRingBuffer based Dispatcher. |
Latency Peak tolerated for a limited time Fastest Multi-Threaded Dispatcher, 5-10M+ dispatch/sec on commodity hardware |
'Spin' Loop when getting the next slot on full capcity Concurrent dispatch Doesn’t support ordering |
Synchronous |
dispatcher("sync") or SynchronousDispatcher. INSTANCE |
Runs on the current thread. |
Upstream and Consumer executions are colocated Useful for Test support Support ordering if the reentrant dispatch is on the current thread |
No Tail Recursion support Blocking |
TailRecurse |
tailRecurse() or TailRecurse Dispatcher. INSTANCE |
Synchronous Reentrant Dispatcher that enqueue dispatches when currently dispatching. |
Upstream and Consumer executions are colocated Reduce execution stack, greatly expanded by functional call chains |
Unbounded Tail Recurse depth Blocking Support ordering (Thread Stealing) |
ThreadPoolExecutor |
newDispatcher(int, int, DispatcherType. THREAD_POOL_EXECUTOR) |
Use underlying ThreadPoolExecutor message-passing |
Multi-Threaded Blocking Consumers, permanent latency tolerated 1-5M+ dispatch/sec on commodity hardware |
Concurrent run on a given consumer executed twice or more Unbounded by default Doesn’t support ordering |
Traceable Delegating |
N/A |
Decorate an existing dispatcher with TRACE level logs. |
Dispatch tapping Runs slower than the delegated dispatcher alone |
Log overhead (runtime, disk) |
DispatcherSupplier
你可能已經注意到了,一些Dispatcher事單線程的,特別是RingBufferDispatcher和MpsDispatcher。更進一步,根據Reactive Stream規范,Subscriber/Processor的實現是不允許並發通知的。這一點尤其對Reactor Streams產生了影響,使用Stream.dispachOn(Dispatcher)和一個Dispatcher來給並發信號的顯示失敗留后門。
然后,有一個方法來避免這個缺點,使用Dispatcher池DispatcherSupplier。實際上,作為Supplier的工廠,Supplier.get()方法根據有趣的共享策略:輪詢、最少使用。。等間接提供一個Dispatcher。
Enviroment提供了一個靜態方法去創建、並注冊到當前活躍Environment的Dispatcher池:一組輪詢的返回Dispatcher。一旦就緒,Supplier提供對Dispatcher數目的控制。
不同於一般的Dispatcher,Environment提供了一站式的管理服務:
Environment.initialize(); //.... //Create an anonymous pool of 2 dispatchers with automatic default settings (same type than default dispatcher, default backlog size...) DispatcherSupplier supplier = Environment.newCachedDispatchers(2); Dispatcher d1 = supplier.get(); Dispatcher d2 = supplier.get(); Dispatcher d3 = supplier.get(); Dispatcher d4 = supplier.get(); Assert.isTrue( d1 == d3 && d2 == d4); supplier.shutdown(); //Create and register a new pool of 3 dispatchers DispatcherSupplier supplier1 = Environment.newCachedDispatchers(3, "myPool"); DispatcherSupplier supplier2 = Environment.cachedDispatchers("myPool"); Assert.isTrue( supplier1 == supplier2 ); supplier1.shutdown();
Timer定時器
Dispatcher盡可能快的計算接收的任務,然而,Timer定時器提供一次性或者周期性的調度API。Reactor Core模塊默認提供了一個HashWheelTimer定時器,它自動綁定到任意的新的Environment中。HashWheelTimer對處理大量的、並發的、內存調度任務有巨大的優勢,它是替換java TaskScheduler的一個強大的選項。
注意:它不是一個持久化的調度器,應用關閉時task將會丟失。下個正式版本Timer定時器將會有一些改變,例如使用redis增加持久化/共享,請關注。
創建一個簡單的定時器:
import reactor.fn.timer.Timer //... given: "a new timer" Environment.initializeIfEmpty() Timer timer = Environment.timer() def latch = new CountDownLatch(10) when: "a task is submitted" timer.schedule( { Long now -> latch.countDown() } as Consumer<Long>, period, TimeUnit.MILLISECONDS ) then: "the latch was counted down" latch.await(1, TimeUnit.SECONDS) timer.cancel() Environment.terminate()
核心Processor
核心Processor用來做比Dispatcher更集中的job:支持背壓計算異步task。
提供了org.reactivestreams.Processor接口的直接實現,因此可以很好的和別的Reactive Stream廠商一起工作。
記住:Processor即是Subscriber也是Publisher,因此你可以在想要的地方(source,processing,sink)將一個Processor插入到Reactive stream chain中。
注意:規范不推薦直接使用Processor.onNext(d)。
RingBuffer Processors
基於RingBuffer的Reactive Stream Processor的優點如下:
高吞吐量
重啟時不會丟掉沒有消費的數據,且從最近的沒有消費的數據開始執行
若沒有Subscriber監聽,數據不會丟失(不想Reactor-stream的Broadcaster會丟掉數據)
若在消息處理過程中取消Subscriber,信號將會安全的重新執行,實際上它能在RingBufferProcessor上很好的工作。
靈活的背壓,它允許任意時間內有限數量的背壓,Subscriber會消費掉並且請求更多的數據。
傳播的背壓,因為它是一個Processor,它可以通過訂閱方式傳遞消息。
多線程的出/入Processor。
事實上,RingBuffer*Process類似於典型的MicroMessageBroker!
它們的唯一缺點是它們在運行時創建它們會消耗大量的資源,原因是它們不像它們的兄弟RingBufferDispatcher可以很容易的共享,這種特性使它們更適應於高吞吐量的預定義數據管道。
RingBufferProcessor
Reactor的RingBufferProcessor組件本質上是Disruptor的RingBuffer,設計的目的是盡可能的和原生的效率一樣。使用場景是:你需要分發task到另外一個線程,且該線程具有低耗、高吞吐量還在你的工作流中管理背壓。
我使用RingBufferProcessor來計算遠程異步調用的各種輸出:AMQP, SSD存儲和內存存儲,Process完全處理掉易變的延遲,每秒百萬級別的消息的數據源從來沒有阻塞過。
— 友好的Reactor使用者
RingBufferProcessor的使用場景
圖7 在跟定時間T內,一個ringbufferprocessor,2個消費同一個sequence的Subscriber。
你可以使用靜態工具方法去創建一個ringbufferprocessor:
Processor<Integer, Integer> p = RingBufferProcessor.create("test", 32); //1 Stream<Integer> s = Streams.wrap(p); //2 s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //3 s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //4 s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //5 input.subscribe(p); //5
1.創建一個Processor,讓它具有32個slot的內部RingBuffer。
2. 從Reactive Streams Processor創建一個Reactor。
3. 每個請求調用consume方法在自己的線程內創建一個Disruptor的EventProcessor。
4. 每個請求調用consume方法在自己的線程內創建一個Disruptor的EventProcessor。
5. 每個請求調用consume方法在自己的線程內創建一個Disruptor的EventProcessor。
6. 向一個Reactive Streams Publisher訂閱這個Processor。
傳遞到Processor的Subscribe.onNext(Buffer)方法的每個數據元素將廣播給所有的消費者。這個Processor沒有使用輪詢分發,因為它在RingBufferWorkProcess中,RingBufferWorkProcess下面將要討論。若傳遞1、2、3三個整數到Processor,可以看到控制台輸出結果如下:
Thread[test-2,5,main] data=1 Thread[test-1,5,main] data=1 Thread[test-3,5,main] data=1 Thread[test-1,5,main] data=2 Thread[test-2,5,main] data=2 Thread[test-1,5,main] data=3 Thread[test-3,5,main] data=2 Thread[test-2,5,main] data=3 Thread[test-3,5,main] data=3
每個線程接收到傳給Process的所有數據,每個線程順序獲得數據,因為內部使用RingBuffer管理
slot來發布數據。
RingBufferWorkProcessor
不像標准的RingBufferProcessor只廣播它的值給所有的消費者,RingBufferWorkProcessor基於消費者的多少來分發請求值。Processor接收信息,然后輪詢發送到不同的線程中(因為每個消費者有自己獨立的線程),然而使用內部RingBuffer來有效管理消息的發布。
我們構造了一個可擴展的、多種htp微服務器請求負載均衡的RingBufferWorkProcessor.說它看起來快過光速可能是我錯了,另外gc的壓力完全可控。
— 使用RingBufferWorkProcessor的Reactor友好者
使用RingBufferWorkProcessor非常簡單,你只要改變上面示例代碼的引用到靜態的create方法創建。使用RingBufferWorkProcessor如下,其它的代碼時一樣的。
Processor<Integer, Integer> p = RingBufferWorkProcessor.create("test", 32);
創建一個具有32個slot的內部RingBuffer的Processor。
現在,發布消息到Processor時,將不會廣播給每一個consumer,會根據消費者的數目分發給不同的消費者。運行示例,結果如下:
Thread[test-2,5,main] data=3 Thread[test-3,5,main] data=2 Thread[test-1,5,main] data=1
注意,RingBufferWorkProcessor會重復終端的信號、檢測正在停止工作的Subscriber的取消異常,最終會被別的Subscriber執行一次。我們保證適合事件至少發送一次。若你理解這個語義,你可能會立即說“等等,RingBufferWorkProcessor怎么作為一個消息代理工作啦?” 答案是肯定的。
Codecs和Buffer
字節碼操作對大量數據管道配置的應用是一個核心關注點。reactor-net廣泛使用字節碼操作來對接收的字節碼進行編組和分組或者通過IO發送。
reactor.io.buffer.Buffer是java byteBuffer處理的一個裝飾器,增加了一些列的操作。目的是通過使用ByteBuffer的limit和讀取/覆蓋預先分配的字節來減少字節的復制。追蹤ByteBuffer的位置是開發人員口頭的問題,Buffer簡化了這些,我們只需要關注這個簡單的工具就可以了。
下面是一個簡單的Buffer操作示例:
import reactor.io.buffer.Buffer //... given: "an empty Buffer and a full Buffer" def buff = new Buffer() def fullBuff = Buffer.wrap("Hello World!") when: "a Buffer is appended" buff.append(fullBuff) then: "the Buffer was added" buff.position() == 12 buff.flip().asString() == "Hello World!"
Buffer的一個有用的應用是Buffer.View,多個操作例如split都會返回Buffer.View。它提供了一個無需拷貝的方式去掃描和檢索ByteBuffer的字節碼。Buffer.View同樣也是一種Buffer。
使用一個分隔符和Buffer.view使塊數據讀取可以復用同樣的字節碼
byte delimiter = (byte) ';'; byte innerDelimiter = (byte) ','; Buffer buffer = Buffer.wrap("a;b-1,b-2;c;d"); List<Buffer.View> views = buffer.split(delimiter); int viewCount = views.size(); Assert.isTrue(viewCount == 4); for (Buffer.View view : views) { System.out.println(view.asString()); //prints "a" then "b-1,b-2", then "c" and finally "d" if(view.indexOf(innerDelimiter) != -1){ for(Buffer.View innerView : view.split(innerDelimiter)){ System.out.println(innerView.asString()); //prints "b-1" and "b-2" } } }
使用Buffer應用到普通的分組和編組對開發者來說可能顯得不夠高級,Reactor提供了一系列名稱為Codec的預定義的轉換器。一些Codec需要在classpath路徑下添加一些額外的依賴,如json操作的Jackson依賴。
codec以兩種方式工作:第一,繼承Function去直接編碼並返回編碼好的數據,通常以Buffer的形式返回。這非常棒,但僅限於與無狀態的Codec才能起效,另外一個可選的方法是使用Codec.encoder來返回編碼函數。
Codec.encoder()對比Codec.apply(Source) Codec.encoder() 返回一個唯一的編碼函數,這個編碼函數不能被不同線程共享。 Codec.apply(Source) 直接編碼(並保存分配的編碼器), 但Codec本身可以在線程間共享。
對大部分實現了Buffer的codec來說,Codec同樣也可以根據source類型去解碼數據。
解碼數據源,需要使用Codec.decoder()獲取解碼函數。和編碼不同的是,沒有為編碼目的而重寫的快捷方法。和編碼相同的是,解碼函數不能在線程間共享。
有兩種形式的Code.decoder()函數,Codec.decoder()是一個阻塞的解碼函數,它直接從傳遞源數據解碼返回解碼后的數據。Codec.decoder(Consumer)用作非阻塞的解碼,它返回null,一旦解碼只觸發的Consumer,它可以和其它異步工具結合使用。
使用一個預定義的codec示例如下:
import reactor.io.json.JsonCodec //... given: 'A JSON codec' def codec = new JsonCodec<Map<String, Object>, Object>(Map); def latch = new CountDownLatch(1) when: 'The decoder is passed some JSON' Map<String, Object> decoded; def callbackDecoder = codec.decoder{ decoded = it latch.countDown() } def blockingDecoder = codec.decoder() //yes this is real simple async strategy, but that's not the point here :) Thread.start{ callbackDecoder.apply(Buffer.wrap("{\"a\": \"alpha\"}")) } def decodedMap = blockingDecoder.apply(Buffer.wrap("{\"a\": \"beta\"}") then: 'The decoded maps have the expected entries' latch.await() decoded.size() == 1 decoded['a'] == 'alpha' decodedMap['a'] == 'beta'
可用的核心Codec
名稱 | 描述 | 需要的依賴 |
---|---|---|
ByteArrayCodec |
Wrap/unwrap byte arrays from/to Buffer. |
N/A |
DelimitedCodec |
Split/Aggregate Buffer and delegate to the passed Codec for unit marshalling. |
N/A |
FrameCodec |
Split/Aggregate Buffer into |
N/A |
JavaSerializationCodec |
Deserialize/Serialize Buffers using Java Serialization. |
N/A |
PassThroughCodec |
Leave the Buffers untouched. |
N/A |
StringCodec |
Convert String to/from Buffer |
N/A |
LengthFieldCodec |
Find the length and decode/encode the appropriate number of bytes into/from Buffer |
N/A |
KryoCodec |
Convert Buffer into Java objects using Kryo with Buffers |
|
JsonCodec,JacksonJsonCodec |
Convert Buffer into Java objects using Jackson with Buffers |
|
SnappyCodec |
A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer |
|
GZipCodec |
A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer |
N/A |
參考文獻:
1. http://baike.baidu.com/link?url=kXnm3flViIx-4E7PxZtYVgb3xY5tlwovUqog2u_TgCCiN7FSFkxt7ze-Qio5j1FXPmIz2DGV2_lbOBoLeyXdaa
2. http://projectreactor.io/docs/reference/