Tips
做一個終身學習的人。
在本章中,主要介紹以下內容:
- 什么是流(stream)
- 響應式流(Reactive Streams)的倡議是什么,以及規范和Java API
- 響應式流在JDK 中的API以及如何使用它們
- 如何使用JDK 9中的響應式流的Java API來創建發布者,訂閱者和處理者
一. 什么是流
流是由生產者生產並由一個或多個消費者消費的元素(item)的序列。 這種生產者——消費者模型也被稱為source/sink模型或發布者——訂閱者(publisher-subscriber )模型。 在本章中,將其稱為發布者訂閱者模型。
有幾種流處理機制,其中pull模型和push模型是最常見的。 在push模型中,發布者將元素推送給訂閱者。 在pull模式中,訂閱者將元素推送給發布者。 發布者和訂閱者都以同樣的速率工作,這是一個理想的情況,這些模式非常有效。 我們會考慮一些情況,如果他們不按同樣的速率工作,這種情況下涉及的問題以及對應的解決辦法。
當發布者比訂閱者快的時候,后者必須有一個無邊界緩沖區來保存快速傳入的元素,或者它必須丟棄它無法處理的元素。 另一個解決方案是使用一種稱為背壓(backpressure )的策略,其中訂閱者告訴發布者減慢速率並保持元素,直到訂閱者准備好處理更多的元素。 使用背壓可確保更快的發布者不會壓制較慢的訂閱者。 使用背壓可能要求發布者擁有無限制的緩沖區,如果它要一直生成和保存元素。 發布者可以實現有界緩沖區來保存有限數量的元素,如果緩沖區已滿,可以選擇放棄它們。 可以使用另一策略,其中發布者將發布元素重新發送到訂閱者,這些元素發布時訂閱者不能接受。
訂閱者在請求發布者的元素並且元素不可用時,該做什么? 在同步請求中訂閱者戶必須等待,無限期地,直到有元素可用。 如果發布者同步地向訂閱者發送元素,並且訂閱者同步處理它們,則發布者必須阻塞直到數據處理完成。 解決方案是在兩端進行異步處理,訂閱者可以在從發布者請求元素之后繼續處理其他任務。 當更多的元素准備就緒時,發布者將它們異步發送給訂閱者。
二. 什么是響應式流(Reactive Streams)
響應式流從2013年開始,作為提供非阻塞背壓的異步流處理標准的倡議。 它旨在解決處理元素流的問題——如何將元素流從發布者傳遞到訂閱者,而不需要發布者阻塞,或訂閱者有無限制的緩沖區或丟棄。
響應式流模型非常簡單——訂閱者向發布者發送多個元素的異步請求。 發布者向訂閱者異步發送多個或稍少的元素。
Tips
響應式流在pull模型和push模型流處理機制之間動態切換。 當訂閱者較慢時,它使用pull模型,當訂閱者更快時使用push模型。
在2015年,出版了用於處理響應式流的規范和Java API。 有關響應式流的更多信息,請訪問http://www.reactive-streams.org/。 Java API 的響應式流只包含四個接口:
Publisher<T>
Subscriber<T>
Subscription
Processor<T,R>
發布者(publisher)是潛在無限數量的有序元素的生產者。 它根據收到的要求向當前訂閱者發布(或發送)元素。
訂閱者(subscriber)從發布者那里訂閱並接收元素。 發布者向訂閱者發送訂閱令牌(subscription token)。 使用訂閱令牌,訂閱者從發布者哪里請求多個元素。 當元素准備就緒時,發布者向訂閱者發送多個或更少的元素。 訂閱者可以請求更多的元素。 發布者可能有多個來自訂閱者的元素待處理請求。
訂閱(subscription)表示訂閱者訂閱的一個發布者的令牌。 當訂閱請求成功時,發布者將其傳遞給訂閱者。 訂閱者使用訂閱令牌與發布者進行交互,例如請求更多的元素或取消訂閱。
下圖顯示了發布者和訂閱者之間的典型交互順序。 訂閱令牌未顯示在圖表中。 該圖沒有顯示錯誤和取消事件。
處理者(processor)充當訂閱者和發布者的處理階段。 Processor
接口繼承了Publisher
和Subscriber
接口。 它用於轉換發布者——訂閱者管道中的元素。 Processor<T,R>
訂閱類型T的數據元素,接收並轉換為類型R的數據,並發布變換后的數據。 下圖顯示了處理者在發布者——訂閱和管道中作為轉換器的作用。 可以擁有多個處理者。
下面顯示了響應式流倡導所提供的Java API。所有方法的返回類型為void
。 這是因為這些方法表示異步請求或異步事件通知。
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
public interface Subscription {
public void request(long n);
public void cancel();
}
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
用於響應式流的Java API似乎很容易理解。 但是,實現起來並不簡單。 發布者和訂閱者之間的所有交互的異步性質以及處理背壓使得實現變得復雜。 作為應用程序開發人員,會發現實現這些接口很復雜。 類庫應該提供實現來支持廣泛的用例。 JDK 9提供了Publisher
接口的簡單實現,可以將其用於簡單的用例或擴展以滿足自己的需求。 RxJava是響應式流的Java實現之一。
三. JDK 9 中響應式流的API
JDK 9在java.util.concurrent包中提供了一個與響應式流兼容的API,它在java.base模塊中。 API由兩個類組成:
Flow
SubmissionPublisher<T>
Flow
類是final的。 它封裝了響應式流Java API和靜態方法。 由響應式流Java API指定的四個接口作為嵌套靜態接口包含在Flow
類中:
Flow.Processor<T,R>
Flow.Publisher<T>
Flow.Subscriber<T>
Flow.Subscription
這四個接口包含與上面代碼所示的相同的方法。 Flow
類包含defaultBufferSize()
靜態方法,它返回發布者和訂閱者使用的緩沖區的默認大小。 目前,它返回256。
SubmissionPublisher<T>
類是Flow.Publisher<T>
接口的實現類。 該類實現了AutoCloseable
接口,因此可以使用try-with-resources塊來管理其實例。 JDK 9不提供Flow.Subscriber<T>
接口的實現類; 需要自己實現。 但是,SubmissionPublisher<T>
類包含可用於處理此發布者發布的所有元素的consume(Consumer<? super T> consumer)
方法。
1. 發布者——訂閱者交互
在開始使用JDK API之前,了解使用響應式流的典型發布者——訂閱者會話中發生的事件順序很重要。 包括在每個事件中使用的方法。 發布者可以擁有零個或多個訂閱者。 這里只使用一個訂閱者。
- 創建發布者和訂閱者,它們分別是
Flow.Publisher
和Flow.Subscriber
接口的實例。 - 訂閱者通過調用發布者的
subscribe()
方法來嘗試訂閱發布者。 如果訂閱成功,發布者用Flow.Subscription
異步調用訂閱者的onSubscribe()
方法。 如果嘗試訂閱失敗,則使用調用訂閱者的onError()
方法,並拋出IllegalStateException
異常,並且發布者——訂閱者交互結束。 - 訂閱者通過調用
Subscription
的request(N)
方法向發布者發送多個元素的請求。 訂閱者可以向發布者發送更多元素的多個請求,而不必等待其先前請求是否完成。 - 訂閱者在所有先前的請求中調用訂閱者的
onNext(T item)
方法,直到訂閱者戶請求的元素數量上限——在每次調用中向訂閱者發送一個元素。 如果發布者沒有更多的元素要發送給訂閱者,則發布者調用訂閱者的onComplete()
方法來發信號通知流,從而結束發布者——訂閱者交互。 如果訂閱者請求Long.MAX_VALUE
元素,則它實際上是無限制的請求,並且流實際上是推送流。 - 如果發布者隨時遇到錯誤,它會調用訂閱者的
onError()
方法。 - 訂閱者可以通過調用其
Flow.Subscription
的cancel()
方法來取消訂閱。 一旦訂閱被取消,發布者——訂閱者交互結束。 然而,如果在請求取消之前存在未決請求,訂閱者可以在取消訂閱之后接收元素。
總結上述結束條件的步驟,一旦在訂閱者上調用了onComplete()
或onError()
方法,訂閱者就不再收到發布者的通知。
在發布者的subscribe()
方法被調用之后,如果訂閱者不取消其訂閱,則保證以下訂閱方法調用序列:
onSubscribe onNext* (onError | onComplete)?
這里,符號*
和?
在正則表達式中被用作關鍵字,一個*
表示零個或多個出現, ?
意為零或一次。
在訂閱者上的第一個方法調用是onSubscribe()
方法,它是成功訂閱發布者的通知。訂閱者的onNext()
方法可以被調用零次或多次,每次調用指示元素發布。onComplete()
和onError()
方法可以被調用為零或一次來指示終止狀態; 只要訂閱者不取消其訂閱,就會調用這些方法。
2. 創建發布者
創建發布者取決於Flow.Publisher<T>
接口的實現類。該類包含以下構造函數:
SubmissionPublisher()
SubmissionPublisher(Executor executor, int maxBufferCapacity)
SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)
SubmissionPublisher
使用提供的Executor
向其訂閱者提供元素。 如果使用多個線程來生成要發布的元素並且可以估計訂閱者數量,則可以使用具有固定線程池的newFixedThreadPool(int nThread)
,這可以使用Executors
類的newFixedThreadPool(int nThread)
靜態方法獲取。 否則,使用默認的Executor
,它使用ForkJoinPool
類的commonPool()
方法獲取。
SubmissionPublisher
類為每個訂閱者使用一個獨立的緩沖區。 緩沖區大小由構造函數中的maxBufferCapacity
參數指定。 默認緩沖區大小是Flow
類的defaultBufferSize()
靜態方法返回的值,該值為256。如果發布的元素數超過了訂戶的緩沖區大小,則額外的元素將被刪除。 可以使用SubmissionPublisher
類的getMaxBufferCapacity()
方法獲取每個訂閱者的當前緩沖區大小。
當訂閱者的方法拋出異常時,其訂閱被取消。 當訂閱者的onNext()
方法拋出異常時,在其訂閱被取消之前調用構造函數中指定的處理程序。 默認情況下,處理程序為null。
以下代碼片段會創建一個SubmissionPublishe
r,它發布所有屬性設置為默認值的Long
類型的元素:
// Create a publisher that can publish Long values
SubmissionPublisher<Long> pub = new SubmissionPublisher<>();
SubmissionPublisher
類實現了AutoCloseable
接口。 調用其close()
方法調用其當前訂閱者上的onComplete()
方法。 調用close()
方法后嘗試發布元素會拋出IllegalStateException
異常。
3. 發布元素
SubmissionPublisher<T>
類包含以下發布元素的方法:
int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
int offer(T item, BiPredicate<Flow.Subscriber <? super T>,? super T> onDrop)
int submit(T item)
submit()
方法阻塞,直到當前訂閱者的資源可用於發布元素。 考慮每個訂閱者的緩沖區容量為10的情況。 訂閱者訂閱了發布者並且不請求任何元素。 發布者發布了10個元素並全部緩沖所有元素。 嘗試使用submit()
方法發布另一個元素將阻塞,因為訂閱者的緩沖區已滿。
offer()
方法是非阻塞的。 該方法的第一個版本允許指定超時,之后刪除該項。 可以指定一個刪除處理器,它是一個BiPredicate
。 在刪除訂閱者的元素之前調用刪除處理器的test()
方法。 如果test()
方法返回true,則再次重試該項。 如果test()
方法返回false,則在不重試的情況下刪除該項。 從offer()
方法返回的負整數表示向訂閱者發送元素失敗的嘗試次數;正整數表示在所有當前訂閱者中提交但尚未消費的最大元素數量的估計。
應該使用哪種方法發布一個元素:submit()
或offer()
? 這取決於你的要求。 如果每個已發布的元素必須發給所有訂閱者,則submit()
方法是最好選擇。 如果要等待發布一段特定時間的元素進行重試,則可以使用offer()
方法。
4. 舉個例子
來看一個使用SubmissionPublisher
作為發布者的例子。 SubmissionPublisher
可以使用其submit(T item)
方法發布元素。 以下代碼片段生成並發布五個整數(1,2,3,4和5),假設pub
是對SubmissionPublisher
對象的引用:
// Generate and publish 10 integers
LongStream.range(1L, 6L)
.forEach(pub::submit);
需要訂閱者才能使用發布者發布的元素。 SubmissionPublisher
類包含一個consume(Consumer<? super T> consumer)
方法,它允許添加一個希望處理所有已發布元素的訂閱者,並且對任何其他通知(如錯誤和完成通知)不感興趣。 該方法返回一個CompletedFuture<Void>
,當發布者調用訂閱者的onComplete()
方法時,表示完成。 以下代碼片段將一個Consumer
作為訂閱者添加到發布者中:
// Add a subscriber that prints the published items
CompletableFuture<Void> subTask = pub.consume(System.out::println);
本章中的代碼是com.jdojo.stream的模塊的一部分,其聲明如下所示。
// module-info.java
module com.jdojo.stream {
exports com.jdojo.stream;
}
下面包含了NumberPrinter
類的代碼,它顯示了如何使用SubmissionPublisher
類來發布整數。 示例代碼的詳細說明遵循NumberPrinter
類的輸出。
// NumberPrinter.java
package com.jdojo.stream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.SubmissionPublisher;
import java.util.stream.LongStream;
public class NumberPrinter {
public static void main(String[] args) {
CompletableFuture<Void> subTask = null;
// The publisher is closed when the try block exits
try (SubmissionPublisher<Long> pub = new SubmissionPublisher<>()) {
// Print the buffer size used for each subscriber
System.out.println("Subscriber Buffer Size: " + pub.getMaxBufferCapacity());
// Add a subscriber to the publisher. The subscriber prints the published elements
subTask = pub.consume(System.out::println);
// Generate and publish five integers
LongStream.range(1L, 6L)
.forEach(pub::submit);
}
if (subTask != null) {
try {
// Wait until the subscriber is complete
subTask.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
}
輸出結果為:
Subscriber Buffer Size: 256
1
2
3
4
5
main()
方法聲明一個subTask
的變量來保存訂閱者任務的引用。 subTask.get()
方法將阻塞,直到訂閱者完成。
CompletableFuture<Void> subTask = null;
發布類型為Long
的元素發布者是在資源塊中創建的。 發布者是SubmissionPublisher<Long>
類的實例。 當try塊退出時,發布者將自動關閉。
try (SubmissionPublisher<Long> pub = new SubmissionPublisher<>()) {
//...
}
該程序打印將訂閱發布者的每個訂閱者的緩沖區大小。
// Print the buffer size used for each subscriber
System.out.println("Subscriber Buffer Size: " + pub.getMaxBufferCapacity());
訂閱者將使用consume()
方法添加到發布者。 請注意,該方法允許指定一個Consumer
,它在內部轉換為Subscribe
r。 每個發布的元素會通知訂閱者。 訂閱者只需打印它接收的元素。
// Add a subscriber to the publisher. The subscriber prints the published elements
subTask = pub.consume(System.out::println);
現在是發布整數的時候了。 該程序生成五個整數,1到5,並使用發布者的submit()
方法發布它們。
// Generate and publish five integers
LongStream.range(1L, 6L)
.forEach(pub::submit);
已發布的整數以異步方式發送給訂閱者。 當try塊退出時,發布者關閉。 要保持程序運行,直到訂閱者完成處理所有已發布的元素,必須調用subTask.get()
。 如果不調用此方法,則可能不會在輸出中看到五個整數。
4. 創建訂閱者
要有訂閱者,需要創建一個實現Flow.Subscriber<T>
接口的類。 實現接口方法的方式取決於具體的需求。 在本節中,將創建一個SimpleSubscriber
類,該類實現Flow.Subscriber<Long>
接口。 下面包含此類的代碼。
// SimpleSubscriber.java
package com.jdojo.stream;
import java.util.concurrent.Flow;
public class SimpleSubscriber implements Flow.Subscriber<Long> {
private Flow.Subscription subscription;
// Subscriber name
private String name = "Unknown";
// Maximum number of items to be processed by this subscriber
private final long maxCount;
// keep track of number of items processed
private long counter;
public SimpleSubscriber(String name, long maxCount) {
this.name = name;
this.maxCount = maxCount <= 0 ? 1 : maxCount;
}
public String getName() {
return name;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.printf("%s subscribed with max count %d.%n", name, maxCount);
// Request all items in one go
subscription.request(maxCount);
}
@Override
public void onNext(Long item) {
counter++;
System.out.printf("%s received %d.%n", name, item);
if (counter >= maxCount) {
System.out.printf("Cancelling %s. Processed item count: %d.%n", name, counter);
// Cancel the subscription
subscription.cancel();
}
}
@Override
public void onError(Throwable t) {
System.out.printf("An error occurred in %s: %s.%n", name, t.getMessage());
}
@Override
public void onComplete() {
System.out.printf("%s is complete.%n", name);
}
}
SimpleSubscriber
類的實例表示一個訂閱者,它有一個名稱和要處理的最大數量的items (maxCount)
方法。 需要將其名稱和maxCount
傳遞給其構造函數。 如果maxCount
小於1,則在構造函數中設置為1。
在onSubscribe()
方法中,它保存發布者在名為subscription
的實例變量中傳遞的Flow.Subscription
。 它打印有關Flow.Subscription
的消息,並請求一次可以處理的所有元素。 該訂閱者有效地使用push模型,因為在該請求之后,不再向發布者發送更多的元素的請求。 發布着將推送maxCount
或更少的元素數量給該訂閱者。
在onNext()
方法中,它將counter
實例變量遞增1。counter
實例變量跟蹤訂閱者接收到的元素數量。 該方法打印詳細說明接收到的元素消息。 如果它已經收到可以處理的最后一個元素,它將取消訂閱。 取消訂閱后,發布者不再收到任何元素。
在onError()
和onComplete()
方法中,它打印一個有關其狀態的消息。
以下代碼段創建一個SimpleSubscriber
,其名稱為S1
,可以處理最多10個元素。
SimpleSubscriber sub1 = new SimpleSubscriber("S1", 10);
現在看一下具體使用SimpleSubscriber
的例子。 下包含一個完整的程序。 它定期發布元素。 發布一個元素后,它等待1到3秒鍾。 等待的持續時間是隨機的。 以下詳細說明本程序的輸出。 該程序使用異步處理可能導致不同輸出結果。
// PeriodicPublisher.java
package com.jdojo.stream;
import java.util.Random;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
public class PeriodicPublisher {
final static int MAX_SLEEP_DURATION = 3;
// Used to generate sleep time
final static Random sleepTimeGenerator = new Random();
public static void main(String[] args) {
SubmissionPublisher<Long> pub = new SubmissionPublisher<>();
// Create three subscribers
SimpleSubscriber sub1 = new SimpleSubscriber("S1", 2);
SimpleSubscriber sub2 = new SimpleSubscriber("S2", 5);
SimpleSubscriber sub3 = new SimpleSubscriber("S3", 6);
SimpleSubscriber sub4 = new SimpleSubscriber("S4", 10);
// Subscriber to the publisher
pub.subscribe(sub1);
pub.subscribe(sub2);
pub.subscribe(sub3);
// Subscribe the 4th subscriber after 2 seconds
subscribe(pub, sub4, 2);
// Start publishing items
Thread pubThread = publish(pub, 5);
try {
// Wait until the publisher is finished
pubThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static Thread publish(SubmissionPublisher<Long> pub, long count) {
Thread t = new Thread(() -> {
for (long i = 1; i <= count; i++) {
pub.submit(i);
sleep(i);
}
// Close the publisher
pub.close();
});
// Start the thread
t.start();
return t;
}
private static void sleep(Long item) {
// Wait for 1 to 3 seconds
int sleepTime = sleepTimeGenerator.nextInt(MAX_SLEEP_DURATION) + 1;
try {
System.out.printf("Published %d. Sleeping for %d sec.%n", item, sleepTime);
TimeUnit.SECONDS.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void subscribe(SubmissionPublisher<Long> pub, Subscriber<Long> sub,
long delaySeconds) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(delaySeconds);
pub.subscribe(sub);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
輸出結果為:
S2 subscribed with max count 5.
Published 1. Sleeping for 1 sec.
S3 subscribed with max count 6.
S1 subscribed with max count 2.
S1 received 1.
S3 received 1.
S2 received 1.
Published 2. Sleeping for 1 sec.
S1 received 2.
S2 received 2.
S3 received 2.
Cancelling S1. Processed item count: 2.
S4 subscribed with max count 10.
Published 3. Sleeping for 1 sec.
S4 received 3.
S3 received 3.
S2 received 3.
Published 4. Sleeping for 2 sec.
S4 received 4.
S3 received 4.
S2 received 4.
Published 5. Sleeping for 2 sec.
S2 received 5.
Cancelling S2. Processed item count: 5.
S4 received 5.
S3 received 5.
S3 is complete.
S4 is complete.
PeriodicPublisher
類使用兩個靜態變量。 MAX_SLEEP_DURATION
靜態變量保存發布這等待發布下一個元素最大秒數。 它設置為3。sleepTimeGenerator
靜態變量Random
對象的引用,該對象在sleep()
方法中用於生成下一個等待的隨機持續時間。
PeriodicPublisher
類的main()
方法執行以下操作:
- 它創建作為
SubmissionPublisher<Long>
類的實例的發布者。 - 它創建了四個為
S1
,S2
,S3
和S4
的訂閱者。每個訂閱者能夠處理不同數量的元素。 - 三個訂閱者立即訂閱。
S4
的訂閱者在兩秒鍾的最短延遲之后以單獨的線程訂閱。PeriodicPublisher
類的subscribe()
方法負責處理此延遲訂閱。注意到在兩個元素(1和2)已經發布之后S4
訂閱的輸出中,它將不會收到這兩個元素。- 它調用
publish()
方法,它啟動一個新的線程來發布五個元素,它啟動線程並返回線程引用。 main()
方法調用發布元素線程的join()
方法,所以在所有元素發布之前程序不會終止。publish()
方法負責發布五個元素。最后關閉發布者。它調用sleep()
方法,使當前線程休眠一個和MAX_SLEEP_DURATION
秒之間的隨機選擇的持續時間。- 在輸出中注意到,一些訂閱者取消了訂閱,因為他們從發布商那里收到指定數量的元素。
請注意,該程序保證所有元素將在終止之前發布,但不保證所有訂閱者都將收到這些元素。 在輸出中,會看到訂閱者收到所有已發布的元素。 這是因為發布者在發布最后一個元素后等待至少一秒鍾,這給了訂閱者足夠的時間,在這個小程序中接收和處理最后一個元素。
該程序沒有表現出背壓(backpressure) ,因為所有訂閱者都通過一次性請求元素來使用push模型。 可以將SimpleSubscriber
類修改為分配任務,以查看背壓的效果:
- 在
onSubscribe()
方法中使用subscription.request(1)
方法請求一個元素。 - 在
onNext()
方法中,延遲后請求更多的元素。 延遲應使訂閱者的工作速度較慢,發布者發布元素的速度較慢。 - 需要發布超過256個元素,這是每個發布者向訂閱者使用的默認緩沖區,或者使用
SubmissionPublisher
類的另一個構造函數使用較小的緩沖區大小。 這將迫使發布者發布比訂閱者可以處理的更多的元素。 - 訂閱者使用刪除處理程序( drop handler)訂閱,以便可以看到發布者何時發現背壓。
- 使用
SubmissionPublisher
類的offer()
方法發布元素,因此當訂閱者無法處理更多元素時,發布者不會無限期地等待。
5. 使用處理者
處理者(Processor)同時是訂閱者也是發布者。 要使用處理者,需要一個實現Flow.Processor<T,R>
接口的類,其中T是訂閱元素類型,R是已發布的元素類型。 在本節中,創建了一個基於Predicate<T>
過濾元素的簡單處理者。 處理者訂閱發布六個整數——1,2,3,4,5和6的發布者。訂閱者訂閱處理者。 處理者從其發布者接收元素,如果它們通過了Predicate<T>
指定的標准,則重新發布相同的元素。 下面包含其實例作為處理者的FilterProcessor<T>
類的代碼。
// FilterProcessor.java
package com.jdojo.stream;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Predicate;
public class FilterProcessor<T> extends SubmissionPublisher<T> implements Processor<T,T>{
private Predicate<? super T> filter;
public FilterProcessor(Predicate<? super T> filter) {
this.filter = filter;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
// Request an unbounded number of items
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(T item) {
// If the item passes the filter publish it. Otherwise, no action is needed.
System.out.println("Filter received: " + item);
if (filter.test(item)) {
this.submit(item);
}
}
@Override
public void onError(Throwable t) {
// Pass the onError message to all subscribers asynchronously
this.getExecutor().execute(() -> this.getSubscribers()
.forEach(s -> s.onError(t)));
}
@Override
public void onComplete() {
System.out.println("Filter is complete.");
// Close this publisher, so all its subscribers will receive a onComplete message
this.close();
}
}
FilterProcessor<T>
類繼承自SubmissionPublisher<T>
類,並實現了Flow.Processor<T,T>
接口。 處理者必須是發布者以及訂閱者。 從SubmissionPublisher<T>
類繼承了這個類,所以不必編寫代碼來使其成為發布者。 該類實現了Processor<T,T>
接口的所有方法,因此它將接收和發布相同類型的元素。
構造函數接受Predicate<? super T>
參數並將其保存在實例變量filter
中,將在onNext()
方法中使用filter
元素。
onNext()
方法應用filter
。 如果filter
返回true,則會將該元素重新發布到其訂閱者。 該類從其超類SubmissionPublisher
繼承了用於重新發布元素的submit()
方法。
onError()
方法異步地將錯誤重新發布給其訂閱者。 它使用SubmissionPublisher
類的getExecutor()
和getSubscribers()
方法,該方法返回Executor
和當前訂閱者的列表。 Executor
用於異步地向當前訂閱者發布消息。
onComplete()
方法關閉處理者的發布者部分,它將向所有訂閱者發送一個onComplete
消息。
讓我們看看這個處理者具體的例子。 下面包含ProcessorTest
類的代碼。 可能會得到一個不同的輸出,因為這個程序涉及到幾個異步步驟。 該程序的詳細說明遵循程序的輸出。
// ProcessorTest.java
package com.jdojo.stream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
public class ProcessorTest {
public static void main(String[] args) {
CompletableFuture<Void> subTask = null;
// The publisher is closed when the try block exits
try (SubmissionPublisher<Long> pub = new SubmissionPublisher<>()) {
// Create a Subscriber
SimpleSubscriber sub = new SimpleSubscriber("S1", 10);
// Create a processor
FilterProcessor<Long> filter = new FilterProcessor<>(n -> n % 2 == 0);
// Subscribe the filter to the publisher and a subscriber to the filter
pub.subscribe(filter);
filter.subscribe(sub);
// Generate and publish 6 integers
LongStream.range(1L, 7L)
.forEach(pub::submit);
}
try {
// Sleep for two seconds to let subscribers finish handling all items
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
輸出的結果為:
S1 subscribed with max count 10.
Filter received: 1
Filter received: 2
Filter received: 3
S1 received 2.
Filter received: 4
S1 received 4.
Filter received: 5
Filter received: 6
Filter is complete.
S1 received 6.
S1 is complete.
ProcessorTest
類的main()
方法創建一個發布者,它將發布六個整數——1,2,3,4,5和6。該方法做了很多事情:
- 它創建一個使用try-with-resources塊的發布者,所以當try塊退出時它將自動關閉。
- 它創建一個
SimpleSubscriber
類的實例的訂閱者。訂閱者名為S1
,最多可處理10個元素。 - 它創建一個處理者,它是
FilterProcessor<Long>
類的實例。傳遞一個Predicate<Long>
,讓處理者重新發布整數並丟棄奇數。 - 處理者被訂閱發布者,並且簡單訂閱者被訂閱到處理者。這完成了發布者到訂閱者的管道——發布者到處理者到訂閱者。
- 在第一個try塊的末尾,代碼生成從1到6的整數,並使用發布者發布它們。
- 在
main()
方法結束時,程序等待兩秒鍾,以確保處理者和訂閱者有機會處理其事件。如果刪除此邏輯,程序可能無法打印任何內容。必須包含這個邏輯,因為所有事件都是異步處理的。當第一個try塊退出時,發布者將完成向處理者發送所有通知。然而,處理者和訂閱者需要一些時間來接收和處理這些通知。
四. 總結
流是生產者生產並由一個或多個消費者消費的元素序列。 這種生產者——消費者模型也被稱為source/sink模型或發行者——訂閱者模型。
有幾種流處理機制,pull模型和push模型是最常見的。 在push模型中,發布者將數據流推送到訂閱者。 在pull模型中,定於這從發布者拉出數據。 當兩端不以相同的速率工作的時,這些模型有問題。 解決方案是提供適應發布者和訂閱者速率的流。 使用稱為背壓的策略,其中訂閱者通知發布者它可以處理多少個元素,並且發布者僅向訂閱者發送那些需要處理的元素。
響應式流從2013年開始,作為提供非阻塞背壓的異步流處理標准的舉措。 它旨在解決處理元素流的問題 ——如何將元素流從發布者傳遞到訂閱者,而不需要發布者阻塞,或者訂閱者有無限制的緩沖區或丟棄。 響應式流模型在pull模型和push模型流處理機制之間動態切換。 當訂閱者處理較慢時,它使用pull模型,當訂閱者處理更快時使用push模型。
在2015年,出版了一個用於處理響應式流的規范和Java API。 Java API 中的響應式流由四個接口組成:Publisher<T>
,Subscriber<T>
,Subscription
和Processor<T,R>
。
發布者根據收到的要求向訂閱者發布元素。 用戶訂閱發布者接收元素。 發布者向訂閱者發送訂閱令牌。 使用訂閱令牌,訂閱者從發布者請求多個數據元素。 當數據元素准備就緒時,發布者向訂閱者發送多個個或稍少的數據元素。 訂閱者可以請求更多的數據元素。
JDK 9在java.util.concurrent包中提供了與響應式流兼容的API,它在java.base模塊中。 API由兩個類組成:Flow
和SubmissionPublisher<T>
。
Flow
類封裝了響應式流Java API。 由響應式流Java API指定的四個接口作為嵌套靜態接口包含在Flow
類中:Flow.Processor<T,R>
,Flow.Publisher<T>
,Flow.Subscriber<T>
和Flow.Subscription
。