kafka AdminClient 閑時關閉連接


AdminClient 類提供了創建、刪除 topic 的 api。

在項目中創建了一個 AdminClient 對象,每次創建 topic 時,調用

org.apache.kafka.clients.admin.AdminClient#createTopics

如果長時間不使用這個對象,客戶端與 broker 之間的連接會被關掉,相關的參數:

connections.max.idle.ms

這個最大空閑參數在 broker 和 客戶端都可以配置,即 broker 和客戶端都會關閉空閑太久的連接。

org.apache.kafka.common.network.Selector#maybeCloseOldestConnection

    private void maybeCloseOldestConnection(long currentTimeNanos) {
        if (idleExpiryManager == null)
            return;

        Map.Entry<String, Long> expiredConnection = idleExpiryManager.pollExpiredConnection(currentTimeNanos);
        if (expiredConnection != null) {
            String connectionId = expiredConnection.getKey();
            KafkaChannel channel = this.channels.get(connectionId);
            if (channel != null) {
                if (log.isTraceEnabled())
                    log.trace("About to close the idle connection from {} due to being idle for {} millis",
                            connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000);
                channel.state(ChannelState.EXPIRED);
                close(channel, CloseMode.GRACEFUL);
            }
        }
    }

org.apache.kafka.common.network.Selector.IdleExpiryManager#pollExpiredConnection

lruConnections 是 LinkedHashMap 類型,可以按照插入和訪問順序進行排序,這里是按訪問順序進行排序,訪問過的順序放到雙向鏈表的結尾。

        public Map.Entry<String, Long> pollExpiredConnection(long currentTimeNanos) {
            if (currentTimeNanos <= nextIdleCloseCheckTime)
                return null;

            if (lruConnections.isEmpty()) {
                nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos;
                return null;
            }

            Map.Entry<String, Long> oldestConnectionEntry = lruConnections.entrySet().iterator().next();
            Long connectionLastActiveTime = oldestConnectionEntry.getValue();
            nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos;

            if (currentTimeNanos > nextIdleCloseCheckTime)
                return oldestConnectionEntry;
            else
                return null;
        }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM