異步痛點
1.回調地獄(CallBack hell) ; 解決方式 Promise 或 Future
2.執行異步后的結果如何回調currentThread ; 解決方式 Context 設計
3.如何處理依賴多異步的result進行邏輯 ; 解決方案 CompositeFuture
JDK的lib庫 Callable和Future 問題
Callable任務可以返回結果,返回的結果可以由Future對象取出,但是調用Future.get()會阻塞當前線程
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } /** * 等待完成,或者中斷、超時 * * @param timed 定義了超時則為true * @param nanos 等待時間 */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; //輪詢查看FutureTask的狀態 for (;;) { if (Thread.interrupted()) {//線程中斷 removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) {//完成或取消狀態 if (q != null) q.thread = null; return s; } else if (s == COMPLETING)//正在set result,Thread “運行狀態”進入到“就緒狀態” Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
vertx的Future設計
1. Future通過事件注冊(EventHandler)回調方式,來解決Thread阻塞問題.
UML圖:
Futrue instances 采用FutureFactory生產,基於 SPI 機制:
public class FutureFactoryImpl implements FutureFactory { private static final SucceededFuture EMPTY = new SucceededFuture<>(null); /** * 創建FutureImpl 實例 */ public <T> Future<T> future() { return new FutureImpl<>(); } /** * 返回一個常量 SucceededFuture 實例 * result本身為null, 可以減少內存開銷 */ public <T> Future<T> succeededFuture() { @SuppressWarnings("unchecked") Future<T> fut = EMPTY; return fut; } /** * 創建一個 SucceededFuture 實例 */ public <T> Future<T> succeededFuture(T result) { return new SucceededFuture<>(result); } /** * 創建一個 FailedFuture 實例 */ public <T> Future<T> failedFuture(Throwable t) { return new FailedFuture<>(t); } /** * 創建一個 FailedFuture 實例 */ public <T> Future<T> failureFuture(String failureMessage) { return new FailedFuture<>(failureMessage); } }
多異步的result如何組合(並行變串行)
使用的CompositeFuture,來處理多異步結果組合問題:采用計數器(Counters)的方法來解決 wait 問題

public class CompositeFutureImpl implements CompositeFuture, Handler<AsyncResult<CompositeFuture>> { private final Future[] results;//定義數組 private int count;//計數器 private boolean completed;//是否完成 private Throwable cause;//錯誤原因 private Handler<AsyncResult<CompositeFuture>> handler;// 回調 eventHandler public static CompositeFuture all(Future<?>... results) { CompositeFutureImpl composite = new CompositeFutureImpl(results);//創建實例 int len = results.length; //獲取 futures數組長度 for (int i = 0; i < len; i++) { results[i].setHandler(ar -> { Handler<AsyncResult<CompositeFuture>> handler = null; if (ar.succeeded()) { synchronized (composite) { //添加內存屏障,防止並發問題 composite.count++; if (!composite.isComplete() && composite.count == len) {//所有future成功 handler = composite.setCompleted(null); } } } else { synchronized (composite) {//添加內存屏障,防止並發問題 if (!composite.isComplete()) {//任何一個失敗就失敗 handler = composite.setCompleted(ar.cause()); } } } if (handler != null) {//執行回調EventHandler handler.handle(composite); } }); } if (len == 0) {//判斷臨界點 composite.setCompleted(null); } return composite; } } public static CompositeFuture any(Future<?>... results) { CompositeFutureImpl composite = new CompositeFutureImpl(results); int len = results.length; for (int i = 0;i < len;i++) { results[i].setHandler(ar -> { Handler<AsyncResult<CompositeFuture>> handler = null; if (ar.succeeded()) { synchronized (composite) { if (!composite.isComplete()) {//任何一個成功 handler = composite.setCompleted(null); } } } else { synchronized (composite) { composite.count++; if (!composite.isComplete() && composite.count == len) {//所有future失敗 handler = composite.setCompleted(ar.cause()); } } } if (handler != null) {//執行回調EventHandler handler.handle(composite); } }); } if (results.length == 0) {//判斷臨界點 composite.setCompleted(null); } return composite; } private static CompositeFuture join(Function<CompositeFuture, Throwable> pred, Future<?>... results) { CompositeFutureImpl composite = new CompositeFutureImpl(results); int len = results.length; for (int i = 0; i < len; i++) { results[i].setHandler(ar -> { Handler<AsyncResult<CompositeFuture>> handler = null; synchronized (composite) { composite.count++; if (!composite.isComplete() && composite.count == len) {//處理所有不管失敗還是成功 // Take decision here Throwable failure = pred.apply(composite); handler = composite.setCompleted(failure); } } if (handler != null) { handler.handle(composite); } }); } if (len == 0) {//{//判斷臨界點 composite.setCompleted(null); } return composite; } /** * 根據下標返回結果 * / public <T> T resultAt(int index) { return this.<T>future(index).result(); }
public interface CompositeFuture extends Future<CompositeFuture> { /** * 返回list Future.result() */ default <T> List<T> list() { int size = size(); ArrayList<T> list = new ArrayList<>(size); for (int index = 0;index < size;index++) { list.add(resultAt(index)); } return list; } }
