RocketMQ 冪等性主要分為生產端和消費端冪等性
備注:這里只討論生產者 和消費者集群部署下的情況
生產者端冪等性保證:
1RocketMQ 為消息生產者提供了消息查詢的API,在消息發送之前,可以查詢該條消息是否發送過,注意但是該方法在2020年5月之后的版本,已經被廢掉了;
eg:
public class DefaultMQProducer extends ClientConfig implements MQProducer { @Deprecated @Override public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, InterruptedException { return this.defaultMQProducerImpl.queryMessage(withNamespace(topic), key, maxNum, begin, end); } }
2:引入第三方存儲,例如Redis,在消息發送之后,將發送記錄存儲在Redis中,下次發送消息之前,在Redis中查詢消息是否發送過
消費端冪等性保證:(處理必須唯一) 無論這個業務請求被(consumer)執行多少次,我們的數據庫的結果都是唯一的,不可變的。
1.去重策略:去重表機制,業務拼接去重策略(比如唯一流水號)
2.建立一個消息表,拿到這個消息做數據庫的insert操作。給這個消息做一個唯一主鍵(primary key)或者唯一約束,那么就算出現重復消費的情況,就會導致主鍵沖突。
高並發下去重:采用Redis去重(key天然支持原子性並要求不可重復),但是由於不在一個事務,要求有適當的補償策略,但是對於很重要的業務,不應該支持補償
3..利用redis事務,主鍵(我們必須把全量的操作數據都存放在redis里,然后定時去和數據庫做數據同步)—-即消費處理后,該處理本來應該保存在數據庫的,先保存在redis,再通過一定業務方式從redis中取數據進行db持久化
4.利用redis和關系型數據庫一起做去重機制
5.拿到這個消息做redis的set的操作.redis就是天然冪等性
6.准備一個第三方介質,來做消費記錄。以redis為例,給消息分配一個全局id,只要消費過該消息,將 < id,message>以K-V形式寫入redis。那消費者開始消費前,先去redis中查詢有沒消費記錄即可。
eg: redis唯一key保證冪等性代碼如下:
String idempotentValue = RedisUtil.get(RedisConstant.IDEMPOTENT.concat(msgId), String.class); if (!StringUtils.isEmpty(idempotentValue)) { log.info(">>>>>>>>>>>>>>>>>該消息已經被消費:【{}】", msgBody); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
//業務代碼.............
//冪等處理
RedisUtil.setEx(RedisConstant.IDEMPOTENT.concat(msgId), "1", 10, TimeUnit.DAYS);
參考:https://www.cnblogs.com/chx9832/p/12325871.html