帶連接池的netty客戶端核心功能實現剖解


帶連接池的netty客戶端核心功能實現剖析

帶連接池的netty的客戶端核心功能實現剖析

 

本文為原創,轉載請注明出處 

源碼地址:

 https://github.com/zhangxianwu/light-netty-client

 

1、連接池

    由於TCP連接的建立和關閉分別會經歷三次握手和四次揮手,而三次握手和四次揮手都是系統開銷很大的操作。如果每次一個新的請求發起時,都為其新建一個連接,在請求處理完畢后,再將這個新的連接關閉,這樣處理的代價是高昂的,尤其是在請求本身的處理邏輯比較簡單時,那么新建和關閉連接的開銷在整個請求處理中占的比例就會越大。

    因此,需要采用連接池,將連接緩存起來,以便后續的復用。

 

    一個TCP連接可用一個四元組標示(源IP、源端口、目的IP、目的端口),當必須為一個新的請求建立連接后,服務端處理完該請求並通過該連接發送響應,客戶端接收到響應后,將該連接還回到池中。

 

1.1 連接的存儲

    池采用ConcurrentHashMap<String, LinkedBlockingQueue<Channel>>存儲連接:

    Key:

    目的IP + 目的端口(對於一個固定的客戶端來說,它所處的源IP和源端口是不變的);

    value:

    如果只為每個目的IP和目的端口緩存一個連接,那么在高並發的場景或者請求本身處理比較耗時的情況下,請求獲取連接的延時會比較嚴重,因此在當前請求從池里獲取連接超時后,需要根據實際的需要新建連接,並將新建的連接存儲下來,所以采用LinkedBlockingQueue<Channel>實現多個連接的存儲。

    然而每個連接的存儲會有一定的內存開銷,所以在高並發的場景下,不能無限制的創建和存儲連接,需要做最大數量的限制。如果能夠從連接創建的地方做到最大數量限制,那么最終緩存的連接數量也就實現了最大數量的限制(最大連接數的控制在后面分析)。

 

1.2 連接的入池和出池

    a)連接何時入池:

    往pipeline添加一個handler(取名為NettyChannelPoolHandler),並繼承SimpleChannelInboundHandler,實現channelRead0方法,當服務端返回響應,並被客戶端decode后,會發出channelRead的inbound事件,該事件的處理會經過channelRead0方法,在該方法中,如果發現decode后的msg是HttpContent類型,且響應頭中不包含“Connection: close”,則在通知調用方響應結果已接受后,將該連接返回到池中。

    說明:如果服務端tomcat設置了maxKeepAliveRequests參數,則一個keep-alive連接處理的請求數達到這個配置后,在該連接處理的最后一個請求的響應頭中,就會設置“Connection: close”,表示連接會被關閉。所以這個連接不需要放回到池中。

    b)連接何時出池:

    1、新請求從池中獲取連接

    2、連接被關閉

 

1.3 最大連接數的控制

    采用ConcurrentHashMap<String, Semaphore>記錄當前每個目的IP和目的端口組合能夠新建的連接數量。

    a)信號量的初始化:

    客戶端初始化時可以指定每個目的IP和目的端口組合的最大允許存活的連接數量。如果未指定,則設置為默認值,譬如200,表示對於某個目的IP和目的端口組合,可以同時允許最大存活200個連接。

    b)信號量的減少:

    每次新建連接前,基於信號量做tryAquire操作,如果tryAquire成功,則再執行新建連接操作。

    c)信號量的恢復:

    新建連接失敗或連接被關閉,則需要基於信號量的tryRelease操作進行恢復。

    在netty中,connect操作會返回channelFuture,為其添加listener:如果future的isSuccess返回false,則說明新建連接失敗,需要恢復信號量;如果isSuccess返回true,則說明新建連接成功,此時為新建channel的closeFuture添加listener,執行信號量恢復操作。

    在新建連接成功后,會執行發送請求操作,即調用channel的writeAndFlush操作,該操作也會返回一個future,需要為該future添加CLOSE_ON_FAILURE(netty提供)這個listener。

 

1.4 空閑連接處理:

    在業務低峰期,池中的連接大部分處於空閑狀態,是一種浪費,因此需要對空閑連接進行清理。

    Netty提供了IdleStateHandler,通過指定允許的最大空閑時間,當某個連接空閑時間超過這個值后,會發出userEventTriggered的Inbound事件,在NettyChannelPoolHandler中捕獲該事件,如果發現事件的類型是IdleStateEvent,則調用channel.close()方法關閉連接,這樣,在之前為該連接添加的listener就會收到close事件,然后將連接出池,並恢復控制最大連接數的信號量。

 

1.5 連接的獲取策略:

    基於以下先后順序獲取連接:

    策略1:首先從池中獲取連接(調用LinkedBlockingQueue.pool(),不等待,獲取不到立即返回null),如果獲取不到連接,則進入第二種策略

    策略2:創建新連接,如果信號量已用完或者創建連接失敗,則進入第三種策略

    策略3:再次從池中獲取連接(調用LinkedBlockingQueue.(long timeout, TimeUnit unit),等待timeoute時間后,如果獲取不到連接,則返回null)。如果此時還是獲取不到連接,則拋出獲取連接失敗的異常。

    當並發程度很大或者服務端處理請求比較耗時,如果信號量的初始值設置的比較小,則會導致部分請求獲取連接有一定的延時,甚至會獲取連接超時。此時,可以采用以下措施之一:

    a)增大信號量的初始值

    b)在策略2中,由調用方指定是否在信號量已用完的情況下,強制創建新連接。注意對於強制創建的連接,不需要執行信號量的acquire和release操作,也不需要進行入池和出池操作。那么如何區分一個連接是正常創建的還是強制創建的呢?基於netty,可以通過channel的attr(AttributeKey<T> key)方法進行標示。

 

2、如何通知調用方響應結果已收到

   在netty中,一切都是異步的,那么調用方通過客戶端發起請求后,如何得知請求已處理完畢,響應結果已返回?

   在每次獲取連接前,首先新建一個自定義的NettyResponseFuture,在獲取連接后,將該future通過channel的attr(AttributeKey<T> key)方法添加到channel中,然后在發送請求后,將該NettyResponseFuture立即返回給調用方。NettyResponseFuture中包含以下屬性和方法:

private final CountDownLatch              latch       = new CountDownLatch(1);
    private volatile boolean                  isDone      = false;
    private volatile boolean                  isCancel    = false;
    private final AtomicBoolean               isProcessed = new AtomicBoolean(false);
    private volatile NettyHttpResponseBuilder responseBuilder;
    private volatile Channel                  channel;
    public boolean cancel(Throwable cause) {
        if (isProcessed.getAndSet(true)) {
            return false;
        }
        responseBuilder = new NettyHttpResponseBuilder();
        responseBuilder.setSuccess(false);
        responseBuilder.setCause(cause);
        isCancel = true;
        latch.countDown();
        return true;
    }

    public NettyHttpResponse get() throws InterruptedException, ExecutionException {
        latch.await();
        return responseBuilder.build();
    }

    public NettyHttpResponse get(long timeout, TimeUnit unit) throws TimeoutException,
                                                             InterruptedException {
        if (!latch.await(timeout, unit)) {
            throw new TimeoutException();
        }
        return responseBuilder.build();
    }

    public boolean done() {
        if (isProcessed.getAndSet(true)) {
            return false;
        }
        isDone = true;
        latch.countDown();
        return true;
    }

    通過get方法獲取返回結果,在響應未返回時,latch.await()會一直阻塞。而latch.countdown只有在cancel和done方法中被調用。那什么時候會調用cancel或done方法呢呢?三個時機:

    1) connect失敗,則connect返回的future中注冊的listener會調用cancel方法;

    2) channel被關閉,則channel對應的的closefuture中注冊的Listner會調用cancel方法;

    3) 服務端正常返回響應時,NettyChannelPoolHandler的channelRead0方法會調用done方法

    說明:

    a)由於isProcessed可能會被netty的io線程和外部線程並發修改,因此采用atomicBoolean的cas操作進行修改

    b)由於isDone和isCancel只會被一個線程修改,因此不需要采用AtomicBoolean類型。但這兩個屬性會被其他線程訪問,因此需要定義為volatile,保證線程間的可見性


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM