Kafka producer 的活動關系圖


在使用kafka producer API時,主要有2個過程:創建producer實例過程,調用producer實例發送數據(同步/異步)過程,其實在創建producer實例的同時,也創建了一個sender線程,sender線程不斷輪詢更新metadata及從accumulator中讀取數據並真正發送到某個broker上面,下面的活動關系圖大致描述了producer的API的內部調用過程

 

 

創建producer實例:

1:client讀取producer config,sample如下:

{
	security.protocol=SASL_PLAINTEXT,
	bootstrap.servers=server1.com:8881,server2:8882,
	value.serializer=xxx.serialization.avro.AvroWithSchemaSpecificSer,
	key.serializer=org.apache.kafka.common.serialization.LongSerializer,
	client.id=15164@hostname,
	acks=all
}

2:調用以下方法創建producer實例

Producer<K,V> producer = new KafkaProducer<>(props, keySerClass, valueSerClass);

3:Kafka Producer實例是producer的關鍵入口,封裝了后續所有組件的調用,在創建producer實例的過程中,將依次創建以下組件

創建metadata實例,傳遞參數 refreshBackoffMs(最小過期時間retry.backoff.ms,默認值100毫秒),metadataExpireMs(元數據最大保留時間metadata.max.age.ms,默認值300000毫秒)

metadata保存了topic,partition,borker的相關信息

this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));

4:創建累加器RecordAccumulator

累計器是一個保存消息的有邊界的內存隊列,當客戶端發送數據時,數據將append到隊列尾部,如果內存耗盡,append調用將被阻塞,在實例內部,batches成員保存將被sender的數據

private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;

調用方法

this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), //batch.size,默認值16384(16K),record成批發送的字節數 this.totalMemorySize, //buffer.memory,默認值33554432(32M),緩沖區內存字節數 this.compressionType, //compression.type,默認值none,表示producer的數據壓縮類型,有效值為none,gzip,snappy,lz4
                    config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                    retryBackoffMs, //retry.backoff.ms,默認值100毫秒,metadata最小過期時間
                    metrics,
                    time);

 5:創建通道構建器實例,ChannelBuilder是一個接口,因安全認證方式不同,分別有具體的實現類SaslChannelBuilder,SslChannelBuilder及PlaintextChannelBuilder,通道是java nio中的實際與IO通信的部分,在kafka中,類KafkaChannel封裝了SocketChannel。在創建構建器實例時,將進行登錄認證

ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());

6:創建網絡客戶端實例,NetworkClient封裝了底層網絡的訪問,及metadata數據的更新。

            NetworkClient client = new NetworkClient(
                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder), this.metadata, clientId, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), this.requestTimeoutMs, time);

 

在NetworkClient構造函數中,傳進了Selector的實例,這是一個封裝了java nio selector的選擇器,在Selector構造函數中開啟了java nio selector,並且也將前面創建的ChannelBuilder傳給NetworkClient內部成員

this.nioSelector = java.nio.channels.Selector.open();

 7:創建線程類sender並啟動線程,將前面創建的NetworkClient實例,metadata實例及累加器accumulator全部導入到sender類中,sender線程類是一個關鍵類,所有的動作都是在這個線程中處理的,當sender線程啟動后,不斷的輪詢進行元數據的更新和消息的發送

            this.sender = new Sender(client,
                    this.metadata,
                    this.accumulator,
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                    config.getInt(ProducerConfig.RETRIES_CONFIG),
                    this.metrics,
                    new SystemTime(),
                    clientId,
                    this.requestTimeoutMs);
            String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();

 

發送數據過程:

1:客戶端發送數據

客戶端構造好ProducerRecord,調用send方法發送消息,send方法是一個異步方法,返回一個future對象,調用完后就立即返回,send方法只是把數據寫入到內存隊列RecordAccumulator后就返回了,如果想同步發送消息並確認消息是否發送成功,可以再調用get方法,這將阻塞當前發送線程

ProducerRecord<K, V> producerRecord = new ProducerRecord<>(“topic”,data);
producer.send(producerRecord, new ProducerCallBack(requestId));

在調用send方法時,可以傳入回調對象,回調函數用於處理send后對ack的處理

2:record 保存到累加器中

在send方法內部,如果有配置攔截器,則先調用攔截器對數據做處理,處理完后的數據,再調用doSend方法,

ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback); //異步發送一條記錄到topic

在doSend方法把記錄保存到累加器之前,需要做幾個事情,首先需要調用waitOnMetadata確認給定topic的並包含partition的metadata是否可用,在waitOnMetadata中如果沒有partition,則在循環中請求更新metadata,並喚醒sender線程 (sender.wakeup()),更新metadata,如果超出block時間則timeout異常退出

long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); //KafkaProducer.send()的最長阻塞時間,max.block.ms,默認為60000毫秒
while (metadata.fetch().partitionsForTopic(topic) == null) {
            log.trace("Requesting metadata update for topic {}.", topic);
            int version = metadata.requestUpdate();
            sender.wakeup(); //喚醒sender線程去更新metadata
            metadata.awaitUpdate(version, remainingWaitMs); //等待metadata更新,如果超出max.block.ms時間,則拋出timeout異常
            ......
}

如果在到達 max.block.ms前成功更新metadata,則對record做key/value序列化,添加到累加器中,然后再次喚醒sender線程

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);

在這個階段中,數據只是發送到一個中間的緩沖區中,並沒有真正的傳到broker上

 

sender線程

1:sender線程啟動后,循環執行Sender.run(now)函數,此函數的最主要功能就是從accumulator中drain出數據,轉成生產者請求數據List<ClientRequest>

 

2:如果有生產者請求數據列表,則調用NetworkClient.send函數,此函數內部調用了doSend(ClientRequest,long)函數,將請求加入飛行隊列中inFlightRequests,

{NetworkClient}    

private void doSend(ClientRequest request, long now) { request.setSendTimeMs(now); this.inFlightRequests.add(request); //將請求加入飛行隊列 selector.send(request.request()); //數據被保存到KafkaChannel的內存中 }

 

3:接着調用Selector.send(Send)方法,從Send數據中取到目標KafkaChannel,再放到KafkaChannel通道的待發送內存中,此時數據還沒有真正的被傳遞broker

{Selector} 
public void send(Send send) { KafkaChannel channel = channelOrFail(send.destination()); //取到目標channel try { channel.setSend(send); //把數據保存到目標channel內存中 } catch (CancelledKeyException e) { this.failedSends.add(send.destination()); close(channel); } }

 

4:當所有從accumulator中取出來的數據被放到對應的channel后,調用NetworkClient.poll,這才是對socket實際讀寫的地方

 

5-8:首先需要對元數據的更新,在maybeUpdate中,如果元數據更新時間(metadataTimeout)已經到0了,說明需要更新元數據,找到最近最少用的節點,如果沒建好連接,則先創建socket的連接,到第7步,調用Selector.connect,到第8步調用SocketChannel.open及SocketChannel.connect建立socket連接

 

{NetworkClient}

        public long maybeUpdate(long now) {
            // should we update our metadata?
            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
            long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
            long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
            // if there is no node available to connect, back off refreshing metadata
            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
                    waitForMetadataFetch);

            if (metadataTimeout == 0) {
                // Beware that the behavior of this method and the computation of timeouts for poll() are
                // highly dependent on the behavior of leastLoadedNode.
                Node node = leastLoadedNode(now);
                maybeUpdate(now, node);
            }

            return metadataTimeout;
        }

private void maybeUpdate(long now, Node node) {
            if (node == null) {
                log.debug("Give up sending metadata request since no node is available");
                // mark the timestamp for no node available to connect
                this.lastNoNodeAvailableMs = now;
                return;
            }
            String nodeConnectionId = node.idString();

            if (canSendRequest(nodeConnectionId)) { //如果連接以及建好,可以發送數據
                this.metadataFetchInProgress = true;
                MetadataRequest metadataRequest;
                if (metadata.needMetadataForAllTopics())
                    metadataRequest = MetadataRequest.allTopics();
                else
                    metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
                ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
                log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
                doSend(clientRequest, now); //發送數據,將數據保存到channel的內存中
            } else if (connectionStates.canConnect(nodeConnectionId, now)) {
                // we don't have a connection to this node right now, make one
                log.debug("Initialize connection to node {} for sending metadata request", node.id());
                initiateConnect(node, now); //創建socket連接
                // If initiateConnect failed immediately, this node will be put into blackout and we
                // should allow immediately retrying in case there is another candidate node. If it
                // is still connecting, the worst case is that we end up setting a longer timeout
                // on the next round and then wait for the response.
            } else { // connected, but can't send more OR connecting
                // In either case, we just need to wait for a network event to let us know the selected
                // connection might be usable again.
                this.lastNoNodeAvailableMs = now;
            }
        }

 

9:socket連接建立好后,注冊到nioSelector中,並使用前面創建的channelBuilder創建KafkaChannel通道,KafkaChannel中創建和封裝了傳輸層TransportLayer,傳輸層封裝了socketChannel,通道保存在Map中,而且KafkaChannel中也創建了authenticator認證器

{Selector}

     SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); //注冊到Selector中
     KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); //創建KafkaChannel
     key.attach(channel);
     this.channels.put(id, channel);

 

10:如果到節點的狀態是連接的,並且channel狀態已經ready(傳輸層是ready的,authenticator認證器是complete狀態),那么在上面metadataUpdate中,就可以發送元數據更新的請求,調用NetworkClient.doSend,數據被放到飛行請求隊列(inFlightRequests)及相應的KafkaChannel內存中,如果inFlightRequests中的請求在請求超時后(默認為"request.timeout.ms" -> "30000",30秒),將斷開請求所對應的socket連接,並設置重刷metadata

 

11:調用Selector.poll進行數據的IO操作,如果獲取到selector key,則在這個可以上取到對應的KafkaChannel,調用channel.write發送數據到broker上

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM