kafka producer batch 發送消息


1. 使用 KafkaProducer 發送消息,是按 batch 發送的,producer 首先把消息放入 ProducerBatch 中:
org.apache.kafka.clients.producer.internals.ProducerBatch

2. KafkaProduer 類中有一個 Thread 屬性,負責 IO,發送和接收數據:
            this.sender = new Sender(logContext,
                    client,
                    this.metadata,
                    this.accumulator,
                    maxInflightRequests == 1,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    acks,
                    retries,
                    metricsRegistry.senderMetrics,
                    Time.SYSTEM,
                    this.requestTimeoutMs,
                    config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                    this.transactionManager,
                    apiVersions);
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();

Sender 類實現了 Runnable 接口,封裝了具體的邏輯,發送消息和接收響應都在這個類中。

// 發送消息
long pollTimeout = sendProducerData(now);
// 接收響應
client.poll(pollTimeout, now);

 

3. 執行回調

org.apache.kafka.clients.producer.internals.Sender#completeBatch()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM