在使用kafka producer API時,主要有2個過程:創建producer實例過程,調用producer實例發送數據(同步/異步)過程,其實在創建producer實例的同時,也創建了一個sender線程,sender線程不斷輪詢更新metadata及從accumulator中讀取數據並真正發送到某個broker上面,下面的活動關系圖大致描述了producer的API的內部調用過程
創建producer實例:
1:client讀取producer config,sample如下:
{ security.protocol=SASL_PLAINTEXT, bootstrap.servers=server1.com:8881,server2:8882, value.serializer=xxx.serialization.avro.AvroWithSchemaSpecificSer, key.serializer=org.apache.kafka.common.serialization.LongSerializer, client.id=15164@hostname, acks=all }
2:調用以下方法創建producer實例
Producer<K,V> producer = new KafkaProducer<>(props, keySerClass, valueSerClass);
3:Kafka Producer實例是producer的關鍵入口,封裝了后續所有組件的調用,在創建producer實例的過程中,將依次創建以下組件
創建metadata實例,傳遞參數 refreshBackoffMs(最小過期時間retry.backoff.ms,默認值100毫秒),metadataExpireMs(元數據最大保留時間metadata.max.age.ms,默認值300000毫秒)
metadata保存了topic,partition,borker的相關信息
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));
4:創建累加器RecordAccumulator
累計器是一個保存消息的有邊界的內存隊列,當客戶端發送數據時,數據將append到隊列尾部,如果內存耗盡,append調用將被阻塞,在實例內部,batches成員保存將被sender的數據
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
調用方法
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), //batch.size,默認值16384(16K),record成批發送的字節數 this.totalMemorySize, //buffer.memory,默認值33554432(32M),緩沖區內存字節數 this.compressionType, //compression.type,默認值none,表示producer的數據壓縮類型,有效值為none,gzip,snappy,lz4 config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, //retry.backoff.ms,默認值100毫秒,metadata最小過期時間 metrics, time);
5:創建通道構建器實例,ChannelBuilder是一個接口,因安全認證方式不同,分別有具體的實現類SaslChannelBuilder,SslChannelBuilder及PlaintextChannelBuilder,通道是java nio中的實際與IO通信的部分,在kafka中,類KafkaChannel封裝了SocketChannel。在創建構建器實例時,將進行登錄認證
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
6:創建網絡客戶端實例,NetworkClient封裝了底層網絡的訪問,及metadata數據的更新。
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder), this.metadata, clientId, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), this.requestTimeoutMs, time);
在NetworkClient構造函數中,傳進了Selector的實例,這是一個封裝了java nio selector的選擇器,在Selector構造函數中開啟了java nio selector,並且也將前面創建的ChannelBuilder傳給NetworkClient內部成員
this.nioSelector = java.nio.channels.Selector.open();
7:創建線程類sender並啟動線程,將前面創建的NetworkClient實例,metadata實例及累加器accumulator全部導入到sender類中,sender線程類是一個關鍵類,所有的動作都是在這個線程中處理的,當sender線程啟動后,不斷的輪詢進行元數據的更新和消息的發送
this.sender = new Sender(client, this.metadata, this.accumulator, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), this.metrics, new SystemTime(), clientId, this.requestTimeoutMs); String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start();
發送數據過程:
1:客戶端發送數據
客戶端構造好ProducerRecord,調用send方法發送消息,send方法是一個異步方法,返回一個future對象,調用完后就立即返回,send方法只是把數據寫入到內存隊列RecordAccumulator后就返回了,如果想同步發送消息並確認消息是否發送成功,可以再調用get方法,這將阻塞當前發送線程
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(“topic”,data); producer.send(producerRecord, new ProducerCallBack(requestId));
在調用send方法時,可以傳入回調對象,回調函數用於處理send后對ack的處理
2:record 保存到累加器中
在send方法內部,如果有配置攔截器,則先調用攔截器對數據做處理,處理完后的數據,再調用doSend方法,
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); return doSend(interceptedRecord, callback); //異步發送一條記錄到topic
在doSend方法把記錄保存到累加器之前,需要做幾個事情,首先需要調用waitOnMetadata確認給定topic的並包含partition的metadata是否可用,在waitOnMetadata中如果沒有partition,則在循環中請求更新metadata,並喚醒sender線程 (sender.wakeup()),更新metadata,如果超出block時間則timeout異常退出
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); //KafkaProducer.send()的最長阻塞時間,max.block.ms,默認為60000毫秒
while (metadata.fetch().partitionsForTopic(topic) == null) { log.trace("Requesting metadata update for topic {}.", topic); int version = metadata.requestUpdate(); sender.wakeup(); //喚醒sender線程去更新metadata metadata.awaitUpdate(version, remainingWaitMs); //等待metadata更新,如果超出max.block.ms時間,則拋出timeout異常 ...... }
如果在到達 max.block.ms前成功更新metadata,則對record做key/value序列化,添加到累加器中,然后再次喚醒sender線程
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
在這個階段中,數據只是發送到一個中間的緩沖區中,並沒有真正的傳到broker上
sender線程
1:sender線程啟動后,循環執行Sender.run(now)函數,此函數的最主要功能就是從accumulator中drain出數據,轉成生產者請求數據List<ClientRequest>
2:如果有生產者請求數據列表,則調用NetworkClient.send函數,此函數內部調用了doSend(ClientRequest,long)函數,將請求加入飛行隊列中inFlightRequests,
{NetworkClient}
private void doSend(ClientRequest request, long now) { request.setSendTimeMs(now); this.inFlightRequests.add(request); //將請求加入飛行隊列 selector.send(request.request()); //數據被保存到KafkaChannel的內存中 }
3:接着調用Selector.send(Send)方法,從Send數據中取到目標KafkaChannel,再放到KafkaChannel通道的待發送內存中,此時數據還沒有真正的被傳遞broker
{Selector}
public void send(Send send) { KafkaChannel channel = channelOrFail(send.destination()); //取到目標channel try { channel.setSend(send); //把數據保存到目標channel內存中 } catch (CancelledKeyException e) { this.failedSends.add(send.destination()); close(channel); } }
4:當所有從accumulator中取出來的數據被放到對應的channel后,調用NetworkClient.poll,這才是對socket實際讀寫的地方
5-8:首先需要對元數據的更新,在maybeUpdate中,如果元數據更新時間(metadataTimeout)已經到0了,說明需要更新元數據,找到最近最少用的節點,如果沒建好連接,則先創建socket的連接,到第7步,調用Selector.connect,到第8步調用SocketChannel.open及SocketChannel.connect建立socket連接
{NetworkClient} public long maybeUpdate(long now) { // should we update our metadata? long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0); long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0; // if there is no node available to connect, back off refreshing metadata long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch); if (metadataTimeout == 0) { // Beware that the behavior of this method and the computation of timeouts for poll() are // highly dependent on the behavior of leastLoadedNode. Node node = leastLoadedNode(now); maybeUpdate(now, node); } return metadataTimeout; } private void maybeUpdate(long now, Node node) { if (node == null) { log.debug("Give up sending metadata request since no node is available"); // mark the timestamp for no node available to connect this.lastNoNodeAvailableMs = now; return; } String nodeConnectionId = node.idString(); if (canSendRequest(nodeConnectionId)) { //如果連接以及建好,可以發送數據 this.metadataFetchInProgress = true; MetadataRequest metadataRequest; if (metadata.needMetadataForAllTopics()) metadataRequest = MetadataRequest.allTopics(); else metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics())); ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest); log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); doSend(clientRequest, now); //發送數據,將數據保存到channel的內存中 } else if (connectionStates.canConnect(nodeConnectionId, now)) { // we don't have a connection to this node right now, make one log.debug("Initialize connection to node {} for sending metadata request", node.id()); initiateConnect(node, now); //創建socket連接 // If initiateConnect failed immediately, this node will be put into blackout and we // should allow immediately retrying in case there is another candidate node. If it // is still connecting, the worst case is that we end up setting a longer timeout // on the next round and then wait for the response. } else { // connected, but can't send more OR connecting // In either case, we just need to wait for a network event to let us know the selected // connection might be usable again. this.lastNoNodeAvailableMs = now; } }
9:socket連接建立好后,注冊到nioSelector中,並使用前面創建的channelBuilder創建KafkaChannel通道,KafkaChannel中創建和封裝了傳輸層TransportLayer,傳輸層封裝了socketChannel,通道保存在Map中,而且KafkaChannel中也創建了authenticator認證器
{Selector} SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); //注冊到Selector中 KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); //創建KafkaChannel key.attach(channel); this.channels.put(id, channel);
10:如果到節點的狀態是連接的,並且channel狀態已經ready(傳輸層是ready的,authenticator認證器是complete狀態),那么在上面metadataUpdate中,就可以發送元數據更新的請求,調用NetworkClient.doSend,數據被放到飛行請求隊列(inFlightRequests)及相應的KafkaChannel內存中,如果inFlightRequests中的請求在請求超時后(默認為"request.timeout.ms" -> "30000",30秒),將斷開請求所對應的socket連接,並設置重刷metadata
11:調用Selector.poll進行數據的IO操作,如果獲取到selector key,則在這個可以上取到對應的KafkaChannel,調用channel.write發送數據到broker上