參考: google guava中文教程 https://wizardforcel.gitbooks.io/guava-tutorial/content/16.html
https://blog.csdn.net/u010900754/article/details/90742576
並發編程是一個難題,但是一個強大而簡單的抽象可以顯著的簡化並發的編寫。出於這樣的考慮,Guava 定義了 ListenableFuture接口並繼承了JDK concurrent包下的Future 接口,ListenableFuture 允許你注冊回調方法(callbacks),在運算(多線程執行)完成的時候進行調用, 或者在運算(多線程執行)完成后立即執行。這樣簡單的改進,使得可以明顯的支持更多的操作,這樣的功能在JDK concurrent中的Future是不支持的。 在高並發並且需要大量Future對象的情況下,推薦盡量使用ListenableFuture來代替
使用異步編程接口獲取返回值的方式有兩種:
1.同步方式,也就是調用方主動獲取,但是這時可能還沒有返回結果,可能需要輪詢;
2.回調方式,調用者在提交任務時,注冊一個回調函數,任務執行完以后,自動觸發回調函數通知調用者;這種實現方式需要在執行框架里植入一個擴展點,用於觸發回調。
Java原生api里的Future屬於第一種,Java8提供的CompletableFuture屬於第二種;在Java8出來之前,guava也提供了基於回調的編程接口,也就是本次要說的ListenableFuture(其實看guava代碼,里面有大量這玩意兒,不搞懂不行。。。)。
futureTask
簡單demo:
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<Integer> call = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("waiting~~~~~~");
Thread.sleep(3000);
return 10086;
}
};
FutureTask<Integer> futureTask = new FutureTask(call);
Thread thread = new Thread(futureTask);
thread.start();
//do something
System.out.println("other things");
Integer o = futureTask.get();
System.out.println(o);
}
}
Future
FutureTask實現Future接口,它定義了5個方法:
簡單說明一下接口定義
boolean cancel(boolean mayInterruptInRunning); //取消一個任務,並返回取消結果。參數表示是否中斷線程。
boolean isCancelled(); //判斷任務是否被取消
Boolean isDone(); //判斷當前任務是否執行完畢,包括正常執行完畢、執行異常或者任務取消。
V get() ;//獲取任務執行結果,任務結束之前會阻塞。
V get(long timeout, TimeUnit unit); //在指定時間內嘗試獲取執行結果。若超時則拋出超時異常
FutureTask
- volatile int state:表示對象狀態,volatile關鍵字保證了內存可見性。futureTask中定義了7種狀態,代表了7種不同的執行狀態
private static final int NEW = 0; //任務新建和執行中
private static final int COMPLETING = 1; //任務將要執行完畢
private static final int NORMAL = 2; //任務正常執行結束
private static final int EXCEPTIONAL = 3; //任務異常
private static final int CANCELLED = 4; //任務取消
private static final int INTERRUPTING = 5; //任務線程即將被中斷
private static final int INTERRUPTED = 6; //任務線程已中斷
Callable callable:被提交的任務
Object outcome:任務執行結果或者任務異常
volatile Thread runner:執行任務的線程
volatile WaitNode waiters:等待節點,關聯等待線程
long stateOffset:state字段的內存偏移量
long runnerOffset:runner字段的內存偏移量
long waitersOffset:waiters字段的內存偏移量
ListenableFuture
public class ListenableFutureTest {
public static void main(String[] args) {
}
public static void testListenFuture() throws InterruptedException {
ListenableFuture<String> submit = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1))
.submit(() -> {
Thread.sleep(2000L);
return "aync result";
});
Futures.addCallback(submit, new FutureCallback<String>() {
@Override
public void onSuccess(@Nullable String result) {
System.out.println("succeed, result: {}" + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println("failed, t: {}" + t);
}
}, Executors.newSingleThreadExecutor());
Thread.sleep(100000);
}
}
MoreExecutors.listeningDecorator就是包裝了一下ThreadPoolExecutor,目的是為了使用
private static class ListeningDecorator extends AbstractListeningExecutorService {
private final ExecutorService delegate;
ListeningDecorator(ExecutorService delegate) {
this.delegate = (ExecutorService)Preconditions.checkNotNull(delegate);
}
public final void execute(Runnable command) {
this.delegate.execute(command);
}
}
ListenableFuture 中的基礎方法是addListener(Runnable, Executor), 該方法會在多線程運算完的時候,在Executor中執行指定的Runnable。
這里的delegate就是ThreadPoolExecutor,雖然還重寫了execute,不過還是直接調用ThreadPoolExecutor里面的execute ListeningExecutorService pool=MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
這樣一個執行器就被new出來了,現在需要往里面放任務了
ListenableFuture future = pool.submit(task1);看看submit代碼
public <T> ListenableFuture<T> submit(Callable<T> task) {
return (ListenableFuture)super.submit(task);
}
然后調用父類的submit
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
這里會調用重寫的newTaskFor
protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return TrustedListenableFutureTask.create(callable);
}
class TrustedListenableFutureTask<V> extends TrustedFuture<V> implements RunnableFuture<V> {
private volatile InterruptibleTask<?> task;
static <V> TrustedListenableFutureTask<V> create(Callable<V> callable) {
return new TrustedListenableFutureTask(callable);
}
TrustedListenableFutureTask(Callable<V> callable) {
this.task = new TrustedListenableFutureTask.TrustedFutureInterruptibleTask(callable);
}
public void run() {
InterruptibleTask localTask = this.task;
if (localTask != null) {
localTask.run();
}
this.task = null;
}
}
創建了一個TrustedListenableFutureTask,里面有個task是TrustedFutureInterruptibleAsyncTask, 這里重寫了Runnable的run方法,調用的就是這個task得run方法(也就是我們真正的任務)
private final class TrustedFutureInterruptibleAsyncTask extends InterruptibleTask<ListenableFuture<V>> {
private final AsyncCallable<V> callable;
TrustedFutureInterruptibleAsyncTask(AsyncCallable<V> callable) {
this.callable = (AsyncCallable)Preconditions.checkNotNull(callable);
}
}