一. ListenableFuture是用來增強Future的功能的。
我們知道Future表示一個異步計算任務,當任務完成時可以得到計算結果。如果我們希望一旦計算完成就拿到結果展示給用戶或者做另外的計算,就必須使用另一個線程不斷的查詢計算狀態。這樣做,不斷代碼復雜,而且效率低下。
ListenableFuture,顧名思義,就是可以監聽的Future。我們可以為ListenableFuture增加Listener監聽器,當任務完成時,直接執行某個線程,或者我們可以直接為ListenableFuture設置回調函數,當任務完成時,自動執行該回調方法。
Future模式的缺點
-
Future雖然可以實現獲取異步執行結果的需求,但是它沒有提供通知的機制,我們無法得知Future什么時候完成。
-
1.要么使用阻塞,在future.get()的地方等待future返回的結果,這時又變成同步操作。2.要么使用isDone()輪詢地判斷Future是否完成,這樣會耗費CPU的資源。
通過Future接口的get()方法可以獲取類Callable的返回值,但是此方法最大缺點是阻塞的(用戶態到核心態,開銷大),因此在並發環境下,效率比較低。Google公司提供的開源Guava庫提供了有效的處理異步Future的問題。
下面我們看看ListenableFuture的實際用法。
(1)怎樣得到ListenableFuture對象?
我們知道ExecutorService.submit(Callable) 會返回Future對象,那么ListeningExecutorService.submit(Callable)會返回ListenableFuture對象。
ListeningExecutorService可以通過MoreExecutors.listeningDecorator(ExecutorService)來得到。
(2)增加Listenable功能
方法一:直接添加監聽
ListenableFuture. addListener(Runnable listener, Executor executor)
可以為ListenableFuture增加一個監聽,當線程計算完成時,自動在executor中執行一個Runnable線程。
方法二:添加回調方法
Futures.addCallback(ListenableFuture, new FutureCallback<T>() {
public void onSuccess(T t) {
}
public void onFailure(Throwable thrown) {
}
});
Futures的靜態方法addCallback可以為ListenableFuture對象添加回調函數,回調里面可以定義計算成功時和失敗時分別的操作。onSuccess里面可以把計算結果做為參數傳入。onFailure里面可以把異常做為參數傳入。
那我們一般應該怎么選擇這兩種方式呢?建議直接用第二種,因為這種方式可以把計算結果做為參數傳入。其實,第二種的內部實現就是用的第一種方式,但是用起來會更加的簡潔。
https://blog.csdn.net/u010738033/article/details/79633252
以下參考: 官方文檔
處理並發是一個很困難的問題,但是我們可以通過使用功能強大的抽象來簡化這個工作。為了簡化這個問題,Guava 提供了 ListenableFuture,它繼承了 JDK 中的 Future
接口。
我們強烈建議:在你的代碼中,使用 ListenableFuture
來替代 Future
,因為
* 很多 Future
相關的方法需要它。
* 一開始就使用 ListenableFuture
會省事很多。
* 這樣工具方法提供者就不需要針對 Future
和 ListenableFuture
都提供方法。
接口
Future
代表了異步執行的結果:一個可能還沒有產生結果的執行過程。 Future
可以正在被執行,但是會保證返回一個結果。
ListenableFuture
可以使你注冊回調函數,使得在結果計算完成的時候可以回調你的函數。如果結果已經算好,那么將會立即回調。這個簡單的功能使得可以完成很多 Future
支持不了的操作。
ListenableFuture
添加的基本函數是 addListener(Runnable, Executor)
。通過這個函數,當 Future
中的結果執行完成時,傳入的 Runnable
會在傳入的 Executor
中執行。
添加回調函數
使用者偏向於使用 Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor)
, 或者當需要注冊輕量級的回調的時候,可以使用默認為 MoreExecutors.directExecutor()
的版本。
FutureCallback<V>
實現了兩個方法:
* onSuccess(V)
:當 future 執行成功時候的反應。
* onFailure(Throwable)
:當 future 執行失敗時候的反應。
創建
與 JDK 中 通過 ExecutorService.submit(Callable)
來初始化一個異步的任務相似,Guava 提供了一個 ListeningExecutorService
接口,這個接口可以返回一個 ListenableFuture
(ExecutorService
只是返回一個普通的 Future
)。如果需要將一個 ExecutorService
轉換為 ListeningExecutorService
,可以使用 MoreExecutors.listeningDecorator(ExecutorService)
。一個使用示例如下:
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() { public Explosion call() { return pushBigRedButton(); } }); Futures.addCallback(explosion, new FutureCallback<Explosion>() { // we want this handler to run immediately after we push the big red button! public void onSuccess(Explosion explosion) { walkAwayFrom(explosion); } public void onFailure(Throwable thrown) { battleArchNemesis(); // escaped the explosion! } });
如果你想從一個基於 FutureTask
的 API 轉換過來,Guava 提供了 ListenableFutureTask.create(Callable<V>)
和 ListenableFutureTask.create(Runnable, V)
。和 JDK 不一樣,ListenableFutureTask
並不意味着可以直接擴展。
如果你更喜歡可以設置 future 值的抽象,而不是實現一個方法來計算結果,那么可以考慮直接擴展 AbstractFuture<V>
或者 SettableFuture
。
如果你一定要將一個基於 Future
的 API 轉換為基於 ListenableFuture
的話,你不得不采用硬編碼的方式 JdkFutureAdapters.listenInPoolThread(Future)
來實現從 Future
到 ListenableFuture
的轉換。所以,盡可能地使用 ListenableFuture
。
應用
使用 ListenableFuture
一個最重要的原因就是:可以基於他實現負責的異步執行鏈。如下所示:
ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); AsyncFunction<RowKey, QueryResult> queryFunction = new AsyncFunction<RowKey, QueryResult>() { public ListenableFuture<QueryResult> apply(RowKey rowKey) { return dataService.read(rowKey); } }; ListenableFuture<QueryResult> queryFuture = Futures.transformAsync(rowKeyFuture, queryFunction, queryExecutor);
很多不能被 Future
支持的方法可以通過 ListenableFuture
被高效地支持。不同的操作可能被不同的執行器執行,而且一個 ListenableFuture
可以有多個響應操作。
當 ListenableFuture
有多個后續操作的時候,這樣的操作稱為:“扇出”。當它依賴多個輸入 future 同時完成時,稱作“扇入”。可以參考 Futures.allAsList
的實現。
方法 | 描述 | 參考 |
---|---|---|
transformAsync(ListenableFuture<A>, AsyncFunction<A, B>, Executor) |
返回新的 ListenableFuture ,它是給定 AsyncFunction 結合的結果 |
transformAsync(ListenableFuture<A>, AsyncFunction<A, B>) |
transform(ListenableFuture<A>, Function<A, B>, Executor) |
返回新的 ListenableFuture ,它是給定 Function 結合的結果 |
transform(ListenableFuture<A>, Function<A, B>) |
allAsList(Iterable<ListenableFuture<V>>) |
返回一個 ListenableFuture ,它的值是一個輸入 futures 的值的按序列表,任何一個 future 的失敗都會導致最后結果的失敗 |
allAsList(ListenableFuture<V>...) |
successfulAsList(Iterable<ListenableFuture<V>>) |
返回一個 ListenableFuture ,它的值是一個輸入 futures 的成功執行值的按序列表,對於取消或者失敗的任務,對應的值是 null |
successfulAsList(ListenableFuture<V>...) |
AsyncFunction<A, B>
提供了一個方法:ListenableFuture<B> apply(A input)
。可以被用來異步轉換一個值。
List<ListenableFuture<QueryResult>> queries;
// The queries go to all different data centers, but we want to wait until they're all done or failed. ListenableFuture<List<QueryResult>> successfulQueries = Futures.successfulAsList(queries); Futures.addCallback(successfulQueries, callbackOnSuccessfulQueries);
避免嵌套 Future
在使用通用接口返回 Future
的代碼中,很有可能會嵌套 Future
。例如:
executorService.submit(new Callable<ListenableFuture<Foo>() { @Override public ListenableFuture<Foo> call() { return otherExecutorService.submit(otherCallable); } });
上述代碼將會返回:ListenableFuture<ListenableFuture<Foo>>
。這樣的代碼是不正確的,因為外層 future 的取消操作不能傳遞到內層的 future。此外,一個常犯的錯誤是:使用 get()
或者 listener 來檢測其它 future 的失敗。為了避免這樣的情況,Guava 所有處理 future 的方法(以及一些來自 JDK 的代碼)具有安全解決嵌套的版本。
CheckedFuture
Guava 也提供 CheckedFuture<V, X extends Exception>
接口。
CheckedFuture
是這樣的一個 ListenableFuture
:具有多個可以拋出受保護異常的 get
方法。這使得創建一個執行邏輯可能拋出異常的 future 變得容易。使用 Futures.makeChecked(ListenableFuture<V>, Function<Exception, X>)
可以將 ListenableFuture
轉換為 CheckedFuture
。
ListenableFuture類
- jdk5之后有了Future這種異步執行的結構
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> future = executor.submit(new Callable<Integer>(){ public Integer call() throws Exception{ return service.getCount(); } }); //Retrieve the value of computation Integer count = future.get();
- ListenableFuture對Future進行了擴展,允許注冊一個回調函數,task執行完后自動調用。
- 獲取ListableFuture對象。
正如我們獲取Future對象要通過ExecutorService.submit(Callable)來獲取一樣,我們可以這樣創建ListenableFuture對象:
executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(NUM_THREADS)); //包裝Executors創建的線程池 ListenableFuture<String> listenableFuture = executorService.submit(new Callable<String>()...); //獲取ListableFuture對象 listenableFuture.addListener(new Runnable() { @Override public void run() { methodToRunOnFutureTaskCompletion(); } }, executorService); //注冊回調函數
FutureCallback類
- FutureCallback定義了onSuccess和onFailure方法,onSuccess方法會接收一個Future對象,這樣我們就可以獲取Future的結果。
- 首先需要一個FutureCallback實現類。
/** * 定義一個FutureCallBack實現類 */ public class FutureCallbackImpl implements FutureCallback<String> { private StringBuilder builder = new StringBuilder(); @Override public void onSuccess(String result) { builder.append(result).append(" successfully"); } @Override public void onFailure(Throwable t) { builder.append(t.toString()); } public String getCallbackResult() { return builder.toString(); } }
使用實例:
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
ListenableFuture<String> futureTask = executorService.submit(new Callable<String>() { //創建ListenaleFuture對象 @Override public String call() throws Exception { return "Task completed"; } }); FutureCallbackImpl callback = new FutureCallbackImpl(); Futures.addCallback(futureTask, callback); //添加回調 callback.getCallbackResult(); //獲取結果
如果CallBack是一個耗時操作,你應該選擇另一個注冊CallBack:
Futures.addCallback(futureTask,callback,executorService); //提供另一個線程池來執行性回調
SettableFuture類:
SettableFuture:不需要實現一個方法來計算返回值,而只需要返回一個固定值來做為返回值,可以通過程序設置此Future的返回值或者異常信息
SettableFuture可以用來設置要返回得值:
SettableFuture<String> sf = SettableFuture.create(); //Set a value to return sf.set("Success"); //Or set a failure Exception sf.setException(someException);
通過上面的例子,我們看到,通過create()方法,我們可以創建一個默認的ettableFuture實例,
當我們需要為Future實例設置一個返 回值時,我們可以通過set方法,設置的值就是Future實例在執行成功后將要返回的值;
另外,當我們想要設置一個異常導致Future執行失敗,我們 可以通過調用setException方法,我們將給Future實例設置指定的異常返回。
當我們有一個方法返回Future實例時,SettableFuture會顯得更有價值,但是已經有了Future的返回值,我們也不需要再去執行異步任 務獲取返回值。
https://my.oschina.net/indestiny/blog/219368