冪等指的就是執行多次和執行一次的效果相同,主要是為了防止數據重復消費。MQ中為了保證消息的可靠性,生產者發送消息失敗(例如網絡超時)會觸發 "重試機制",它不是生產者重試而是MQ自動觸發的重試機制, 而這種情況下消費者就會收到兩條消息,比如明明只需要扣一次款, 可是消費者卻執行了2次。為了解決冪等問題,每一個消息應該有一個全局的唯一的標識,當處理過這條消息后,就把這個標識保存到數據庫或者redis中,在處理消息前前判斷這個標識記錄為空就好了。像activemq中msgId就是唯一的,我們可以直接拿這個id來判斷,但是rocketmq重試機制不一樣,它重發會產生一個新的id,但是它提供了setKeys()這個api,我們可以給key設置一個唯一的流水編號來加以判斷。(重試機制是不存在並發問題的,它是間隔一段時間自動促發的)。
1. 導入依賴( 生產者和消費者的依賴都一樣)

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.7.RELEASE</version> <relativePath/> </parent> <!-- springcloud <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Camden.SR6</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> --> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <!-- webmvc --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 集成lombok 框架(get/set) --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- RocketMq --> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency> <!-- 熱加載 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <!-- jackson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.30</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
2. 生產者配置參數和配置文件

#該應用是否啟用生產者
#rocketmq.producer.isOnOff=on
#發送同一類消息的設置為同一個group,保證唯一,默認不需要設置,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一標示
rocketmq.producer.groupName=mqtest
#mq的nameserver地址
rocketmq.producer.namesrvAddr=192.168.5.7:9876
#消息最大長度 默認1024*4(4M)
rocketmq.producer.maxMessageSize=4096
#發送消息超時時間,默認3000
rocketmq.producer.sendMsgTimeout=3000
#發送消息失敗重試次數,默認2
rocketmq.producer.retryTimesWhenSendFailed=3

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; /** * 生產者配置 */ @PropertySource("classpath:rocketmq.properties") @Configuration public class MQProducerConfiguration { public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class); /** * 發送同一類消息的設置為同一個group,保證唯一,默認不需要設置,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一標示 */ @Value("${rocketmq.producer.groupName}") private String groupName; /** 服務器地址 */ @Value("${rocketmq.producer.namesrvAddr}") private String namesrvAddr; /** * 消息最大大小,默認4M */ @Value("${rocketmq.producer.maxMessageSize}") private Integer maxMessageSize ; /** * 消息發送超時時間,默認3秒 */ @Value("${rocketmq.producer.sendMsgTimeout}") private Integer sendMsgTimeout; /** * 消息發送失敗重試次數,默認2次 */ @Value("${rocketmq.producer.retryTimesWhenSendFailed}") private Integer retryTimesWhenSendFailed; @Bean public DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer; producer = new DefaultMQProducer(this.groupName); producer.setNamesrvAddr(this.namesrvAddr); //如果需要同一個jvm中不同的producer往不同的mq集群發送消息,需要設置不同的instanceName //producer.setInstanceName(instanceName); producer.setMaxMessageSize(this.maxMessageSize); producer.setSendMsgTimeout(this.sendMsgTimeout); //如果發送消息失敗,設置重試次數,默認為2次 producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); try { producer.start(); LOGGER.info(String.format("rocketmq producer start ")); } catch (MQClientException e) { LOGGER.error(String.format("producer is error {}", e.getMessage(),e)); } return producer; } }
3. 生產者發送消息

import java.util.UUID; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import lombok.extern.slf4j.Slf4j; @RestController @Slf4j public class TestController { /**使用RocketMq的生產者*/ @Autowired private DefaultMQProducer defaultMQProducer; @RequestMapping("/send") public void send(){ String msg = "冪等"; log.info("開始發送消息:"+msg); try { // arg0主題名稱 arg1分組 arg2內容 Message sendMsg = new Message("DemoTopic","wulei",(msg).getBytes()); // 注意: activemq的msgId是唯一的,但是rocketmq的不是,所以冪等不能用id來判斷,我們可以通過setKeys來解決,一般都是業務id,這里用隨機數代替。 sendMsg.setKeys(UUID.randomUUID().toString()); SendResult sendResult = defaultMQProducer.send(sendMsg); //默認3秒超時 log.info("消息發送響應信息:"+sendResult.toString()); } catch (Exception e) { e.printStackTrace(); } } }
4. 消費者配置參數和配置文件

##該應用是否啟用消費者 #rocketmq.consumer.isOnOff=on #發送同一類消息的設置為同一個group,保證唯一,默認不需要設置,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一標示 rocketmq.consumer.groupName=mqtest #mq的nameserver地址 rocketmq.consumer.namesrvAddr=192.168.5.7:9876 #該消費者訂閱的主題和tags("*"號表示訂閱該主題下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*; rocketmq.consumer.topics=DemoTopic~*; rocketmq.consumer.consumeThreadMin=20 rocketmq.consumer.consumeThreadMax=64 #設置一次消費消息的條數,默認為1條 rocketmq.consumer.consumeMessageBatchMaxSize=1

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.wulei.listener.MQConsumeMsgListenerProcessor; /** * 消費者配置 */ @PropertySource("classpath:rocketmq.properties") @Configuration public class MQConsumerConfiguration { public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class); // 地址 @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; // 發送同一類消息的設置為同一個group,保證唯一,默認不需要設置,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一標示 @Value("${rocketmq.consumer.groupName}") private String groupName; // 該消費者訂閱的主題和tags("*"號表示訂閱該主題下所有的tags) @Value("${rocketmq.consumer.topics}") private String topics; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}") private int consumeMessageBatchMaxSize; @Autowired private MQConsumeMsgListenerProcessor mqMessageListenerProcessor; @Bean public DefaultMQPushConsumer getRocketMQConsumer(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.registerMessageListener(mqMessageListenerProcessor); /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 * 如果非第一次啟動,那么按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); /** * 設置消費模型,集群還是廣播,默認為集群 */ //consumer.setMessageModel(MessageModel.CLUSTERING); /** * 設置一次消費消息的條數,默認為1條 */ consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); try { /** * 設置該消費者訂閱的主題和tag,如果是訂閱該主題下的所有tag,則tag使用*;如果需要指定訂閱該主題下的某些tag,則使用||分割,例如tag1||tag2||tag3 */ String[] topicTagsArr = topics.split(";"); for (String topicTags : topicTagsArr) { String[] topicTag = topicTags.split("~"); consumer.subscribe(topicTag[0],topicTag[1]); } consumer.start(); LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr); }catch (MQClientException e){ LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e); } return consumer; } }
5. 消費者監聽消息

import java.util.HashMap; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.common.message.MessageExt; @Component public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently{ private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class); // 假裝這是一個redis private HashMap<String, String> myredis = new HashMap<String, String>(); /** * 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數來批量接收消息<br/> * 不要拋異常,如果沒有return CONSUME_SUCCESS ,consumer會重新消費該消息,直到return CONSUME_SUCCESS */ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // if(CollectionUtils.isEmpty(msgs)){ // logger.info("接受到的消息為空,不處理,直接返回成功"); // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // } // //for(MessageExt messageExt : msgs) { } MessageExt messageExt = msgs.get(0); String keys = messageExt.getKeys();// 自定義的唯一key String msgId = null; // 消息id(不是唯一的) String msgContext = null; // 消息內容 int reconsume = 0; // 重試次數 if(messageExt.getTopic().equals("DemoTopic") && messageExt.getTags().equals("wulei")){ if(myredis.get(keys)==null) { //logger.info("接受到的消息為:"+messageExt.toString()); msgId = messageExt.getMsgId(); msgContext = new String(messageExt.getBody()); reconsume = messageExt.getReconsumeTimes(); try { int i = 1/0; System.out.println("消費成功: id:"+msgId+" msg"+msgContext+" 次數"+reconsume); myredis.put(messageExt.getKeys(), msgContext); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { // 重試3次就不在重試了,直接返回消費成功狀態,並觸發人工補償機制。 if(reconsume==2) { myredis.put(messageExt.getKeys(), msgContext); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }else { // 一般消費者這邊盡量不要拋異常,它失敗就會觸發重試機制。如果非要拋異常可以在try{}catch{}里面return ConsumeConcurrentlyStatus.RECONSUME_LATER(表示失敗讓他重試) return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }else { // 已經消費過就不要再重試了,直接返回成功。 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }else { // 不存在不要再重試了,直接返回成功。 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } }