使用wireshark查看Kafka客戶端的網絡連接 - Producer


Kafka客戶端包括producer及consumer API,通過在wireshark中查看所捕獲的請求,能更好的理解從producer及consumer到broker的網絡連接過程。對於producer端,為了發送數據,需要建立client到broker節點的TCP長連接,此長連接可用於更新metadata,發送消息到broker,在超過配置的空閑時間后,為了節省資源,長連接將被關閉。

1:producer kerberos 認證連接

在創建producer實例時,調用KafkaProducer類中的函數createChannelBuilder,因為配置了kerberos認證,將啟動client到KDC的認證過程

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer)
{
    ......
    ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
    ......
}
上面第一個參數里配置的authentication為SASL_PLAINTEXT,所以此方法將創建一個SaslChannelBuilder 通道構造器,
channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable);
對應輸入參數為
mode = CLIENT
loginType = CLIENT
securityProtocol = SASL_PLAINTEXT
clientSaslMechanism = GSSAPI
saslHandshakeRequestEnable = true

創建好通道構造器后,就是設置配置信息,調用channelBuilder.configure(configs);在此方法中將創建loginManager實例,而在loginManager構造的時候,將取得KerberosLogin實例,並登陸,
login = hasKerberos ? new KerberosLogin() : new DefaultLogin();
login.configure(configs, loginContext);
login.login();

從Wireshark的捕獲可以看到請求與響應的過程,默認情況下,kerberos使用UDP協議,前面4條便是使用的UDP協議,但因為受限於請求包的長度限制,所以返回失敗,錯誤碼是KRB5KDC_ERR_PREAUTH_REQUIRED及KRB5KRB_ERR_RESPONSE_TOO_BIG,於是在第5條重新使用TCP發送AS-REQ請求到目標端口88,並收到AS-REP響應

 

下面是到KDC的Authentication Service的連接過程:

 

 請求AS成功后,緊接着就是到KDC的Ticket Granting Service以獲取票據的連接過程

 

 

具體可以參考文章:KRB5KDC_ERR_PREAUTH_REQUIRED

 

2:producer sender線程

在創建producer實例過程中,將先初始化一個metadata實例,這個metadata保存的是集群的配置信息,如broker的列表topic,partition與broker的映射關系

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        try {
             ......
             //初始化metadata對象,設置屬性metadata.max.age.ms,這個值從producer 的配置文件獲取,表示更新meta的時間周期
             this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); 
             ......
             //設置初始的broker節點信息,是從配置的bootstrap.servers屬性獲取
             this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());

             ......
             String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
             this.ioThread.start();
            }
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed
            // this is to prevent resource leak. see KAFKA-2121
            close(0, TimeUnit.MILLISECONDS, true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka producer", t);
        }
    }

同時將啟動一個sender IO線程,在這個線程中將真正建立從client到broker的連接,從broker獲取metadata 信息及當發送的數據在緩存中達到閾值時,從accumulator中獲取消息並發送給broker。NetworkClient是kafka客戶端的網絡接口層,實現了接口KafkaClient,封裝了Java NIO對網絡的調用,函數initiateConnect進行初始化連接,所連接的broker 節點由函數leastLoadedNode確定

public class NetworkClient implements KafkaClient {
    /**
     * Initiate a connection to the given node
     */
    private void initiateConnect(Node node, long now) {
        String nodeConnectionId = node.idString();
        try {
            log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
            this.connectionStates.connecting(nodeConnectionId, now);
            selector.connect(nodeConnectionId,
                             new InetSocketAddress(node.host(), node.port()),
                             this.socketSendBuffer,
                             this.socketReceiveBuffer);
        } catch (IOException e) {
            /* attempt failed, we'll try again after the backoff */
            connectionStates.disconnected(nodeConnectionId, now);
            /* maybe the problem is our metadata, update it */
            metadataUpdater.requestUpdate();
            log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
        }
    }
}

在wireshark中可以看到建立連接的TCP 3次握手過程

 

3:metadata的獲取更新

建立好連接后,sender線程中調用KafkaClient 的poll來對socket進行實際的讀寫操作,在poll函數中首先調用metadataUpdater.maybeUpdate(now)來判斷是否需要更新metadata,

{Class NetworkClient}

//do actual reads and writes to sockets
public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now); //判斷是否需要更新metadata
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }
        ......
}  

如果canSendRequest返回true,則調用dosend發送請求到某個broker node獲取metadata,其實dosend只是把獲取metadata的request放到隊列中,由selector.poll從隊列中獲取數據並實際發送請求到broker

{Class DefaultMetadataUpdater}

         public long maybeUpdate(long now) {
            ......
            if (metadataTimeout == 0) {
                // Beware that the behavior of this method and the computation of timeouts for poll() are
                // highly dependent on the behavior of leastLoadedNode.
                Node node = leastLoadedNode(now);
                maybeUpdate(now, node);
            }

            return metadataTimeout;
        }


        private void maybeUpdate(long now, Node node) {
            if (node == null) {
                log.debug("Give up sending metadata request since no node is available");
                // mark the timestamp for no node available to connect
                this.lastNoNodeAvailableMs = now;
                return;
            }
            String nodeConnectionId = node.idString();

            if (canSendRequest(nodeConnectionId)) {
                this.metadataFetchInProgress = true;
                MetadataRequest metadataRequest;
                if (metadata.needMetadataForAllTopics())
                    metadataRequest = MetadataRequest.allTopics();
                else
                    metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
                ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
                log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
                doSend(clientRequest, now); //發送請求到某個broker node,使用下面initiateConnect建立的與此node的長連接
            } else if (connectionStates.canConnect(nodeConnectionId, now)) {
                // we don't have a connection to this node right now, make one
                log.debug("Initialize connection to node {} for sending metadata request", node.id());
                initiateConnect(node, now);//建立到node的長連接
            } else { // connected, but can't send more OR connecting
                // In either case, we just need to wait for a network event to let us know the selected
                // connection might be usable again.
                this.lastNoNodeAvailableMs = now;
            }
        }

在wireshark中,可以看到從broker獲取metadata的Request / Response 過程,從broker node返回的是所有的broker 列表。

 

4:producer 發送數據

當用戶調用下面方法發送數據時

producer.send(producerRecord, new ProducerCallBack(requestId))

其實是將數據保存在accumulator中的,在doSend方法中會先確定是否有metadata信息,如果有metadata,則對數據做key及value的序列化,然后將數據append到accumulator中便返回

{Class KafkaProducer}

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { // first make sure the metadata for the topic is available long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs); byte[] serializedKey; ...... serializedKey = keySerializer.serialize(record.topic(), record.key()); //key 序列化 byte[] serializedValue; ...... serializedValue = valueSerializer.serialize(record.topic(), record.value()); //value 序列化 int partition = partition(record, serializedKey, serializedValue, metadata.fetch()); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
......
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs); return result.future; ...... }

在sender線程中,將從accumulator中獲取數據,並發送到相應的broker node

 

從上面的網絡連接可以看到有2次發送請求的過程,Request() 及 Request(Exchange),在TCP的封包中,也可以看到有avro.schema的模式信息

總結:

1:如果配置kerberos認證,則需要到KDC (AS/TGS)進行TCP連接的請求

2:初始情況,根據bootstrap.servers配置的broker列表,建立到每個節點的TCP長連接

3:一個kafka producer實例對應一個sender線程,客戶端根據leastLoadedNode返回的節點,向此節點發送獲取metadata的更新請求,可以得到全部的brokers,也就是說在bootstrap.server中的節點只是全部節點的一個子集

4:創建producer后,如果立刻發送數據,數據保存在accumulator中,sender線程會讀取accumulator,並獲取metadata,使用已有連接(如果沒有連接則建立TCP連接)發送數據

5:sender線程調用NetworkClient.poll不斷的輪詢,按metadata.max.age.ms配置的時間周期性的更新metadata,在本文中配置的是"metadata.max.age.ms" -> "300000",故會每300秒更新一次metadata。

6:在創建到某個node的長連接后,如果時間到了上面metadata更新周期,又將創建一個新的長連接,更新metadata后,如果原來那個連接在"connections.max.idle.ms" -> "540000"所配置的默認時間沒有使用過,會斷開空閑的長連接,一旦斷開連接,立刻又請求更新metadata

下圖為抓取的從producer客戶端到broker的TCP連接的請求過程,僅供參考:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM