vertx的Future設計


異步痛點

1.回調地獄(CallBack hell) ; 解決方式 Promise 或 Future

2.執行異步后的結果如何回調currentThread ; 解決方式 Context 設計

3.如何處理依賴多異步的result進行邏輯 ; 解決方案 CompositeFuture

JDK的lib庫 Callable和Future 問題

Callable任務可以返回結果,返回的結果可以由Future對象取出,但是調用Future.get()會阻塞當前線程

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}


public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

 /**
 * 等待完成,或者中斷、超時
 *
 * @param timed 定義了超時則為true
 * @param nanos 等待時間
 */
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    //輪詢查看FutureTask的狀態
    for (;;) {
        if (Thread.interrupted()) {//線程中斷
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {//完成或取消狀態
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING)//正在set result,Thread “運行狀態”進入到“就緒狀態”
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

 

vertx的Future設計

1. Future通過事件注冊(EventHandler)回調方式,來解決Thread阻塞問題.

UML圖:

image    

Futrue instances 采用FutureFactory生產,基於 SPI 機制:

public class FutureFactoryImpl implements FutureFactory {

  private static final SucceededFuture EMPTY = new SucceededFuture<>(null);

  /**
    * 創建FutureImpl 實例
    */
  public <T> Future<T> future() {
    return new FutureImpl<>();
  }

  /**
    * 返回一個常量 SucceededFuture 實例
    * result本身為null, 可以減少內存開銷
    */
  public <T> Future<T> succeededFuture() {
    @SuppressWarnings("unchecked")
    Future<T> fut = EMPTY;
    return fut;
  }

  /**
    * 創建一個 SucceededFuture 實例
    */
  public <T> Future<T> succeededFuture(T result) {
    return new SucceededFuture<>(result);
  }

  /**
    * 創建一個 FailedFuture 實例
    */
  public <T> Future<T> failedFuture(Throwable t) {
    return new FailedFuture<>(t);
  }

  /**
    * 創建一個 FailedFuture 實例
    */
  public <T> Future<T> failureFuture(String failureMessage) {
    return new FailedFuture<>(failureMessage);
  }
}

 

多異步的result如何組合(並行變串行)

使用的CompositeFuture,來處理多異步結果組合問題:采用計數器(Counters)的方法來解決 wait 問題

image

public class CompositeFutureImpl implements CompositeFuture, Handler<AsyncResult<CompositeFuture>> {

  private final Future[] results;//定義數組
  
  private int count;//計數器
  
  private boolean completed;//是否完成
  
  private Throwable cause;//錯誤原因
  
  private Handler<AsyncResult<CompositeFuture>> handler;// 回調 eventHandler
  
  public static CompositeFuture all(Future<?>... results) {
    CompositeFutureImpl composite = new CompositeFutureImpl(results);//創建實例
    int len = results.length; //獲取 futures數組長度
    for (int i = 0; i < len; i++) {
      results[i].setHandler(ar -> {
        Handler<AsyncResult<CompositeFuture>> handler = null;
        if (ar.succeeded()) {
          synchronized (composite) { //添加內存屏障,防止並發問題
            composite.count++; 
            if (!composite.isComplete() && composite.count == len) {//所有future成功
              handler = composite.setCompleted(null);
            }
          }
        } else {
          synchronized (composite) {//添加內存屏障,防止並發問題
            if (!composite.isComplete()) {//任何一個失敗就失敗
              handler = composite.setCompleted(ar.cause());
            }
          }
        }
        if (handler != null) {//執行回調EventHandler
          handler.handle(composite);
        }
      });
    }
    if (len == 0) {//判斷臨界點
      composite.setCompleted(null);
    }
    return composite;
    }
}

public static CompositeFuture any(Future<?>... results) {
    CompositeFutureImpl composite = new CompositeFutureImpl(results);
    int len = results.length;
    for (int i = 0;i < len;i++) {
      results[i].setHandler(ar -> {
        Handler<AsyncResult<CompositeFuture>> handler = null;
        if (ar.succeeded()) {
          synchronized (composite) {
            if (!composite.isComplete()) {//任何一個成功
              handler = composite.setCompleted(null);
            }
          }
        } else {
          synchronized (composite) {
            composite.count++;
            if (!composite.isComplete() && composite.count == len) {//所有future失敗
              handler = composite.setCompleted(ar.cause());
            }
          }
        }
        if (handler != null) {//執行回調EventHandler
          handler.handle(composite);
        }
      });
    }
    if (results.length == 0) {//判斷臨界點
      composite.setCompleted(null);
    }
    return composite;
}

private  static CompositeFuture join(Function<CompositeFuture, Throwable> pred, Future<?>... results) {
    CompositeFutureImpl composite = new CompositeFutureImpl(results);
    int len = results.length;
    for (int i = 0; i < len; i++) {
      results[i].setHandler(ar -> {
        Handler<AsyncResult<CompositeFuture>> handler = null;
        synchronized (composite) {
          composite.count++;
          if (!composite.isComplete() && composite.count == len) {//處理所有不管失敗還是成功
            // Take decision here
            Throwable failure = pred.apply(composite);
            handler = composite.setCompleted(failure);
          }
        }
        if (handler != null) {
          handler.handle(composite);
        }
      });
    }
    if (len == 0) {//{//判斷臨界點
      composite.setCompleted(null);
    }
    return composite;
}

/**
  * 根據下標返回結果
  * /
public <T> T resultAt(int index) {
    return this.<T>future(index).result();
}
 

 

public interface CompositeFuture extends Future<CompositeFuture> {
    /**
      * 返回list Future.result()
      */
    default <T> List<T> list() {
        int size = size();
        ArrayList<T> list = new ArrayList<>(size);
        for (int index = 0;index < size;index++) {
          list.add(resultAt(index));
        }
        return list;
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM