ListenableFuture的說明
並發編程是一個難題,但是一個強大而簡單的抽象可以顯著的簡化並發的編寫。出於這樣的考慮,Guava 定義了 ListenableFuture接口並繼承了JDK concurrent包下的Future 接口,ListenableFuture 允許你注冊回調方法(callbacks),在運算(多線程執行)完成的時候進行調用, 或者在運算(多線程執行)完成后立即執行。這樣簡單的改進,使得可以明顯的支持更多的操作,這樣的功能在JDK concurrent中的Future是不支持的。 在高並發並且需要大量Future對象的情況下,推薦盡量使用ListenableFuture來代替..
ListenableFuture 中的基礎方法是addListener(Runnable, Executor), 該方法會在多線程運算完的時候,在Executor中執行指定的Runnable。
ListenableFuture的創建和使用
對應JDK中的 ExecutorService.submit(Callable) 提交多線程異步運算的方式,Guava 提供了ListeningExecutorService 接口, 該接口返回 ListenableFuture, 而相應的ExecutorService 返回普通的 Future。將 ExecutorService 轉為 ListeningExecutorService,可以使用MoreExecutors.listeningDecorator(ExecutorService)進行裝飾。舉例說明:
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
然后我們可以向這個ListeningExecutorService提交Callable任務
final ListenableFuture<String> future = pool.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(1000*3); return "Task done !"; } });
然后我們添加Listener:
future.addListener(new Runnable() { @Override public void run() { try { final String contents = future.get(); System.out.println(contents); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }, MoreExecutors.sameThreadExecutor());
我們看看上面的代碼,確實不怎么優雅,我們需要處理拋出的異常,需要自己通過future.get()獲得前面計算的值。有沒有更加簡便的方法呢?當然有,Guava提供了一個簡便方法來替代上面的寫法:
Futures.addCallback(future, new FutureCallback<String>() { @Override public void onSuccess(String result) { System.out.println(result); } @Override public void onFailure(Throwable t) { t.printStackTrace(); } });
完成代碼如下:
package concurrency;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
/**
* Created by hupeng on 2014/9/24.
*/
public class ListenableFutureTest {
public static void main(String[] args) throws InterruptedException {
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
final ListenableFuture<String> future = pool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000 * 2);
return "Task done !";
}
});
// future.addListener(new Runnable() {
// @Override
// public void run() {
// try {
// final String contents = future.get();
// System.out.println(contents);
// } catch (InterruptedException e) {
// e.printStackTrace();
// } catch (ExecutionException e) {
// e.printStackTrace();
// }
// }
// }, MoreExecutors.sameThreadExecutor());
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println(result);
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
});
Thread.sleep(5 * 1000); //wait for task done
pool.shutdown();
}
}
