spring boot:用rocketmq發送延時消息用來取消訂單(spring boot 2.3.3)


一,為什么要用延時消息來取消訂單?

      1,為什么要取消訂單

      在電商的下單過程中,需要在生成訂單時扣減庫存,

      但有可能發生這種情況:用戶下了單,臨時改變主意不再支付,

      則訂單不能無限期的保留,因為還要把占用的庫存數量釋放出來,

      所以通常會在用戶下單后半小時(或其他時長)把未支付的訂單取消不再保留。

 

     2,取消訂單的方法:

        通常我們會在crond中創建一個定時運行的任務,每1分鍾執行一次,

        把下單時間超過半小時的取出來,檢查訂單狀態是否還是未支付,

        如果仍未支付,則修改訂單狀態為無效,同時把庫存數量加回

        這個做法的缺點是數據庫繁忙時會增加數據庫的壓力

 

     3,rocketmq的延時消息功能可以精准的在指定時間把消息發送到消費者,

        而無需掃描數據庫,

        在這里我們使用延時消息來實現取消訂單功能

 

說明:劉宏締的架構森林是一個專注架構的博客,地址:https://www.cnblogs.com/architectforest

         對應的源碼可以訪問這里獲取: https://github.com/liuhongdi/

說明:作者:劉宏締 郵箱: 371125307@qq.com

 

二,演示項目的相關信息

1,項目地址:

https://github.com/liuhongdi/mqdelay

 

2,項目功能說明

        演示了用rocketmq發送延時消息

 

3,項目結構:如圖:

 

三,配置文件說明

1,send/pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <!--fastjson begin-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>

 

2,receive/pom.xml

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <!--fastjson begin-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>

說明:兩個模塊的pom.xml內容相同

 

3,receive/application.properties

server.port=8081

說明:兩個模塊同時運行時,需要把端口區分開,
         send不做修改,使用默認的8080端口
         receive這里指定使用8081端口

 

四,java代碼說明

1,send/OrderMsg.java

//發送的取消訂單信息
public class OrderMsg {
    //用戶id
    private int userId;
    public int getUserId() {
        return this.userId;
    }
    public void setUserId(int userId) {
        this.userId = userId;
    }

    //訂單sn
    private String orderSn;
    public String getOrderSn() {
        return this.orderSn;
    }
    public void setOrderSn(String orderSn) {
        this.orderSn = orderSn;
    }

}

說明:要取消的訂單的消息模型,

OrderMsg.java在兩個模塊中一致

 

2,send/RocketConstants.java

public class RocketConstants {//name server,有多個時用分號隔開
    public static final String NAME_SERVER = "127.0.0.1:9876";
    //topic的名字,應該從服務端先創建好,否則會報錯
    public static final String TOPIC = "laoliutest";
}

rocketmq需要用到的name server和topic名字

RocketConstants.java在兩個模塊中一致

 

3,send/Producer.java

//消息生產者類
@Component
public class Producer {
    private String producerGroup = "order_producer";
    private DefaultMQProducer producer;
    //構造
    public Producer(){
        //創建生產者
        producer = new DefaultMQProducer(producerGroup);
        //不開啟vip通道
        producer.setVipChannelEnabled(false);
        //設定 name server
        producer.setNamesrvAddr(RocketConstants.NAME_SERVER);
        //producer.m
        start();
    }

    //使producer啟動
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
    //返回producer
    public DefaultMQProducer getProducer(){
        return this.producer;
    }

    //進行關閉的方法
    public void shutdown(){
        this.producer.shutdown();
    }
}

生產者類

 

4,send/HomeController.java

@RestController
@RequestMapping("/home")
public class HomeController {

    @Autowired
    private Producer producer;

    //初始化並發送消息
    @RequestMapping("/send")
    public String send() throws Exception {

        int userId = 1;

        //得到訂單編號:
        DateTimeFormatter df_year = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");
        LocalDateTime date = LocalDateTime.now();
        String datestr = date.format(df_year);

        //消息,指定用戶id和訂單編號
        OrderMsg msg = new OrderMsg();
        msg.setUserId(userId);
        msg.setOrderSn(userId+"_"+datestr);

        String msgJson = JSON.toJSONString(msg);
        //生成一個信息,標簽在這里手動指定
        Message message = new Message(RocketConstants.TOPIC, "carttag", msgJson.getBytes());
        //delaytime的值:
        //messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        message.setDelayTimeLevel(5);
        //發送信息
        SendResult sendResult = producer.getProducer().send(message);
        System.out.println("時間:"+ TimeUtil.getTimeNow()+";生產者發送一條信息,內容:{"+msgJson+"},結果:{"+sendResult+"}");

        return "success";
    }
}

發送消息

注意延遲時間的值5對應1m,所以消費者應該會在1分鍾后才收到消息

 

5,receive/Consumer.java

@Component
public class Consumer {

    //消費者實體對象
    private DefaultMQPushConsumer consumer;

    //消費者組
    public static final String CONSUMER_GROUP = "order_consumer";

    //構造函數 用來實例化對象
    public Consumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr(RocketConstants.NAME_SERVER);
        //指定消費模式
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //指定訂閱主題
        //指定訂閱標簽,*代表所有標簽
        consumer.subscribe(RocketConstants.TOPIC, "*");
        //注冊一個消費消息的Listener
        //對消息的消費在這里實現
        //兩種回調 MessageListenerConcurrently 為普通監聽,MessageListenerOrderly 為順序監聽
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            //遍歷接收到的消息
            try {
                for (Message msg : msgs) {
                    //得到消息的body
                    String body = new String(msg.getBody(), "utf-8");
                    //用json轉成對象
                    OrderMsg msgOne = JSON.parseObject(body, OrderMsg.class);
                    //打印用戶id
                    System.out.println("消息:用戶id:"+msgOne.getUserId());
                    //打印訂單編號
                    System.out.println("消息:訂單sn:"+msgOne.getOrderSn());
                    //打印消息內容
                    System.out.println("時間:"+ TimeUtil.getTimeNow()+";消費者已接收到消息-topic={"+msg.getTopic()+"}, 消息內容={"+body+"}");
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        consumer.start();
        System.out.println("消費者 啟動成功=======");
    }
}

 

6,其他非關鍵代碼可查看github

 

五,測試效果

1,分別啟動兩個模塊

 

2,訪問:

http://127.0.0.1:8080/home/send

返回:

success

查看send的控制台:

時間:2020-09-17 14:56:53.207;生產者發送一條信息,
內容:{{"orderSn":"1_20200917145653166","userId":1}},
結果:{SendResult [sendStatus=SEND_OK, msgId=C0A803D5231F42A57993559ADFC50000, offsetMsgId=7F00000100002A9F0000000000016E7B, 
messageQueue=MessageQueue [topic=laoliutest, brokerName=broker-a, queueId=0], queueOffset=13]}

注意發送的時間:2020-09-17 14:56:53.207

查看receive的控制台:

消息:用戶id:1
消息:訂單sn:1_20200917145653166
時間:2020-09-17 14:57:53.212;
消費者已接收到消息-topic={laoliutest}, 
消息內容={{"orderSn":"1_20200917145653166","userId":1}}

注意接收到的時間:2020-09-17 14:57:53.212

時長整好是60秒,和我們在代碼中的設置一致

 

六,查看spring boot版本:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.3.3.RELEASE)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM