系列目錄
kafka原理和實踐(三)spring-kafka生產者源碼
kafka原理和實踐(四)spring-kafka消費者源碼
==============正文分割線=====================
由於項目上了Spring-cloud,繼承了spring-boot-start,默認支持版本是spring-kafka-1.1.7,本文基於源碼spring-kafka-1.1.7分析。雖然官網已經到2.0版本,但我們分析核心方法基本不變,官網飛機票
一、 KafkaProducer發送模型

如上圖,由KafkaTemplete發起發送請求,可分為如下幾個步驟:
一、數據入池
1.KafkaProducer啟動發送消息
2.消息發送攔截器攔截
3.用序列化器把數據進行序列化
4.用分區器選擇消息的分區
5.添加進記錄累加器
二、NIO發送數據
6.等待數據條數達到批量發送閥值或者新建一個RecoedBatch,立即喚醒Sender線程執行run方法
7.發送器內部從累加器Deque中拿到要發送的數據RecordBatch轉換成ClientRequest客戶端請求
8.在發送器內部,經由NetworkClient轉換成RequestSend(Send接口)並調用Selector暫存進KafkaChannel(NetWorkClient維護的通道Map<String, KafkaChannel> channels)
9.執行nio發送消息(1.Selector.select()2.把KafkaChannel中的Send數據(ByteBuffer[])寫入KafkaChannel的寫通道GatheringByteChannel)
二、KafkaTemplate模板
spring-kafka提供了簡單的KafkaTemplate類,直接調用發送方法即可,只需要讓容器知道這個bean即可(具體見第二章實踐中xml中配置bean)。
1 public class KafkaTemplate<K, V> implements KafkaOperations<K, V> { 2 14 ... 15 16 /** 17 * Create an instance using the supplied producer factory and autoFlush false. 18 * @param producerFactory the producer factory. 19 */ 20 public KafkaTemplate(ProducerFactory<K, V> producerFactory) { 21 this(producerFactory, false); 22 } 23 24 /** 25 * Create an instance using the supplied producer factory and autoFlush setting. 26 * Set autoFlush to true if you wish to synchronously interact with Kafka, calling 27 * {@link java.util.concurrent.Future#get()} on the result. 28 * @param producerFactory the producer factory. 29 * @param autoFlush true to flush after each send. 30 */ 31 public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) { 32 this.producerFactory = producerFactory; 33 this.autoFlush = autoFlush; 34 } 36 ... 181 /** 182 * Send the producer record. 183 * @param producerRecord the producer record. 184 * @return a Future for the {@link RecordMetadata}. 185 */ 186 protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) { 187 final Producer<K, V> producer = getTheProducer(); 188 if (this.logger.isTraceEnabled()) { 189 this.logger.trace("Sending: " + producerRecord); 190 } 191 final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>(); 192 producer.send(producerRecord, new Callback() { 193 194 @Override 195 public void onCompletion(RecordMetadata metadata, Exception exception) { 196 try { 197 if (exception == null) { 198 future.set(new SendResult<>(producerRecord, metadata)); 199 if (KafkaTemplate.this.producerListener != null 200 && KafkaTemplate.this.producerListener.isInterestedInSuccess()) { 201 KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(), 202 producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata); 203 } 204 } 205 else { 206 future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception)); 207 if (KafkaTemplate.this.producerListener != null) { 208 KafkaTemplate.this.producerListener.onError(producerRecord.topic(), 209 producerRecord.partition(), 210 producerRecord.key(), 211 producerRecord.value(), 212 exception); 213 } 214 } 215 } 216 finally { 217 producer.close(); 218 } 219 } 220 221 }); 222 if (this.autoFlush) { 223 flush(); 224 } 225 if (this.logger.isTraceEnabled()) { 226 this.logger.trace("Sent: " + producerRecord); 227 } 228 return future; 229 } 235 }
KafkaTemplate源碼重點
1.構造函數,入參ProducerFactory構造工廠和是否自動刷新(緩沖區的records立即發送)
2.發送消息doSend,這里核心點就2個:
1)producer.send(producerRecord, Callback)producer即KafkaProducer
2)Callback回調onCompletion完成,onSuccess,onError。
三、KafkaProducer
3.1KafkaProducer構造過程
1 @SuppressWarnings({"unchecked", "deprecation"}) 2 private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { 3 try { 4 log.trace("Starting the Kafka producer"); 5 Map<String, Object> userProvidedConfigs = config.originals(); 6 this.producerConfig = config; 7 this.time = new SystemTime(); 8 9 clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); 10 if (clientId.length() <= 0) 11 clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); 12 Map<String, String> metricTags = new LinkedHashMap<String, String>(); 13 metricTags.put("client-id", clientId); 14 MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) 15 .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) 16 .tags(metricTags); 17 List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, 18 MetricsReporter.class); 19 reporters.add(new JmxReporter(JMX_PREFIX)); 20 this.metrics = new Metrics(metricConfig, reporters, time); 21 this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); 22 long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); 23 if (keySerializer == null) { 24 this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 25 Serializer.class); 26 this.keySerializer.configure(config.originals(), true); 27 } else { 28 config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); 29 this.keySerializer = keySerializer; 30 } 31 if (valueSerializer == null) { 32 this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 33 Serializer.class); 34 this.valueSerializer.configure(config.originals(), false); 35 } else { 36 config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); 37 this.valueSerializer = valueSerializer; 38 } 39 40 // load interceptors and make sure they get clientId 41 userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); 42 List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 43 ProducerInterceptor.class); 44 this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList); 45 46 ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters); 47 this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners); 48 this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); 49 this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); 50 this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); 51 /* check for user defined settings. 52 * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG. 53 * This should be removed with release 0.9 when the deprecated configs are removed. 54 */ 55 if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) { 56 log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " + 57 "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); 58 boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG); 59 if (blockOnBufferFull) { 60 this.maxBlockTimeMs = Long.MAX_VALUE; 61 } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { 62 log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + 63 "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); 64 this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); 65 } else { 66 this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); 67 } 68 } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { 69 log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + 70 "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); 71 this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); 72 } else { 73 this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); 74 } 75 76 /* check for user defined settings. 77 * If the TIME_OUT config is set use that for request timeout. 78 * This should be removed with release 0.9 79 */ 80 if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) { 81 log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " + 82 ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); 83 this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG); 84 } else { 85 this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); 86 } 87 88 this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), 89 this.totalMemorySize, 90 this.compressionType, 91 config.getLong(ProducerConfig.LINGER_MS_CONFIG), 92 retryBackoffMs, 93 metrics, 94 time); 95 96 List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); 97 this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); 98 ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); 99 NetworkClient client = new NetworkClient( 100 new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder), 101 this.metadata, 102 clientId, 103 config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 104 config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), 105 config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), 106 config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), 107 this.requestTimeoutMs, time); 108 this.sender = new Sender(client, 109 this.metadata, 110 this.accumulator, 111 config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1, 112 config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), 113 (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), 114 config.getInt(ProducerConfig.RETRIES_CONFIG), 115 this.metrics, 116 new SystemTime(), 117 clientId, 118 this.requestTimeoutMs); 119 String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); 120 this.ioThread = new KafkaThread(ioThreadName, this.sender, true); 121 this.ioThread.start(); 122 123 this.errors = this.metrics.sensor("errors"); 124 125 126 config.logUnused(); 127 AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); 128 log.debug("Kafka producer started"); 129 } catch (Throwable t) { 130 // call close methods if internal objects are already constructed 131 // this is to prevent resource leak. see KAFKA-2121 132 close(0, TimeUnit.MILLISECONDS, true); 133 // now propagate the exception 134 throw new KafkaException("Failed to construct kafka producer", t); 135 } 136 }
如上圖,KafkaProducer包含集合核心組件:
1)Metadata元數據:維護cluster集群信息、topic信息。
2)RecordAccumulator記錄累加器: 緩存生產數據,然后批量發送,用以減少IO次數,提升性能。
2)Sender發送器:metadata+RecordAccumulator+NetworkClient網絡客戶端
3)KafkaThread IO線程:一個自定義名稱的線程,Sender作為Runnable接口,線程start后,運行Sender的run方法,go!
1 /** 2 * The main run loop for the sender thread 3 */ 4 public void run() { 5 log.debug("Starting Kafka producer I/O thread."); 6 7 // main loop, runs until close is called 8 while (running) { 9 try { 10 run(time.milliseconds()); 11 } catch (Exception e) { 12 log.error("Uncaught error in kafka producer I/O thread: ", e); 13 } 14 } 15 16 log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); 17 18 // okay we stopped accepting requests but there may still be 19 // requests in the accumulator or waiting for acknowledgment, 20 // wait until these are completed. 21 while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) { 22 try { 23 run(time.milliseconds()); 24 } catch (Exception e) { 25 log.error("Uncaught error in kafka producer I/O thread: ", e); 26 } 27 } 28 if (forceClose) { 29 // We need to fail all the incomplete batches and wake up the threads waiting on 30 // the futures. 31 this.accumulator.abortIncompleteBatches(); 32 } 33 try { 34 this.client.close(); 35 } catch (Exception e) { 36 log.error("Failed to close network client", e); 37 } 38 39 log.debug("Shutdown of Kafka producer I/O thread has completed."); 40 } 41 42 /** 43 * Run a single iteration of sending 44 * 45 * @param now 46 * The current POSIX time in milliseconds 47 */ 48 void run(long now) { 49 Cluster cluster = metadata.fetch(); 50 // 獲取集群中已准備好的分區列表 51 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); 52 53 // 如果有的分區的leader還未知 ,強制更新元數據 54 if (!result.unknownLeaderTopics.isEmpty()) { 58 for (String topic : result.unknownLeaderTopics) 59 this.metadata.add(topic); 60 this.metadata.requestUpdate(); 61 } 62 63 // 移除NetworkClient還沒准備好的發送到達的節點 64 Iterator<Node> iter = result.readyNodes.iterator(); 65 long notReadyTimeout = Long.MAX_VALUE; 66 while (iter.hasNext()) { 67 Node node = iter.next(); 68 if (!this.client.ready(node, now)) { 69 iter.remove(); 70 notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); 71 } 72 } 73 74 // 根據准備好的節點,創建生產者請求 75 Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, 76 result.readyNodes, 77 this.maxRequestSize, 78 now); 79 if (guaranteeMessageOrder) { 80 // Mute all the partitions drained 81 for (List<RecordBatch> batchList : batches.values()) { 82 for (RecordBatch batch : batchList) 83 this.accumulator.mutePartition(batch.topicPartition); 84 } 85 } 86 // 超時處理 87 List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); 88 // update sensors 89 for (RecordBatch expiredBatch : expiredBatches) 90 this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); 91 92 sensors.updateProduceRequestMetrics(batches); 93 List<ClientRequest> requests = createProduceRequests(batches, now); 94 // 如果存在已就緒節點,置輪詢時間為0 98 long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); 99 if (result.readyNodes.size() > 0) { 100 log.trace("Nodes with data ready to send: {}", result.readyNodes); 101 log.trace("Created {} produce requests: {}", requests.size(), requests); 102 pollTimeout = 0; 103 } 104 for (ClientRequest request : requests) 105 client.send(request, now); 106 107 // 1.如果有一些分區已准備好,查詢時間為0;
109 // 2.否則如果有分區有數據存儲但是還沒准備好,查詢時間在當前時間和滯留過期時間差 110 // 3.其他情況,查詢時間在當前時間和元數據過期時間差 111 this.client.poll(pollTimeout, now); 112 }
對創建好的requests遍歷執行:client.send(request, now);NetworkClient發送ClientRequest
1 @Override 2 public void send(ClientRequest request, long now) { 3 String nodeId = request.request().destination(); 4 if (!canSendRequest(nodeId)) 5 throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); 6 doSend(request, now); 7 } 8 9 private void doSend(ClientRequest request, long now) { 10 request.setSendTimeMs(now); 11 this.inFlightRequests.add(request); 12 selector.send(request.request()); 13 }
1 public void send(Send send) { 2 KafkaChannel channel = channelOrFail(send.destination()); 3 try { 4 channel.setSend(send); 5 } catch (CancelledKeyException e) { 6 this.failedSends.add(send.destination()); 7 close(channel); 8 } 9 }
見上圖,最終實際上就是構造了一個KafkaChannel對象,並設置了發送內容和目的地。
client.poll(pollTimeout, now);實際的IO讀寫操作。
1 @Override 2 public List<ClientResponse> poll(long timeout, long now) { 3 long metadataTimeout = metadataUpdater.maybeUpdate(now); 4 try { 5 this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); 6 } catch (IOException e) { 7 log.error("Unexpected error during I/O", e); 8 } 9 10 // 處理執行完后,構建各種ClientResponse添加進responses
11 long updatedNow = this.time.milliseconds(); 12 List<ClientResponse> responses = new ArrayList<>(); 13 handleCompletedSends(responses, updatedNow); 14 handleCompletedReceives(responses, updatedNow); 15 handleDisconnections(responses, updatedNow); 16 handleConnections(); 17 handleTimedOutRequests(responses, updatedNow); 18 19 //遍歷responses處理回調 20 for (ClientResponse response : responses) { 21 if (response.request().hasCallback()) { 22 try { 23 response.request().callback().onComplete(response); 24 } catch (Exception e) { 25 log.error("Uncaught error in request completion:", e); 26 } 27 } 28 } 29 30 return responses; 31 }
核心方法selector.poll最終執行了什么?
1 public void poll(long timeout) throws IOException { 2 if (timeout < 0) 3 throw new IllegalArgumentException("timeout should be >= 0"); 4 5 clear(); 6 7 if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty()) 8 timeout = 0; 9 10 /* check ready keys */ 11 long startSelect = time.nanoseconds(); 12 int readyKeys = select(timeout); 13 long endSelect = time.nanoseconds(); 14 this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); 15 16 if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { 17 pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); 18 pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); 19 } 20 21 addToCompletedReceives(); 22 23 long endIo = time.nanoseconds(); 24 this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); 25 26 // we use the time at the end of select to ensure that we don't close any connections that 27 // have just been processed in pollSelectionKeys 28 maybeCloseOldestConnection(endSelect); 29 }
如上圖,核心邏輯就2個:查詢等待通道,寫入數據。
1)select:等待通道變成就緒狀態,返回已准備好的通道數
1 private int select(long ms) throws IOException { 2 if (ms < 0L) 3 throw new IllegalArgumentException("timeout should be >= 0"); 4 5 if (ms == 0L) 6 return this.nioSelector.selectNow(); 7 else 8 return this.nioSelector.select(ms); 9 }
java.nio.channels.Selector nioSelector看上圖,最終其實就是一個JDK自帶的JAVA NIO Selector執行 select方法,自上次調用select()方法后有多少通道變成就緒狀態。
Selector.select(ms) 最長阻塞ms毫秒(通道在你注冊的事件上就緒)。
Selector.selectNow:不會阻塞,不管什么通道就緒都立刻返回,沒有通道變成可選擇的,則此方法直接返回零
NIO Selector
1.JAVA NIO模型
比較多,不在這里展開寫,預留飛機票一張。
2.Selector
關於Selector這里就簡單引用一張圖,有圖有真相。

2)pollSelectionKeys 如果已准備好通道數>0,根據key把數據(ByteBuffer)寫入指定Channel
1 private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, 2 boolean isImmediatelyConnected, 3 long currentTimeNanos) { 4 Iterator<SelectionKey> iterator = selectionKeys.iterator(); 5 while (iterator.hasNext()) { 6 SelectionKey key = iterator.next(); 7 iterator.remove(); 8 KafkaChannel channel = channel(key); 9 10 // register all per-connection metrics at once 11 sensors.maybeRegisterConnectionMetrics(channel.id()); 12 if (idleExpiryManager != null) 13 idleExpiryManager.update(channel.id(), currentTimeNanos); 14 15 try { 16 17 /* complete any connections that have finished their handshake (either normally or immediately) */ 18 if (isImmediatelyConnected || key.isConnectable()) { 19 if (channel.finishConnect()) { 20 this.connected.add(channel.id()); 21 this.sensors.connectionCreated.record(); 22 SocketChannel socketChannel = (SocketChannel) key.channel(); 23 log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", 24 socketChannel.socket().getReceiveBufferSize(), 25 socketChannel.socket().getSendBufferSize(), 26 socketChannel.socket().getSoTimeout(), 27 channel.id()); 28 } else 29 continue; 30 } 31 32 /* 准備好通道 */ 33 if (channel.isConnected() && !channel.ready()) 34 channel.prepare(); 35 36 /* 從channel讀取數據 */ 37 if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { 38 NetworkReceive networkReceive; 39 while ((networkReceive = channel.read()) != null) 40 addToStagedReceives(channel, networkReceive); 41 } 42 43 /* 數據寫入Channel */ 44 if (channel.ready() && key.isWritable()) { 45 Send send = channel.write(); 46 if (send != null) { 47 this.completedSends.add(send); 48 this.sensors.recordBytesSent(channel.id(), send.size()); 49 } 50 } 51 52 /* cancel any defunct sockets */ 53 if (!key.isValid()) { 54 close(channel); 55 this.disconnected.add(channel.id()); 56 } 57 58 } catch (Exception e) { 59 String desc = channel.socketDescription(); 60 if (e instanceof IOException) 61 log.debug("Connection with {} disconnected", desc, e); 62 else 63 log.warn("Unexpected error from {}; closing connection", desc, e); 64 close(channel); 65 this.disconnected.add(channel.id()); 66 } 67 } 68 }
3.2 KafkaProducer發送數據
KafkaProducer.send
1 @Override 2 public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { 3 // intercept the record, which can be potentially modified; this method does not throw exceptions 4 ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); 5 return doSend(interceptedRecord, callback); 6 } 7 8 /** 9 * 異步發送一條記錄到一個主題的實現類 10 */ 11 private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { 12 TopicPartition tp = null; 13 try { 14 // first make sure the metadata for the topic is available 15 ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); 16 long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); 17 Cluster cluster = clusterAndWaitTime.cluster; 18 byte[] serializedKey; 19 try {// 序列化key 20 serializedKey = keySerializer.serialize(record.topic(), record.key()); 21 } catch (ClassCastException cce) { 22 throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + 23 " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + 24 " specified in key.serializer"); 25 } 26 byte[] serializedValue; 27 try {// 序列化value 28 serializedValue = valueSerializer.serialize(record.topic(), record.value()); 29 } catch (ClassCastException cce) { 30 throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + 31 " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + 32 " specified in value.serializer"); 33 } 34 35 int partition = partition(record, serializedKey, serializedValue, cluster); 36 int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); 37 ensureValidRecordSize(serializedSize);
// 主題和分區 38 tp = new TopicPartition(record.topic(), partition); 39 long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); 40 log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); 41 // producer callback will make sure to call both 'callback' and interceptor callback 42 Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp); 43 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs); 44 if (result.batchIsFull || result.newBatchCreated) { 45 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); 46 this.sender.wakeup(); 47 } 48 return result.future;// 返回Future 49 // handling exceptions and record the errors; 50 // for API exceptions return them in the future, 51 // for other exceptions throw directly 52 } catch (ApiException e) { 53 log.debug("Exception occurred during message send:", e); 54 if (callback != null) 55 callback.onCompletion(null, e); 56 this.errors.record(); 57 if (this.interceptors != null) 58 this.interceptors.onSendError(record, tp, e); 59 return new FutureFailure(e); 60 } catch (InterruptedException e) { 61 this.errors.record(); 62 if (this.interceptors != null) 63 this.interceptors.onSendError(record, tp, e); 64 throw new InterruptException(e); 65 } catch (BufferExhaustedException e) { 66 this.errors.record(); 67 this.metrics.sensor("buffer-exhausted-records").record(); 68 if (this.interceptors != null) 69 this.interceptors.onSendError(record, tp, e); 70 throw e; 71 } catch (KafkaException e) { 72 this.errors.record(); 73 if (this.interceptors != null) 74 this.interceptors.onSendError(record, tp, e); 75 throw e; 76 } catch (Exception e) { 77 // we notify interceptor about all exceptions, since onSend is called before anything else in this method 78 if (this.interceptors != null) 79 this.interceptors.onSendError(record, tp, e); 80 throw e; 81 } 82 }
核心方法,
1.把需要發送的數據(TopicPartition+序列化后的key,value+)添加進RecordAccumulator記錄累加器。
2.sender.wakeup()當累加器滿了時,喚醒Sender不再阻塞在當前select()方法上。
1 /** 2 * 添加記錄進累加器,返回result包含Future、標志位(batch批量發送已滿或者新建)
7 * @param tp 主題分區 8 * @param timestamp The timestamp of the record 9 * @param key 序列化后的key 10 * @param value 序列化后的value 11 * @param callback 請求完成時的回調函數 12 * @param maxTimeToBlock 阻塞最大毫秒數 13 */ 14 public RecordAppendResult append(TopicPartition tp, 15 long timestamp, 16 byte[] key, 17 byte[] value, 18 Callback callback, 19 long maxTimeToBlock) throws InterruptedException { 20 // 條數+1,往累加器中添加數據的條數(abortIncompleteBatches方法會作為條件使用) 22 appendsInProgress.incrementAndGet(); 23 try { 24 // 從ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches中獲取key=tp的的雙向隊列,為空新建一個 25 Deque<RecordBatch> dq = getOrCreateDeque(tp); 26 synchronized (dq) {// 阻塞雙向隊列,一直到獲取鎖,嘗試添加進累加器 27 if (closed) 28 throw new IllegalStateException("Cannot send after the producer is closed."); 29 RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); 30 if (appendResult != null)// 1.如果添加成功,直接返回 31 return appendResult; 32 } 33 // =====2.添加失敗==== 34 //2.1划分緩存,再次嘗試添加進累加器 35 int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); 36 log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); 37 ByteBuffer buffer = free.allocate(size, maxTimeToBlock); 38 synchronized (dq) {// 阻塞雙向隊列,一直到獲取鎖,嘗試添加進累加器 39 // 獲取雙向隊列鎖之后再次校驗生產者是否已關閉 40 if (closed) 41 throw new IllegalStateException("Cannot send after the producer is closed."); 42 43 RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); 44 if (appendResult != null) { 45 //2.2添加成功,釋放緩沖區 46 free.deallocate(buffer); 47 return appendResult; 48 }//2.3添加失敗,構建一個可寫入內存的MemoryRecords 49 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); 50 RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); 51 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); 52 53 dq.addLast(batch); 54 incomplete.add(batch);// 添加進未完成記錄IncompleteRecordBatches 55 return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); 56 } 57 } finally {
// 條數-1,往累加器中添加記錄的條數 58 appendsInProgress.decrementAndGet(); 59 } 60 }
看上圖append方法,把record添加進累加器調用了三次tryAppend,前兩次一樣的最后一個參數是Deque,最后一次的最后一個參數是毫秒數。追蹤前兩個tryAppend:
1 /** 2 * If `RecordBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary 3 * resources (like compression streams buffers). 4 */ 5 private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) { 6 RecordBatch last = deque.peekLast(); 7 if (last != null) { 8 FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds()); 9 if (future == null) 10 last.records.close(); 11 else 12 return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false); 13 } 14 return null; 15 }
如上圖,最終還是調用的tryAppend(timestamp, key, value, callback, time.milliseconds());追蹤:
1 /** 2 * Append the record to the current record set and return the relative offset within that record set 3 * 4 * @return The RecordSend corresponding to this record or null if there isn't sufficient room. 5 */ 6 public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) { 7 if (!this.records.hasRoomFor(key, value)) { 8 return null; 9 } else { 10 long checksum = this.records.append(offsetCounter++, timestamp, key, value); 11 this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value)); 12 this.lastAppendTime = now; 13 FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, 14 timestamp, checksum, 15 key == null ? -1 : key.length, 16 value == null ? -1 : value.length); 17 if (callback != null) 18 thunks.add(new Thunk(callback, future)); 19 this.recordCount++; 20 return future; 21 } 22 }
如上圖,append實際就是往RecordBatch的MemoryRecords(封裝了ByteBuffer等信息)中添加當前record。返回一個FutureRecordMetadata。
最終封裝成RecordAppendResult 返回,至此完成了往累加器accumulator中添加一條record。
再次回歸到KafkaTemplete生產者模板發送消息時doSend方法,當KafkaProducer.send發送消息完畢時,如果設置了自動刷新,則執行KafkaProducer.flush()
1 @Override 2 public void flush() { 3 log.trace("Flushing accumulated records in producer."); 4 this.accumulator.beginFlush(); 5 this.sender.wakeup(); 6 try { 7 this.accumulator.awaitFlushCompletion(); 8 } catch (InterruptedException e) { 9 throw new InterruptException("Flush interrupted.", e); 10 } 11 }
KafkaProducer.flush()==》accumulator.awaitFlushCompletion()==》RecordBatch.produceFuture.await()
1 /** 2 * Mark all partitions as ready to send and block until the send is complete 3 */ 4 public void awaitFlushCompletion() throws InterruptedException { 5 try { 6 for (RecordBatch batch : this.incomplete.all()) 7 batch.produceFuture.await(); 8 } finally { 9 this.flushesInProgress.decrementAndGet(); 10 } 11 }
1 private final CountDownLatch latch = new CountDownLatch(1); 2 3 /** 4 * Await the completion of this request 5 */ 6 public void await() throws InterruptedException { 7 latch.await(); 8 }
如上圖,awaitFlushCompletion遍歷未完成的RecordBatch的ProduceRequestResult (生產請求結果)用一個倒計數器(1個任務)等待完成。
四、總結
本章,我們結合流程圖從kafaTemplete入手分析了kafka生產者發送消息的主要源碼,現在看來主要就兩個模塊,一個是存儲數據進累加器緩存,第二個是發送器 netty NIO發送消息。我們發現生產者發送消息源碼並不復雜。下一章,講解消費者源碼。
