[編織消息框架][netty源碼分析]9 Promise 實現類DefaultPromise職責與實現


netty Future是基於jdk Future擴展,以監聽完成任務觸發執行
Promise是對Future修改任務數據
DefaultPromise是重要的模板類,其它不同類型實現基本是一層簡單的包裝,如DefaultChannelPromise
主要是分析await是如何等侍結果的

 

public interface Future<V> extends java.util.concurrent.Future<V> {
   Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
}
public interface Promise<V> extends Future<V> {
    Promise<V> setSuccess(V result);
    boolean trySuccess(V result);
    Promise<V> setFailure(Throwable cause);
    boolean tryFailure(Throwable cause);
    boolean setUncancellable();
}
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

    @Override
    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return await0(unit.toNanos(timeout), true);
    }
    private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
        //已完成任務直接忽略
        if (isDone()) {
            return true;
        }
        //沒有等侍時間返回處理記錄
        if (timeoutNanos <= 0) {
            return isDone();
        }
        //已中斷拋異常
        if (interruptable && Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        //checkDeadLock();
        //netty 認為是當前線程是死鎖狀態
        EventExecutor e = executor();
        if (e != null && e.inEventLoop()) {
            throw new BlockingOperationException(toString());
        }
        
        long startTime = System.nanoTime();
        long waitTime = timeoutNanos;
        boolean interrupted = false; 
        try {
            for (;;) {
                synchronized (this) {
                    if (isDone()) {
                        return true;
                    }
                    //最大檢查次數為 Short.MAX_VALUE
                    //很奇怪的邏輯,處理完后又自減
                    if (waiters == Short.MAX_VALUE) {
                        throw new IllegalStateException("too many waiters: " + this);
                    }
                    ++waiters;
                    try {
                        //阻塞的代碼只是一行參數1是milliseconds,參數2是輔助用的大於0時milliseconds+1,如果是0的話會無限制阻塞
                        wait(waitTime / 1000000, (int) (waitTime % 1000000));
                    } catch (InterruptedException e) {
                        if (interruptable) {
                            throw e;
                        } else {
                            interrupted = true;
                        }
                    } finally {
                        waiters--;
                    }
                }
                //這里是double check跟並發無影響的邏輯放在synchronized外面
                if (isDone()) {
                    return true;
                } else {
                    waitTime = timeoutNanos - (System.nanoTime() - startTime);
                    if (waitTime <= 0) {
                        return isDone();
                    }
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
}
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
    private final Channel channel;
    public DefaultChannelPromise(Channel channel) {
        this.channel = channel;
    }
    public DefaultChannelPromise(Channel channel, EventExecutor executor) {
        super(executor);
        this.channel = channel;
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM