AsyncHttpClient的連接池使用邏輯


AsyncHttpClient的連接池結構很簡單, NettyConnectionsPool內部重要的幾個變量如下

    // 連接池, 通過 host 區分不同的池
    private final ConcurrentHashMap<String, ConcurrentLinkedQueue<IdleChannel>> connectionsPool = new ConcurrentHashMap<String, ConcurrentLinkedQueue<IdleChannel>>();
    // 原生channel跟IdleChannel對象的映射, IdleChannel主要是包含一些請求信息, 請求url以及請求開始時間
    private final ConcurrentHashMap<Channel, IdleChannel> channel2IdleChannel = new ConcurrentHashMap<Channel, IdleChannel>();
    // 記錄了Channel的創建時間, 用於做Channel生命周期檢測, 如果生命周期是-1, 此Map無用
    private final ConcurrentHashMap<Channel, Long> channel2CreationDate = new ConcurrentHashMap<Channel, Long>();

 

主要邏輯都位於NettyAsyncHttpProvider下

1. 取出連接池連接(doConnection階段)
先從連接池取出連接, 取出連接后會將連接從connectionsPool的數量會減少

synchronized (idleConnectionForHost) {
  idleChannel = idleConnectionForHost.poll();
  if (idleChannel != null) {
    channel2IdleChannel.remove(idleChannel.channel);
  }
}

如果連接存在, 取出來以后直接就會返回future. 否則進入下列流程

 

2. 對池內連接的控制 (doConnect階段)
在doConnect的時候會判斷connectionsPool是否可cache, 如下

    public boolean canCacheConnection() {
        if (!isClosed.get() && maxTotalConnections != -1 && channel2IdleChannel.size() >= maxTotalConnections) {
            return false;
        } else {
            return true;
        }
    }

其中channel2IdleChannel在連接池poll的時候會remove channel, 也就是說判斷的連接數是在池內的channel數
加入返回false, 則會調用asyncHandler的onThrowable()方法, 並拋出 "Too many connections " 異常

// Do not throw an exception when we need an extra connection for a redirect.
    if (!reclaimCache && !connectionsPool.canCacheConnection()) {
        IOException ex = new IOException(String.format("Too many connections %s", config.getMaxTotalConnections()));
        try {
            asyncHandler.onThrowable(ex);
        } catch (Throwable t) {
            log.warn("!connectionsPool.canCacheConnection()", t);
        }
        throw ex;
    }

provider對這一步的判斷在 3) 的判斷之前

 

3. 對池外連接的控制 (doConnect階段)
池外連接使用
private Semaphore freeConnections = null;
進行控制, 他的值為 MaxTotalConnections, 這個值和連接池的是一樣的, 邏輯如下

    if (trackConnections) {
        if (!reclaimCache) {
            if (!freeConnections.tryAcquire()) {
                IOException ex = new IOException(String.format("Too many connections %s", config.getMaxTotalConnections()));
                try {
                    asyncHandler.onThrowable(ex);
                } catch (Throwable t) {
                    log.warn("!connectionsPool.canCacheConnection()", t);
                }
                throw ex;
            } else {
                acquiredConnection = true;
            }
        }
    }

默認調用的
public <T> ListenableFuture<T> execute(Request request, AsyncHandler<T> handler) throws IOException;
方法, reclaimCache 都為 false

 

4. 向連接池添加連接邏輯 (Protocol handle()階段)
在provider的HttpProtocol類里會調finishUpdate()方法, 這里會執行向連接池添加連接的操作, 調用offer方法

    private void finishUpdate(final NettyResponseFuture<?> future, final ChannelHandlerContext ctx, boolean lastValidChunk) throws IOException {
        if (lastValidChunk && future.getKeepAlive()) {
            drainChannel(ctx, future);
        } else {
            if (future.getKeepAlive() && ctx.getChannel().isReadable() && connectionsPool.offer(getPoolKey(future), ctx.getChannel())) {
                markAsDone(future, ctx);
                return;
            }
            finishChannel(ctx);
        }
        markAsDone(future, ctx);
    }

連接池的offer方法沒有對maxTotalConnections的判斷, 只對maxConnectionPerHost做判斷


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM