代碼示例
public static void main(String[] args) throws Exception { ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(); PoolingNHttpClientConnectionManager cm = new PoolingNHttpClientConnectionManager(ioReactor); cm.setMaxTotal(100); CloseableHttpAsyncClient httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(cm).build(); httpAsyncClient.start(); String[] urisToGet = { "http://www.chinaso.com/", "http://www.so.com/", "http://www.qq.com/", }; final CountDownLatch latch = new CountDownLatch(urisToGet.length); for (final String uri: urisToGet) { final HttpGet httpget = new HttpGet(uri); httpAsyncClient.execute(httpget, new FutureCallback<HttpResponse>() { public void completed(final HttpResponse response) { latch.countDown(); System.out.println(httpget.getRequestLine() + "->" + response.getStatusLine()); } public void failed(final Exception ex) { latch.countDown(); System.out.println(httpget.getRequestLine() + "->" + ex); } public void cancelled() { latch.countDown(); System.out.println(httpget.getRequestLine() + " cancelled"); } }); } latch.await(); }
流程分析
簡要流程總結如下:
HttpAsyncClient有一個AbstractMultiworkerIOReactor和AbstractIOReactor, 前者和后者類似於netty的bossGroup和workerGroup, AbstractMultiworkerIOReactor負責channel的連接, AbstractIOReactor負責channel的讀寫
先要明白連接池的結構
AbstractNIOConnPool 下各個變量的含義
// 一個可復用的ioreactor, 負責生成SessionRequest並喚醒selector去做連接到目標網站的操作 private final ConnectingIOReactor ioreactor; // 用來構造連接池的entry的工廠 private final NIOConnFactory<T, C> connFactory; // 驗證並生成目標連接socketAddress的類 private final SocketAddressResolver<T> addressResolver; // 一個可復用的callBack類, 里面提供了一個調用SessionRequest的complete的方法 private final SessionRequestCallback sessionRequestCallback; // 用域名區分的連接池 private final Map<T, RouteSpecificPool<T, C, E>> routeToPool; // 沒有成功拿到連接的請求列表 private final LinkedList<LeaseRequest<T, C, E>> leasingRequests; // 已經拿到連接權利, 但是還沒連接成功的連接集合 private final Set<SessionRequest> pending; // 已經連接成功, 並被租借出去的連接集合 private final Set<E> leased; // 當前連接池可用的連接集合 private final LinkedList<E> available; // 已經連接完成, 但是不可用的連接集合, 例如因為異常連接失敗等待, 他們會在隊列中等待被調用回調方法做后續處理 private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests; // 每個route的最大連接數 private final Map<T, Integer> maxPerRoute; // 鎖對象 private final Lock lock; // 是否關閉 private final AtomicBoolean isShutDown; // 每個route最大連接數默認值 private volatile int defaultMaxPerRoute; // 整個連接池最大連接數 private volatile int maxTotal;
1. 發起請求
a. 根據請求route查看連接池, 如果連接池不為空, 直接返回跟池中connection綁定的future, 並把該conn放入leased列表
b. 如果因為某些原因導致當前請求無法取得連接, 但是沒有發生致命錯誤的, 請求將被放入一個 leasing 列表, 這個列表會在后續動作中被取出來做連接重試
c. 如果實在連接過程中出現了移除等不可恢復的錯誤, 則將request標記為completed, 退出方法后調用fireCallBack, 進行回調清理, 這次請求就算是失敗結束了
d. 如果是因為連接池沒有可用連接, 但是可以新建連接的情況, 則會將request 加入pending列表, 並調用 selector的wakeup()方法, selector在wakeup以后會使用AbstractMultiworkerIOReactor(bossGroup)來進行連接操作, 並注冊到selector中, 后續的connectable事件監聽和channel連接成功注冊也是由他完成的
2. AbstractIOReactor監聽讀寫事件
3. 通過decoder檢測response已經完成, 最后將連接release到連接池中, 此時將連接從leased列表除去, 並加入到available中
連接階段
調用
Future<HttpResponse> execute(
HttpUriRequest request,
FutureCallback<org.apache.http.HttpResponse> callback)
請求開始, 里面會調用 execute(request, new BasicHttpContext(), callback)
調用
Future<HttpResponse> execute( final HttpUriRequest request, final HttpContext context, final FutureCallback<HttpResponse> callback)
context 代表了一次請求的上下文, 里面實際上就是一個用來存儲 attribute 的結構, 默認的實現 BasicHttpContext 實際上就是一個 ConcurrentHashMap
context 是可以嵌套的, 代碼如下
@ThreadSafe public class BasicHttpContext implements HttpContext { private final HttpContext parentContext; private final Map<String, Object> map; public BasicHttpContext() { this(null); } public BasicHttpContext(final HttpContext parentContext) { super(); this.map = new ConcurrentHashMap<String, Object>(); this.parentContext = parentContext; } public Object getAttribute(final String id) { Args.notNull(id, "Id"); Object obj = this.map.get(id); if (obj == null && this.parentContext != null) { obj = this.parentContext.getAttribute(id); } return obj; } public void setAttribute(final String id, final Object obj) { Args.notNull(id, "Id"); if (obj != null) { this.map.put(id, obj); } else { this.map.remove(id); } } public Object removeAttribute(final String id) { Args.notNull(id, "Id"); return this.map.remove(id); } /** * @since 4.2 */ public void clear() { this.map.clear(); } @Override public String toString() { return this.map.toString(); } }
接着看執行流程
public Future<HttpResponse> execute( final HttpUriRequest request, final HttpContext context, final FutureCallback<HttpResponse> callback) { final HttpHost target; try { target = determineTarget(request); // 這一步是取出目標host } catch (final ClientProtocolException ex) { final BasicFuture<HttpResponse> future = new BasicFuture<HttpResponse>(callback); future.failed(ex); return future; } return execute(target, request, context, callback); }
調用
public Future<HttpResponse> execute( final HttpHost target, final HttpRequest request, final HttpContext context, final FutureCallback<HttpResponse> callback) { return execute( HttpAsyncMethods.create(target, request), HttpAsyncMethods.createConsumer(), context, callback); }
位於HttpAsyncClient接口下的 /** * Initiates asynchronous HTTP request execution using the given context. * <p/> * The request producer passed to this method will be used to generate * a request message and stream out its content without buffering it * in memory. The response consumer passed to this method will be used * to process a response message without buffering its content in memory. * <p/> * Please note it may be unsafe to interact with the context instance * while the request is still being executed. * * @param <T> the result type of request execution. * @param requestProducer request producer callback. * @param responseConsumer response consumer callaback. * @param context HTTP context * @param callback future callback. * @return future representing pending completion of the operation. */ <T> Future<T> execute( HttpAsyncRequestProducer requestProducer, HttpAsyncResponseConsumer<T> responseConsumer, HttpContext context, FutureCallback<T> callback);
這里會通過原來的請求信息生成一個requestProducer跟responseConsumer, 默認會調用HttpAsyncClient的InternalHttpAsyncClient的實現, 如下
public <T> Future<T> execute( final HttpAsyncRequestProducer requestProducer, final HttpAsyncResponseConsumer<T> responseConsumer, final HttpContext context, final FutureCallback<T> callback) { final Status status = getStatus(); Asserts.check(status == Status.ACTIVE, "Request cannot be executed; " + "I/O reactor status: %s", status); final BasicFuture<T> future = new BasicFuture<T>(callback); final HttpClientContext localcontext = HttpClientContext.adapt( context != null ? context : new BasicHttpContext()); setupContext(localcontext); @SuppressWarnings("resource") final DefaultClientExchangeHandlerImpl<T> handler = new DefaultClientExchangeHandlerImpl<T>( this.log, requestProducer, responseConsumer, localcontext, future, this.connmgr, this.exec); try { handler.start(); // 請求開始 } catch (final Exception ex) { handler.failed(ex); } return future; }
這里通過生成一個ExchangeHandler來實現請求開始, 查看 handler.start()
public void start() throws HttpException, IOException { final HttpHost target = this.requestProducer.getTarget(); final HttpRequest original = this.requestProducer.generateRequest(); if (original instanceof HttpExecutionAware) { ((HttpExecutionAware) original).setCancellable(this); } this.exec.prepare(this.state, target, original); // 准備動作, 往state里設置各種狀態 requestConnection(); // 實際發送請求的地方 }
接着往下
private void requestConnection() { if (this.log.isDebugEnabled()) { this.log.debug("[exchange: " + this.state.getId() + "] Request connection for " + this.state.getRoute()); } discardConnection(); this.state.setValidDuration(0); this.state.setNonReusable(); this.state.setRouteEstablished(false); this.state.setRouteTracker(null); final HttpRoute route = this.state.getRoute(); final Object userToken = this.localContext.getUserToken(); final RequestConfig config = this.localContext.getRequestConfig(); this.connmgr.requestConnection( // 此處調用ConenctionManager的requestConnection方法 route, userToken, config.getConnectTimeout(), config.getConnectionRequestTimeout(), TimeUnit.MILLISECONDS, new FutureCallback<NHttpClientConnection>() { public void completed(final NHttpClientConnection managedConn) { connectionAllocated(managedConn); } public void failed(final Exception ex) { connectionRequestFailed(ex); } public void cancelled() { connectionRequestCancelled(); } }); }
再看NHttpClientConnectionManager下的
/** * Returns a {@link Future} for a {@link NHttpClientConnection}. * <p/> * Please note that the consumer of that connection is responsible * for fully establishing the route the to the connection target * by calling {@link #startRoute(org.apache.http.nio.NHttpClientConnection, * org.apache.http.conn.routing.HttpRoute, * org.apache.http.protocol.HttpContext) startRoute} in order to start * the process of connection initialization, optionally calling * {@link #upgrade(org.apache.http.nio.NHttpClientConnection, * org.apache.http.conn.routing.HttpRoute, * org.apache.http.protocol.HttpContext) upgrade} method to upgrade * the connection after having executed <code>CONNECT</code> method to * all intermediate proxy hops and and finally calling * {@link #routeComplete(org.apache.http.nio.NHttpClientConnection, * org.apache.http.conn.routing.HttpRoute, * org.apache.http.protocol.HttpContext) routeComplete} to mark the route * as fully completed. * * @param route HTTP route of the requested connection. * @param state expected state of the connection or <code>null</code> * if the connection is not expected to carry any state. * @param connectTimeout connect timeout. * @param connectionRequestTimeout connection request timeout. * @param timeUnit time unit of the previous two timeout values. * @param callback future callback. */ Future<NHttpClientConnection> requestConnection( HttpRoute route, Object state, long connectTimeout, long connectionRequestTimeout, TimeUnit timeUnit, FutureCallback<NHttpClientConnection> callback);
它調用了PoolingNHttpClientConnectionManager的實現
public Future<NHttpClientConnection> requestConnection( final HttpRoute route, final Object state, final long connectTimeout, final long leaseTimeout, final TimeUnit tunit, final FutureCallback<NHttpClientConnection> callback) { Args.notNull(route, "HTTP route"); if (this.log.isDebugEnabled()) { this.log.debug("Connection request: " + format(route, state) + formatStats(route)); } final BasicFuture<NHttpClientConnection> future = new BasicFuture<NHttpClientConnection>(callback); final HttpHost host; if (route.getProxyHost() != null) { host = route.getProxyHost(); } else { host = route.getTargetHost(); } final SchemeIOSessionStrategy sf = this.iosessionFactoryRegistry.lookup( host.getSchemeName()); if (sf == null) { future.failed(new UnsupportedSchemeException(host.getSchemeName() + " protocol is not supported")); return future; } this.pool.lease(route, state, connectTimeout, leaseTimeout, tunit != null ? tunit : TimeUnit.MILLISECONDS, new InternalPoolEntryCallback(future)); // 這里就是實際運用連接池的地方 return future; }
看 AbstractNIOConnPool 的 lease 方法
public Future<E> lease( final T route, final Object state, final long connectTimeout, final long leaseTimeout, final TimeUnit tunit, final FutureCallback<E> callback) { Args.notNull(route, "Route"); Args.notNull(tunit, "Time unit"); Asserts.check(!this.isShutDown.get(), "Connection pool shut down"); final BasicFuture<E> future = new BasicFuture<E>(callback); this.lock.lock(); // 同步 try { final long timeout = connectTimeout > 0 ? tunit.toMillis(connectTimeout) : 0; final LeaseRequest<T, C, E> request = new LeaseRequest<T, C, E>(route, state, timeout, leaseTimeout, future); final boolean completed = processPendingRequest(request); // 1) 獲取連接的方法 if (!request.isDone() && !completed) { // 2) 因為連接池滿而不能馬上獲得連接的的, 加入到一個leasing的LinkedList中, 他會在后續的某些操作中被取出來重新嘗試連接發送請求 this.leasingRequests.add(request); } if (request.isDone()) { // 3) 已經完成連接動作(注意是連接動作完成, 不是請求完成獲得響應, 這里的連接完成包括從連接池獲取到連接, 或者是因為異常request被設置為fail)的請求, 加入到一個ConcurrentLinkedQueue中, 這個隊列的唯一作用就是標記連接完成以后, 調用fireCallBack方法會從里面把這些連接完成的request做一遍回調處理 this.completedRequests.add(request); } } finally { this.lock.unlock(); } fireCallbacks(); return future; }
這里主要涉及到連接池 AbstractNIOConnPool 以及連接池下得實際存儲連接的 RouteSpecificPool,
然后開始分析連接流程
private boolean processPendingRequest(final LeaseRequest<T, C, E> request) { final T route = request.getRoute(); final Object state = request.getState(); final long deadline = request.getDeadline(); final long now = System.currentTimeMillis(); if (now > deadline) { request.failed(new TimeoutException()); return false; } final RouteSpecificPool<T, C, E> pool = getPool(route); E entry; for (;;) { // 租借連接池連接 entry = pool.getFree(state); // getFree即是從available中獲取一個state匹配的連接 if (entry == null) { // 沒有可用連接退出循環 break; } // 清除不可用連接 if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) { entry.close(); this.available.remove(entry); pool.free(entry, false); } else { break; } } if (entry != null) { // 找到連接退出 this.available.remove(entry); this.leased.add(entry); request.completed(entry); onLease(entry); return true; } // 需要新連接的情況 // New connection is needed final int maxPerRoute = getMax(route); // 已經分配的連接超出可分配限制 // Shrink the pool prior to allocating a new connection final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute); // 對連接池進行縮減, 將上次使用的連接關閉並刪除, 直到超出的連接全被清除 if (excess > 0) { for (int i = 0; i < excess; i++) { final E lastUsed = pool.getLastUsed(); // 這個方法是取到 available 里的最后一個連接, 也就是說會出現所有連接都被租借出去了的情況, 這樣的話就相當於連接池滿, 到下一步的 if (pool.getAllocatedCount() < maxPerRoute) 即會 false, 最后導致request進入 leasingRequest 列表 if (lastUsed == null) { break; } lastUsed.close(); this.available.remove(lastUsed); pool.remove(lastUsed); } } // 已分配連接數 < 最大連接數限制, 開始新建 if (pool.getAllocatedCount() < maxPerRoute) { // 總共被使用的數量等於 正在等待連接數 + 已經租借出去的連接數 final int totalUsed = this.pending.size() + this.leased.size(); final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0); if (freeCapacity == 0) { return false; } // 需要注意的是pool里available不為空, 也有可能拿不到可用連接, 因為state不匹配 final int totalAvailable = this.available.size(); // 總的available > 連接空位時, 會隨機選擇最后一次使用的連接, 並把它關掉.. 沒搞明白這一步是干嘛用的 if (totalAvailable > freeCapacity - 1) { if (!this.available.isEmpty()) { final E lastUsed = this.available.removeLast(); lastUsed.close(); final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute()); otherpool.remove(lastUsed); } } // 創建連接監視器階段, 創建了一個監時此次請求的監視對象 SessionRequest, 並調用selector的wakeup(), 出發實際的連接操作 final SocketAddress localAddress; final SocketAddress remoteAddress; try { remoteAddress = this.addressResolver.resolveRemoteAddress(route); localAddress = this.addressResolver.resolveLocalAddress(route); } catch (final IOException ex) { request.failed(ex); return false; }
// 重點關注一下這個connect方法 final SessionRequest sessionRequest = this.ioreactor.connect( remoteAddress, localAddress, route, this.sessionRequestCallback); final int timout = request.getConnectTimeout() < Integer.MAX_VALUE ? (int) request.getConnectTimeout() : Integer.MAX_VALUE; sessionRequest.setConnectTimeout(timout); // 加入到總pending集合 this.pending.add(sessionRequest); // 加入到route連接池pending集合 pool.addPending(sessionRequest, request.getFuture()); return true; } else { return false; } } // 檢查最后一個完成的request的結果, 並設置future的狀態 private void fireCallbacks() { LeaseRequest<T, C, E> request; while ((request = this.completedRequests.poll()) != null) { final BasicFuture<E> future = request.getFuture(); final Exception ex = request.getException(); final E result = request.getResult(); if (ex != null) { future.failed(ex); } else if (result != null) { future.completed(result); } else { future.cancel(); } } }
看看DefaultConnectingIOReactor的connect方法
public SessionRequest connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final Object attachment, final SessionRequestCallback callback) { Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0, "I/O reactor has been shut down"); final SessionRequestImpl sessionRequest = new SessionRequestImpl( remoteAddress, localAddress, attachment, callback); sessionRequest.setConnectTimeout(this.config.getConnectTimeout()); this.requestQueue.add(sessionRequest); this.selector.wakeup(); // 去看看wakeup()以后會發生什么事情 return sessionRequest; }
在AbstractMultiworkerIOReactor中有一個execute()方法
/** * Activates the main I/O reactor as well as all worker I/O reactors. * The I/O main reactor will start reacting to I/O events and triggering * notification methods. The worker I/O reactor in their turn will start * reacting to I/O events and dispatch I/O event notifications to the given * {@link IOEventDispatch} interface. * <p> * This method will enter the infinite I/O select loop on * the {@link Selector} instance associated with this I/O reactor and used * to manage creation of new I/O channels. Once a new I/O channel has been * created the processing of I/O events on that channel will be delegated * to one of the worker I/O reactors. * <p> * The method will remain blocked unto the I/O reactor is shut down or the * execution thread is interrupted. * * @see #processEvents(int) * @see #cancelRequests() * * @throws InterruptedIOException if the dispatch thread is interrupted. * @throws IOReactorException in case if a non-recoverable I/O error. */ public void execute( final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException { Args.notNull(eventDispatch, "Event dispatcher"); synchronized (this.statusLock) { if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) { this.status = IOReactorStatus.SHUT_DOWN; this.statusLock.notifyAll(); return; } Asserts.check(this.status.compareTo(IOReactorStatus.INACTIVE) == 0, "Illegal state %s", this.status); this.status = IOReactorStatus.ACTIVE; // Start I/O dispatchers for (int i = 0; i < this.dispatchers.length; i++) { final BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing); dispatcher.setExceptionHandler(exceptionHandler); this.dispatchers[i] = dispatcher; } for (int i = 0; i < this.workerCount; i++) { final BaseIOReactor dispatcher = this.dispatchers[i]; this.workers[i] = new Worker(dispatcher, eventDispatch); this.threads[i] = this.threadFactory.newThread(this.workers[i]); } } try { // 啟動所有worker線程, 連接的事情是交給 AbstractMultiworkerIOReactor 來做的, 但是連接成功后的事情則是交給 AbstractIOReactor Worker 線程來處理, 前者類似於 bossGroup, 后者類似於 workerGroup for (int i = 0; i < this.workerCount; i++) { if (this.status != IOReactorStatus.ACTIVE) { return; } this.threads[i].start(); } // 使用無限循環監聽事件 for (;;) { final int readyCount; try { // 阻塞, 直到超時或者調用 wakeup() readyCount = this.selector.select(this.selectTimeout); } catch (final InterruptedIOException ex) { throw ex; } catch (final IOException ex) { throw new IOReactorException("Unexpected selector failure", ex); } // 如果有需要處理的事件, 則進入processEvents流程, 實際的連接過程就在這里 if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) { processEvents(readyCount); } // Verify I/O dispatchers for (int i = 0; i < this.workerCount; i++) { final Worker worker = this.workers[i]; final Exception ex = worker.getException(); if (ex != null) { throw new IOReactorException( "I/O dispatch worker terminated abnormally", ex); } } if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) { break; } } } catch (final ClosedSelectorException ex) { addExceptionEvent(ex); } catch (final IOReactorException ex) { if (ex.getCause() != null) { addExceptionEvent(ex.getCause()); } throw ex; } finally { doShutdown(); synchronized (this.statusLock) { this.status = IOReactorStatus.SHUT_DOWN; this.statusLock.notifyAll(); } } }
首次連接的時候, 觸發的是 DefaultConnectingIOReactor 的 processEvents 方法
@Override protected void processEvents(final int readyCount) throws IOReactorException { processSessionRequests(); // 這里就是實際連接的地方 if (readyCount > 0) { final Set<SelectionKey> selectedKeys = this.selector.selectedKeys(); for (final SelectionKey key : selectedKeys) { processEvent(key); } selectedKeys.clear(); } final long currentTime = System.currentTimeMillis(); if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) { this.lastTimeoutCheck = currentTime; final Set<SelectionKey> keys = this.selector.keys(); processTimeouts(keys); } } private void processSessionRequests() throws IOReactorException { SessionRequestImpl request; // wakeup 一次將隊列的所有request處理(發起連接)掉 while ((request = this.requestQueue.poll()) != null) { if (request.isCompleted()) { continue; } final SocketChannel socketChannel; try { socketChannel = SocketChannel.open(); } catch (final IOException ex) { throw new IOReactorException("Failure opening socket", ex); } try { socketChannel.configureBlocking(false); validateAddress(request.getLocalAddress()); validateAddress(request.getRemoteAddress()); if (request.getLocalAddress() != null) { final Socket sock = socketChannel.socket(); sock.setReuseAddress(this.config.isSoReuseAddress()); sock.bind(request.getLocalAddress()); } prepareSocket(socketChannel.socket()); final boolean connected = socketChannel.connect(request.getRemoteAddress()); if (connected) { // 馬上連接成功, 處理下一個 final ChannelEntry entry = new ChannelEntry(socketChannel, request); addChannel(entry); continue; } } catch (final IOException ex) { closeChannel(socketChannel); request.failed(ex); return; } // 還未連接成功, 則注冊到selector, 等待connect事件的觸發, 再用processEvent來處理 final SessionRequestHandle requestHandle = new SessionRequestHandle(request); try { final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT, requestHandle); request.setKey(key); } catch (final IOException ex) { closeChannel(socketChannel); throw new IOReactorException("Failure registering channel " + "with the selector", ex); } } } // 這個方法是連接成功以后注冊channel的方法 private void processEvent(final SelectionKey key) { try { if (key.isConnectable()) { final SocketChannel channel = (SocketChannel) key.channel(); // Get request handle final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment(); final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest(); // Finish connection process try { channel.finishConnect(); } catch (final IOException ex) { sessionRequest.failed(ex); } key.cancel(); key.attach(null); if (!sessionRequest.isCompleted()) { // 注冊新channel, 這些channel后來會被worker線程處理, 他們來進行io讀寫 addChannel(new ChannelEntry(channel, sessionRequest)); } else { try { channel.close(); } catch (IOException ignore) { } } } } catch (final CancelledKeyException ex) { final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment(); key.attach(null); if (requestHandle != null) { final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest(); if (sessionRequest != null) { sessionRequest.cancel(); } } } }
接下來看連接成功后的IOReactor如何處理, 如下 BaseIOReactor 的 execute 方法
/** * Activates the I/O reactor. The I/O reactor will start reacting to * I/O events and triggering notification methods. * <p> * This method will enter the infinite I/O select loop on * the {@link Selector} instance associated with this I/O reactor. * <p> * The method will remain blocked unto the I/O reactor is shut down or the * execution thread is interrupted. * * @see #acceptable(SelectionKey) * @see #connectable(SelectionKey) * @see #readable(SelectionKey) * @see #writable(SelectionKey) * @see #timeoutCheck(SelectionKey, long) * @see #validate(Set) * @see #sessionCreated(SelectionKey, IOSession) * @see #sessionClosed(IOSession) * * @throws InterruptedIOException if the dispatch thread is interrupted. * @throws IOReactorException in case if a non-recoverable I/O error. */ protected void execute() throws InterruptedIOException, IOReactorException { this.status = IOReactorStatus.ACTIVE; try { for (;;) { final int readyCount; try { readyCount = this.selector.select(this.selectTimeout); } catch (final InterruptedIOException ex) { throw ex; } catch (final IOException ex) { throw new IOReactorException("Unexpected selector failure", ex); } if (this.status == IOReactorStatus.SHUT_DOWN) { // Hard shut down. Exit select loop immediately break; } if (this.status == IOReactorStatus.SHUTTING_DOWN) { // Graceful shutdown in process // Try to close things out nicely closeSessions(); closeNewChannels(); } // Process selected I/O events if (readyCount > 0) { processEvents(this.selector.selectedKeys()); } // Validate active channels validate(this.selector.keys()); // Process closed sessions processClosedSessions(); // If active process new channels if (this.status == IOReactorStatus.ACTIVE) { processNewChannels(); } // Exit select loop if graceful shutdown has been completed if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0 && this.sessions.isEmpty()) { break; } if (this.interestOpsQueueing) { // process all pending interestOps() operations processPendingInterestOps(); } } } catch (final ClosedSelectorException ignore) { } finally { hardShutdown(); synchronized (this.statusMutex) { this.statusMutex.notifyAll(); } } } private void processEvents(final Set<SelectionKey> selectedKeys) { for (final SelectionKey key : selectedKeys) { processEvent(key); } selectedKeys.clear(); } /** * Processes new event on the given selection key. * * @param key the selection key that triggered an event. */ protected void processEvent(final SelectionKey key) { final IOSessionImpl session = (IOSessionImpl) key.attachment(); try { if (key.isAcceptable()) { acceptable(key); } if (key.isConnectable()) { connectable(key); } if (key.isReadable()) { session.resetLastRead(); readable(key); } if (key.isWritable()) { session.resetLastWrite(); writable(key); } } catch (final CancelledKeyException ex) { queueClosedSession(session); key.attach(null); } }
這個就跟AbstractMultiworkerIOReactor類似, 只不過兩個人的興趣集事件不太一樣, 看看 AbstractIOReactor 的實現類 BaseIOReactor就知道了
/** * This I/O reactor implementation does not react to the * {@link SelectionKey#OP_ACCEPT} event. * <p> * Super-classes can override this method to react to the event. */ @Override protected void acceptable(final SelectionKey key) { } /** * This I/O reactor implementation does not react to the * {@link SelectionKey#OP_CONNECT} event. * <p> * Super-classes can override this method to react to the event. */ @Override protected void connectable(final SelectionKey key) { } /** * Processes {@link SelectionKey#OP_READ} event on the given selection key. * This method dispatches the event notification to the * {@link IOEventDispatch#inputReady(IOSession)} method. */ @Override protected void readable(final SelectionKey key) { final IOSession session = getSession(key); try { this.eventDispatch.inputReady(session); if (session.hasBufferedInput()) { this.bufferingSessions.add(session); } } catch (final CancelledKeyException ex) { queueClosedSession(session); key.attach(null); } catch (final RuntimeException ex) { handleRuntimeException(ex); } } /** * Processes {@link SelectionKey#OP_WRITE} event on the given selection key. * This method dispatches the event notification to the * {@link IOEventDispatch#outputReady(IOSession)} method. */ @Override protected void writable(final SelectionKey key) { final IOSession session = getSession(key); try { this.eventDispatch.outputReady(session); } catch (final CancelledKeyException ex) { queueClosedSession(session); key.attach(null); } catch (final RuntimeException ex) { handleRuntimeException(ex); } }
它實際上只會處理 read 跟 write 事件
這里特別注意一下 AbstractNIOConnPool requestComplete 並不是整個請求結束, 而是連接成功的意思, 看看他的調用的地方和觸發的東西
最開始他是從 AbstractIOReactor 的processNewChannels中來的, 這個方法在execute里被觸發
rivate void processNewChannels() throws IOReactorException { ChannelEntry entry; while ((entry = this.newChannels.poll()) != null) { // 記得上面連接成功后調用addChannel加到的這個隊列, 現在取出來 final SocketChannel channel; final SelectionKey key; try { channel = entry.getChannel(); channel.configureBlocking(false); key = channel.register(this.selector, SelectionKey.OP_READ); } catch (final ClosedChannelException ex) { final SessionRequestImpl sessionRequest = entry.getSessionRequest(); if (sessionRequest != null) { sessionRequest.failed(ex); } return; } catch (final IOException ex) { throw new IOReactorException("Failure registering channel " + "with the selector", ex); } final SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() { public void sessionClosed(final IOSession session) { queueClosedSession(session); } }; InterestOpsCallback interestOpsCallback = null; if (this.interestOpsQueueing) { interestOpsCallback = new InterestOpsCallback() { public void addInterestOps(final InterestOpEntry entry) { queueInterestOps(entry); } }; } final IOSession session; try { session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback); int timeout = 0; try { timeout = channel.socket().getSoTimeout(); } catch (final IOException ex) { // Very unlikely to happen and is not fatal // as the protocol layer is expected to overwrite // this value anyways } session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment()); session.setSocketTimeout(timeout); } catch (final CancelledKeyException ex) { continue; } try { this.sessions.add(session); final SessionRequestImpl sessionRequest = entry.getSessionRequest(); if (sessionRequest != null) { // 就是在這里調用了completed, 最后進入連接池的completed方法 sessionRequest.completed(session); } key.attach(session); sessionCreated(key, session); } catch (final CancelledKeyException ex) { queueClosedSession(session); key.attach(null); } } }
看看連接池 AbstractNIOConnPool 的completed 方法做了什么
protected void requestCompleted(final SessionRequest request) { if (this.isShutDown.get()) { return; } @SuppressWarnings("unchecked") final T route = (T) request.getAttachment(); this.lock.lock(); try { this.pending.remove(request); // 從peding列表中去掉這個連接 final RouteSpecificPool<T, C, E> pool = getPool(route); final IOSession session = request.getSession(); try { final C conn = this.connFactory.create(route, session); final E entry = pool.createEntry(request, conn); this.leased.add(entry); // 連接加入到被租借集合 pool.completed(request, entry); // 調用 perRoute 連接池的complte onLease(entry); // 這個 onLease 就是設置了一下超時時間 } catch (final IOException ex) { pool.failed(request, ex); } } finally { this.lock.unlock(); } fireCallbacks(); }
主要做的事情就是設置了一下連接的sotimeout, 還有就是將連接從 pending列表移到了 lease 集合
最后, 來看看 連接是如何歸還的
通過對AbstractNIOConnPool的release方法的跟蹤, 最后找了還是在 BaseIOReactor 監聽到 readable 時間的時候, 調用了HttpAsyncRequestExecutor的inputReady
方法
public void inputReady( final NHttpClientConnection conn, final ContentDecoder decoder) throws IOException, HttpException { final State state = ensureNotNull(getState(conn)); final HttpAsyncClientExchangeHandler handler = ensureNotNull(getHandler(conn)); handler.consumeContent(decoder, conn); state.setResponseState(MessageState.BODY_STREAM); if (decoder.isCompleted()) { // 檢測到內容已結束, 進入complete流程 processResponse(conn, state, handler); } }
請求完成階段
當檢測到response已經完了, 就會進入complete流程, 最后回到releaseConnection流程, 最后到達連接池的release
public void release(final E entry, final boolean reusable) { if (entry == null) { return; } if (this.isShutDown.get()) { return; } this.lock.lock(); try { if (this.leased.remove(entry)) { // 從租借集合中刪除 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute()); pool.free(entry, reusable); // 重新加入到pool的available中 if (reusable) { this.available.addFirst(entry); // 加入到available onRelease(entry); // 重新這是soTimeout } else { entry.close(); } processNextPendingRequest(); // 處理下一個在leasing隊列中等待的請求 } } finally { this.lock.unlock(); } fireCallbacks(); }
完