kafka 0.8.1 新producer 源碼簡單分析


 

1 背景

最近由於項目需要,需要使用kafka的producer。但是對於c++,kafka官方並沒有很好的支持。

在kafka官網上可以找到0.8.x的客戶端。可以使用的客戶端有C版本客戶端,此客戶端雖然目前看來還較為活躍,但是代碼問題還是較多的,而且對於c++的支持並不是很好。

還有c++版本,雖然該客戶端是按照c++的思路設計,但是最近更新時間為2013年12月19日,已經很久沒有更新了。

從官方了解到,kafka作者對於現有的producer和consumer的設計是不太滿意的。他們打算在kafka 0.9版本里發布新的producer與consumer。

其中新的producer已經被包含到了kafka0.8.1的源碼里,官方描述如下。

3.4 New Producer Configs

We are working on a replacement for our existing producer. The code is available in trunk now and can be considered beta quality. Below is the configuration for the new producer

現在新producer還是屬於beta版。但是在kafka0.9版本里,新producer與consumer都會成為穩定版,並提供了更多的功能。舊版的producer是由scala實現,為java提供調用api。而新版的producer直接是用java實現的。

具體文檔在這https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite

2 producer基本類的介紹

源碼樹如下

image

其中,org.apache.kafka.clients.tools包下的ProducerPerformance.java里包含了producer的最基本用法。

該程序原本是有三個參數的,直接給三個參數硬編碼賦值后,代碼如下:

public static void main(String[] args) throws Exception {
        String url = "10.134.58.155:9092";
        int numRecords = 100;
        int recordSize = 100;
        Properties props = new Properties();
        props.setProperty(ProducerConfig.REQUIRED_ACKS_CONFIG, "1");
        props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url);
        props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000));
        props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));

        KafkaProducer producer = new KafkaProducer(props);
        Callback callback = new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null)
                    e.printStackTrace();
            }
        };
        byte[] payload = new byte[recordSize];
        Arrays.fill(payload, (byte) 1);
        ProducerRecord record = new ProducerRecord("test6", payload);
        long start = System.currentTimeMillis();
        long maxLatency = -1L;
        long totalLatency = 0;
        int reportingInterval = 1;
        for (int i = 0; i < numRecords; i++) {
            long sendStart = System.currentTimeMillis();
            producer.send(record, callback);
            long sendEllapsed = System.currentTimeMillis() - sendStart;
            maxLatency = Math.max(maxLatency, sendEllapsed);
            totalLatency += sendEllapsed;
            if (i % reportingInterval == 0) {
                System.out.printf("%d  max latency = %d ms, avg latency = %.5f\n",
                                  i,
                                  maxLatency,
                                  (totalLatency / (double) reportingInterval));
                totalLatency = 0L;
                maxLatency = -1L;
            }
        }
        long ellapsed = System.currentTimeMillis() - start;
        double msgsSec = 1000.0 * numRecords / (double) ellapsed;
        double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0);
        System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).", numRecords, ellapsed, msgsSec, mbSec);
        producer.close();
    }

可以看到,運行producer需要三個基本的類ProducerConfig,KafkaProducer,ProducerRecord,另外還有回調函數的類Callback。

ProducerConfig類包含了kafka的各種配置信息,並提供了默認的配置。

ProducerRecord類是向broker發送的消息載體,包括topic,partition,key和value屬性。

上面這兩個類都很簡單。

producer所有操作都包含在KafkaProducer類中。

這個類由Partitioner,Metadata,RecordAccumulator,Sender,Metrics這些類組成。

Partitioner是用來計算一個消息的分片的類。

Metadata顧名思義保存的是kafka集群的元數據,metadata的更新和topic有關。

RecordAccumulator類似於一個隊列,所有producer發出的消息都先送到隊列中,等待處理。

Sender類使用NIO方式實現了producer消息的發送與接收,sender是一個守護線程,監聽讀寫事件,並

Metrics類,kafka本來是被用於分布式的日志收集與監控的,Metrics類可以注冊一些關注的內容,供監控使用。

3源碼分析

我們以發送一條消息來分析producer的工作過程。

發送一條消息可以分為異步的兩個過程。

入隊過程

@Override
    public Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
        try {
            Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
            int partition = partitioner.partition(record, cluster);
            ensureValidSize(record.key(), record.value());
            TopicPartition tp = new TopicPartition(record.topic(), partition);
            FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), CompressionType.NONE, callback);
            this.sender.wakeup();
            return future;
        } catch (Exception e) {
            if (callback != null)
                callback.onCompletion(null, e);
            return new FutureFailure(e);
        }
    }

該send函數首先根據topic獲取集群的基本數據,如果topic不存在,該函數會阻塞,並更新metadata。

接下來獲取分區,並將數據寫入該TopicPartition下的隊列中。

public FutureRecordMetadata append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException {
        if (closed)
            throw new IllegalStateException("Cannot send after the producer is closed.");
        // check if we have an in-progress batch
        Deque<RecordBatch> dq = dequeFor(tp);
        synchronized (dq) {
            RecordBatch batch = dq.peekLast();
            if (batch != null) {
                FutureRecordMetadata future = batch.tryAppend(key, value, compression, callback);
                if (future != null)
                    return future;
            }
        }

        // we don't have an in-progress record batch try to allocate a new batch
        int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
        ByteBuffer buffer = free.allocate(size);
        synchronized (dq) {
            RecordBatch first = dq.peekLast();
            if (first != null) {
                FutureRecordMetadata future = first.tryAppend(key, value, compression, callback);
                if (future != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen
                    // often...
                    free.deallocate(buffer);
                    return future;
                }
            }
            RecordBatch batch = new RecordBatch(tp, new MemoryRecords(buffer), time.milliseconds());
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(key, value, compression, callback));
            dq.addLast(batch);
            return future;
        }
    }

這個函數上面有一大段關於send函數的用法,簡單來說,send函數可以實現簡單的阻塞式發送(利用Future.get()方法),以及利用回調函數,實現非阻塞發送。

因為這個是一個向套接字寫數據的過程,所以入隊之后,立刻調用wakeup函數,喚醒阻塞在讀數據的sender上,並發送數據。

出隊過程

該過程是由守護線程完成的,守護線程不斷循環在run函數上

public int run(long now) {
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        List<TopicPartition> ready = this.accumulator.ready(now);

        // prune the list of ready topics to eliminate any that we aren't ready to send yet
        List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now);

        // should we update our metadata?
        List<NetworkSend> sends = new ArrayList<NetworkSend>(sendable.size());
        InFlightRequest metadataReq = maybeMetadataRequest(cluster, now);
        if (metadataReq != null) {
            sends.add(metadataReq.request);
            this.inFlightRequests.add(metadataReq);
        }

        // create produce requests
        List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize);
        List<InFlightRequest> requests = collate(cluster, batches);
        for (int i = 0; i < requests.size(); i++) {
            InFlightRequest request = requests.get(i);
            this.inFlightRequests.add(request);
            sends.add(request.request);
        }

        // do the I/O
        try {
            this.selector.poll(5L, sends);
        } catch (IOException e) {
            e.printStackTrace();
        }

        // handle responses, connections, and disconnections
        handleSends(this.selector.completedSends());
        handleResponses(this.selector.completedReceives(), now);
        handleDisconnects(this.selector.disconnected());
        handleConnects(this.selector.connected());

        return ready.size();
    }

代碼注釋很清晰了。。

handleSends實現了入隊過程中的future以及回調。

后續的一些對網絡協議的封裝就不再贅述。下一篇,我會接着分析kafka producer的c客戶端librdkafka

 

第一次寫博客或許寫的不是很清楚,望大家可以多提提意見,謝謝。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM