kafka原理和實踐(三)spring-kafka生產者源碼


系列目錄

kafka原理和實踐(一)原理:10分鍾入門

kafka原理和實踐(二)spring-kafka簡單實踐

kafka原理和實踐(三)spring-kafka生產者源碼

kafka原理和實踐(四)spring-kafka消費者源碼

kafka原理和實踐(五)spring-kafka配置詳解

kafka原理和實踐(六)總結升華

 

==============正文分割線=====================

由於項目上了Spring-cloud,繼承了spring-boot-start,默認支持版本是spring-kafka-1.1.7,本文基於源碼spring-kafka-1.1.7分析。雖然官網已經到2.0版本,但我們分析核心方法基本不變,官網飛機票

一、 KafkaProducer發送模型

如上圖,由KafkaTemplete發起發送請求,可分為如下幾個步驟:

 

一、數據入池

1.KafkaProducer啟動發送消息

2.消息發送攔截器攔截

3.用序列化器把數據進行序列化

4.用分區器選擇消息的分區

5.添加進記錄累加器

二、NIO發送數據

6.等待數據條數達到批量發送閥值或者新建一個RecoedBatch,立即喚醒Sender線程執行run方法

7.發送器內部從累加器Deque中拿到要發送的數據RecordBatch轉換成ClientRequest客戶端請求

8.在發送器內部,經由NetworkClient轉換成RequestSend(Send接口)並調用Selector暫存進KafkaChannel(NetWorkClient維護的通道Map<String, KafkaChannel> channels)

9.執行nio發送消息(1.Selector.select()2.把KafkaChannel中的Send數據(ByteBuffer[])寫入KafkaChannel的寫通道GatheringByteChannel)

 

 二、KafkaTemplate模板

spring-kafka提供了簡單的KafkaTemplate類,直接調用發送方法即可,只需要讓容器知道這個bean即可(具體見第二章實踐中xml中配置bean)。

  1 public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
  2  14      ...
 15 
 16     /**
 17      * Create an instance using the supplied producer factory and autoFlush false.
 18      * @param producerFactory the producer factory.
 19      */
 20     public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
 21         this(producerFactory, false);
 22     }
 23 
 24     /**
 25      * Create an instance using the supplied producer factory and autoFlush setting.
 26      * Set autoFlush to true if you wish to synchronously interact with Kafka, calling
 27      * {@link java.util.concurrent.Future#get()} on the result.
 28      * @param producerFactory the producer factory.
 29      * @param autoFlush true to flush after each send.
 30      */
 31     public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
 32         this.producerFactory = producerFactory;
 33         this.autoFlush = autoFlush;
 34     }
 36    ...
181     /**
182      * Send the producer record.
183      * @param producerRecord the producer record.
184      * @return a Future for the {@link RecordMetadata}.
185      */
186     protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
187         final Producer<K, V> producer = getTheProducer();
188         if (this.logger.isTraceEnabled()) {
189             this.logger.trace("Sending: " + producerRecord);
190         }
191         final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
192         producer.send(producerRecord, new Callback() {
193 
194             @Override
195             public void onCompletion(RecordMetadata metadata, Exception exception) {
196                 try {
197                     if (exception == null) {
198                         future.set(new SendResult<>(producerRecord, metadata));
199                         if (KafkaTemplate.this.producerListener != null
200                                 && KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
201                             KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
202                                     producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
203                         }
204                     }
205                     else {
206                         future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
207                         if (KafkaTemplate.this.producerListener != null) {
208                             KafkaTemplate.this.producerListener.onError(producerRecord.topic(),
209                                     producerRecord.partition(),
210                                     producerRecord.key(),
211                                     producerRecord.value(),
212                                     exception);
213                         }
214                     }
215                 }
216                 finally {
217                     producer.close();
218                 }
219             }
220 
221         });
222         if (this.autoFlush) {
223             flush();
224         }
225         if (this.logger.isTraceEnabled()) {
226             this.logger.trace("Sent: " + producerRecord);
227         }
228         return future;
229     }
235 }

 KafkaTemplate源碼重點

1.構造函數,入參ProducerFactory構造工廠和是否自動刷新(緩沖區的records立即發送)

2.發送消息doSend,這里核心點就2個:

1)producer.send(producerRecord, Callback)producer即KafkaProducer

2)Callback回調onCompletion完成,onSuccess,onError。

三、KafkaProducer

3.1KafkaProducer構造過程

  1 @SuppressWarnings({"unchecked", "deprecation"})
  2     private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
  3         try {
  4             log.trace("Starting the Kafka producer");
  5             Map<String, Object> userProvidedConfigs = config.originals();
  6             this.producerConfig = config;
  7             this.time = new SystemTime();
  8 
  9             clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
 10             if (clientId.length() <= 0)
 11                 clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
 12             Map<String, String> metricTags = new LinkedHashMap<String, String>();
 13             metricTags.put("client-id", clientId);
 14             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
 15                     .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
 16                     .tags(metricTags);
 17             List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
 18                     MetricsReporter.class);
 19             reporters.add(new JmxReporter(JMX_PREFIX));
 20             this.metrics = new Metrics(metricConfig, reporters, time);
 21             this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
 22             long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
 23             if (keySerializer == null) {
 24                 this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
 25                         Serializer.class);
 26                 this.keySerializer.configure(config.originals(), true);
 27             } else {
 28                 config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
 29                 this.keySerializer = keySerializer;
 30             }
 31             if (valueSerializer == null) {
 32                 this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
 33                         Serializer.class);
 34                 this.valueSerializer.configure(config.originals(), false);
 35             } else {
 36                 config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
 37                 this.valueSerializer = valueSerializer;
 38             }
 39 
 40             // load interceptors and make sure they get clientId
 41             userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
 42             List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
 43                     ProducerInterceptor.class);
 44             this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
 45 
 46             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
 47             this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
 48             this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
 49             this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
 50             this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
 51             /* check for user defined settings.
 52              * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
 53              * This should be removed with release 0.9 when the deprecated configs are removed.
 54              */
 55             if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
 56                 log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
 57                         "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
 58                 boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
 59                 if (blockOnBufferFull) {
 60                     this.maxBlockTimeMs = Long.MAX_VALUE;
 61                 } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
 62                     log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
 63                             "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
 64                     this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
 65                 } else {
 66                     this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
 67                 }
 68             } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
 69                 log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
 70                         "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
 71                 this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
 72             } else {
 73                 this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
 74             }
 75 
 76             /* check for user defined settings.
 77              * If the TIME_OUT config is set use that for request timeout.
 78              * This should be removed with release 0.9
 79              */
 80             if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
 81                 log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
 82                         ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 83                 this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
 84             } else {
 85                 this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 86             }
 87 
 88             this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
 89                     this.totalMemorySize,
 90                     this.compressionType,
 91                     config.getLong(ProducerConfig.LINGER_MS_CONFIG),
 92                     retryBackoffMs,
 93                     metrics,
 94                     time);
 95 
 96             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
 97             this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
 98             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
 99             NetworkClient client = new NetworkClient(
100                     new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
101                     this.metadata,
102                     clientId,
103                     config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
104                     config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
105                     config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
106                     config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
107                     this.requestTimeoutMs, time);
108             this.sender = new Sender(client,
109                     this.metadata,
110                     this.accumulator,
111                     config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
112                     config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
113                     (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
114                     config.getInt(ProducerConfig.RETRIES_CONFIG),
115                     this.metrics,
116                     new SystemTime(),
117                     clientId,
118                     this.requestTimeoutMs);
119             String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
120             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
121             this.ioThread.start();
122 
123             this.errors = this.metrics.sensor("errors");
124 
125 
126             config.logUnused();
127             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
128             log.debug("Kafka producer started");
129         } catch (Throwable t) {
130             // call close methods if internal objects are already constructed
131             // this is to prevent resource leak. see KAFKA-2121
132             close(0, TimeUnit.MILLISECONDS, true);
133             // now propagate the exception
134             throw new KafkaException("Failed to construct kafka producer", t);
135         }
136     }

如上圖,KafkaProducer包含集合核心組件:

1)Metadata元數據:維護cluster集群信息、topic信息。

2)RecordAccumulator記錄累加器: 緩存生產數據,然后批量發送,用以減少IO次數,提升性能。

2)Sender發送器:metadata+RecordAccumulator+NetworkClient網絡客戶端

3)KafkaThread IO線程:一個自定義名稱的線程,Sender作為Runnable接口,線程start后,運行Sender的run方法,go!

  1 /**
  2      * The main run loop for the sender thread
  3      */
  4     public void run() {
  5         log.debug("Starting Kafka producer I/O thread.");
  6 
  7         // main loop, runs until close is called
  8         while (running) {
  9             try {
 10  run(time.milliseconds());
 11             } catch (Exception e) {
 12                 log.error("Uncaught error in kafka producer I/O thread: ", e);
 13             }
 14         }
 15 
 16         log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
 17 
 18         // okay we stopped accepting requests but there may still be
 19         // requests in the accumulator or waiting for acknowledgment,
 20         // wait until these are completed.
 21         while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
 22             try {
 23                 run(time.milliseconds());
 24             } catch (Exception e) {
 25                 log.error("Uncaught error in kafka producer I/O thread: ", e);
 26             }
 27         }
 28         if (forceClose) {
 29             // We need to fail all the incomplete batches and wake up the threads waiting on
 30             // the futures.
 31             this.accumulator.abortIncompleteBatches();
 32         }
 33         try {
 34             this.client.close();
 35         } catch (Exception e) {
 36             log.error("Failed to close network client", e);
 37         }
 38 
 39         log.debug("Shutdown of Kafka producer I/O thread has completed.");
 40     }
 41 
 42     /**
 43      * Run a single iteration of sending
 44      * 
 45      * @param now
 46      *            The current POSIX time in milliseconds
 47      */
 48     void run(long now) {
 49         Cluster cluster = metadata.fetch();
 50         // 獲取集群中已准備好的分區列表
 51         RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
 52 
 53         // 如果有的分區的leader還未知 ,強制更新元數據
 54         if (!result.unknownLeaderTopics.isEmpty()) {
 58             for (String topic : result.unknownLeaderTopics)
 59                 this.metadata.add(topic);
 60             this.metadata.requestUpdate();
 61         }
 62 
 63         // 移除NetworkClient還沒准備好的發送到達的節點
 64         Iterator<Node> iter = result.readyNodes.iterator();
 65         long notReadyTimeout = Long.MAX_VALUE;
 66         while (iter.hasNext()) {
 67             Node node = iter.next();
 68             if (!this.client.ready(node, now)) {
 69                 iter.remove();
 70                 notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
 71             }
 72         }
 73 
 74         // 根據准備好的節點,創建生產者請求
 75         Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
 76                                                                          result.readyNodes,
 77                                                                          this.maxRequestSize,
 78                                                                          now);
 79         if (guaranteeMessageOrder) {
 80             // Mute all the partitions drained
 81             for (List<RecordBatch> batchList : batches.values()) {
 82                 for (RecordBatch batch : batchList)
 83                     this.accumulator.mutePartition(batch.topicPartition);
 84             }
 85         }
 86      // 超時處理
 87         List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
 88         // update sensors
 89         for (RecordBatch expiredBatch : expiredBatches)
 90             this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
 91 
 92         sensors.updateProduceRequestMetrics(batches);
 93         List<ClientRequest> requests = createProduceRequests(batches, now);
 94         // 如果存在已就緒節點,置輪詢時間為0
 98         long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
 99         if (result.readyNodes.size() > 0) {
100             log.trace("Nodes with data ready to send: {}", result.readyNodes);
101             log.trace("Created {} produce requests: {}", requests.size(), requests);
102             pollTimeout = 0;
103         }
104         for (ClientRequest request : requests)
105             client.send(request, now);
106 
107         // 1.如果有一些分區已准備好,查詢時間為0;
109 // 2.否則如果有分區有數據存儲但是還沒准備好,查詢時間在當前時間和滯留過期時間差 110 // 3.其他情況,查詢時間在當前時間和元數據過期時間差 111 this.client.poll(pollTimeout, now); 112 }

 對創建好的requests遍歷執行:client.send(request, now);NetworkClient發送ClientRequest

 1 @Override
 2     public void send(ClientRequest request, long now) {
 3         String nodeId = request.request().destination();
 4         if (!canSendRequest(nodeId))
 5             throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
 6  doSend(request, now);
 7     }
 8 
 9     private void doSend(ClientRequest request, long now) {
10         request.setSendTimeMs(now);
11         this.inFlightRequests.add(request);
12         selector.send(request.request());
13     }

 

1 public void send(Send send) {
2         KafkaChannel channel = channelOrFail(send.destination());
3         try {
4             channel.setSend(send);
5         } catch (CancelledKeyException e) {
6             this.failedSends.add(send.destination());
7             close(channel);
8         }
9     }

見上圖,最終實際上就是構造了一個KafkaChannel對象,並設置了發送內容和目的地。

client.poll(pollTimeout, now);實際的IO讀寫操作。

 1 @Override
 2     public List<ClientResponse> poll(long timeout, long now) {
 3         long metadataTimeout = metadataUpdater.maybeUpdate(now);
 4         try {
 5             this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
 6         } catch (IOException e) {
 7             log.error("Unexpected error during I/O", e);
 8         }
 9 
10         // 處理執行完后,構建各種ClientResponse添加進responses
11 long updatedNow = this.time.milliseconds(); 12 List<ClientResponse> responses = new ArrayList<>(); 13 handleCompletedSends(responses, updatedNow); 14 handleCompletedReceives(responses, updatedNow); 15 handleDisconnections(responses, updatedNow); 16 handleConnections(); 17 handleTimedOutRequests(responses, updatedNow); 18 19 //遍歷responses處理回調 20 for (ClientResponse response : responses) { 21 if (response.request().hasCallback()) { 22 try { 23 response.request().callback().onComplete(response); 24 } catch (Exception e) { 25 log.error("Uncaught error in request completion:", e); 26 } 27 } 28 } 29 30 return responses; 31 }

核心方法selector.poll最終執行了什么?

 1 public void poll(long timeout) throws IOException {
 2         if (timeout < 0)
 3             throw new IllegalArgumentException("timeout should be >= 0");
 4 
 5         clear();
 6 
 7         if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
 8             timeout = 0;
 9 
10         /* check ready keys */
11         long startSelect = time.nanoseconds();
12         int readyKeys = select(timeout);
13         long endSelect = time.nanoseconds();
14         this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
15 
16         if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
17             pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
18             pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
19         }
20 
21         addToCompletedReceives();
22 
23         long endIo = time.nanoseconds();
24         this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
25 
26         // we use the time at the end of select to ensure that we don't close any connections that
27         // have just been processed in pollSelectionKeys
28  maybeCloseOldestConnection(endSelect);
29     }

如上圖,核心邏輯就2個:查詢等待通道,寫入數據。

1)select:等待通道變成就緒狀態,返回已准備好的通道數

1 private int select(long ms) throws IOException {
2         if (ms < 0L)
3             throw new IllegalArgumentException("timeout should be >= 0");
4 
5         if (ms == 0L)
6             return this.nioSelector.selectNow();
7         else
8             return this.nioSelector.select(ms);
9     }

java.nio.channels.Selector nioSelector看上圖,最終其實就是一個JDK自帶的JAVA NIO Selector執行 select方法,自上次調用select()方法后有多少通道變成就緒狀態。

Selector.select(ms) 最長阻塞ms毫秒(通道在你注冊的事件上就緒)。

Selector.selectNow:不會阻塞,不管什么通道就緒都立刻返回,沒有通道變成可選擇的,則此方法直接返回零

NIO Selector

1.JAVA NIO模型

比較多,不在這里展開寫,預留飛機票一張。

2.Selector

關於Selector這里就簡單引用一張圖,有圖有真相。

2)pollSelectionKeys 如果已准備好通道數>0,根據key把數據(ByteBuffer)寫入指定Channel

 1 private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,  2 boolean isImmediatelyConnected,  3 long currentTimeNanos) {  4 Iterator<SelectionKey> iterator = selectionKeys.iterator();  5 while (iterator.hasNext()) {  6 SelectionKey key = iterator.next();  7  iterator.remove();  8 KafkaChannel channel = channel(key);  9 10 // register all per-connection metrics at once 11  sensors.maybeRegisterConnectionMetrics(channel.id()); 12 if (idleExpiryManager != null) 13  idleExpiryManager.update(channel.id(), currentTimeNanos); 14 15 try { 16 17 /* complete any connections that have finished their handshake (either normally or immediately) */ 18 if (isImmediatelyConnected || key.isConnectable()) { 19 if (channel.finishConnect()) { 20 this.connected.add(channel.id()); 21 this.sensors.connectionCreated.record(); 22 SocketChannel socketChannel = (SocketChannel) key.channel(); 23 log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", 24  socketChannel.socket().getReceiveBufferSize(), 25  socketChannel.socket().getSendBufferSize(), 26  socketChannel.socket().getSoTimeout(), 27  channel.id()); 28 } else 29 continue; 30  } 31 32 /* 准備好通道 */ 33 if (channel.isConnected() && !channel.ready()) 34  channel.prepare(); 35 36 /* 從channel讀取數據 */ 37 if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { 38  NetworkReceive networkReceive; 39 while ((networkReceive = channel.read()) != null) 40  addToStagedReceives(channel, networkReceive); 41  } 42 43 /* 數據寫入Channel */ 44 if (channel.ready() && key.isWritable()) { 45 Send send = channel.write(); 46 if (send != null) { 47 this.completedSends.add(send); 48 this.sensors.recordBytesSent(channel.id(), send.size()); 49  } 50  } 51 52 /* cancel any defunct sockets */ 53 if (!key.isValid()) { 54  close(channel); 55 this.disconnected.add(channel.id()); 56  } 57 58 } catch (Exception e) { 59 String desc = channel.socketDescription(); 60 if (e instanceof IOException) 61 log.debug("Connection with {} disconnected", desc, e); 62 else 63 log.warn("Unexpected error from {}; closing connection", desc, e); 64  close(channel); 65 this.disconnected.add(channel.id()); 66  } 67  } 68 }

 

3.2 KafkaProducer發送數據

KafkaProducer.send

 1 @Override
 2     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
 3         // intercept the record, which can be potentially modified; this method does not throw exceptions
 4         ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
 5         return doSend(interceptedRecord, callback);
 6     }
 7 
 8     /**
 9      * 異步發送一條記錄到一個主題的實現類
10      */
11     private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
12         TopicPartition tp = null;
13         try {
14             // first make sure the metadata for the topic is available
15             ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
16             long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
17             Cluster cluster = clusterAndWaitTime.cluster;
18             byte[] serializedKey;
19             try {// 序列化key
20                 serializedKey = keySerializer.serialize(record.topic(), record.key());
21             } catch (ClassCastException cce) {
22                 throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
23                         " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
24                         " specified in key.serializer");
25             }
26             byte[] serializedValue;
27             try {// 序列化value
28                 serializedValue = valueSerializer.serialize(record.topic(), record.value());
29             } catch (ClassCastException cce) {
30                 throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
31                         " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
32                         " specified in value.serializer");
33             }
34 
35             int partition = partition(record, serializedKey, serializedValue, cluster);
36             int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
37             ensureValidRecordSize(serializedSize);
// 主題和分區
38 tp = new TopicPartition(record.topic(), partition); 39 long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); 40 log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); 41 // producer callback will make sure to call both 'callback' and interceptor callback 42 Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp); 43 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs); 44 if (result.batchIsFull || result.newBatchCreated) { 45 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); 46 this.sender.wakeup(); 47 } 48 return result.future;// 返回Future 49 // handling exceptions and record the errors; 50 // for API exceptions return them in the future, 51 // for other exceptions throw directly 52 } catch (ApiException e) { 53 log.debug("Exception occurred during message send:", e); 54 if (callback != null) 55 callback.onCompletion(null, e); 56 this.errors.record(); 57 if (this.interceptors != null) 58 this.interceptors.onSendError(record, tp, e); 59 return new FutureFailure(e); 60 } catch (InterruptedException e) { 61 this.errors.record(); 62 if (this.interceptors != null) 63 this.interceptors.onSendError(record, tp, e); 64 throw new InterruptException(e); 65 } catch (BufferExhaustedException e) { 66 this.errors.record(); 67 this.metrics.sensor("buffer-exhausted-records").record(); 68 if (this.interceptors != null) 69 this.interceptors.onSendError(record, tp, e); 70 throw e; 71 } catch (KafkaException e) { 72 this.errors.record(); 73 if (this.interceptors != null) 74 this.interceptors.onSendError(record, tp, e); 75 throw e; 76 } catch (Exception e) { 77 // we notify interceptor about all exceptions, since onSend is called before anything else in this method 78 if (this.interceptors != null) 79 this.interceptors.onSendError(record, tp, e); 80 throw e; 81 } 82 }

核心方法,

1.把需要發送的數據(TopicPartition+序列化后的key,value+)添加進RecordAccumulator記錄累加器

2.sender.wakeup()當累加器滿了時,喚醒Sender不再阻塞在當前select()方法上。

 1 /**
 2      * 添加記錄進累加器,返回result包含Future、標志位(batch批量發送已滿或者新建) 
7
* @param tp 主題分區 8 * @param timestamp The timestamp of the record 9 * @param key 序列化后的key 10 * @param value 序列化后的value 11 * @param callback 請求完成時的回調函數 12 * @param maxTimeToBlock 阻塞最大毫秒數 13 */ 14 public RecordAppendResult append(TopicPartition tp, 15 long timestamp, 16 byte[] key, 17 byte[] value, 18 Callback callback, 19 long maxTimeToBlock) throws InterruptedException { 20 // 條數+1,往累加器中添加數據的條數(abortIncompleteBatches方法會作為條件使用 22 appendsInProgress.incrementAndGet(); 23 try { 24 //ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches中獲取key=tp的的雙向隊列,為空新建一個 25 Deque<RecordBatch> dq = getOrCreateDeque(tp); 26 synchronized (dq) {// 阻塞雙向隊列,一直到獲取鎖,嘗試添加進累加器 27 if (closed) 28 throw new IllegalStateException("Cannot send after the producer is closed."); 29 RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); 30 if (appendResult != null)// 1.如果添加成功,直接返回 31 return appendResult; 32 } 33        // =====2.添加失敗==== 34 //2.1划分緩存,再次嘗試添加進累加器 35 int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); 36 log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); 37 ByteBuffer buffer = free.allocate(size, maxTimeToBlock); 38 synchronized (dq) {// 阻塞雙向隊列,一直到獲取鎖,嘗試添加進累加器 39 // 獲取雙向隊列鎖之后再次校驗生產者是否已關閉 40 if (closed) 41 throw new IllegalStateException("Cannot send after the producer is closed."); 42 43 RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); 44 if (appendResult != null) { 45 //2.2添加成功,釋放緩沖區 46 free.deallocate(buffer); 47 return appendResult; 48 }//2.3添加失敗,構建一個可寫入內存的MemoryRecords 49 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); 50 RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); 51 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); 52 53 dq.addLast(batch); 54 incomplete.add(batch);// 添加進未完成記錄IncompleteRecordBatches 55 return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); 56 } 57 } finally {
        // 條數-1,往累加器中添加記錄的條數
58 appendsInProgress.decrementAndGet(); 59 } 60 }

 看上圖append方法,把record添加進累加器調用了三次tryAppend,前兩次一樣的最后一個參數是Deque,最后一次的最后一個參數是毫秒數。追蹤前兩個tryAppend

 1 /**
 2      * If `RecordBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary
 3      * resources (like compression streams buffers).
 4      */
 5     private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
 6         RecordBatch last = deque.peekLast();
 7         if (last != null) {
 8             FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
 9             if (future == null)
10                 last.records.close();
11             else
12                 return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
13         }
14         return null;
15     }

如上圖,最終還是調用的tryAppend(timestamp, key, value, callback, time.milliseconds());追蹤:

 1 /**
 2      * Append the record to the current record set and return the relative offset within that record set
 3      * 
 4      * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
 5      */
 6     public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {
 7         if (!this.records.hasRoomFor(key, value)) {
 8             return null;
 9         } else {
10             long checksum = this.records.append(offsetCounter++, timestamp, key, value);
11             this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
12             this.lastAppendTime = now;
13             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
14                                                                    timestamp, checksum,
15                                                                    key == null ? -1 : key.length,
16                                                                    value == null ? -1 : value.length);
17             if (callback != null)
18                 thunks.add(new Thunk(callback, future));
19             this.recordCount++;
20             return future;
21         }
22     }

如上圖,append實際就是往RecordBatchMemoryRecords(封裝了ByteBuffer等信息)中添加當前record。返回一個FutureRecordMetadata

最終封裝成RecordAppendResult 返回,至此完成了往累加器accumulator中添加一條record。

再次回歸到KafkaTemplete生產者模板發送消息時doSend方法,當KafkaProducer.send發送消息完畢時,如果設置了自動刷新,則執行KafkaProducer.flush()

 1 @Override
 2     public void flush() {
 3         log.trace("Flushing accumulated records in producer.");
 4         this.accumulator.beginFlush();
 5         this.sender.wakeup();
 6         try {
 7             this.accumulator.awaitFlushCompletion();
 8         } catch (InterruptedException e) {
 9             throw new InterruptException("Flush interrupted.", e);
10         }
11     }

KafkaProducer.flush()==》accumulator.awaitFlushCompletion()==》RecordBatch.produceFuture.await()

 1 /**
 2      * Mark all partitions as ready to send and block until the send is complete
 3      */
 4     public void awaitFlushCompletion() throws InterruptedException {
 5         try {
 6             for (RecordBatch batch : this.incomplete.all())
 7                 batch.produceFuture.await();
 8         } finally {
 9             this.flushesInProgress.decrementAndGet();
10         }
11     }
1 private final CountDownLatch latch = new CountDownLatch(1);
2 
3 /**
4      * Await the completion of this request
5      */
6     public void await() throws InterruptedException {
7  latch.await();
8     }

如上圖,awaitFlushCompletion遍歷未完成的RecordBatchProduceRequestResult (生產請求結果)用一個倒計數器(1個任務)等待完成。

 四、總結

本章,我們結合流程圖從kafaTemplete入手分析了kafka生產者發送消息的主要源碼,現在看來主要就兩個模塊,一個是存儲數據進累加器緩存,第二個是發送器 netty NIO發送消息。我們發現生產者發送消息源碼並不復雜。下一章,講解消費者源碼。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM