KafkaConsumer實現精確一次消費


轉自 https://blog.csdn.net/qq_18581221/article/details/89766073

簡介

在使用kafka時,大多數場景對於數據少量的不一致(重復或者丟失)並不關注,比如日志,因為不會影響最終的使用或者分析,但是在某些應用場景(比如業務數據),需要對任何一條消息都要做到精確一次的消費,才能保證系統的正確性,kafka並不提供准確一致的消費API,需要我們在實際使用時借用外部的一些手段來保證消費的精確性,下面我們介紹如何實現

kafka消費機制

這篇文章KafkaConsumer使用介紹、參數配置介紹了如何kafka具有兩種提交offset(消費偏移量)方式,我們在Kafka簡介以及安裝和使用可知每個分區具備一offset記錄消費位置,如果消費者一直處於正常的運行轉態,那么offset將沒有什么用處,因為正常消費時,consumer記錄了本次消費的offset和下一次將要進行poll數據的offset起始位置,但是如果消費者發生崩潰或者有新的消費者加入消費者組,就會觸發再均衡Rebalance,Rebalance之后,每個消費者將會分配到新的分區,而消費者對於新的分區應該從哪里進行起始消費,這時候提交的offset信息就起作用了,提交的offset信息包括消費者組所有分區的消費進度,這時候消費者可以根據消費進度繼續消費,提交offset提交自動提交是最不具確定性的,所以要使用手動提交來控制offset

消費時出現幾種異常情況

自動提交

  • 重復消費:當數據已經被處理,然后自動提交offset時消費者出現故障或者有新消費者加入組導致再均衡,這時候offset提交失敗,導致這批已經處理的數據的信息沒有記錄,后續會重復消費一次
  • 丟失數據:如果業務處理時間較長一點,這時候數據處理業務還未完成,offset信息已經提交了,但是在后續處理數據過程中程序發生了崩潰,導致這批數據未正常消費,這時候offset已經提交,消費者后續將不在消費這批數據,導致這批數據將會丟失

手動提交

  • 重復消費(最少一次消費語義實現):消費數據處理業務完成后進行offset提交,可以保證數據最少一次消費,因為在提交offset的過程中可能出現提交失敗的情況,導致數據重復消費
/**
 * 手動提交offset
 * 實現至少一次的消費語義 at least once
 * 當手動提交位移失敗,會重復消費數據
 */
@Test
public void testCommitOffset() {
    String topic = "first-topic";
    String group = "g1";

    Properties props = new Properties();
    props.put("bootstrap.servers", "node00:9092,node03:9092");   //required
    props.put("group.id", group);   //required
    props.put("enable.auto.commit", "false"); // 關閉自動提交
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "latest");     //從最早的消息開始讀取
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //required
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //required

    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topic));       //訂閱topic
    final int minBatchSize = 10;
    // 緩存
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>(minBatchSize);
    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            records.forEach(buffer::add);

            // 緩存滿了才對數據進行處理
            if (buffer.size() >= minBatchSize) {

                // 業務邏輯--插入數據庫
                // insertIntoDb(buffer);
                // 等數據插入數據庫之后,再異步提交位移

                // 通過異步的方式提交位移
                consumer.commitAsync(((offsets, exception) -> {
                    if (exception == null) {
                        offsets.forEach((topicPartition, metadata) -> {
                            System.out.println(topicPartition + " -> offset=" + metadata.offset());
                        });
                    } else {
                        exception.printStackTrace();
                        // 如果出錯了,同步提交位移
                        consumer.commitSync(offsets);
                    }
                }));

               
                // 如果提交位移失敗了,那么重啟consumer后會重復消費之前的數據,再次插入到數據庫中
                // 清空緩沖區
                buffer.clear();
            }
        }
    } finally {
        consumer.close();
    }
}

 

 

  • 丟失數據(最多一次消費語義實現):在消費數據業務處理前進行offset提交,可以保證最多一次消費,在后續數據業務處理程序出現故障,將導致數據丟失

代碼實現

/**
 * 實現最多一次語義
 * 在消費前提交位移,當后續業務出現異常時,可能丟失數據
 */
@Test
public void testAtMostOnce() {
    Properties props = new Properties();
    props.put("enable.auto.commit", "false");
    KafkaConsumer<String, String> kafkaConsumer = KafkaFactory.buildConsumer(props);
    kafkaConsumer.subscribe(Arrays.asList("first-topic"));
    try {

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(500);
            // 處理業務之前就提交位移
            kafkaConsumer.commitAsync();
            // 下面是業務邏輯
            records.forEach(record -> {
                System.out.println(record.value() + ", offset=" + record.offset());
            });
        }
    } catch (Exception e) {

    } finally {
        kafkaConsumer.close();
    }

}

精確一次消費實現

從kafka的消費機制,我們可以得到是否能夠精確的消費關鍵在消費進度信息的准確性,如果能夠保證消費進度的准確性,也就保證了消費數據的准確性

  • 數據有狀態:可以根據數據信息進行確認數據是否重復消費,這時候可以使用手動提交的最少一次消費語義實現,即使消費的數據有重復,可以通過狀態進行數據去重,以達到冪等的效果
  • 存儲數據容器具備冪等性:在數據存入的容器具備天然的冪等(比如ElasticSearch的put操作具備冪等性,相同的數據多次執行Put操作和一次執行Put操作的結果是一致的),這樣的場景也可以使用手動提交的最少一次消費語義實現,由存儲數據端來進行數據去重
  • 數據無狀態,並且存儲容器不具備冪等:這種場景需要自行控制offset的准確性,今天文章主要說明這種場景下的處理方式,這里數據不具備狀態,存儲使用關系型數據庫,比如MySQL

這里簡單說明一下實現思路
1) 利用consumer api的seek方法可以指定offset進行消費,在啟動消費者時查詢數據庫中記錄的offset信息,如果是第一次啟動,那么數據庫中將沒有offset信息,需要進行消費的元數據插入,然后從offset=0開始消費

2) 關系型數據庫具備事務的特性,當數據入庫時,同時也將offset信息更新,借用關系型數據庫事務的特性保證數據入庫和修改offset記錄這兩個操作是在同一個事務中進行

3) 使用ConsumerRebalanceListener來完成在分配分區時和Relalance時作出相應的處理邏輯

4) 要弄清楚的是,我們在消費的時候,關閉了自動提交,我們也沒有通過consumer.commitAsync()手動提交我們的位移信息,而是在每次啟動一個新的consumer的時候,觸發rebalance時,讀取數據庫中的位移信息,從該位移中開始讀取partition的信息(初始化的時候為0),在沒有出現異常的情況下,我們的consumer會不斷從producer讀取信息,這個位移是最新的那個消息位移,而且會同時把這個位移更新到數據庫中,但是,當出現了rebalance時,那么consumer就會從數據庫中讀取開始的位移。

表設計

create table kafka_info(
    topic_group_partition varchar(32) primary key, //主題+組名+分區號 這里冗余設計方便通過這個主鍵進行更新提升效率 
    topic_group varchar(30), //主題和組名
    partition_num tinyint,//分區號
    offsets bigint default 0 //offset信息
);

 

代碼

/**
 * @Description: 實現Kafka的精確一次消費
 * @author: HuangYn
 * @date: 2019/10/15 21:10
 */
public class ExactlyOnceConsume {

    private final KafkaConsumer<String, String> consumer;
    private Map<TopicPartition, Long> tpOffsetMap;
    private List<ConsumerRecord> list;
    private JDBCHelper jdbcHelper = JDBCHelper.getInstance();
    private String groupId;
    private String topic;

    public ExactlyOnceConsume(Properties props, String topic, String groupId) {
        this.consumer = KafkaFactory.buildConsumer(props);
        this.list = new ArrayList<>(100);
        this.tpOffsetMap = new HashMap<>();
        this.groupId = groupId;
        this.topic = topic;
        this.consumer.subscribe(Arrays.asList(this.topic), new HandleRebalance());
    }

    public void receiveMsg() {
        try {

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                if (!records.isEmpty()) {
                    // 處理每個partition的記錄
                    records.partitions().forEach(tp -> {
                        List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
                        // 記錄加到緩存中
                        tpRecords.forEach(record -> {
                            System.out.println("partition=" + record.partition() +
                                    ", offset= " + record.offset() +
                                    ", value=" + record.value());
                            list.add(record);
                        });
                        // 將partition對應的offset加到map中, 獲取partition中最后一個元素的offset,
                        // +1 就是下一次讀取的位移,就是本次需要提交的位移
                        tpOffsetMap.put(tp, tpRecords.get(tpRecords.size() - 1).offset() + 1);
                    });
                }
                // 緩存中有數據
                if (!list.isEmpty()) {
                    // 將數據插入數據庫,並且將位移信息也插入數據庫
                    // 因此,每次讀取到數據,都要更新本consumer在數據庫中的位移信息
                    boolean success = insertIntoDB(list, tpOffsetMap);
                    if (success) {
                        list.clear();
                        tpOffsetMap.clear();
                    }
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    private boolean insertIntoDB(List<ConsumerRecord> list,
                                 Map<TopicPartition, Long> tpOffsetMap) {

        // 這里應該是在同一個事務中進行的
        // 為了方便就省略了

        try {
            // TODO 將數據入庫,這里省略了

            // 將partition位移更新
            String sql = "UPDATE kafka_info SET offsets = ? WHERE topic_group_partition = ?";
            List<Object[]> params = new ArrayList<>(tpOffsetMap.size());
            tpOffsetMap.forEach((tp, offset) -> {
                Object[] param = new Object[]{offset, topic + "_" + groupId + "_" + tp.partition()};
                params.add(param);
            });
            jdbcHelper.batchExecute(sql, params);
            return true;
        } catch (Exception e) {
            // 回滾事務
        }
    }

    /**
     * rebalance觸發的處理器
     */
    private class HandleRebalance implements ConsumerRebalanceListener {

        // rebalance之前觸發
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            //發生Rebalance時,只需要將list中數據和記錄offset信息清空即可
            //這里為什么要清除數據,應為在Rebalance的時候有可能還有一批緩存數據在內存中沒有進行入庫,
            //並且offset信息也沒有更新,如果不清除,那么下一次還會重新poll一次這些數據,將會導致數據重復
            System.out.println("==== onPartitionsRevoked ===== ");
            list.clear();
            tpOffsetMap.clear();
        }

        // rebalance后調用,consumer抓取數據之前觸發
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println("== onPartitionsAssigned ==");

            List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
            // 從數據庫讀取當前partition的信息
            Map<TopicPartition, Long> partitionOffsetMapFromDB = getPartitionOffsetMapFromDB(partitionInfos.size());

            // 在分配分區時指定消費位置
            for (TopicPartition partition : partitions) {
                // 指定consumer在每個partition上的消費開始位置
                // 如果在數據庫中有對應partition的信息則使用,否則將默認從offset=0開始消費
                if (partitionOffsetMapFromDB.get(partition) != null) {
                    consumer.seek(partition, partitionOffsetMapFromDB.get(partition));
                } else {
                    consumer.seek(partition, 0L);
                }
            }
        }
    }

    /**
     * 從數據庫讀取offset信息
     *
     * @param size
     * @return
     */
    private Map<TopicPartition, Long> getPartitionOffsetMapFromDB(int size) {
        Map<TopicPartition, Long> partitionOffsetMapFromDB = new HashMap<>();
        String sql = "SELECT partition_num, offsets FROM kafka_info WHERE topic_group = ?";
        jdbcHelper.executeQuery(sql, new Object[]{topic + "_" + groupId}, resultSet -> {

            int partition_num = -1;
            long offsets = -1;
            while (resultSet.next()) {
                partition_num = resultSet.getInt("partition_num");
                offsets = resultSet.getLong("offsets");
                System.out.println("partition_num=" + partition_num + ", offset=" + offsets);
                partitionOffsetMapFromDB.put(new TopicPartition(topic, partition_num), offsets);
            }

            System.out.println("partitionOffsetMapFromDB.size = " + partitionOffsetMapFromDB.size());

            //判斷數據庫是否存在所有的分區的信息,如果沒有,則需要進行初始化
            if (partitionOffsetMapFromDB.size() < size) {
                String insert = "INSERT INTO kafka_info (topic_group_partition,topic_group,partition_num) VALUES(?,?,?)";
                List<Object[]> params = new ArrayList<>();
                for (int p_num = 0; p_num < size; p_num++) {
                    Object[] param = new Object[]{
                            topic + "_" + groupId + "_" + p_num,
                            topic + "_" + groupId,
                            p_num
                    };
                    params.add(param);
                }
                jdbcHelper.batchExecute(insert, params);
            }

        });
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return partitionOffsetMapFromDB;
    }

}

 

 數據庫中記錄

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM