用java代碼手動控制kafkaconsumer偏移量


為應對消費結果需要存儲到關系數據庫中,避免數據庫down時consumer繼續消費的場景

http://kafka.apache.org

查了很多源碼都記錄下來,省的下次還要過濾源碼。

//如果將結果存儲在關系數據庫中,那么在數據庫中存儲偏移量也可以允許在單個事務中提交結果和偏移量.。因此,要么事務成功,偏移量將根據所消耗的內容進行更新,否則結果將不會被存儲,偏移量不會被更新.。
If the results of the consumption are being stored in a relational database, storing the offset in the database as well can allow committing both the results and offset in a single transaction. Thus either the transaction will succeed and the offset will be updated based on what was consumed or the result will not be stored and the offset won't be updated.

 

每個記錄都有自己的偏移量,所以要管理你自己的偏移,你只需要做以下:

1.Configure enable.auto.commit=false
2.Use the offset provided with each ConsumerRecord to save your position.
3.On restart restore the position of the consumer using seek(TopicPartition, long).
 
        

這里分享一個別人的源碼分析:http://blog.csdn.net/chunlongyu/article/details/52663090>

原子操作( Atomic operations): 不可中斷的一個或一系列操作,就像原子一樣,不能再被拆分了,已經是最小單位了,當然在這里沒有單位只有操作。 

 

public void consume() throws FileNotFoundException, IOException {
        Properties props = new Properties();
        props.put("enable.auto.commit", "false");
        KafkaConsumer<String, String> consumer = null;
        try {
            props.load(new FileInputStream(new File("./config/consumer.properties")));
            consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList(props.getProperty("topic")));
            boolean y = true;
            while (run) {

                ConsumerRecords<String, String> records = consumer.poll(100);
                log.info("records.count():" + records.count());
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);// 疑問,這個list里的數據順序是怎么確定的

                    long firstoffset = partitionRecords.get(0).offset();
                    try {
                        for (ConsumerRecord<String, String> record : partitionRecords) {
                            if (this.handler != null)
                                this.handler.handle(record.offset(), record.key(), record.value());
                            // TODO insert db

                        }
                    } catch (Exception e) {
                        log.info("insert db filuer");
                        consumer.seek(partition, firstoffset);
                        y = false;
                    }
                    if (y) {

                        long lastoffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastoffset + 1)));// singletonXxx():返回一個只包含指定對象的,不可變的集合對象。
                    }
                }

            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (consumer != null)
                consumer.close();
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM