分布式事務——冪等設計(rocketmq案例)


  冪等指的就是執行多次和執行一次的效果相同,主要是為了防止數據重復消費。MQ中為了保證消息的可靠性,生產者發送消息失敗(例如網絡超時)會觸發 "重試機制",它不是生產者重試而是MQ自動觸發的重試機制, 而這種情況下消費者就會收到兩條消息,比如明明只需要扣一次款, 可是消費者卻執行了2次。為了解決冪等問題,每一個消息應該有一個全局的唯一的標識,當處理過這條消息后,就把這個標識保存到數據庫或者redis中,在處理消息前前判斷這個標識記錄為空就好了。像activemq中msgId就是唯一的,我們可以直接拿這個id來判斷,但是rocketmq重試機制不一樣,它重發會產生一個新的id,但是它提供了setKeys()這個api,我們可以給key設置一個唯一的流水編號來加以判斷。(重試機制是不存在並發問題的,它是間隔一段時間自動促發的)。

1. 導入依賴( 生產者和消費者的依賴都一樣)

    <parent> 
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.7.RELEASE</version>
        <relativePath/> 
    </parent>
    <!-- springcloud
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Camden.SR6</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
     -->
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <!-- webmvc -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 集成lombok 框架(get/set) -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- RocketMq -->
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>3.2.6</version>
        </dependency>
        <!-- 熱加載 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- jackson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.30</version>
        </dependency>
    </dependencies>
  
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build> 
pom.xml

2. 生產者配置參數和配置文件

#該應用是否啟用生產者
#rocketmq.producer.isOnOff=on
#發送同一類消息的設置為同一個group,保證唯一,默認不需要設置,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一標示
rocketmq.producer.groupName=mqtest
#mq的nameserver地址
rocketmq.producer.namesrvAddr=192.168.5.7:9876
#消息最大長度 默認1024*4(4M)
rocketmq.producer.maxMessageSize=4096
#發送消息超時時間,默認3000
rocketmq.producer.sendMsgTimeout=3000
#發送消息失敗重試次數,默認2
rocketmq.producer.retryTimesWhenSendFailed=3
rocketmq.properties
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
/**
 * 生產者配置
 */
@PropertySource("classpath:rocketmq.properties")
@Configuration
public class MQProducerConfiguration {
    public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfiguration.class);
    /**
     * 發送同一類消息的設置為同一個group,保證唯一,默認不需要設置,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一標示
     */
    @Value("${rocketmq.producer.groupName}")
    private String groupName;
    /** 服務器地址  */
    @Value("${rocketmq.producer.namesrvAddr}")
    private String namesrvAddr;
    /**
     * 消息最大大小,默認4M
     */
    @Value("${rocketmq.producer.maxMessageSize}")
    private Integer maxMessageSize ;
    /**
     * 消息發送超時時間,默認3秒
     */
    @Value("${rocketmq.producer.sendMsgTimeout}")
    private Integer sendMsgTimeout;
    /**
     * 消息發送失敗重試次數,默認2次
     */
    @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
    private Integer retryTimesWhenSendFailed;

    @Bean
    public DefaultMQProducer getRocketMQProducer() {
        DefaultMQProducer producer;
        producer = new DefaultMQProducer(this.groupName);
        producer.setNamesrvAddr(this.namesrvAddr);
        //如果需要同一個jvm中不同的producer往不同的mq集群發送消息,需要設置不同的instanceName
        //producer.setInstanceName(instanceName);
        producer.setMaxMessageSize(this.maxMessageSize);
        producer.setSendMsgTimeout(this.sendMsgTimeout);
        //如果發送消息失敗,設置重試次數,默認為2次
        producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
        try {
            producer.start();
            LOGGER.info(String.format("rocketmq producer start "));
        } catch (MQClientException e) {
            LOGGER.error(String.format("producer is error {}", e.getMessage(),e));
        }
        return producer;
    }
}
MQProducerConfiguration

3. 生產者發送消息

import java.util.UUID;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import lombok.extern.slf4j.Slf4j;
@RestController
@Slf4j
public class TestController {

    /**使用RocketMq的生產者*/
    @Autowired
    private DefaultMQProducer defaultMQProducer;
    
    @RequestMapping("/send")
    public void send(){
        String msg = "冪等";
        log.info("開始發送消息:"+msg);
        
        try {
            // arg0主題名稱    arg1分組    arg2內容
            Message sendMsg = new Message("DemoTopic","wulei",(msg).getBytes());
            // 注意: activemq的msgId是唯一的,但是rocketmq的不是,所以冪等不能用id來判斷,我們可以通過setKeys來解決,一般都是業務id,這里用隨機數代替。
            sendMsg.setKeys(UUID.randomUUID().toString());
            SendResult sendResult = defaultMQProducer.send(sendMsg);
            //默認3秒超時
            log.info("消息發送響應信息:"+sendResult.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
TestController

4. 消費者配置參數和配置文件

##該應用是否啟用消費者
#rocketmq.consumer.isOnOff=on
#發送同一類消息的設置為同一個group,保證唯一,默認不需要設置,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一標示
rocketmq.consumer.groupName=mqtest
#mq的nameserver地址
rocketmq.consumer.namesrvAddr=192.168.5.7:9876
#該消費者訂閱的主題和tags("*"號表示訂閱該主題下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
rocketmq.consumer.topics=DemoTopic~*;
rocketmq.consumer.consumeThreadMin=20
rocketmq.consumer.consumeThreadMax=64
#設置一次消費消息的條數,默認為1條
rocketmq.consumer.consumeMessageBatchMaxSize=1
rocketmq.properties
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.wulei.listener.MQConsumeMsgListenerProcessor;
/**
 * 消費者配置
 */
@PropertySource("classpath:rocketmq.properties")
@Configuration
public class MQConsumerConfiguration {
    public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfiguration.class);
    // 地址
    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;
    // 發送同一類消息的設置為同一個group,保證唯一,默認不需要設置,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一標示
    @Value("${rocketmq.consumer.groupName}")
    private String groupName;
    // 該消費者訂閱的主題和tags("*"號表示訂閱該主題下所有的tags)
    @Value("${rocketmq.consumer.topics}")
    private String topics;
    
    @Value("${rocketmq.consumer.consumeThreadMin}")
    private int consumeThreadMin;
    @Value("${rocketmq.consumer.consumeThreadMax}")
    private int consumeThreadMax;
    @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
    private int consumeMessageBatchMaxSize;
    
    @Autowired
    private MQConsumeMsgListenerProcessor mqMessageListenerProcessor;
    
    @Bean
    public DefaultMQPushConsumer getRocketMQConsumer(){
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.registerMessageListener(mqMessageListenerProcessor);
        /**
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        /**
         * 設置消費模型,集群還是廣播,默認為集群
         */
        //consumer.setMessageModel(MessageModel.CLUSTERING);
        /**
         * 設置一次消費消息的條數,默認為1條
         */
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
        try {
            /**
             * 設置該消費者訂閱的主題和tag,如果是訂閱該主題下的所有tag,則tag使用*;如果需要指定訂閱該主題下的某些tag,則使用||分割,例如tag1||tag2||tag3
             */
            String[] topicTagsArr = topics.split(";");
            for (String topicTags : topicTagsArr) {
                String[] topicTag = topicTags.split("~");
                consumer.subscribe(topicTag[0],topicTag[1]);
            }
            consumer.start();
            LOGGER.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr);
        }catch (MQClientException e){
            LOGGER.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e);
        }
        return consumer;
    }
}
MQConsumerConfiguration

5. 消費者監聽消息

import java.util.HashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently{
    private static final Logger logger = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
   
    // 假裝這是一個redis
    private HashMap<String, String> myredis = new HashMap<String, String>();
    
    /**
     *  默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數來批量接收消息<br/>
     *  不要拋異常,如果沒有return CONSUME_SUCCESS ,consumer會重新消費該消息,直到return CONSUME_SUCCESS
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//        if(CollectionUtils.isEmpty(msgs)){
//            logger.info("接受到的消息為空,不處理,直接返回成功");
//            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//        }
//        

        //for(MessageExt messageExt : msgs) { }
        MessageExt messageExt = msgs.get(0);
        String keys = messageExt.getKeys();// 自定義的唯一key
        String msgId = null;                // 消息id(不是唯一的)
        String msgContext = null;            // 消息內容
        int reconsume = 0;                   // 重試次數
        
        
        if(messageExt.getTopic().equals("DemoTopic") && messageExt.getTags().equals("wulei")){
            if(myredis.get(keys)==null) { 
                //logger.info("接受到的消息為:"+messageExt.toString());
                  msgId = messageExt.getMsgId();
                  msgContext = new String(messageExt.getBody());
                  reconsume = messageExt.getReconsumeTimes();
                  try {
                      int i = 1/0;
                      System.out.println("消費成功: id:"+msgId+"  msg"+msgContext+"   次數"+reconsume);
                      myredis.put(messageExt.getKeys(), msgContext);
                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    // 重試3次就不在重試了,直接返回消費成功狀態,並觸發人工補償機制。
                    if(reconsume==2) {
                        myredis.put(messageExt.getKeys(), msgContext);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }else {
                        // 一般消費者這邊盡量不要拋異常,它失敗就會觸發重試機制。如果非要拋異常可以在try{}catch{}里面return ConsumeConcurrentlyStatus.RECONSUME_LATER(表示失敗讓他重試)
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            }else {
                // 已經消費過就不要再重試了,直接返回成功。
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        }else {
            // 不存在不要再重試了,直接返回成功。
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
}
MQConsumeMsgListenerProcessor


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM