AsyncHttpClient的連接池結構很簡單, NettyConnectionsPool內部重要的幾個變量如下
// 連接池, 通過 host 區分不同的池 private final ConcurrentHashMap<String, ConcurrentLinkedQueue<IdleChannel>> connectionsPool = new ConcurrentHashMap<String, ConcurrentLinkedQueue<IdleChannel>>(); // 原生channel跟IdleChannel對象的映射, IdleChannel主要是包含一些請求信息, 請求url以及請求開始時間 private final ConcurrentHashMap<Channel, IdleChannel> channel2IdleChannel = new ConcurrentHashMap<Channel, IdleChannel>(); // 記錄了Channel的創建時間, 用於做Channel生命周期檢測, 如果生命周期是-1, 此Map無用 private final ConcurrentHashMap<Channel, Long> channel2CreationDate = new ConcurrentHashMap<Channel, Long>();
主要邏輯都位於NettyAsyncHttpProvider下
1. 取出連接池連接(doConnection階段)
先從連接池取出連接, 取出連接后會將連接從connectionsPool的數量會減少
synchronized (idleConnectionForHost) { idleChannel = idleConnectionForHost.poll(); if (idleChannel != null) { channel2IdleChannel.remove(idleChannel.channel); } }
如果連接存在, 取出來以后直接就會返回future. 否則進入下列流程
2. 對池內連接的控制 (doConnect階段)
在doConnect的時候會判斷connectionsPool是否可cache, 如下
public boolean canCacheConnection() { if (!isClosed.get() && maxTotalConnections != -1 && channel2IdleChannel.size() >= maxTotalConnections) { return false; } else { return true; } }
其中channel2IdleChannel在連接池poll的時候會remove channel, 也就是說判斷的連接數是在池內的channel數
加入返回false, 則會調用asyncHandler的onThrowable()方法, 並拋出 "Too many connections " 異常
// Do not throw an exception when we need an extra connection for a redirect. if (!reclaimCache && !connectionsPool.canCacheConnection()) { IOException ex = new IOException(String.format("Too many connections %s", config.getMaxTotalConnections())); try { asyncHandler.onThrowable(ex); } catch (Throwable t) { log.warn("!connectionsPool.canCacheConnection()", t); } throw ex; }
provider對這一步的判斷在 3) 的判斷之前
3. 對池外連接的控制 (doConnect階段)
池外連接使用
private Semaphore freeConnections = null;
進行控制, 他的值為 MaxTotalConnections, 這個值和連接池的是一樣的, 邏輯如下
if (trackConnections) { if (!reclaimCache) { if (!freeConnections.tryAcquire()) { IOException ex = new IOException(String.format("Too many connections %s", config.getMaxTotalConnections())); try { asyncHandler.onThrowable(ex); } catch (Throwable t) { log.warn("!connectionsPool.canCacheConnection()", t); } throw ex; } else { acquiredConnection = true; } } }
默認調用的
public <T> ListenableFuture<T> execute(Request request, AsyncHandler<T> handler) throws IOException;
方法, reclaimCache 都為 false
4. 向連接池添加連接邏輯 (Protocol handle()階段)
在provider的HttpProtocol類里會調finishUpdate()方法, 這里會執行向連接池添加連接的操作, 調用offer方法
private void finishUpdate(final NettyResponseFuture<?> future, final ChannelHandlerContext ctx, boolean lastValidChunk) throws IOException { if (lastValidChunk && future.getKeepAlive()) { drainChannel(ctx, future); } else { if (future.getKeepAlive() && ctx.getChannel().isReadable() && connectionsPool.offer(getPoolKey(future), ctx.getChannel())) { markAsDone(future, ctx); return; } finishChannel(ctx); } markAsDone(future, ctx); }
連接池的offer方法沒有對maxTotalConnections的判斷, 只對maxConnectionPerHost做判斷