關於Kafka producer管理TCP連接的討論


  在Kafka中,TCP連接的管理交由底層的Selector類(org.apache.kafka.common.network)來維護。Selector類定義了很多數據結構,其中最核心的當屬java.nio.channels.Selector實例,故所有的IO事件實際上是使用Java的Selector來完成的。本文我們探討一下producer與Kafka集群進行交互時TCP連接的管理與維護。

一、何時創建TCP連接

  Producer端在創建KafkaProducer實例時就會創建與broker的TCP連接——這個表述嚴格來說不是很准確,應當這么說:在創建KafkaProducer實例時會創建並啟動Sender線程實例。Sender線程開始運行時首先就會創建與broker的TCP連接,如下面這段日志所示:

[2018-12-09 09:35:45,620] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: -2 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,622] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)


[2018-12-09 09:35:45,814] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,815] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)

[2018-12-09 09:35:45,828] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9093 (id: -2 rack: null) (org.apache.kafka.clients.NetworkClient:1068)

  在我的樣例代碼中,bootstrap.servers指定了"localhost:9092, localhost:9093"。由上面的日志可以看到KafkaProducer實例創建后(此時尚未開始發送消息)producer會創建與這兩台broker的TCP連接。特別注意我標紅的broker id——這里的id都是負值,我會在后文詳細說說這里面的事情。另外,上述日志中最后一行表明producer選擇了向localhost:9093的broker發送METADATA請求去獲取集群的元數據信息——實際上producer選擇的是當前負載最少的broker。這里的負載指的是未處理完的網絡請求數。

  總的來說,TCP連接是在Sender線程運行過程中創建的,所以即使producer不發送任何消息(即顯式調用producer.send),底層的TCP連接也是會被創建出來的。

  在轉到下一個話題之前,我想聊聊針對這種設計的一些自己的理解:如社區文檔所說,KafkaProducer類是線程安全的。我雖然沒有詳盡地去驗證過是否真的thread-safe,但根據瀏覽源碼大致可以得出這樣的結論:producer主線程和Sender線程共享的可變數據結構大概就只有RecordAccumulator類,因此維護RecordAccumulator類的線程安全也就實現了KafkaProducer的線程安全,而RecordAccumulator類中主要的數據結構是ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,而且凡是用到Deque的地方基本上都由Java monitor lock來保護,所以基本上可以認定RecordAccumulator的線程安全性。

  我這里真正想說的是,即使KafkaProducer類是線程安全的,我其實也不太贊同創建KafkaProducer實例時立即啟動Sender線程的做法。Brian Goetz大神著作《Java Concurrency in Practice》中明確給出了這樣做的風險:在對象構造器中啟動線程會造成this指針的逃逸——理論上Sender線程完全能夠看到一個未構造完整的KafkaProducer實例。當然在構造KafkaProducer實例時創建Sender線程實例本身沒有任何問題,但最好不要啟動它。

二、創建多少個TCP連接

我們還是結合日志來看。這次producer開始發送消息,日志如下:

[2018-12-09 10:06:46,761] DEBUG [Producer clientId=producer-1] 開始發送消息...
[2018-12-09 10:06:46,762] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: 0 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 10:06:46,762] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: 0 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)


[2018-12-09 10:06:46,765] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: 1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 10:06:46,766] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: 1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)


[2018-12-09 10:06:46,770] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=test) to node localhost:9092 (id: 0 rack: null) (org.apache.kafka.clients.NetworkClient:1068)

  日志告訴我們,producer又創建了與localhost:9092、localhost:9093的TCP連接。加上最開始創建的兩個TCP連接,目前producer總共創建了4個TCP連接,連向localhost:9092和localhost:9093各有兩個。再次注意標紅的broker id——此時id不再是負值了,或者說此時它們是真正的broker id了(即在server.properties中broker.id指定的值)。這個結論告訴了我們一個有意思的事實:當前版本下(2.1.0),Kafka producer會為bootstrap.servers中指定的每個broker都創建兩個TCP連接:第一個TCP連接用於首次獲取元數據信息;第二個TCP連接用於消息發送以及之后元數據信息的獲取。注意,第一個TCP連接中broker id是假的;第二個TCP連接中broker id才是真實的broker id。

  另外,注意上面日志的最后一行。當producer再次發送METADATA請求時它使用的是新創建的TCP連接,而非最開始的那個TCP連接。這點非常關鍵!這揭示了一個事實:最開始創建的TCP連接將不再被使用,或者說完全被廢棄掉了。

三、何時關閉TCP連接

  Producer端關閉TCP連接的方式有兩種:一種是用戶主動關閉;一種是Kafka自動關閉。我們先說第一種,這里的主動關閉實際上是廣義的主動關閉,甚至包括用戶調用kill -9主動“殺掉”producer應用。當然最推薦的方式還是調用producer.close方法來關閉。第二種則是Kafka幫你關閉,這與producer端參數connections.max.idle.ms的值有關。默認情況下該參數值是9分鍾,即如果在9分鍾內沒有任何請求“流過”該某個TCP連接,那么Kafka會主動幫你把該TCP連接關閉。用戶可以在producer端設置connections.max.idle.ms=-1禁掉這種機制。一旦被設置成-1,TCP連接將成為永久長連接。當然這只是軟件層面的“長連接”機制,由於Kafka創建的這些Socket連接都開啟了keepalive,因此keepalive探活機制還是會遵守的。

四、可能的問題?

  顯然,這種機制存在一個問題:假設你的producer指定了connections.max.idle.ms = -1(因為TCP連接的關閉與創建也是有開銷的,故很多時候我們確實想要禁掉自動關閉機制)而且bootstrap.servers指定了集群所有的broker連接信息。我們假設你的broker數量是N,那么producer啟動后它會創建2 * N個TCP連接,而其中的N個TCP連接在producer正常工作之后再也不會被使用且不會被關閉。實際上,producer只需要N個TCP連接即可與N個broker進行通訊。為了請求元數據而創建的N個TCP連接完全是浪費——我個人傾向於認為Kafka producer應該重用最開始創建的那N個連接,因此我覺得這是一個bug。

  造成重復創建TCP連接的根本原因在於broker id的記錄。就像之前說到的,最開始producer請求元數據信息時它肯定不知道broker的id信息,故它做了一個假的id(從-1開始,然后是-2, -3。。。。),同時它將這個id保存起來以判斷是否存在與這個broker的TCP連接。Broker端返回元數據信息后producer獲知了真正的broker id,於是它拿着這個broker id去判斷是否存在與該broker的TCP連接——自然是不存在,因此它重新創建了一個新的Socket連接。這里的問題就在於我們不能僅僅依靠broker id來判斷是否存在連接。實際上使用host:port對來判斷可能是更好的方法。也許社區可以考慮在后續修正這個問題。

五、總結

  簡單總結一下當前的結論,針對最新版本Kafka(2.1.0)而言,Java producer端管理TCP連接的方式是:

1. KafkaProducer實例創建時啟動Sender線程,從而創建與bootstrap.servers中所有broker的TCP連接

2. KafkaProducer實例拿到元數據信息之后還會再次創建與bootstrap.servers中所有broker的TCP連接

3. 步驟1中創建的TCP連接只用於首次獲取元數據信息(實際上也只是會用到其中的一個連接,其他的N - 1個甚至完全不會被用到)

4. 如果設置producer端connections.max.idle.ms參數大於0,則步驟1中創建的TCP連接會被自動關閉;如果設置該參數=-1,那么步驟1中創建的TCP連接將成為“僵屍”連接

5. 當前producer判斷是否存在與某broker的TCP連接依靠的是broker id,這是有問題的,依靠<host, port>對可能是更好的方式

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM