JDK HttpClient客戶端的構建和啟動


HttpClient客戶端的構建和啟動

1. 簡述

上篇簡單測試過,Http Client 的性能相對高效。那么,這樣一個客戶端,又是怎樣構建的呢?短短的構建代碼,背后又發生了什么呢?

簡而言之,HttpClient的構建使用了建造者模式,在構建時同時產生了HttpClient的兩個實現類對應的對象:外觀代理對象HttpclientFacade和真正的實現對象HttpclientImpl,兩者互相引用彼此,一切對客戶端的行為都首先作用在外觀對象上,再交由實現對象來處理。在構建過程中,HttpclientImpl會創建一個作為NIO中的選擇器管理者的守護線程,負責對各類I/O事件進行輪詢,並分發相關事件進行處理。

理解本文,需要JAVA NIO相關知識。

本文所指的[HTTPClient] 都是指 JDK11 開始內置的HTTPClient及相關類。源代碼分析基於JDK17。

2. HttpClient客戶端構建:建造者模式、代理模式

先回顧下HttpClient客戶端的構建代碼:

HttpClient client = HttpClient.newBuilder()
        .executor(executor) //對建造者進行參數化
        .build();

可以看到,這里使用了典型的建造者模式(Builder Pattern)。使用建造者模式的好處是:Httpclient相對復雜的構建過程被隱藏了起來,使用者無需知道具體的構建細節。

在newBuilder()被調用后,產生一個HttpclientBuilderImpl的實例,之后可以鏈式地對該建造者參數化,最后通過build()方法,構建出需要的HttpClient實現類。

那么,build()執行時,發生了什么呢?我們走進build()方法的源碼:

	@Override
    public HttpClient build() {
        return HttpClientImpl.create(this);
    }

可以看到,建造者將自身作為HttpClientImpl的靜態方法create()的入參,將最后的構建過程交給了HttpClientImpl,我們跟隨進入。

/**
 * Client implementation. Contains all configuration information and also
 * the selector manager thread which allows async events to be registered
 * and delivered when they occur. See AsyncEvent.
 */
final class HttpClientImpl extends HttpClient implements Trackable {
    
    static final AtomicLong CLIENT_IDS = new AtomicLong();

    //此處列出大部分成員變量
    
    private final CookieHandler cookieHandler;
    private final Duration connectTimeout;
    private final Redirect followRedirects;
    private final ProxySelector userProxySelector;
    private final ProxySelector proxySelector;
    private final Authenticator authenticator;
    private final Version version;
    private final ConnectionPool connections;
    private final DelegatingExecutor delegatingExecutor;
    private final boolean isDefaultExecutor;
    // Security parameters
    private final SSLContext sslContext;
    private final SSLParameters sslParams;
    private final SelectorManager selmgr;
    private final FilterFactory filters;
    //HttpClient2客戶端
    private final Http2ClientImpl client2;
    private final long id;
    private final String dbgTag;
    
    private final SSLDirectBufferSupplier sslBufferSupplier
            = new SSLDirectBufferSupplier(this);
    //對HttpClient外觀實現類的弱引用
    private final WeakReference<HttpClientFacade> facadeRef;
	
    //對待處理的請求的計數
    private final AtomicLong pendingOperationCount = new AtomicLong();
    private final AtomicLong pendingWebSocketCount = new AtomicLong();
    private final AtomicLong pendingHttpRequestCount = new AtomicLong();
    private final AtomicLong pendingHttp2StreamCount = new AtomicLong();

    /** 過期時間 */
    private final TreeSet<TimeoutEvent> timeouts;
/**
     * This is a bit tricky:
     * 1. an HttpClientFacade has a final HttpClientImpl field.
     * 2. an HttpClientImpl has a final WeakReference<HttpClientFacade> field,
     *    where the referent is the facade created for that instance.
     * 3. We cannot just create the HttpClientFacade in the HttpClientImpl
     *    constructor, because it would be only weakly referenced and could
     *    be GC'ed before we can return it.
     * The solution is to use an instance of SingleFacadeFactory which will
     * allow the caller of new HttpClientImpl(...) to retrieve the facade
     * after the HttpClientImpl has been created.
     */
    //外觀工廠
    private static final class SingleFacadeFactory {
        HttpClientFacade facade;
        HttpClientFacade createFacade(HttpClientImpl impl) {
            assert facade == null;
            return (facade = new HttpClientFacade(impl));
        }
    }

  //我們要分析的方法
    static HttpClientFacade create(HttpClientBuilderImpl builder) {
        //這是個Factory?這是在做什么呢?
        //其實,這里是HttpClient的外觀代理實現類HttpClientFacade的構建工廠。
        SingleFacadeFactory facadeFactory = new SingleFacadeFactory();
        //構建方法的重點:實例化HttpClientImpl。稍后詳細分析
        HttpClientImpl impl = new HttpClientImpl(builder, facadeFactory);
        //啟動NIO選擇子管理者守護線程,接收並響應I/O事件
        impl.start();
        assert facadeFactory.facade != null;
        assert impl.facadeRef.get() == facadeFactory.facade;
        //返回外觀實現類
        return facadeFactory.facade;
    }
    
    //HttpClientImpl的初始化構造方法
    private HttpClientImpl(HttpClientBuilderImpl builder,
                           SingleFacadeFactory facadeFactory) {
    	//此處暫時省略,稍后分析
    }
    
}

可以看到,調用build()方法返回的是一個HttpClientFacade的外觀實現類。看了上面的幾行代碼,語言的力量已經顯得不足。HttpClientFacade是什么?它和HttpClientImpl的關系是什么?SingleFacadeFactory又是什么用途呢?因此,我們通過UML類圖來說明。

分析HttpClient幾個類的關系,我們可以看到,HttpClient抽象類有兩個實現類:

  • 外觀實現類HttpClientFacade(簡稱外觀類)
  • 真正的實現類HttpClientImpl(簡稱實現類)。

由此我們可以看到HttpClientFacade存在的一個目的:作為一個“中介”,提供一個簡單的封裝,它強引用了HttpClientImpl,一切作用在HttpClient上的調用都會被交由HttpClientImpl處理。盡管其名字中帶有Facade(名為外觀)字樣,但實際上承擔的是代理類的角色:這是一個典型的靜態代理模式的運用。

/**
 * An HttpClientFacade is a simple class that wraps an HttpClient implementation
 * and delegates everything to its implementation delegate.
 */
final class HttpClientFacade extends HttpClient implements Trackable {

    final HttpClientImpl impl;

    /**
     * Creates an HttpClientFacade.
     */
    HttpClientFacade(HttpClientImpl impl) {
        this.impl = impl;
    }

    @Override // for tests
    public Tracker getOperationsTracker() {
        return impl.getOperationsTracker();
    }

 //此處省略大批getter代碼。可以看到,HttpClientFacade類中的方法都是對實際的實現類HttpClientImpl的簡單調用

    @Override
    public Optional<Executor> executor() {
        return impl.executor();
    }

    @Override
    public <T> HttpResponse<T>
    send(HttpRequest req, HttpResponse.BodyHandler<T> responseBodyHandler)
        throws IOException, InterruptedException
    {
        try {
            return impl.send(req, responseBodyHandler);
        } finally {
            Reference.reachabilityFence(this);
        }
    }

    @Override
    public <T> CompletableFuture<HttpResponse<T>>
    sendAsync(HttpRequest req, HttpResponse.BodyHandler<T> responseBodyHandler) {
        try {
            return impl.sendAsync(req, responseBodyHandler);
        } finally {
            Reference.reachabilityFence(this);
        }
    }
//省略一些方法

    @Override
    public WebSocket.Builder newWebSocketBuilder() {
        try {
            return impl.newWebSocketBuilder();
        } finally {
            Reference.reachabilityFence(this);
        }
    }

}

那么,實現類為什么要弱引用外觀類呢?我們稍后再看。

我們關注為什么創建外觀對象需要一個外觀工廠(SingleFacadeFactory):正如其英文注釋寫到的這樣:外觀對象維持對實現類對象的強引用,而實現類對象則只維持了對外觀對象的弱引用;如果直接在實現類的構造函數(稍后分析)中創建外觀對象,可能外觀對象會被JVM垃圾收集器直接回收。因此,需要一個工廠對象,維持對外觀對象的引用,直到實現類被完全初始化完成,這樣可以保證始終能返回給調用者可用的外觀對象。

接下來,我們關注實現類HttpClientImpl的構造方法,看看初始化過程中究竟做了什么:

//HttpClientImpl的私有初始化構造方法
private HttpClientImpl(HttpClientBuilderImpl builder,
                           SingleFacadeFactory facadeFactory) {
    id = CLIENT_IDS.incrementAndGet();
    dbgTag = "HttpClientImpl(" + id +")";
    if (builder.sslContext == null) {
        try {
            //初始化默認ssl環境
            sslContext = SSLContext.getDefault();
        } catch (NoSuchAlgorithmException ex) {
            throw new UncheckedIOException(new IOException(ex));
        }
    } else {
        sslContext = builder.sslContext;
    }
    Executor ex = builder.executor;
    if (ex == null) {
        //若沒有自定義線程池,則使用不限大小的線程池
        ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));
        isDefaultExecutor = true;
    } else {
        isDefaultExecutor = false;
    }
    delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);
    //外觀HttpClient類弱引用初始化
    facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
    //初始化Http2專屬的client
    client2 = new Http2ClientImpl(this);
    //cookie處理器、連接超時事件和重定向策略初始化和默認值設置
    cookieHandler = builder.cookieHandler;
    connectTimeout = builder.connectTimeout;
    //默認不跟隨服務器的重定向請求
    followRedirects = builder.followRedirects == null ?
        Redirect.NEVER : builder.followRedirects;
    this.userProxySelector = builder.proxy;
    //若沒有設置代理,使用默認的代理選擇器
    this.proxySelector = Optional.ofNullable(userProxySelector)
        .orElseGet(HttpClientImpl::getDefaultProxySelector);
    if (debug.on())
        debug.log("proxySelector is %s (user-supplied=%s)",
                  this.proxySelector, userProxySelector != null);
    authenticator = builder.authenticator;
    //先初始化為Http2版本的客戶端,之后針對請求會有降級的操作
    if (builder.version == null) {
        version = HttpClient.Version.HTTP_2;
    } else {
        version = builder.version;
    }
    //設置默認的ssl參數
    if (builder.sslParams == null) {
        sslParams = getDefaultParams(sslContext);
    } else {
        sslParams = builder.sslParams;
    }
    //連接池的初始化
    connections = new ConnectionPool(id);
    connections.start();
    //超時時間treeSet的初始化
    timeouts = new TreeSet<>();
    try {
        /*
            重點!此處初始化了一個SelectorManager的守護線程。
            該線程負責向操作系統輪詢各類事件,並在事件發生時派發事件
            */
        selmgr = new SelectorManager(this);
    } catch (IOException e) {
        // unlikely
        throw new UncheckedIOException(e);
    }
    selmgr.setDaemon(true);
    //初始化請求頭過濾器,包括重定向、認證和cookie管理器(若有)
    filters = new FilterFactory();
    initFilters();
    assert facadeRef.get() != null;
}

//此方法在上面分析過的create()靜態方法中調用,
//啟動選擇子管理者守護線程
private void start() {
    selmgr.start();
}

//此處省略大量代碼

//過濾器初始化,此處可以看到只添加了實現類到過濾器鏈表中,這是處於懶加載的考慮
//這里有個添加順序的問題,后篇會稍作分析
private void initFilters() {
    addFilter(AuthenticationFilter.class);
    addFilter(RedirectFilter.class);
    if (this.cookieHandler != null) {
        addFilter(CookieFilter.class);
    }
}

可以看到,HttpClientImpl對象的構建過程相對復雜:

  • 進行了客戶端策略的設置和默認值填充
  • 初始化了線程池(ex)和連接池(connections)
  • 初始化了請求頭過濾器(懶加載)
  • 同時,初始化了一個SelectorManager的守護線程,並直接以新的線程方式啟動。

那么,這個SelectorManager是何方神聖,扮演了怎樣的作用?從名字中,你可能已經猜出,它是NIO編程中的選擇器的管理者。

3. 選擇器線程的運行

選擇器管理者的主要行為可以如下概括:向系統輪詢發生的各類I/O事件,並調用事件自身的方法進行分發處理

3.1 源碼分析

我們進入SelectorManager的源碼,重點關注主方法run():

//SelectorManager,HttpClientImpl的內部類,可直接作為線程啟動,負責向系統輪詢並派發事件	
// Main loop for this client's selector
    private final static class SelectorManager extends Thread {

        // 控制選擇器在沒有事件發生時的喚醒時間相關變量
        private static final int MIN_NODEADLINE = 1000; // ms
        private static final int MAX_NODEADLINE = 1000 * 1200; // ms
        private static final int DEF_NODEADLINE = 3000; // ms
        private static final long NODEADLINE; // default is DEF_NODEADLINE ms
        static {
            // ensure NODEADLINE is initialized with some valid value.
            long deadline =  Utils.getIntegerProperty(
                "jdk.internal.httpclient.selectorTimeout",
                DEF_NODEADLINE); // millis
            if (deadline <= 0) deadline = DEF_NODEADLINE;
            deadline = Math.max(deadline, MIN_NODEADLINE);
            NODEADLINE = Math.min(deadline, MAX_NODEADLINE);
        }

        //JAVA NIO中的選擇器,負責向操作系統輪詢事件
        private final Selector selector;
        private volatile boolean closed;
        //注冊和解掛相關的事件
        private final List<AsyncEvent> registrations;
        private final List<AsyncTriggerEvent> deregistrations;
        private final Logger debug;
        private final Logger debugtimeout;
        HttpClientImpl owner;
        ConnectionPool pool;

        //構造方法
        SelectorManager(HttpClientImpl ref) throws IOException {
            super(null, null,
                  "HttpClient-" + ref.id + "-SelectorManager",
                  0, false);
            owner = ref;
            debug = ref.debug;
            debugtimeout = ref.debugtimeout;
            pool = ref.connectionPool();
            registrations = new ArrayList<>();
            deregistrations = new ArrayList<>();
            selector = Selector.open();
        }

        void eventUpdated(AsyncEvent e) throws ClosedChannelException {
          	//添加事件到注冊事件列表,並喚醒選擇器,省略
        }
        
        // This returns immediately. So caller not allowed to send/receive
        // on connection.
        //注冊事件方法,將時間添加到注冊時間列表等待選擇器處理,並喚醒選擇器
        //客戶端會間接調用此方法添加事件
        synchronized void register(AsyncEvent e) {
            registrations.add(e);
            selector.wakeup();
        }

        synchronized void cancel(SocketChannel e) {
            //該方法沒有被調用?忽略
        }

        //喚醒選擇器
        void wakeupSelector() {
            selector.wakeup();
        }

        synchronized void shutdown() {
            //關閉選擇器,連接和線程池,暫時省略
        }

        /*
       	SelectorManager線程的主要運行方法
        */
        @Override
        public void run() {
            List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
            //初始化就緒事件和重置事件列表
            List<AsyncEvent> readyList = new ArrayList<>();
            List<Runnable> resetList = new ArrayList<>();
            try {
                if (Log.channel()) Log.logChannel(getName() + ": starting");
                //開啟無限循環
                while (!Thread.currentThread().isInterrupted()) {
                    synchronized (this) {
                        assert errorList.isEmpty();
                        assert readyList.isEmpty();
                        assert resetList.isEmpty();
                        //首先處理要注銷的事件,然后清空注銷事件列表
                        for (AsyncTriggerEvent event : deregistrations) {
                            event.handle();
                        }
                        deregistrations.clear();
                        //處理注冊事件列表中的事件
                        for (AsyncEvent event : registrations) {
                            //AsyncTriggerEvent無需注冊到通道上,直接加入待處理列表
                            if (event instanceof AsyncTriggerEvent) {
                                readyList.add(event);
                                continue;
                            }
                            //從事件中獲取事件維護的NIO channel通道
                            SelectableChannel chan = event.channel();
                            SelectionKey key = null;
                            try {
                                //獲取通道對應的選擇鍵(是連接建立時,通道綁定到選擇器上分配的)
                                key = chan.keyFor(selector);
                                SelectorAttachment sa;
                                if (key == null || !key.isValid()) {
                                    if (key != null) {
                                        // key is canceled.
                                        // invoke selectNow() to purge it
                                        // before registering the new event.
                                        selector.selectNow();
                                    }
                                    sa = new SelectorAttachment(chan, selector);
                                } else {
                                    //獲取綁定到選擇器上的附件(也是通道注冊到選擇器上時附帶的)
                                    //稍后將看到,該附件維護了通道、選擇器、通道上的待處理事件列表的關系,
                                    //扮演着異步事件中轉站的角色
                                    sa = (SelectorAttachment) key.attachment();
                                }
                                //添加事件到附件中的待處理列表(pending屬性),並將通道重新向選擇器注冊
                                // may throw IOE if channel closed: that's OK
                                sa.register(event);
                                if (!chan.isOpen()) {
                                    throw new IOException("Channel closed");
                                }
                            } catch (IOException e) {
                                Log.logTrace("{0}: {1}", getName(), e);
                                if (debug.on())
                                    debug.log("Got " + e.getClass().getName()
                                              + " while handling registration events");
                                chan.close();
                                //發生I/O錯誤的情況下,將事件加入錯誤列表,並取消選擇鍵
                                // let the event abort deal with it
                                errorList.add(new Pair<>(event, e));
                                if (key != null) {
                                    key.cancel();
                                    selector.selectNow();
                                }
                            }
                        }
                        registrations.clear();
                        selector.selectedKeys().clear();
                    }
                    // 處理 加入到列表的AsyncTriggerEvent
                    for (AsyncEvent event : readyList) {
                        assert event instanceof AsyncTriggerEvent;
                        event.handle();
                    }
                    readyList.clear();

                    for (Pair<AsyncEvent,IOException> error : errorList) {
                        // an IOException was raised and the channel closed.
                        handleEvent(error.first, error.second);
                    }
                    errorList.clear();

                    //當客戶端不再被引用時,結束選擇器的運行
                    // Check whether client is still alive, and if not,
                    // gracefully stop this thread
                    if (!owner.isReferenced()) {
                        Log.logTrace("{0}: {1}",
                                getName(),
                                "HttpClient no longer referenced. Exiting...");
                        return;
                    }

                    //下面是一些對選擇器select方法的阻塞時長的計算
                    long nextTimeout = owner.purgeTimeoutsAndReturnNextDeadline();
                    if (debugtimeout.on())
                        debugtimeout.log("next timeout: %d", nextTimeout);

                    long nextExpiry = pool.purgeExpiredConnectionsAndReturnNextDeadline();
                    if (debugtimeout.on())
                        debugtimeout.log("next expired: %d", nextExpiry);
                    assert nextTimeout >= 0;
                    assert nextExpiry >= 0;
                    if (nextTimeout <= 0) nextTimeout = NODEADLINE;
                    if (nextExpiry <= 0) nextExpiry = NODEADLINE;
                    else nextExpiry = Math.min(NODEADLINE, nextExpiry);
                    long millis = Math.min(nextExpiry, nextTimeout);
                    if (debugtimeout.on())
                        debugtimeout.log("Next deadline is %d",
                                         (millis == 0 ? NODEADLINE : millis));
                    /*selector的select方法:負責向操作系統輪詢阻塞的事件。若在millis毫秒后,
                    沒有任何通道有就緒事件發生,該方法會在millis毫秒后返回0;否則阻塞過程中,
                    當有至少1個通道有事件發生時返回,返回有事件發生的通道的數量。*/
                    int n = selector.select(millis == 0 ? NODEADLINE : millis);
                    if (n == 0) {
                        //如果沒有事件發生,看看外觀客戶端是否還被引用,否則退出方法
                        // Check whether client is still alive, and if not,
                        // gracefully stop this thread
                        if (!owner.isReferenced()) {
                            Log.logTrace("{0}: {1}",
                                    getName(),
                                    "HttpClient no longer referenced. Exiting...");
                            return;
                        }
                        //清楚連接池中的超時連接
                        owner.purgeTimeoutsAndReturnNextDeadline();
                        continue;
                    }

                    //返回有就緒的事件的通道的選擇鍵
                    Set<SelectionKey> keys = selector.selectedKeys();
                    assert errorList.isEmpty();
                    
                    /*
                    這一步是關鍵:
                    遍歷有事件I/O事件發生的選擇鍵,取出對應的“附件”,
                    匹配篩選附件上事件列表中 感興趣的事件類型和發生的I/O事件類型相符合的 事件,
                    異步處理這些事件,並將它們從選擇鍵中的待辦事件列表中刪除
                    */
                    for (SelectionKey key : keys) {
                        SelectorAttachment sa = (SelectorAttachment) key.attachment();
                        //處理選擇鍵失效的情況(鍵被取消,通道關閉或選擇器關閉)的情況
                        if (!key.isValid()) {
                            IOException ex = sa.chan.isOpen()
                                    ? new IOException("Invalid key")
                                    : new ClosedChannelException();
                            sa.pending.forEach(e -> errorList.add(new Pair<>(e,ex)));
                            sa.pending.clear();
                            continue;
                        }

                        int eventsOccurred;
                        try {
                            eventsOccurred = key.readyOps();
                        } catch (CancelledKeyException ex) {
                            //處理選擇鍵被取消的情況
                            IOException io = Utils.getIOException(ex);
                            sa.pending.forEach(e -> errorList.add(new Pair<>(e,io)));
                            sa.pending.clear();
                            continue;
                        }
                        //從附件的等待列表中去除待處理的事件(上一個大的for循環中加入的),加入待處理列表
                        sa.events(eventsOccurred).forEach(readyList::add);
                        //將被保存在該“附件”的等待事件列表內的、操作類型和給定的感興趣操作相符合的 
                        //待處理事件從該附件的等待列表中移除
                        resetList.add(() -> sa.resetInterestOps(eventsOccurred));
                    }

                    selector.selectNow(); // complete cancellation
                    selector.selectedKeys().clear();

                    // handle selected events  處理待處理事件
                    readyList.forEach((e) -> handleEvent(e, null));
                    readyList.clear();

                    // handle errors (closed channels etc...) 處理錯誤
                    errorList.forEach((p) -> handleEvent(p.first, p.second));
                    errorList.clear();

                    // reset interest ops for selected channels 
                    resetList.forEach(r -> r.run());
                    resetList.clear();

                }
            } catch (Throwable e) {
                if (!closed) {
                    // This terminates thread. So, better just print stack trace
                    String err = Utils.stackTrace(e);
                    Log.logError("{0}: {1}: {2}", getName(),
                            "HttpClientImpl shutting down due to fatal error", err);
                }
                if (debug.on()) debug.log("shutting down", e);
                if (Utils.ASSERTIONSENABLED && !debug.on()) {
                    e.printStackTrace(System.err); // always print the stack
                }
            } finally {
                if (Log.channel()) Log.logChannel(getName() + ": stopping");
                shutdown();
            }
        }

下面是SelectorManager類的源碼,同樣是HttpClientImpl的內部類。該類在通道注冊到選擇器上時作為“附件”附着到選擇鍵上,在事件發生時可以取出來。該類的作用是管理多個針對同一個選擇鍵的注冊行為。其相當於一個”中轉站“的作用。

/**
 * Tracks multiple user level registrations associated with one NIO
 * registration (SelectionKey). In this implementation, registrations
 * are one-off and when an event is posted the registration is cancelled
 * until explicitly registered again.
 *
 * <p> No external synchronization required as this class is only used
 * by the SelectorManager thread. One of these objects required per
 * connection.
 */
private static class SelectorAttachment {
    private final SelectableChannel chan;
    private final Selector selector;
    private final Set<AsyncEvent> pending;
    private final static Logger debug =
            Utils.getDebugLogger("SelectorAttachment"::toString, Utils.DEBUG);
    private int interestOps;

    SelectorAttachment(SelectableChannel chan, Selector selector) {
        this.pending = new HashSet<>();
        this.chan = chan;
        this.selector = selector;
    }

    void register(AsyncEvent e) throws ClosedChannelException {
        int newOps = e.interestOps();
        //判斷是否重新注冊
        boolean reRegister = (interestOps & newOps) != newOps;
        interestOps |= newOps;
        pending.add(e);
        if (debug.on())
            debug.log("Registering %s for %d (%s)", e, newOps, reRegister);
        if (reRegister) {
            // first time registration happens here also
            try {
                //將通道注冊到選擇器上,會更新感興趣的事件,並將自身“附着”到選擇器上
                chan.register(selector, interestOps, this);
            } catch (Throwable x) {
                abortPending(x);
            }
        } else if (!chan.isOpen()) {
            abortPending(new ClosedChannelException());
        }
    }

    /**篩選出操作類型與該通道感興趣的操作重合的待處理事件
     * Returns a Stream<AsyncEvents> containing only events that are
     * registered with the given {@code interestOps}.
     */
    Stream<AsyncEvent> events(int interestOps) {
        return pending.stream()
                .filter(ev -> (ev.interestOps() & interestOps) != 0);
    }

    /**
        將被保存在該“附件”的等待事件列表內的、操作類型和給定的感興趣操作相符合的 待處理事件
        從該附件的等待列表中移除
     * Removes any events with the given {@code interestOps}, and if no
     * events remaining, cancels the associated SelectionKey.
     */
    void resetInterestOps(int interestOps) {
        int newOps = 0;

        Iterator<AsyncEvent> itr = pending.iterator();
        while (itr.hasNext()) {
            AsyncEvent event = itr.next();
            int evops = event.interestOps();
            if (event.repeating()) {
                newOps |= evops;
                continue;
            }
            if ((evops & interestOps) != 0) {
                itr.remove();
            } else {
                newOps |= evops;
            }
        }

        this.interestOps = newOps;
        SelectionKey key = chan.keyFor(selector);
        if (newOps == 0 && key != null && pending.isEmpty()) {
            key.cancel();
        } else {
            try {
                if (key == null || !key.isValid()) {
                    throw new CancelledKeyException();
                }
                key.interestOps(newOps);
                // double check after
                if (!chan.isOpen()) {
                    abortPending(new ClosedChannelException());
                    return;
                }
                assert key.interestOps() == newOps;
            } catch (CancelledKeyException x) {
                // channel may have been closed
                if (debug.on()) debug.log("key cancelled for " + chan);
                abortPending(x);
            }
        }
    }

    void abortPending(Throwable x) {
        if (!pending.isEmpty()) {
            AsyncEvent[] evts = pending.toArray(new AsyncEvent[0]);
            pending.clear();
            IOException io = Utils.getIOException(x);
            for (AsyncEvent event : evts) {
                event.abort(io);
            }
        }
    }
}

3.2 基本流程

在線程初始化后,選擇器線程會固定間隔地執行如下流程:

  1. 遍歷初始時為空的等待事件列表。
  2. 阻塞地向操作系統輪詢各I/O通道的事件,每隔一段時間返回就緒事件對應的選擇鍵。
  3. 在沒有連接、請求等操作下,沒有其余操作

而當存在請求時,后續將看到,客戶端會將異步事件放入選擇器線程的等待事件列表中。此時,選擇器線程流程如下:

  1. 遍歷等待事件列表,若有相關事件,則將他們加入到對應的選擇鍵上附着的SelectorAttachment對象的等待事件隊列。
  2. 阻塞地向操作系統輪詢各I/O通道的事件,當有事件發生時,遍歷對應選擇鍵,篩選出對應的“附件”中與發生的I/O事件相符合的待辦事件列表
  3. 異步處理待辦事件列表,並清除對應附件上已處理的事件

需要注意的是,上述表述中“待辦事件”或”等待事件“等表述的是具體的AsyncEvent對象,而通道或選擇器上的”事件“則表示I/O事件,兩者有聯系也有區別。

3.3 外觀客戶端的意義

同時,我們注意到,代碼中,當選擇器輪詢I/O事件一無所獲時,會檢查外觀類(HttpClientFacade)是否還被實現類(HttpClientImpl)引用,如果沒有,便結束線程。

int n = selector.select(millis == 0 ? NODEADLINE : millis);
if (n == 0) {
    //如果沒有事件發生,看看外觀客戶端是否還被引用,否則退出方法
    // Check whether client is still alive, and if not,
    // gracefully stop this thread
    if (!owner.isReferenced()) {
        Log.logTrace("{0}: {1}",
                     getName(),
                     "HttpClient no longer referenced. Exiting...");
        return;
    }
    owner.purgeTimeoutsAndReturnNextDeadline();
    continue;
}

結合代碼中的注釋,我們可以看出外觀類HttpClientFacade的意義:它實質上並非只是對真正實現類HttpClientImpl的代理。HttpClientImpl需要知道調用者(應用程序)什么時候不再持有了對HttpClient(Facade)外觀對象的引用,以便及時中止守護線程。而弱引用,正好滿足了這樣的要求:當應用程序不再持有Facade對象時,外觀對象會在垃圾回收時被回收。HttpClientImpl通過檢查對外觀對象的弱引用是否為空,即可知道是否要停止處理I/O事件的守護線程。

4. 小結

HttpClient的初始化並非如我們想象的那般簡單。由於擁抱了NIO,在客戶端構建時,后台線程便已開始運行,處理未來的I/O事件。

理解客戶端的構建和啟動流程,有助於我們更深入地理解NIO,體會其哲學和魅力。

下篇,我們將看到,在HttpClient接受用戶請求的調用后,會面臨生成多個請求的問題,那么,它又是怎樣處理的呢?


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM