本篇是《關於Kafka producer管理TCP連接的討論》的續篇,主要討論Kafka java consumer是如何管理TCP連接。實際上,這兩篇大部分的內容是相同的,即consumer也是把TCP連接的管理交由底層的Selector類(org.apache.kafka.common.network)來維護。我們依然以“何時創建/創建多少/何時關閉/潛在問題/總結”的順序來討論。和上一篇一樣,本文將無差別地混用名詞TCP和Socket。
一、何時創建TCP連接
首先明確的是,在構建KafkaConsumer實例時是不會創建任何TCP連接的;另外在調用諸如subscribe或assign的時候也不會創建任何TCP連接。那么TCP連接是在什么時候創建的呢?嚴格來說有幾個可能的時間點。從粗粒度層面來說,我們可以安全地認為Socket連接是在調用consumer.poll()創建的;從細粒度層面來說,TCP連接創建的時機有3個:1. 請求METADATA時;2. 進行組協調時;3. 發送數據時。
二、創建多少個TCP連接
對於每台broker而言,kafka consumer實例通常會創建3個TCP連接,第一個TCP連接是consumer請求集群元數據時創建的,之后consumer會使用這個Socket繼續請求元數據以及尋找group對應的coordinator,如下列日志所示:
[2019-01-01 17:38:22,301] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
...
[2019-01-01 17:38:22,360] TRACE [Consumer clientId=consumer-1, groupId=test] Sending METADATA {topics=[bar,foo],allow_auto_topic_creation=true} with correlation id 2 to node -1 (org.apache.kafka.clients.NetworkClient:492)
...
[2019-01-01 17:38:22,360] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FIND_COORDINATOR {coordinator_key=test,coordinator_type=0} with correlation id 0 to node -1 (org.apache.kafka.clients.NetworkClient:492)
至於這里為什么是node -1是因為首次請求元數據時尚不確定broker.id,所以只能先用-1替代。
第二個TCP連接供consumer執行組協調操作使用,這里的組協調操作包括:JOIN_GROUP(加入組)、SYNC_GROUP(等待組分配方案)、HEARTBEAT(心跳請求)、OFFSET_FETCH(獲取位移)、OFFSET_COMMIT(提交位移)以及其他請求(比如LEAVE_GROUP,但本例中沒有演示組成員退出的情形,故日志中沒有出現這個請求類型),如下列日志所示:
[2019-01-01 17:38:22,379] TRACE [Consumer clientId=consumer-1, groupId=test] Sending JOIN_GROUP {group_id=test,session_timeout=10000,rebalance_timeout=300000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=20 cap=20]}]} with correlation id 3 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:22,382] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for JOIN_GROUP with correlation id 3, received {throttle_time_ms=0,error_code=0,generation_id=9,group_protocol=range,leader_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,members=[{member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=20 cap=20]}]} (org.apache.kafka.clients.NetworkClient:810)
[2019-01-01 17:38:22,386] TRACE [Consumer clientId=consumer-1, groupId=test] Sending SYNC_GROUP {group_id=test,generation_id=9,member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,group_assignment=[{member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=36 cap=36]}]} with correlation id 5 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:22,388] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for SYNC_GROUP with correlation id 5, received {throttle_time_ms=0,error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=36 cap=36]} (org.apache.kafka.clients.NetworkClient:810)
[2019-01-01 17:38:22,396] TRACE [Consumer clientId=consumer-1, groupId=test] Sending OFFSET_FETCH {group_id=test,topics=[{topic=bar,partitions=[{partition=0}]},{topic=foo,partitions=[{partition=0}]}]} with correlation id 6 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
[2019-01-03 17:38:22,397] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for OFFSET_FETCH with correlation id 6, received {throttle_time_ms=0,responses=[{topic=bar,partition_responses=[{partition=0,offset=0,leader_epoch=-1,metadata=,error_code=0}]},{topic=foo,partition_responses=[{partition=0,offset=0,leader_epoch=-1,metadata=,error_code=0}]}],error_code=0} (org.apache.kafka.clients.NetworkClient:810)
...
[2019-01-01 17:38:23,401] TRACE [Consumer clientId=consumer-1, groupId=test] Sending OFFSET_COMMIT {group_id=test,generation_id=9,member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,topics=[{topic=bar,partitions=[{partition=0,offset=0,leader_epoch=-1,metadata=}]},{topic=foo,partitions=[{partition=0,offset=0,leader_epoch=-1,metadata=}]}]} with correlation id 10 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:23,403] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for OFFSET_COMMIT with correlation id 10, received {throttle_time_ms=0,responses=[{topic=bar,partition_responses=[{partition=0,error_code=0}]},{topic=foo,partition_responses=[{partition=0,error_code=0}]}]} (org.apache.kafka.clients.NetworkClient:810)
上面標紅的節點ID看上去有些奇怪,實際上它是由Integer.MAX_VALUE - coordinator的broker.id計算得來的,因為我的測試環境中只有一台broker且該id是0,所以這個Socket連接的節點ID就是Integer.MAX_VALUE,即2147483647。針對這個node ID的計算方式,Kafka代碼是故意為之的,目的就是要讓組協調請求和真正的數據獲取請求使用不同的Socket連接。
第三個Socket連接就非常好理解了,就是用於發送FETCH請求的。當consumer代碼使用第一個Socket連接獲取到集群元數據之后,每個broker的真實ID已經緩存在consumer本地的內存中,因此此時代碼會使用真實的ID創建第三個Socket連接並用於消息獲取,如下列日志所示:
[2019-01-01 17:38:23,424] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=104064890,session_epoch=2,topics=[],forgotten_topics_data=[]} with correlation id 11 to node 0 (org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:23,927] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 0 for FETCH with correlation id 11, received {throttle_time_ms=0,error_code=0,session_id=104064890,responses=[]} (org.apache.kafka.clients.NetworkClient:810)
[2019-01-01 17:38:23,928] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=104064890,session_epoch=3,topics=[],forgotten_topics_data=[]} with correlation id 12 to node 0 (org.apache.kafka.clients.NetworkClient:492)
...
上面標紅的節點0是真實的broker.id,可見consumer是使用這個Socket進行消息獲取操作的。值得一提的是,當這個Socket連接成功建立之后,第一個Socket連接就會被廢棄掉,之后所有的元數據請求都通過第三個Socket發送。
三、何時關閉TCP連接
和Producer原理相同,consumer關閉Socket也分為主動關閉和Kafka自動關閉。主動關閉依然是由用戶發起,顯式調用consumer.close()以及類似方法亦或是kill -9;而Kafka自動關閉同樣由connections.max.idle.ms參數值控制。和producer有些不同的是,如果用戶寫consumer程序時使用了循環的方式來poll消息,那么上面提到的所有請求都會不斷地發送到broker,故這些Socket連接上總是能保證有請求在發送,因此實現了“長連接”的效果。
四、可能的問題?
Consumer端和producer端的問題是一樣的,即第一個Socket連接僅僅是為了首次(最多也就是幾次)獲取元數據之用,后面就會被廢棄掉。根本的原因在於它使用了“假”的broker id去注冊,當 后面consumer獲取了真實的broker id之后它無法區分哪個broker id對應這個假ID,所以只能重新創建另外的Socket連接。
五、總結
最后總結一下當前的結論,針對最新版本Kafka(2.1.0)而言,Java consumer端管理TCP連接的方式是:
1. KafkaConsumer實例創建時不會創建任何Socket連接,實例創建之后首次請求元數據時會創建第一個Socket連接
2. KafkaConsumer實例拿到元數據信息之后隨機尋找其中一個broker去發現對應的coordinator,然后向coordinator所在broker創建第二個Socket連接。之后所有的組協調請求處理都經由該Socket
3. 步驟1中創建的TCP連接只用於首次獲取元數據信息,后面會被廢棄掉
4. 如果設置consumer端connections.max.idle.ms參數大於0,則步驟1中創建的TCP連接會被自動關閉;如果設置該參數=-1,那么步驟1中創建的TCP連接將成為“僵屍”連接
5. 當前consumer判斷是否存在與某broker的TCP連接依靠的是broker id,這是有問題的,依靠<host, port>對可能是更好的方式
