RocketMQ可靠消息最終一致性解決方案 - 用戶消費賺積分業務


RocketMQ可靠消息最終一致性解決方案 - 用戶消費賺積分業務

 

================================

©Copyright 蕃薯耀 2021-05-14

https://www.cnblogs.com/fanshuyao/

 

什么是可靠消息最終一致性方案?

可靠消息最終一致性方案是指當事務發起方執行完成本地事務后發出一條消息到消息中間件,事務參與方(消息消費者)一定能夠接收到消息並處理事務成功,此方案強調的是只要消息發給事務參與方,則最終事務要達到一致。

具體流程圖如下所示:

 

 

 

消息事務一致性問題:

 

本地消息成功,消息超時(但發送成功),本地事務回滾,消息成功。
造成本地事務,與消息參與方的事務不一致。

 

 

一、數據庫設計(Mysql)

#RocketMQ可靠消息最終一致性解決方案 - 用戶消費賺積分業務


#用戶表
#drop table `user`;
CREATE TABLE `user`(
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_name VARCHAR(100) NOT NULL,
    money DOUBLE NOT NULL DEFAULT 100,#用戶金額
    score_sum DOUBLE NOT NULL DEFAULT 0,#總積分
    create_time DATETIME NOT NULL DEFAULT NOW(),
    remark VARCHAR(100)
);
INSERT INTO `user`(user_name) VALUES ('小明');


#消費記錄表
#DROP TABLE `money`;
CREATE TABLE `money`(
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    money_code BIGINT NOT NULL UNIQUE,#唯一標識,流水號
    user_id BIGINT NOT NULL,#啟用id
    money DOUBLE NOT NULL DEFAULT 0,#消費的金額
    msg_id VARCHAR(50),#回寫消息的id
    create_time DATETIME NOT NULL DEFAULT NOW(),
    remark VARCHAR(100)
);




#積分記錄表
#DROP TABLE `score`;
CREATE TABLE `score`(
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id BIGINT NOT NULL,
    money_code BIGINT NOT NULL UNIQUE,
    score_add DOUBLE,
    score_type VARCHAR(30) NOT NULL DEFAULT 'jifen',
    create_time DATETIME NOT NULL DEFAULT NOW(),
    remark VARCHAR(100)
);

SELECT * FROM `mq`.`user`;

SELECT * FROM `mq`.`score`;

#delete from `mq`.`score`;

SELECT * FROM `mq`.`money`;

#DELETE FROM `mq`.`money`;

 

 

 

 

二、RocketMQ消息生產者端

 

1、pom.xml引入依賴

<properties>    
    <rocketmq-spring-boot.version>2.2.0</rocketmq-spring-boot.version>
</properties>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
     <version>${rocketmq-spring-boot.version}</version>
</dependency>

 

2、application.properties配置

#增加RocketMq依賴后,增加配置
rocketmq.name-server=localhost:9876
rocketmq.producer.group=rocketMQ2Producer
rocketmq.producer.sendMessageTimeout=30000

#自定義變量
my.mq.tag.score=tag-score

 

3、Controller端

import javax.servlet.http.HttpServletRequest;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.lqy.mq.bean.Result;
import com.lqy.mq.biz.producer.entity.Money;
import com.lqy.mq.biz.producer.service.UserService;
import com.lqy.utils.JsonUtil;
import com.lqy.utils.UidUtil;

/**
 * <p>
 *  前端控制器
 * </p>
 *
 * @author root
 * @since 2021-05-11
 */
@RestController
@RequestMapping("/user")
public class UserController {
    
    private static Logger log = Logger.getLogger(UserController.class);
    
    @Autowired
    private UserService userService;
    
    @RequestMapping("/consumeMoney")
    public Result consumeMoney(HttpServletRequest request, Money m) {
        
        log.info("m = " + JsonUtil.toJson(m));
        m.setMoneyCode(UidUtil.getUid());
        log.info("m2 = " + JsonUtil.toJson(m));
        
        userService.sendMsg(m);
        
        return Result.ok();
    }
}

 

4、Service端

import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.lqy.mq.biz.producer.dao.UserDao;
import com.lqy.mq.biz.producer.entity.Money;
import com.lqy.mq.biz.producer.entity.User;
import com.lqy.mq.biz.producer.service.MoneyService;
import com.lqy.mq.biz.producer.service.UserService;
import com.lqy.utils.JsonUtil;

/**
 * <p>
 *  服務實現類
 * </p>
 *
 * @author root
 * @since 2021-05-11
 */
@Service
public class UserServiceImpl extends ServiceImpl<UserDao, User> implements UserService {

    private static Logger log = Logger.getLogger(UserServiceImpl.class);
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Autowired
    private MoneyService moneyService;
    
    @Value("${my.mq.tag.score}")
    private String TAG_SCORE;
    
    
    @Transactional
    @Override
    public void consume(Money m) {
        User user = this.baseMapper.selectById(m.getUserId());
        
        user.setMoney(user.getMoney() - m.getMoney());
        
        this.baseMapper.updateById(user);
        
        moneyService.save(m);
        log.info("m = " +JsonUtil.toJson(m));
        
        if(m.getMoney() == 99) {
            throw new RuntimeException("消費生產者異常");
        }
        
    }
    
    
    @Transactional
    @Override
    public void sendMsg(Money m) {
        
        //先進行消費
        this.consume(m);
        
        
        try {
            
            //后發送消息
            //如果因為消息發生超時,導致業務的事務回滾,后面可以在事務消息中確認或者回查時,根據有沒有業務信息進行消息的提交或者回滾
            
            //消息對象生成
            Message<String> message = MessageBuilder.withPayload(JsonUtil.toJson(m))
                    .setHeader(RocketMQHeaders.TRANSACTION_ID, m.getMoneyCode())
                    .setHeader(RocketMQHeaders.TOPIC, TAG_SCORE)
                    .build();
            
            //事務發消息
            TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(TAG_SCORE, message, null);
            
            if(transactionSendResult == null || transactionSendResult.getSendStatus() != SendStatus.SEND_OK) {
                throw new RuntimeException("MQ消息發送失敗");
            }
            
            log.info("transactionSendResult = " +JsonUtil.toJson(transactionSendResult));
            log.info("msgId = " +transactionSendResult.getMsgId());
            
            //回寫msgId
            m.setMsgId(transactionSendResult.getMsgId());
            moneyService.updateById(m);
            
        }catch (Exception e) {
            throw new RuntimeException("MQ消息發送失敗");
        }
        
        
        //故意造錯:這里會導致RocketMq消息發送成功,但業務數據已經回滾。
        //一般情況,消息發送后,不應該再出現業務的代碼
        if(m.getMoney() == 7) {
            throw new RuntimeException("消息生產者異常,金額不對");
        }
    }
    
    
    
    
}

 

 

5、RocketMQLocalTransactionListener監聽器,RocketMQ發送消息成功確認和RocketMQ事務消息回查

import org.apache.log4j.Logger;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

import com.lqy.mq.biz.producer.entity.Money;
import com.lqy.mq.biz.producer.service.MoneyService;
import com.lqy.utils.JsonUtil;


@Component
@RocketMQTransactionListener
public class MyRocketMqListener implements RocketMQLocalTransactionListener {

    private static Logger log = Logger.getLogger(MyRocketMqListener.class);
    
    @Value("${my.mq.tag.score}")
    private String TAG_SCORE;
    
    @Autowired
    private MoneyService moneyService;
    
    
    private RocketMQLocalTransactionState dealMsg(Message msg, Object arg) {
        String json = new String((byte[])msg.getPayload());
        log.info("json = " + json);
        
        MessageHeaders messageHeaders = msg.getHeaders();
        if(messageHeaders != null) {
            String tag = messageHeaders.get(RocketMQHeaders.TOPIC, String.class);
            log.info("tag = " + tag);
            log.info("TAG_SCORE = " + TAG_SCORE);
            
            if(TAG_SCORE.equals(tag)) {
                Money m = JsonUtil.toBean(json, Money.class);
                Long moneyCode = m.getMoneyCode();
                
                log.info("moneyCode = " + moneyCode);
                
                //需要進行冪等校驗(因為RocketMq會不斷回查,即多次回查)
                if(moneyService.exist(moneyCode)) {
                    return RocketMQLocalTransactionState.COMMIT;
                    
                }else {
                    return RocketMQLocalTransactionState.ROLLBACK;
                }
            }
        }
        
        return RocketMQLocalTransactionState.UNKNOWN;
    }
    
    
    //rocketmq保存信息成功后回調
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info("=====executeLocalTransaction=====");
        return dealMsg(msg, arg);
    }

    
    //rocketmq事務回查信息狀態
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("+++++checkLocalTransaction+++++");
        return dealMsg(msg, null);
    }

}

 

 

三、RocketMQ 消息消費者端

 

1、消費者pom.xml引入依賴

<properties>
    <rocketmq-spring-boot.version>2.2.0</rocketmq-spring-boot.version>
</properties>
    
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq-spring-boot.version}</version>
</dependency>

 

2、消費者application.properties配置

#增加RocketMq依賴后,增加配置
rocketmq.name-server=localhost:9876
rocketmq.consumer.group=rocketMQ2Consumer

#自定義變量
my.mq.tag.score=tag-score

 

3、消費者端RocketMQ監聽器:RocketMQListener

import org.apache.log4j.Logger;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.lqy.mq.biz.consumer.entity.Money;
import com.lqy.mq.biz.consumer.service.ScoreService;
import com.lqy.utils.JsonUtil;

@Component
@RocketMQMessageListener(consumerGroup = "rocketMQ2ConsumerListener", topic = "${my.mq.tag.score}")
//如果生產者有TAG(格式為:topic:mytag),則需要使用下面的
//@RocketMQMessageListener(consumerGroup = "rocketMQ2ConsumerListener", topic = "${my.mq.tag.score}", selectorType = SelectorType.TAG, selectorExpression = "")
public class MyConsumerListenner implements RocketMQListener<String> {

    private static Logger log = Logger.getLogger(MyConsumerListenner.class);
    
    @Autowired
    private ScoreService scoreService;
    

    @Override
    public void onMessage(String message) {
        log.info("message = " + message);
        
        Money money = JsonUtil.toBean(message, Money.class);
        
        scoreService.saveScore(money);
    }

}

 

 

4、消費者Service端

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.lqy.mq.biz.consumer.dao.ScoreDao;
import com.lqy.mq.biz.consumer.entity.Money;
import com.lqy.mq.biz.consumer.entity.Score;
import com.lqy.mq.biz.consumer.service.MoneyService;
import com.lqy.mq.biz.consumer.service.ScoreService;
import com.lqy.mq.biz.consumer.service.UserService;
import com.lqy.utils.JsonUtil;

/**
 * <p>
 *  服務實現類
 * </p>
 *
 * @author root
 * @since 2021-05-11
 */
@Service
public class ScoreServiceImpl extends ServiceImpl<ScoreDao, Score> implements ScoreService {
    
    private static Logger log = Logger.getLogger(ScoreServiceImpl.class);
    
    @Autowired
    private MoneyService moneyService;
    
    @Autowired
    private UserService userService;

    @Transactional
    @Override
    public Score saveScore(Money money) {
        
        //沒有錢的記錄,跳過(可能是因為業務回滾,但消息已經發生成功(消息超時等原因))
        //注意:但這個判斷會存在問題,當消息比較快,而在消息生產端生產的數據庫插入數據比較慢時,會導致消費者端未找到數據而走了這步。
        //所以這步是要拋異常的,因為這里沒有成功消費,RocketMq會重新攝像頭攝像頭推送
        if(!moneyService.exist(money.getMoneyCode())) {
            
            //當產生了RocketMq消息,但業務回滾時,此處會一直報錯
            //這里可以將沒有錢的記錄,保存在另一個表,方便業務進行核查。如果是消息生產者系統導致的問題,可以根據實際業務補償處理。
            //每次根據唯一鍵查詢,記錄失敗的次數,在N次失敗以后,可以通過RocketMQ后台處理該消息,避免一直報錯。
            log.error("沒有錢的記錄-----");
            log.error("money = " + JsonUtil.toJson(money));
            
            throw new RuntimeException("沒有錢的記錄");
        }
        
        //冪等校驗(回調會多次)
        //積分已經存在,跳過,避免重新消費
        if(this.exist(money.getMoneyCode())) {
            log.warn("積分已經存在+++++");
            log.warn("money = " + JsonUtil.toJson(money));
            return null;
        }
        
        Score score = new Score();
        score.setUserId(money.getUserId());
        score.setScoreAdd(money.getMoney());
        score.setMoneyCode(money.getMoneyCode());
        
        this.baseMapper.insert(score);
        
        this.userService.updateScore(money);
        
        if(money.getMoney() == 8) {
            throw new RuntimeException("消費者出現異常,金額不對:money.getMoney()=" + money.getMoney());
        }
        return score;
    }
    
    
    @Transactional(readOnly = true)
    @Override
    public Score getScoreByCode(long moneyCode) {
        QueryWrapper<Score> queryWrapper  = new QueryWrapper<Score>();
        queryWrapper.eq("money_code", moneyCode);
        return this.baseMapper.selectOne(queryWrapper);
    }
    
    
    @Transactional(readOnly = true)
    @Override
    public boolean exist(long moneyCode) {
        Score score = this.getScoreByCode(moneyCode);
        if(score == null) {
            return false;
        }else {
            return true;
        }
    }
    
    
}

 

 

 

(時間寶貴,分享不易,捐贈回饋,^_^)

 

 

================================

©Copyright 蕃薯耀 2021-05-14

https://www.cnblogs.com/fanshuyao/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM