轉自 https://blog.csdn.net/qq_18581221/article/details/89766073
簡介
在使用kafka時,大多數場景對於數據少量的不一致(重復或者丟失)並不關注,比如日志,因為不會影響最終的使用或者分析,但是在某些應用場景(比如業務數據),需要對任何一條消息都要做到精確一次的消費,才能保證系統的正確性,kafka並不提供准確一致的消費API,需要我們在實際使用時借用外部的一些手段來保證消費的精確性,下面我們介紹如何實現
kafka消費機制
這篇文章KafkaConsumer使用介紹、參數配置介紹了如何kafka具有兩種提交offset(消費偏移量)方式,我們在Kafka簡介以及安裝和使用可知每個分區具備一offset記錄消費位置,如果消費者一直處於正常的運行轉態,那么offset將沒有什么用處,因為正常消費時,consumer記錄了本次消費的offset和下一次將要進行poll數據的offset起始位置,但是如果消費者發生崩潰或者有新的消費者加入消費者組,就會觸發再均衡Rebalance,Rebalance之后,每個消費者將會分配到新的分區,而消費者對於新的分區應該從哪里進行起始消費,這時候提交的offset信息就起作用了,提交的offset信息包括消費者組所有分區的消費進度,這時候消費者可以根據消費進度繼續消費,提交offset提交自動提交是最不具確定性的,所以要使用手動提交來控制offset
消費時出現幾種異常情況
自動提交
- 重復消費:當數據已經被處理,然后自動提交offset時消費者出現故障或者有新消費者加入組導致再均衡,這時候offset提交失敗,導致這批已經處理的數據的信息沒有記錄,后續會重復消費一次
- 丟失數據:如果業務處理時間較長一點,這時候數據處理業務還未完成,offset信息已經提交了,但是在后續處理數據過程中程序發生了崩潰,導致這批數據未正常消費,這時候offset已經提交,消費者后續將不在消費這批數據,導致這批數據將會丟失
手動提交
- 重復消費(最少一次消費語義實現):消費數據處理業務完成后進行offset提交,可以保證數據最少一次消費,因為在提交offset的過程中可能出現提交失敗的情況,導致數據重復消費
/** * 手動提交offset * 實現至少一次的消費語義 at least once * 當手動提交位移失敗,會重復消費數據 */ @Test public void testCommitOffset() { String topic = "first-topic"; String group = "g1"; Properties props = new Properties(); props.put("bootstrap.servers", "node00:9092,node03:9092"); //required props.put("group.id", group); //required props.put("enable.auto.commit", "false"); // 關閉自動提交 props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "latest"); //從最早的消息開始讀取 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //required props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //required Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); //訂閱topic final int minBatchSize = 10; // 緩存 List<ConsumerRecord<String, String>> buffer = new ArrayList<>(minBatchSize); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); records.forEach(buffer::add); // 緩存滿了才對數據進行處理 if (buffer.size() >= minBatchSize) { // 業務邏輯--插入數據庫 // insertIntoDb(buffer); // 等數據插入數據庫之后,再異步提交位移 // 通過異步的方式提交位移 consumer.commitAsync(((offsets, exception) -> { if (exception == null) { offsets.forEach((topicPartition, metadata) -> { System.out.println(topicPartition + " -> offset=" + metadata.offset()); }); } else { exception.printStackTrace(); // 如果出錯了,同步提交位移 consumer.commitSync(offsets); } })); // 如果提交位移失敗了,那么重啟consumer后會重復消費之前的數據,再次插入到數據庫中 // 清空緩沖區 buffer.clear(); } } } finally { consumer.close(); } }
- 丟失數據(最多一次消費語義實現):在消費數據業務處理前進行offset提交,可以保證最多一次消費,在后續數據業務處理程序出現故障,將導致數據丟失
代碼實現
/** * 實現最多一次語義 * 在消費前提交位移,當后續業務出現異常時,可能丟失數據 */ @Test public void testAtMostOnce() { Properties props = new Properties(); props.put("enable.auto.commit", "false"); KafkaConsumer<String, String> kafkaConsumer = KafkaFactory.buildConsumer(props); kafkaConsumer.subscribe(Arrays.asList("first-topic")); try { while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(500); // 處理業務之前就提交位移 kafkaConsumer.commitAsync(); // 下面是業務邏輯 records.forEach(record -> { System.out.println(record.value() + ", offset=" + record.offset()); }); } } catch (Exception e) { } finally { kafkaConsumer.close(); } }
精確一次消費實現
從kafka的消費機制,我們可以得到是否能夠精確的消費關鍵在消費進度信息的准確性,如果能夠保證消費進度的准確性,也就保證了消費數據的准確性
- 數據有狀態:可以根據數據信息進行確認數據是否重復消費,這時候可以使用手動提交的最少一次消費語義實現,即使消費的數據有重復,可以通過狀態進行數據去重,以達到冪等的效果
- 存儲數據容器具備冪等性:在數據存入的容器具備天然的冪等(比如ElasticSearch的put操作具備冪等性,相同的數據多次執行Put操作和一次執行Put操作的結果是一致的),這樣的場景也可以使用手動提交的最少一次消費語義實現,由存儲數據端來進行數據去重
- 數據無狀態,並且存儲容器不具備冪等:這種場景需要自行控制offset的准確性,今天文章主要說明這種場景下的處理方式,這里數據不具備狀態,存儲使用關系型數據庫,比如MySQL
這里簡單說明一下實現思路
1) 利用consumer api的seek方法可以指定offset進行消費,在啟動消費者時查詢數據庫中記錄的offset信息,如果是第一次啟動,那么數據庫中將沒有offset信息,需要進行消費的元數據插入,然后從offset=0開始消費
2) 關系型數據庫具備事務的特性,當數據入庫時,同時也將offset信息更新,借用關系型數據庫事務的特性保證數據入庫和修改offset記錄這兩個操作是在同一個事務中進行
3) 使用ConsumerRebalanceListener來完成在分配分區時和Relalance時作出相應的處理邏輯
4) 要弄清楚的是,我們在消費的時候,關閉了自動提交,我們也沒有通過consumer.commitAsync()手動提交我們的位移信息,而是在每次啟動一個新的consumer的時候,觸發rebalance時,讀取數據庫中的位移信息,從該位移中開始讀取partition的信息(初始化的時候為0),在沒有出現異常的情況下,我們的consumer會不斷從producer讀取信息,這個位移是最新的那個消息位移,而且會同時把這個位移更新到數據庫中,但是,當出現了rebalance時,那么consumer就會從數據庫中讀取開始的位移。
表設計
create table kafka_info( topic_group_partition varchar(32) primary key, //主題+組名+分區號 這里冗余設計方便通過這個主鍵進行更新提升效率 topic_group varchar(30), //主題和組名 partition_num tinyint,//分區號 offsets bigint default 0 //offset信息 );
代碼
/** * @Description: 實現Kafka的精確一次消費 * @author: HuangYn * @date: 2019/10/15 21:10 */ public class ExactlyOnceConsume { private final KafkaConsumer<String, String> consumer; private Map<TopicPartition, Long> tpOffsetMap; private List<ConsumerRecord> list; private JDBCHelper jdbcHelper = JDBCHelper.getInstance(); private String groupId; private String topic; public ExactlyOnceConsume(Properties props, String topic, String groupId) { this.consumer = KafkaFactory.buildConsumer(props); this.list = new ArrayList<>(100); this.tpOffsetMap = new HashMap<>(); this.groupId = groupId; this.topic = topic; this.consumer.subscribe(Arrays.asList(this.topic), new HandleRebalance()); } public void receiveMsg() { try { while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); if (!records.isEmpty()) { // 處理每個partition的記錄 records.partitions().forEach(tp -> { List<ConsumerRecord<String, String>> tpRecords = records.records(tp); // 記錄加到緩存中 tpRecords.forEach(record -> { System.out.println("partition=" + record.partition() + ", offset= " + record.offset() + ", value=" + record.value()); list.add(record); }); // 將partition對應的offset加到map中, 獲取partition中最后一個元素的offset, // +1 就是下一次讀取的位移,就是本次需要提交的位移 tpOffsetMap.put(tp, tpRecords.get(tpRecords.size() - 1).offset() + 1); }); } // 緩存中有數據 if (!list.isEmpty()) { // 將數據插入數據庫,並且將位移信息也插入數據庫 // 因此,每次讀取到數據,都要更新本consumer在數據庫中的位移信息 boolean success = insertIntoDB(list, tpOffsetMap); if (success) { list.clear(); tpOffsetMap.clear(); } } } } catch (Exception e) { e.printStackTrace(); } finally { consumer.close(); } } private boolean insertIntoDB(List<ConsumerRecord> list, Map<TopicPartition, Long> tpOffsetMap) { // 這里應該是在同一個事務中進行的 // 為了方便就省略了 try { // TODO 將數據入庫,這里省略了 // 將partition位移更新 String sql = "UPDATE kafka_info SET offsets = ? WHERE topic_group_partition = ?"; List<Object[]> params = new ArrayList<>(tpOffsetMap.size()); tpOffsetMap.forEach((tp, offset) -> { Object[] param = new Object[]{offset, topic + "_" + groupId + "_" + tp.partition()}; params.add(param); }); jdbcHelper.batchExecute(sql, params); return true; } catch (Exception e) { // 回滾事務 } } /** * rebalance觸發的處理器 */ private class HandleRebalance implements ConsumerRebalanceListener { // rebalance之前觸發 @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { //發生Rebalance時,只需要將list中數據和記錄offset信息清空即可 //這里為什么要清除數據,應為在Rebalance的時候有可能還有一批緩存數據在內存中沒有進行入庫, //並且offset信息也沒有更新,如果不清除,那么下一次還會重新poll一次這些數據,將會導致數據重復 System.out.println("==== onPartitionsRevoked ===== "); list.clear(); tpOffsetMap.clear(); } // rebalance后調用,consumer抓取數據之前觸發 @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.println("== onPartitionsAssigned =="); List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); // 從數據庫讀取當前partition的信息 Map<TopicPartition, Long> partitionOffsetMapFromDB = getPartitionOffsetMapFromDB(partitionInfos.size()); // 在分配分區時指定消費位置 for (TopicPartition partition : partitions) { // 指定consumer在每個partition上的消費開始位置 // 如果在數據庫中有對應partition的信息則使用,否則將默認從offset=0開始消費 if (partitionOffsetMapFromDB.get(partition) != null) { consumer.seek(partition, partitionOffsetMapFromDB.get(partition)); } else { consumer.seek(partition, 0L); } } } } /** * 從數據庫讀取offset信息 * * @param size * @return */ private Map<TopicPartition, Long> getPartitionOffsetMapFromDB(int size) { Map<TopicPartition, Long> partitionOffsetMapFromDB = new HashMap<>(); String sql = "SELECT partition_num, offsets FROM kafka_info WHERE topic_group = ?"; jdbcHelper.executeQuery(sql, new Object[]{topic + "_" + groupId}, resultSet -> { int partition_num = -1; long offsets = -1; while (resultSet.next()) { partition_num = resultSet.getInt("partition_num"); offsets = resultSet.getLong("offsets"); System.out.println("partition_num=" + partition_num + ", offset=" + offsets); partitionOffsetMapFromDB.put(new TopicPartition(topic, partition_num), offsets); } System.out.println("partitionOffsetMapFromDB.size = " + partitionOffsetMapFromDB.size()); //判斷數據庫是否存在所有的分區的信息,如果沒有,則需要進行初始化 if (partitionOffsetMapFromDB.size() < size) { String insert = "INSERT INTO kafka_info (topic_group_partition,topic_group,partition_num) VALUES(?,?,?)"; List<Object[]> params = new ArrayList<>(); for (int p_num = 0; p_num < size; p_num++) { Object[] param = new Object[]{ topic + "_" + groupId + "_" + p_num, topic + "_" + groupId, p_num }; params.add(param); } jdbcHelper.batchExecute(insert, params); } }); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return partitionOffsetMapFromDB; } }
數據庫中記錄