消息隊列的消費冪等性如何保證


什么是冪等?

任意多次執行所產生的影響均與一次執行的影響相同就可以稱為冪等

什么是消息冪等?

當出現消費者對某條消息重復消費的情況時,重復消費的結果與消費一次的結果是相同的,並且多次消費並未對業務系統產生任何負面影響

為什么我們要保證冪等性,不保證冪等性,會不會有問題?

這個問題其實沒法准確回答。回答這個問題的根源得從業務場景上進行分析。比如正常業務情況下,我們是不允許同個訂單重復支付,這種業務場景我們就需要確保冪等性。再比如日志記錄,這種業務場景,我們可能就不需要做冪等判斷。

因此是否要保證冪等性,得基於業務進行考量

消息隊列的消費冪等性如何保證?

沒法保證。前面說了要保證冪等性,得基於業務場景進行考量。消息隊列他本身就不是給你用來做業務冪等性用的。如果你要實現業務冪等性,靠消息隊列是沒法幫你完成的,你自己得根據自身業務場景,來實現冪等。

常用的業務冪等性保證方法

1、利用數據庫的唯一約束實現冪等

比如將訂單表中的訂單編號設置為唯一索引,創建訂單時,根據訂單編號就可以保證冪等

2、去重表

這個方案本質也是根據數據庫的唯一性約束來實現。其實現大體思路是:首先在去重表上建唯一索引,其次操作時把業務表和去重表放在同個本地事務中,如果出現重現重復消費,數據庫會拋唯一約束異常,操作就會回滾

3、利用redis的原子性

每次操作都直接set到redis里面,然后將redis數據定時同步到數據庫中

4、多版本(樂觀鎖)控制

此方案多用於更新的場景下。其實現的大體思路是:給業務數據增加一個版本號屬性,每次更新數據前,比較當前數據的版本號是否和消息中的版本一致,如果不一致則拒絕更新數據,更新數據的同時將版本號+1

5、狀態機機制

此方案多用於更新且業務場景存在多種狀態流轉的場景

6、token機制

生產者發送每條數據的時候,增加一個全局唯一的id,這個id通常是業務的唯一標識,比如訂單編號。在消費端消費時,則驗證該id是否被消費過,如果還沒消費過,則進行業務處理。處理結束后,在把該id存入redis,同時設置狀態為已消費。如果已經消費過了,則不進行處理。

演示

例子使用springboot2加kafka來演示一下使用token機制如何實現消費端冪等

1、application.yml

spring:
  redis:
    host: localhost
    port: 6379
    # 連接超時時間(毫秒)
    timeout: 10000
    jedis:
      pool:
        # 連接池中的最大空閑連接
        max-idle: 8
        # 連接池中的最小空閑連接
        min-idle: 10
        # 連接池最大連接數(使用負值表示沒有限制)
        max-active: 100
        # 連接池最大阻塞等待時間(使用負值表示沒有限制)
        max-wait: -1
    password:
  kafka:
    # 以逗號分隔的地址列表,用於建立與Kafka集群的初始連接(kafka 默認的端口號為9092)
    bootstrap-servers: localhost:9092
    producer:
      # 發生錯誤后,消息重發的次數。
      retries: 0
      #當有多個消息需要被發送到同一個分區時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節數計算。
      batch-size: 16384
      # 設置生產者內存緩沖區的大小。
      buffer-memory: 33554432
      # 鍵的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: com.github.lybgeek.kafka.serialization.ObjectSerializer
      # acks=0 : 生產者在成功寫入消息之前不會等待任何來自服務器的響應。
      # acks=1 : 只要集群的首領節點收到消息,生產者就會收到一個來自服務器成功響應。
      # acks=all :只有當所有參與復制的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應。
      acks: 1
    consumer:
      # 自動提交的時間間隔 在spring boot 2.X 版本中這里采用的是值的類型為Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下該作何處理:
      # latest(默認值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據(在消費者啟動之后生成的記錄)
      # earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區的記錄
      auto-offset-reset: earliest
      # 是否自動提交偏移量,默認值是true,為了避免出現重復數據和數據丟失,可以把它設置為false,然后手動提交偏移量
      enable-auto-commit: false
      # 鍵的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: com.github.lybgeek.kafka.serialization.ObjectDeserializer
    listener:
      # 在偵聽器容器中運行的線程數。
      concurrency: 1
      #listner負責ack,每調用一次,就立即commit
      ack-mode: manual_immediate

2、實現kafka的自定義序列和反序列

:kakfa默認的序列化和反序列方式是StringSerializer和StringDeserializer。我們要改造成支持對象的序列化和反序列化

a、序列化

public class ObjectSerializer implements Serializer<Object> {


    @Override
    public byte[] serialize(String topic, Object object) {
        return BeanUtils.serialize(object);
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

b、反序列化

public class ObjectDeserializer implements Deserializer<Object> {

    @Override
    public Object deserialize(String topic, byte[] bytes) {
        return BeanUtils.deserialize(bytes);
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }
}

3、消息對象

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDTO<T> implements Serializable {

    private String messageId;


    private T data;
}

4、生產者

:本例子簡單模擬生產者多次生產同個消息,進而達到多次消費的效果

@Slf4j
@Component
public class KafkaProducer implements CommandLineRunner {


    @Autowired
    private KafkaTemplate kafkaTemplate;

    private int threadNum = 2;

    private ExecutorService executorService = Executors.newFixedThreadPool(threadNum);

    private CountDownLatch countDownLatch = new CountDownLatch(threadNum);


    @Override
    public void run(String... args) throws Exception {
          send();
    }


    private void send(){
        for(int i = 0; i < threadNum; i++){
            executorService.submit(()->{
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                   log.error(e.getMessage(),e);
                }
                String messageId = "b14701b8-4b08-4bbd-8a4e-70f76a432e99";

                MessageDTO messageDTO = MessageDTO.builder().messageId(messageId).data("hello").build();
                kafkaTemplate.send(Constant.TOPIC,messageDTO);
            });

            countDownLatch.countDown();
        }

    }
}

5、消費者

@Component
@Slf4j
public class KafkaConsumer {

    @Autowired
    private RedisUtils redisUtils;

    @KafkaListener(id = "msgId",topics = {Constant.TOPIC})
    public void receive(ConsumerRecord<String, MessageDTO<String>> record,Acknowledgment ack){

        boolean isRepeateConsume = checkRepeateConsume(record.value().getMessageId());
        if(isRepeateConsume){
            log.error("重復消費。。。。");
            //手工確認
            ack.acknowledge();
            return;
        }


       doBiz(record,ack);
    }

    private boolean checkRepeateConsume(String messageId){
        Object consumeStatus = redisUtils.get(messageId);
        /**
         * 1、如果redis存在消息ID,且消費狀態為已消費,則說明是重復消費,此時消費端丟棄該消息
         */
        if(Objects.nonNull(consumeStatus) && "已消費".equals(consumeStatus.toString())){
           // log.error("重復消費。。。。");
            return true;
        }

        /**
         * 2、如果redis不存在消息id,或者狀態不是已消費,則從業務方面進行判重
         *
         *  業務判重的可以考慮如下方法:
         *  如果該業務是存在狀態流轉,則采用狀態機策略進行判重。
         *  如果該業務不是狀態流轉類型,則在新增時,根據業務設置一個唯一的屬性,比如根據訂單編號的唯一性;
         *  更新時,可以采用多版本策略,在需要更新的業務表上加上版本號
         */
        return checkRepeateByBiz(messageId);
    }



    /**
     * 模擬業務消費
     * @param messageDTO
     * @param ack
     */
    private void doBiz(ConsumerRecord<String, MessageDTO<String>> record,Acknowledgment ack){
        System.out.println("------模擬業務處理-----------");
        System.out.println("--------執行業務處理:"+record.value()+"------------");
        System.out.println("--------------1、業務處理完畢-----------");
        try {
            redisUtils.setEx(record.value().getMessageId(), "已消費",12, TimeUnit.HOURS);
            System.out.println("-------------2、業務處理完畢后,把全局ID存入redis,並設置值為已消費");
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("----------3、業務處理完畢后,消費端手工確認");
        //手工確認
        ack.acknowledge();

    }

}

6、效果

2020-08-09 16:25:32.701  INFO 9552 --- [    msgId-0-C-1] io.lettuce.core.KqueueProvider           : Starting without optional kqueue library
------模擬業務處理-----------
--------執行業務處理:MessageDTO(messageId=b14701b8-4b08-4bbd-8a4e-70f76a432e99, data=hello)------------
--------------1、業務處理完畢-----------
-------------2、業務處理完畢后,把全局ID存入redis,並設置值為已消費
----------3、業務處理完畢后,消費端手工確認
2020-08-09 16:25:36.021 ERROR 9552 --- [    msgId-0-C-1] c.g.l.kafka.consumer.KafkaConsumer       : 重復消費。。。。

總結

消息隊列沒法幫你做到消費端的冪等性,消費端的冪等性得基於業務場景進行實現。不過消息隊列必須得保證消息不能丟,至少保證被消費一次,不然消息都丟了,沒數據搞啥業務冪等。在實現消費端處理業務時,要確保消費端是采用手工確認應答機制,而不是自動應答機制。這樣能夠確保消費端一旦業務處理失敗,生產者還能再次發送同個消息給消費端

demo鏈接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-mq-idempotent-consume


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM