HttpAsyncClient的連接池使用


代碼示例

    public static void main(String[] args) throws Exception {
        ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
        PoolingNHttpClientConnectionManager cm = new PoolingNHttpClientConnectionManager(ioReactor);
        cm.setMaxTotal(100);
        CloseableHttpAsyncClient httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(cm).build();
        httpAsyncClient.start();
        String[] urisToGet = {
                "http://www.chinaso.com/",
                "http://www.so.com/",
                "http://www.qq.com/",
        };

        final CountDownLatch latch = new CountDownLatch(urisToGet.length);
        for (final String uri: urisToGet) {
            final HttpGet httpget = new HttpGet(uri);
            httpAsyncClient.execute(httpget, new FutureCallback<HttpResponse>() {

                public void completed(final HttpResponse response) {
                    latch.countDown();
                    System.out.println(httpget.getRequestLine() + "->" + response.getStatusLine());
                }

                public void failed(final Exception ex) {
                    latch.countDown();
                    System.out.println(httpget.getRequestLine() + "->" + ex);
                }

                public void cancelled() {
                    latch.countDown();
                    System.out.println(httpget.getRequestLine() + " cancelled");
                }

            });
        }
        latch.await();
    }

流程分析

簡要流程總結如下:

HttpAsyncClient有一個AbstractMultiworkerIOReactor和AbstractIOReactor, 前者和后者類似於netty的bossGroup和workerGroup, AbstractMultiworkerIOReactor負責channel的連接, AbstractIOReactor負責channel的讀寫

先要明白連接池的結構

AbstractNIOConnPool 下各個變量的含義

    // 一個可復用的ioreactor, 負責生成SessionRequest並喚醒selector去做連接到目標網站的操作
    private final ConnectingIOReactor ioreactor;
    // 用來構造連接池的entry的工廠
    private final NIOConnFactory<T, C> connFactory;
    // 驗證並生成目標連接socketAddress的類
    private final SocketAddressResolver<T> addressResolver;
    // 一個可復用的callBack類, 里面提供了一個調用SessionRequest的complete的方法
    private final SessionRequestCallback sessionRequestCallback;
    // 用域名區分的連接池
    private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
    // 沒有成功拿到連接的請求列表
    private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
    // 已經拿到連接權利, 但是還沒連接成功的連接集合
    private final Set<SessionRequest> pending;
    // 已經連接成功, 並被租借出去的連接集合
    private final Set<E> leased;
    // 當前連接池可用的連接集合
    private final LinkedList<E> available;
    // 已經連接完成, 但是不可用的連接集合, 例如因為異常連接失敗等待, 他們會在隊列中等待被調用回調方法做后續處理
    private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests;
    // 每個route的最大連接數
    private final Map<T, Integer> maxPerRoute;
    // 鎖對象
    private final Lock lock;
    // 是否關閉
    private final AtomicBoolean isShutDown;

    // 每個route最大連接數默認值
    private volatile int defaultMaxPerRoute;
    // 整個連接池最大連接數
    private volatile int maxTotal;

 

1. 發起請求

 a. 根據請求route查看連接池, 如果連接池不為空, 直接返回跟池中connection綁定的future, 並把該conn放入leased列表

 b. 如果因為某些原因導致當前請求無法取得連接, 但是沒有發生致命錯誤的, 請求將被放入一個 leasing 列表, 這個列表會在后續動作中被取出來做連接重試

 c. 如果實在連接過程中出現了移除等不可恢復的錯誤, 則將request標記為completed, 退出方法后調用fireCallBack, 進行回調清理, 這次請求就算是失敗結束了

 d. 如果是因為連接池沒有可用連接, 但是可以新建連接的情況, 則會將request 加入pending列表, 並調用 selector的wakeup()方法, selector在wakeup以后會使用AbstractMultiworkerIOReactor(bossGroup)來進行連接操作, 並注冊到selector中, 后續的connectable事件監聽和channel連接成功注冊也是由他完成的

2. AbstractIOReactor監聽讀寫事件

3. 通過decoder檢測response已經完成, 最后將連接release到連接池中, 此時將連接從leased列表除去, 並加入到available中

 

連接階段 

調用

Future<HttpResponse> execute(
HttpUriRequest request, 
FutureCallback<org.apache.http.HttpResponse> callback)

請求開始, 里面會調用 execute(request, new BasicHttpContext(), callback)

調用

Future<HttpResponse> execute(
            final HttpUriRequest request,
            final HttpContext context,
            final FutureCallback<HttpResponse> callback)  

context 代表了一次請求的上下文, 里面實際上就是一個用來存儲 attribute 的結構, 默認的實現 BasicHttpContext 實際上就是一個 ConcurrentHashMap
context 是可以嵌套的, 代碼如下

@ThreadSafe
public class BasicHttpContext implements HttpContext {

    private final HttpContext parentContext;
    private final Map<String, Object> map;

    public BasicHttpContext() {
        this(null);
    }

    public BasicHttpContext(final HttpContext parentContext) {
        super();
        this.map = new ConcurrentHashMap<String, Object>();
        this.parentContext = parentContext;
    }

    public Object getAttribute(final String id) {
        Args.notNull(id, "Id");
        Object obj = this.map.get(id);
        if (obj == null && this.parentContext != null) {
            obj = this.parentContext.getAttribute(id);
        }
        return obj;
    }

    public void setAttribute(final String id, final Object obj) {
        Args.notNull(id, "Id");
        if (obj != null) {
            this.map.put(id, obj);
        } else {
            this.map.remove(id);
        }
    }

    public Object removeAttribute(final String id) {
        Args.notNull(id, "Id");
        return this.map.remove(id);
    }

    /**
     * @since 4.2
     */
    public void clear() {
        this.map.clear();
    }

    @Override
    public String toString() {
        return this.map.toString();
    }

}

接着看執行流程

    public Future<HttpResponse> execute(
            final HttpUriRequest request,
            final HttpContext context,
            final FutureCallback<HttpResponse> callback) {
        final HttpHost target;
        try {
            target = determineTarget(request); // 這一步是取出目標host
        } catch (final ClientProtocolException ex) {
            final BasicFuture<HttpResponse> future = new BasicFuture<HttpResponse>(callback);
            future.failed(ex);
            return future;
        }
        return execute(target, request, context, callback);
    }

 

調用

    public Future<HttpResponse> execute(
            final HttpHost target, final HttpRequest request, final HttpContext context,
            final FutureCallback<HttpResponse> callback) {
        return execute(
                HttpAsyncMethods.create(target, request),
                HttpAsyncMethods.createConsumer(),
                context, callback);
    }
位於HttpAsyncClient接口下的
    /**
     * Initiates asynchronous HTTP request execution using the given context.
     * <p/>
     * The request producer passed to this method will be used to generate
     * a request message and stream out its content without buffering it
     * in memory. The response consumer passed to this method will be used
     * to process a response message without buffering its content in memory.
     * <p/>
     * Please note it may be unsafe to interact with the context instance
     * while the request is still being executed.
     *
     * @param <T> the result type of request execution.
     * @param requestProducer request producer callback.
     * @param responseConsumer response consumer callaback.
     * @param context HTTP context
     * @param callback future callback.
     * @return future representing pending completion of the operation.
     */
    <T> Future<T> execute(
            HttpAsyncRequestProducer requestProducer,
            HttpAsyncResponseConsumer<T> responseConsumer,
            HttpContext context,
            FutureCallback<T> callback);

 

這里會通過原來的請求信息生成一個requestProducer跟responseConsumer, 默認會調用HttpAsyncClient的InternalHttpAsyncClient的實現, 如下

    public <T> Future<T> execute(
            final HttpAsyncRequestProducer requestProducer,
            final HttpAsyncResponseConsumer<T> responseConsumer,
            final HttpContext context,
            final FutureCallback<T> callback) {
        final Status status = getStatus();
        Asserts.check(status == Status.ACTIVE, "Request cannot be executed; " +
                "I/O reactor status: %s", status);
        final BasicFuture<T> future = new BasicFuture<T>(callback);
        final HttpClientContext localcontext = HttpClientContext.adapt(
            context != null ? context : new BasicHttpContext());
        setupContext(localcontext);

        @SuppressWarnings("resource")
        final DefaultClientExchangeHandlerImpl<T> handler = new DefaultClientExchangeHandlerImpl<T>(
            this.log,
            requestProducer,
            responseConsumer,
            localcontext,
            future,
            this.connmgr,
            this.exec);
        try {
            handler.start(); // 請求開始
        } catch (final Exception ex) {
            handler.failed(ex);
        }
        return future;
    }

 

 這里通過生成一個ExchangeHandler來實現請求開始, 查看 handler.start()

    public void start() throws HttpException, IOException {
        final HttpHost target = this.requestProducer.getTarget();
        final HttpRequest original = this.requestProducer.generateRequest();

        if (original instanceof HttpExecutionAware) {
            ((HttpExecutionAware) original).setCancellable(this);
        }
        this.exec.prepare(this.state, target, original); // 准備動作, 往state里設置各種狀態
        requestConnection(); // 實際發送請求的地方
    }

 接着往下

private void requestConnection() {
        if (this.log.isDebugEnabled()) {
            this.log.debug("[exchange: " + this.state.getId() + "] Request connection for " +
                this.state.getRoute());
        }

        discardConnection();

        this.state.setValidDuration(0);
        this.state.setNonReusable();
        this.state.setRouteEstablished(false);
        this.state.setRouteTracker(null);

        final HttpRoute route = this.state.getRoute();
        final Object userToken = this.localContext.getUserToken();
        final RequestConfig config = this.localContext.getRequestConfig();
        this.connmgr.requestConnection( // 此處調用ConenctionManager的requestConnection方法
                route,
                userToken,
                config.getConnectTimeout(),
                config.getConnectionRequestTimeout(),
                TimeUnit.MILLISECONDS,
                new FutureCallback<NHttpClientConnection>() {

                    public void completed(final NHttpClientConnection managedConn) {
                        connectionAllocated(managedConn);
                    }

                    public void failed(final Exception ex) {
                        connectionRequestFailed(ex);
                    }

                    public void cancelled() {
                        connectionRequestCancelled();
                    }

                });
    }

再看NHttpClientConnectionManager下的

/**
     * Returns a {@link Future} for a {@link NHttpClientConnection}.
     * <p/>
     * Please note that the consumer of that connection is responsible
     * for fully establishing the route the to the connection target
     * by calling {@link #startRoute(org.apache.http.nio.NHttpClientConnection,
     *   org.apache.http.conn.routing.HttpRoute,
     *   org.apache.http.protocol.HttpContext) startRoute} in order to start
     * the process of connection initialization, optionally calling
     * {@link #upgrade(org.apache.http.nio.NHttpClientConnection,
     *   org.apache.http.conn.routing.HttpRoute,
     *   org.apache.http.protocol.HttpContext) upgrade} method to upgrade
     * the connection after having executed <code>CONNECT</code> method to
     * all intermediate proxy hops and and finally calling
     * {@link #routeComplete(org.apache.http.nio.NHttpClientConnection,
     *   org.apache.http.conn.routing.HttpRoute,
     *   org.apache.http.protocol.HttpContext) routeComplete} to mark the route
     * as fully completed.
     *
     * @param route HTTP route of the requested connection.
     * @param state expected state of the connection or <code>null</code>
     *              if the connection is not expected to carry any state.
     * @param connectTimeout connect timeout.
     * @param connectionRequestTimeout  connection request timeout.
     * @param timeUnit time unit of the previous two timeout values.
     * @param callback future callback.
     */
    Future<NHttpClientConnection> requestConnection(
            HttpRoute route,
            Object state,
            long connectTimeout,
            long connectionRequestTimeout,
            TimeUnit timeUnit,
            FutureCallback<NHttpClientConnection> callback);

它調用了PoolingNHttpClientConnectionManager的實現

    public Future<NHttpClientConnection> requestConnection(
            final HttpRoute route,
            final Object state,
            final long connectTimeout,
            final long leaseTimeout,
            final TimeUnit tunit,
            final FutureCallback<NHttpClientConnection> callback) {
        Args.notNull(route, "HTTP route");
        if (this.log.isDebugEnabled()) {
            this.log.debug("Connection request: " + format(route, state) + formatStats(route));
        }
        final BasicFuture<NHttpClientConnection> future = new BasicFuture<NHttpClientConnection>(callback);
        final HttpHost host;
        if (route.getProxyHost() != null) {
            host = route.getProxyHost();
        } else {
            host = route.getTargetHost();
        }
        final SchemeIOSessionStrategy sf = this.iosessionFactoryRegistry.lookup(
                host.getSchemeName());
        if (sf == null) {
            future.failed(new UnsupportedSchemeException(host.getSchemeName() +
                    " protocol is not supported"));
            return future;
        }
        this.pool.lease(route, state,
                connectTimeout, leaseTimeout, tunit != null ? tunit : TimeUnit.MILLISECONDS,
                new InternalPoolEntryCallback(future)); // 這里就是實際運用連接池的地方
        return future;
    }

看 AbstractNIOConnPool 的 lease 方法

public Future<E> lease(
            final T route, final Object state,
            final long connectTimeout, final long leaseTimeout, final TimeUnit tunit,
            final FutureCallback<E> callback) {
        Args.notNull(route, "Route");
        Args.notNull(tunit, "Time unit");
        Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
        final BasicFuture<E> future = new BasicFuture<E>(callback);
        this.lock.lock(); // 同步
        try {
            final long timeout = connectTimeout > 0 ? tunit.toMillis(connectTimeout) : 0;
            final LeaseRequest<T, C, E> request = new LeaseRequest<T, C, E>(route, state, timeout, leaseTimeout, future);
            final boolean completed = processPendingRequest(request); // 1) 獲取連接的方法
            if (!request.isDone() && !completed) {  // 2) 因為連接池滿而不能馬上獲得連接的的, 加入到一個leasing的LinkedList中, 他會在后續的某些操作中被取出來重新嘗試連接發送請求
                this.leasingRequests.add(request);
            }
            if (request.isDone()) {  // 3) 已經完成連接動作(注意是連接動作完成, 不是請求完成獲得響應, 這里的連接完成包括從連接池獲取到連接, 或者是因為異常request被設置為fail)的請求, 加入到一個ConcurrentLinkedQueue中, 這個隊列的唯一作用就是標記連接完成以后, 調用fireCallBack方法會從里面把這些連接完成的request做一遍回調處理
                this.completedRequests.add(request);
            }
        } finally {
            this.lock.unlock();
        }
        fireCallbacks();
        return future;
    }

 

這里主要涉及到連接池 AbstractNIOConnPool 以及連接池下得實際存儲連接的 RouteSpecificPool, 

然后開始分析連接流程

private boolean processPendingRequest(final LeaseRequest<T, C, E> request) {
        final T route = request.getRoute();
        final Object state = request.getState();
        final long deadline = request.getDeadline();

        final long now = System.currentTimeMillis();
        if (now > deadline) {
            request.failed(new TimeoutException());
            return false;
        }

        final RouteSpecificPool<T, C, E> pool = getPool(route);
        E entry;
        for (;;) { // 租借連接池連接
            entry = pool.getFree(state); // getFree即是從available中獲取一個state匹配的連接
            if (entry == null) { // 沒有可用連接退出循環
                break;
            }
            // 清除不可用連接
            if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
                entry.close();
                this.available.remove(entry);
                pool.free(entry, false);
            } else {
                break;
            }
        }
        if (entry != null) { // 找到連接退出
            this.available.remove(entry);
            this.leased.add(entry);
            request.completed(entry);
            onLease(entry);
            return true;
        }

        // 需要新連接的情況
        // New connection is needed
        final int maxPerRoute = getMax(route);
        // 已經分配的連接超出可分配限制
        // Shrink the pool prior to allocating a new connection
        final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);

        // 對連接池進行縮減, 將上次使用的連接關閉並刪除, 直到超出的連接全被清除
        if (excess > 0) {
            for (int i = 0; i < excess; i++) {
                final E lastUsed = pool.getLastUsed(); // 這個方法是取到 available 里的最后一個連接, 也就是說會出現所有連接都被租借出去了的情況, 這樣的話就相當於連接池滿, 到下一步的 if (pool.getAllocatedCount() < maxPerRoute) 即會 false, 最后導致request進入 leasingRequest 列表
                if (lastUsed == null) {
                    break;
                }
                lastUsed.close();
                this.available.remove(lastUsed);
                pool.remove(lastUsed);
            }
        }

        // 已分配連接數 < 最大連接數限制, 開始新建
        if (pool.getAllocatedCount() < maxPerRoute) {
            // 總共被使用的數量等於 正在等待連接數 + 已經租借出去的連接數
            final int totalUsed = this.pending.size() + this.leased.size();
            final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
            if (freeCapacity == 0) {
                return false;
            }
            // 需要注意的是pool里available不為空, 也有可能拿不到可用連接, 因為state不匹配
            final int totalAvailable = this.available.size();
            // 總的available > 連接空位時, 會隨機選擇最后一次使用的連接, 並把它關掉.. 沒搞明白這一步是干嘛用的
            if (totalAvailable > freeCapacity - 1) {
                if (!this.available.isEmpty()) {
                    final E lastUsed = this.available.removeLast();
                    lastUsed.close();
                    final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                    otherpool.remove(lastUsed);
                }
            }

            // 創建連接監視器階段, 創建了一個監時此次請求的監視對象 SessionRequest, 並調用selector的wakeup(), 出發實際的連接操作
            final SocketAddress localAddress;
            final SocketAddress remoteAddress;
            try {
                remoteAddress = this.addressResolver.resolveRemoteAddress(route);
                localAddress = this.addressResolver.resolveLocalAddress(route);
            } catch (final IOException ex) {
                request.failed(ex);
                return false;
            }

       // 重點關注一下這個connect方法
final SessionRequest sessionRequest = this.ioreactor.connect( remoteAddress, localAddress, route, this.sessionRequestCallback); final int timout = request.getConnectTimeout() < Integer.MAX_VALUE ? (int) request.getConnectTimeout() : Integer.MAX_VALUE; sessionRequest.setConnectTimeout(timout); // 加入到總pending集合 this.pending.add(sessionRequest); // 加入到route連接池pending集合 pool.addPending(sessionRequest, request.getFuture()); return true; } else { return false; } } // 檢查最后一個完成的request的結果, 並設置future的狀態 private void fireCallbacks() { LeaseRequest<T, C, E> request; while ((request = this.completedRequests.poll()) != null) { final BasicFuture<E> future = request.getFuture(); final Exception ex = request.getException(); final E result = request.getResult(); if (ex != null) { future.failed(ex); } else if (result != null) { future.completed(result); } else { future.cancel(); } } }

看看DefaultConnectingIOReactor的connect方法

    public SessionRequest connect(
            final SocketAddress remoteAddress,
            final SocketAddress localAddress,
            final Object attachment,
            final SessionRequestCallback callback) {
        Asserts.check(this.status.compareTo(IOReactorStatus.ACTIVE) <= 0,
            "I/O reactor has been shut down");
        final SessionRequestImpl sessionRequest = new SessionRequestImpl(
                remoteAddress, localAddress, attachment, callback);
        sessionRequest.setConnectTimeout(this.config.getConnectTimeout());

        this.requestQueue.add(sessionRequest);
        this.selector.wakeup(); // 去看看wakeup()以后會發生什么事情

        return sessionRequest;
    }

在AbstractMultiworkerIOReactor中有一個execute()方法

 /**
     * Activates the main I/O reactor as well as all worker I/O reactors.
     * The I/O main reactor will start reacting to I/O events and triggering
     * notification methods. The worker I/O reactor in their turn will start
     * reacting to I/O events and dispatch I/O event notifications to the given
     * {@link IOEventDispatch} interface.
     * <p>
     * This method will enter the infinite I/O select loop on
     * the {@link Selector} instance associated with this I/O reactor and used
     * to manage creation of new I/O channels. Once a new I/O channel has been
     * created the processing of I/O events on that channel will be delegated
     * to one of the worker I/O reactors.
     * <p>
     * The method will remain blocked unto the I/O reactor is shut down or the
     * execution thread is interrupted.
     *
     * @see #processEvents(int)
     * @see #cancelRequests()
     *
     * @throws InterruptedIOException if the dispatch thread is interrupted.
     * @throws IOReactorException in case if a non-recoverable I/O error.
     */
    public void execute(
            final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
        Args.notNull(eventDispatch, "Event dispatcher");
        synchronized (this.statusLock) {
            if (this.status.compareTo(IOReactorStatus.SHUTDOWN_REQUEST) >= 0) {
                this.status = IOReactorStatus.SHUT_DOWN;
                this.statusLock.notifyAll();
                return;
            }
            Asserts.check(this.status.compareTo(IOReactorStatus.INACTIVE) == 0,
                    "Illegal state %s", this.status);
            this.status = IOReactorStatus.ACTIVE;
            // Start I/O dispatchers
            for (int i = 0; i < this.dispatchers.length; i++) {
                final BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);
                dispatcher.setExceptionHandler(exceptionHandler);
                this.dispatchers[i] = dispatcher;
            }
            for (int i = 0; i < this.workerCount; i++) {
                final BaseIOReactor dispatcher = this.dispatchers[i];
                this.workers[i] = new Worker(dispatcher, eventDispatch);
                this.threads[i] = this.threadFactory.newThread(this.workers[i]);
            }
        }
        try {

            // 啟動所有worker線程, 連接的事情是交給 AbstractMultiworkerIOReactor 來做的, 但是連接成功后的事情則是交給 AbstractIOReactor Worker 線程來處理, 前者類似於 bossGroup, 后者類似於 workerGroup
            for (int i = 0; i < this.workerCount; i++) {
                if (this.status != IOReactorStatus.ACTIVE) {
                    return;
                }
                this.threads[i].start();
            }

            // 使用無限循環監聽事件
            for (;;) {
                final int readyCount;
                try {
                    // 阻塞, 直到超時或者調用 wakeup()
                    readyCount = this.selector.select(this.selectTimeout);
                } catch (final InterruptedIOException ex) {
                    throw ex;
                } catch (final IOException ex) {
                    throw new IOReactorException("Unexpected selector failure", ex);
                }
                
                // 如果有需要處理的事件, 則進入processEvents流程, 實際的連接過程就在這里
                if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) {
                    processEvents(readyCount);
                }

                // Verify I/O dispatchers
                for (int i = 0; i < this.workerCount; i++) {
                    final Worker worker = this.workers[i];
                    final Exception ex = worker.getException();
                    if (ex != null) {
                        throw new IOReactorException(
                                "I/O dispatch worker terminated abnormally", ex);
                    }
                }

                if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0) {
                    break;
                }
            }

        } catch (final ClosedSelectorException ex) {
            addExceptionEvent(ex);
        } catch (final IOReactorException ex) {
            if (ex.getCause() != null) {
                addExceptionEvent(ex.getCause());
            }
            throw ex;
        } finally {
            doShutdown();
            synchronized (this.statusLock) {
                this.status = IOReactorStatus.SHUT_DOWN;
                this.statusLock.notifyAll();
            }
        }
    }

首次連接的時候, 觸發的是 DefaultConnectingIOReactor 的 processEvents 方法

    @Override
    protected void processEvents(final int readyCount) throws IOReactorException {
        processSessionRequests(); // 這里就是實際連接的地方

        if (readyCount > 0) {
            final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
            for (final SelectionKey key : selectedKeys) {
                
                processEvent(key);

            }
            selectedKeys.clear();
        }

        final long currentTime = System.currentTimeMillis();
        if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
            this.lastTimeoutCheck = currentTime;
            final Set<SelectionKey> keys = this.selector.keys();
            processTimeouts(keys);
        }
    }

private void processSessionRequests() throws IOReactorException {
        SessionRequestImpl request;
        // wakeup 一次將隊列的所有request處理(發起連接)掉
        while ((request = this.requestQueue.poll()) != null) {
            if (request.isCompleted()) {
                continue;
            }
            final SocketChannel socketChannel;
            try {
                socketChannel = SocketChannel.open();
            } catch (final IOException ex) {
                throw new IOReactorException("Failure opening socket", ex);
            }
            try {
                socketChannel.configureBlocking(false);
                validateAddress(request.getLocalAddress());
                validateAddress(request.getRemoteAddress());

                if (request.getLocalAddress() != null) {
                    final Socket sock = socketChannel.socket();
                    sock.setReuseAddress(this.config.isSoReuseAddress());
                    sock.bind(request.getLocalAddress());
                }
                prepareSocket(socketChannel.socket());
                final boolean connected = socketChannel.connect(request.getRemoteAddress());
                if (connected) { // 馬上連接成功, 處理下一個
                    final ChannelEntry entry = new ChannelEntry(socketChannel, request);
                    addChannel(entry);
                    continue;
                }
            } catch (final IOException ex) {
                closeChannel(socketChannel);
                request.failed(ex);
                return;
            }

            // 還未連接成功, 則注冊到selector, 等待connect事件的觸發, 再用processEvent來處理
            final SessionRequestHandle requestHandle = new SessionRequestHandle(request);
            try {
                final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT,
                        requestHandle);
                request.setKey(key);
            } catch (final IOException ex) {
                closeChannel(socketChannel);
                throw new IOReactorException("Failure registering channel " +
                        "with the selector", ex);
            }
        }
    }

    // 這個方法是連接成功以后注冊channel的方法
    private void processEvent(final SelectionKey key) {
        try {

            if (key.isConnectable()) {

                final SocketChannel channel = (SocketChannel) key.channel();
                // Get request handle
                final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
                final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();

                // Finish connection process
                try {
                    channel.finishConnect();
                } catch (final IOException ex) {
                    sessionRequest.failed(ex);
                }
                key.cancel();
                key.attach(null);
                if (!sessionRequest.isCompleted()) {
                    // 注冊新channel, 這些channel后來會被worker線程處理, 他們來進行io讀寫
                    addChannel(new ChannelEntry(channel, sessionRequest));
                } else {
                    try {
                        channel.close();
                    } catch (IOException ignore) {
                    }
                }
            }

        } catch (final CancelledKeyException ex) {
            final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
            key.attach(null);
            if (requestHandle != null) {
                final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
                if (sessionRequest != null) {
                    sessionRequest.cancel();
                }
            }
        }
    }

接下來看連接成功后的IOReactor如何處理, 如下 BaseIOReactor 的 execute 方法

 

 /**
     * Activates the I/O reactor. The I/O reactor will start reacting to
     * I/O events and triggering notification methods.
     * <p>
     * This method will enter the infinite I/O select loop on
     * the {@link Selector} instance associated with this I/O reactor.
     * <p>
     * The method will remain blocked unto the I/O reactor is shut down or the
     * execution thread is interrupted.
     *
     * @see #acceptable(SelectionKey)
     * @see #connectable(SelectionKey)
     * @see #readable(SelectionKey)
     * @see #writable(SelectionKey)
     * @see #timeoutCheck(SelectionKey, long)
     * @see #validate(Set)
     * @see #sessionCreated(SelectionKey, IOSession)
     * @see #sessionClosed(IOSession)
     *
     * @throws InterruptedIOException if the dispatch thread is interrupted.
     * @throws IOReactorException in case if a non-recoverable I/O error.
     */
    protected void execute() throws InterruptedIOException, IOReactorException {
        this.status = IOReactorStatus.ACTIVE;

        try {
            for (;;) {

                final int readyCount;
                try {
                    readyCount = this.selector.select(this.selectTimeout);
                } catch (final InterruptedIOException ex) {
                    throw ex;
                } catch (final IOException ex) {
                    throw new IOReactorException("Unexpected selector failure", ex);
                }

                if (this.status == IOReactorStatus.SHUT_DOWN) {
                    // Hard shut down. Exit select loop immediately
                    break;
                }

                if (this.status == IOReactorStatus.SHUTTING_DOWN) {
                    // Graceful shutdown in process
                    // Try to close things out nicely
                    closeSessions();
                    closeNewChannels();
                }

                // Process selected I/O events
                if (readyCount > 0) {
                    processEvents(this.selector.selectedKeys());
                }

                // Validate active channels
                validate(this.selector.keys());

                // Process closed sessions
                processClosedSessions();

                // If active process new channels
                if (this.status == IOReactorStatus.ACTIVE) {
                    processNewChannels();
                }

                // Exit select loop if graceful shutdown has been completed
                if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0
                        && this.sessions.isEmpty()) {
                    break;
                }

                if (this.interestOpsQueueing) {
                    // process all pending interestOps() operations
                    processPendingInterestOps();
                }

            }

        } catch (final ClosedSelectorException ignore) {
        } finally {
            hardShutdown();
            synchronized (this.statusMutex) {
                this.statusMutex.notifyAll();
            }
        }
    }

    private void processEvents(final Set<SelectionKey> selectedKeys) {
        for (final SelectionKey key : selectedKeys) {

            processEvent(key);

        }
        selectedKeys.clear();
    }

    /**
     * Processes new event on the given selection key.
     *
     * @param key the selection key that triggered an event.
     */
    protected void processEvent(final SelectionKey key) {
        final IOSessionImpl session = (IOSessionImpl) key.attachment();
        try {
            if (key.isAcceptable()) {
                acceptable(key);
            }
            if (key.isConnectable()) {
                connectable(key);
            }
            if (key.isReadable()) {
                session.resetLastRead();
                readable(key);
            }
            if (key.isWritable()) {
                session.resetLastWrite();
                writable(key);
            }
        } catch (final CancelledKeyException ex) {
            queueClosedSession(session);
            key.attach(null);
        }
    }

這個就跟AbstractMultiworkerIOReactor類似, 只不過兩個人的興趣集事件不太一樣, 看看 AbstractIOReactor 的實現類 BaseIOReactor就知道了

/**
     * This I/O reactor implementation does not react to the
     * {@link SelectionKey#OP_ACCEPT} event.
     * <p>
     * Super-classes can override this method to react to the event.
     */
    @Override
    protected void acceptable(final SelectionKey key) {
    }

    /**
     * This I/O reactor implementation does not react to the
     * {@link SelectionKey#OP_CONNECT} event.
     * <p>
     * Super-classes can override this method to react to the event.
     */
    @Override
    protected void connectable(final SelectionKey key) {
    }

    /**
     * Processes {@link SelectionKey#OP_READ} event on the given selection key.
     * This method dispatches the event notification to the
     * {@link IOEventDispatch#inputReady(IOSession)} method.
     */
    @Override
    protected void readable(final SelectionKey key) {
        final IOSession session = getSession(key);
        try {
            this.eventDispatch.inputReady(session);
            if (session.hasBufferedInput()) {
                this.bufferingSessions.add(session);
            }
        } catch (final CancelledKeyException ex) {
            queueClosedSession(session);
            key.attach(null);
        } catch (final RuntimeException ex) {
            handleRuntimeException(ex);
        }
    }

    /**
     * Processes {@link SelectionKey#OP_WRITE} event on the given selection key.
     * This method dispatches the event notification to the
     * {@link IOEventDispatch#outputReady(IOSession)} method.
     */
    @Override
    protected void writable(final SelectionKey key) {
        final IOSession session = getSession(key);
        try {
            this.eventDispatch.outputReady(session);
        } catch (final CancelledKeyException ex) {
            queueClosedSession(session);
            key.attach(null);
        } catch (final RuntimeException ex) {
            handleRuntimeException(ex);
        }
    }

它實際上只會處理 read 跟 write 事件

 

這里特別注意一下 AbstractNIOConnPool requestComplete 並不是整個請求結束, 而是連接成功的意思, 看看他的調用的地方和觸發的東西

最開始他是從 AbstractIOReactor 的processNewChannels中來的, 這個方法在execute里被觸發

rivate void processNewChannels() throws IOReactorException {
        ChannelEntry entry;
        while ((entry = this.newChannels.poll()) != null) { // 記得上面連接成功后調用addChannel加到的這個隊列, 現在取出來

            final SocketChannel channel;
            final SelectionKey key;
            try {
                channel = entry.getChannel();
                channel.configureBlocking(false);
                key = channel.register(this.selector, SelectionKey.OP_READ);
            } catch (final ClosedChannelException ex) {
                final SessionRequestImpl sessionRequest = entry.getSessionRequest();
                if (sessionRequest != null) {
                    sessionRequest.failed(ex);
                }
                return;

            } catch (final IOException ex) {
                throw new IOReactorException("Failure registering channel " +
                        "with the selector", ex);
            }

            final SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() {

                public void sessionClosed(final IOSession session) {
                    queueClosedSession(session);
                }

            };

            InterestOpsCallback interestOpsCallback = null;
            if (this.interestOpsQueueing) {
                interestOpsCallback = new InterestOpsCallback() {

                    public void addInterestOps(final InterestOpEntry entry) {
                        queueInterestOps(entry);
                    }

                };
            }

            final IOSession session;
            try {
                session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback);
                int timeout = 0;
                try {
                    timeout = channel.socket().getSoTimeout();
                } catch (final IOException ex) {
                    // Very unlikely to happen and is not fatal
                    // as the protocol layer is expected to overwrite
                    // this value anyways
                }

                session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment());
                session.setSocketTimeout(timeout);
            } catch (final CancelledKeyException ex) {
                continue;
            }
            try {
                this.sessions.add(session);
                final SessionRequestImpl sessionRequest = entry.getSessionRequest();
                if (sessionRequest != null) {
                    // 就是在這里調用了completed, 最后進入連接池的completed方法
                    sessionRequest.completed(session);
                }
                key.attach(session);
                sessionCreated(key, session);
            } catch (final CancelledKeyException ex) {
                queueClosedSession(session);
                key.attach(null);
            }
        }
    }

看看連接池 AbstractNIOConnPool 的completed 方法做了什么

protected void requestCompleted(final SessionRequest request) {
        if (this.isShutDown.get()) {
            return;
        }
        @SuppressWarnings("unchecked")
        final
        T route = (T) request.getAttachment();
        this.lock.lock();
        try {
            this.pending.remove(request); // 從peding列表中去掉這個連接
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            final IOSession session = request.getSession();
            try {
                final C conn = this.connFactory.create(route, session);
                final E entry = pool.createEntry(request, conn);
                this.leased.add(entry); // 連接加入到被租借集合
                pool.completed(request, entry); // 調用 perRoute 連接池的complte
                onLease(entry); // 這個 onLease 就是設置了一下超時時間
            } catch (final IOException ex) {
                pool.failed(request, ex);
            }
        } finally {
            this.lock.unlock();
        }
        fireCallbacks();
    }

主要做的事情就是設置了一下連接的sotimeout, 還有就是將連接從 pending列表移到了 lease 集合

最后, 來看看 連接是如何歸還的

通過對AbstractNIOConnPool的release方法的跟蹤, 最后找了還是在 BaseIOReactor 監聽到 readable 時間的時候, 調用了HttpAsyncRequestExecutor的inputReady

方法

    public void inputReady(
            final NHttpClientConnection conn,
            final ContentDecoder decoder) throws IOException, HttpException {
        final State state = ensureNotNull(getState(conn));
        final HttpAsyncClientExchangeHandler handler = ensureNotNull(getHandler(conn));
        handler.consumeContent(decoder, conn);
        state.setResponseState(MessageState.BODY_STREAM);
        if (decoder.isCompleted()) { // 檢測到內容已結束, 進入complete流程
            processResponse(conn, state, handler);
        }
    }

請求完成階段

當檢測到response已經完了, 就會進入complete流程, 最后回到releaseConnection流程, 最后到達連接池的release

public void release(final E entry, final boolean reusable) {
        if (entry == null) {
            return;
        }
        if (this.isShutDown.get()) {
            return;
        }
        this.lock.lock();
        try {
            if (this.leased.remove(entry)) { // 從租借集合中刪除
                final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
                pool.free(entry, reusable); // 重新加入到pool的available中
                if (reusable) {
                    this.available.addFirst(entry); // 加入到available
                    onRelease(entry); // 重新這是soTimeout
                } else {
                    entry.close();
                }
                processNextPendingRequest(); // 處理下一個在leasing隊列中等待的請求
            }
        } finally {
            this.lock.unlock();
        }
        fireCallbacks();
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM