Spring Cloud Alibaba RocketMQ 消息驅動


比如你的訂單系統,平時每秒最多能處理一萬單請求,但促銷活動的時候可能會有五萬個請求,不限制會導致系統崩潰,限制,導致四萬個訂單失敗。可以用消息隊列來做請求緩沖,異步平緩的處理請求,實現流量削峰。

SpringClud 已經為我們提供了消息驅動框架 SpringCloud Stream。Stream定義了一個消息模型,對消息中間件進行一步封裝,可以做到代碼層面對中間件的無感知,使得微服務開發高度解耦。

1.消息系統通用模型

 

 

2.RocketMQ 架構

 

 

 

 

3.RocketMQ 環境搭建

RocketMQ 部署結構中主要包括

NameServer - Producer 和 Consumer 通過 NameServer 查找 Topic 所在的 Broker。
Broker - 負責消息的存儲、轉發。
部署完 NameServer、Broker 之后,RocketMQ 就可以正常工作了,但所有操作都是通過命令行,不太方便,所以我們還需要部署一個擴展項目 rocketmq-console,可以通過web界面來管理 RocketMQ。

下載地址:http://rocketmq.apache.org/dowloading/releases/

解壓編譯

unzip rocketmq-all-4.7.0-source-release.zip
cd rocketmq-all-4.7.0-source-release
mvn -Prelease-all -DskipTests clean install -U

創建配置文件: conf/broker.properties

 brokerIP1=【你的IP】

啟動 NameServer,nohup后台啟動,>> nameserver.log 2> 指定日志生成文件

> cd distribution/target/rocketmq-4.6.0/rocketmq-4.6.0
> nohup sh bin/mqnamesrv >> nameserver.log 2>&1 &

查看日志文件

tail -f nameserver.log

啟動 Broker

nohup sh bin/mqbroker -n IP:9876 -c conf/broker.properties >>broker.log 2>&1 &

查看日志文件,有可能會沒有足夠內存而報錯。

tail -f broker.log

解決內存不足的問題,修改 bin/runbroker.sh ,把內存參數改小一點。

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"

進行測試,新建兩個終端窗口。

//進行生產消息
export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
//用於消費消息
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

一個 rocketmq 的擴展項目,其中的 rocketmq-console 是控制台,下載項目:

https://github.com/apache/rocketmq-externals

配置

cd rocketmq-console
vim src/main/resources/application.properties

vim src/main/resources/application.properties

設置 console 的端口

找到 rocketmq.config.namesrvAddr ,填上自己的地址端口

或者直接運行jar

java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 -rocketmq.config.namesrvAddr=192.168.31.113.9876

啟動成功 訪問頁面地址:http://192.168.31.113:8080

4.RocketMQ 生產者與消費者開發

開發步驟 - Producer(生產者)

添加 RocketMQ 依賴

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>

RocketMQ 配置

rocketmq:
 name-server: 192.168.31.113:9876
 producer:
   group: test-group

創建消息實體

public class User {
 Long id;
 String name;
 public User(){}
 public User(Long id, String name) {
 this.id = id; this.name = name;
 }
 // setter/getter
 @Override
 public String toString() {
 return "User{id=" + id +", name='" + name + "'}";
 }
}

使用 RocketMQTemplate 發送消息

    @RestController
    public class TestController {
        @Autowired
        RocketMQTemplate rocketMQTemplate;
        @GetMapping("/sendmsg")
        public String sendmsg(Long id, String name){
            rocketMQTemplate.convertAndSend("topic-test", new User(id, name));
            return "ok";
        }
    }

開發步驟- Consumer(消費者)

添加 RocketMQ 依賴

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>

使用 RocketMQListener 接收消息

    @Service
    @RocketMQMessageListener(consumerGroup = "group-consumer", topic = "topic-test")
    public class MyMQConsumer implements RocketMQListener<User> {
        @Override
        public void onMessage(User user) {
            // consume logic
            System.out.println(user);
        }
    }

 

 5.RocketMQ 實現分布式事務

 

 分布式事務的解決方案中,有一個可靠消息模式,就是使用消息隊列來實現的。這個方案的關鍵點:怎么保證本地事務與發送消息保持一直,本地成功 & 發送成功 || 本地失敗 & 發送失敗

 

 

 事務型的生產者

 

 

 代碼實現第一步,發送事務消息。

    @GetMapping("/tx/test")
    public String sendTxMsg() {
        rocketMQTemplate.sendMessageInTransaction("topic-tx",
                MessageBuilder.withPayload("hi").build(), null);
        return "ok";
    }

Producer 事務消息監聽器

package com.example.demo;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@RocketMQTransactionListener
public class TxMsgListener implements RocketMQLocalTransactionListener {

     //本地事務的方法
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        System.out.println("executeLocalTransaction ...");

        RocketMQLocalTransactionState state = RocketMQLocalTransactionState.ROLLBACK;

        try{
            Thread.sleep(60* 1000);
            state = RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            e.printStackTrace();
        }

        System.out.println("executeLocalTransaction return : " + state);

        return state;
    }

    //本地事務檢查執行結果的方法
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        System.out.println("checkLocalTransaction ...");
        return RocketMQLocalTransactionState.COMMIT;
    }
}

實驗場景
1. 本地事務正常,提交事務消息,Consumer 接收
2. 本地事務失敗,回滾事務消息,Consumer 未接收
3. 本地事務沒返回,mq 回查,Consumer 接收
上面測試第 3 個場景的時候,Consumer 會收到 2 次消息,可能導致重復增加積分。保證消息不被重復處理 ,就是“冪等”冪等是一個數學概念,可以理解為:同一個函數,參數相同的情況下,多次執行后的結果一致
解決方法:Consumer 端建立一個判重表,每次收到消息后先,先到判重表中看一下,看這條消息是否處理過。


6.SpringCloud Stream 開發模型

 

 

 

 

 

 SpringCloud Stream 生產與消費開發實踐

1. 創建一個 stream-producer,集成 SpringCloud Stream,綁定 RocketMQ,發送消息
2. 創建一個 stream-Consumer,集成 SpringCloud Stream,綁定 RocketMQ,接收消息

 stream-producer

 添加 stream-rocketmq 依賴:

<dependency>
 <groupId>com.alibaba.cloud</groupId>
 <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

添加 stream-rocketmq 依賴:

spring:
 cloud:
   stream:
     rocketmq:
       binder:
         name-server: 49.235.54.12:9876
bindings:
  output:
     destination: topic-test-stream
     group: stream-consumer-group

開啟 Binding:@EnableBinding(Source.class)

@SpringBootApplication
@EnableBinding(Source.class)
public class StreamproducerApplication {
 public static void main(String[] args) {
   SpringApplication.run(StreamproducerApplication.class, args);
 }
}

 發送消息

@Autowired
Source source;

@GetMapping("teststream")
public String testStream(){
 source.output().send(MessageBuilder.withPayload("msg").build());
 return "ok";
}

 

stream-consumer

添加 stream-rocketmq 依賴

<dependency>
  <groupId>com.alibaba.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

rocketmq binder、binding destination 屬性配置:

spring:
 cloud:
   stream:
     rocketmq:
       binder:
         name-server: 49.235.54.12:9876
bindings:
  input:
    destination: topic-test-stream
    group: stream-consumer-group

開啟 Binding:

@SpringBootApplication
@EnableBinding(Sink.class)
public class StreamconsumerApplication {
public static void main(String[] args) {
  SpringApplication.run(StreamconsumerApplication.class, args);
 }
}

接收消息

@Service
public class MyStreamConsumer {
  @StreamListener(Sink.INPUT)
  public void receive(String msg){
  // consume logic
  System.out.println("receive: " + msg);
 }
}

消息過濾
Consumer 可能希望處理具有某些特征的消息,這就需要對消息進行過濾。最簡單的方法就是收到消息后自己判斷一下 if ... else ...為了簡化開發,Stream 提供了消息過濾的方式,在 Listener 注解中加一個判斷條件即可:

@Service
public class MyStreamConsumer {
 @StreamListener(value = Sink.INPUT,
  condition = "headers['test-header']=='my test'")
 public void receive(String msg){
   System.out.println("receive: " + msg);
 }
}

消息監控

收發消息不正常時怎么辦?可以查看監控信息actuator 中有 binding 信息、健康檢查信息,為我們提供排錯依據

/actuator/bindings
/actuator/health
/actuator/channels

7.SpringCloud Stream 自定義接口

上節通過 Stream 發送消息的方式:配置文件中指定了“bindings.output”,使用注解開啟了 binding“@EnableBinding(Source.class)”就可以使用“Source”發送消息了。這種默認的自動化方式非常便利,但是,如果想再加一個“output”通道怎么辦?

 producer 添加 output 配置

server:
  port: 8081
spring:
  application:
    name: stream-producer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.31.113:9876
      bindings:
        output:
          destination: topic-test-stream
        my-output:
          destination: topic-test-stream-myoutput

producer 創建 source 接口

package com.example.demo;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MySource {
    @Output("my-output")
    MessageChannel output();
}

producer 啟用自定義 source

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@EnableBinding({Source.class, MySource.class})
public class StreamproducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamproducerApplication.class, args);
    }

}

producer 發送消息

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class TestController {

    @Autowired
    Source source;

    @Autowired
    MySource mySource;

    @GetMapping("testmysource")
    public String testmysource(String msg){
        mySource.output().send(MessageBuilder.withPayload(msg).build());
        return "ok";
    }

    @GetMapping("teststream")
    public String teststream(String msg){
        source.output().send(MessageBuilder.withPayload(msg)
                .setHeader("test-header", "my test").build());
        return "ok";
    }

    @GetMapping("/hi")
    public String hi() {
        return "hi";
    }

    @GetMapping("/hello")
    public String hello(@RequestParam String name) {
        return "hello " + name + "!";
    }
}

 

自定義 sink

consumer 添加 iutput 配置

server:
  port: 8082
spring:
  application:
    name: stream-consumer
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.31.113:9876
      bindings:
        input:
          destination: topic-test-stream
          group: stream-consumer-group
        my-input:
          destination: topic-test-stream-myoutput
          group: my-group

consumer  創建 sink 接口

package com.example.demo;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySink {
    String INPUT = "my-input";

    @Input(INPUT)
    SubscribableChannel input();
}

consumer  啟用自定義 sink

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@EnableBinding({Sink.class,MySink.class})
public class StreamconsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamconsumerApplication.class, args);
    }

}

consumer 接收消息

package com.example.demo;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;

@Service
public class MyStreamConsumer {
    @StreamListener(value= Sink.INPUT,
        condition = "headers['test-header']=='my test'")
    public void receive(String msg){
        System.out.println("receive: " + msg);
    }

    @StreamListener(value= MySink.INPUT)
    public void receive_myinput(String msg){
        System.out.println("receive: " + msg);
    }
}

 

8.SpringCloud Stream 消費異常處理

消費者在接收消息時,可能會發生異常,如果我們想處理這些異常,需要采取一些處理策略,可以分為:
1. 應用級處理 - 通用,與底層 MQ 無關
2. 系統級處理 - 根據不同的 MQ 特性進行處理,例如 RabbitMQ 可以放入死信隊列
3. 重試 RetryTemplate - 配置消費失敗后如何重試

本節我們學習最通用的“應用級處理”策略,此方式又分為:局部處理方式(某個消息組),全局處理方式。

配置文件

server:
  port: 8080
spring:
  application:
    name: consumer-exception
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.31.113:9876
      bindings:
        input:
          destination: stream-exception
          group: group-exception

開啟注解添加綁定

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerexceptionApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerexceptionApplication.class, args);
    }

}

創建消息監聽器

package com.example.demo;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class MyStreamConsumer {
    @StreamListener(Sink.INPUT)
    public void receive(String msg){
        log.info("msg: {}", msg);
        throw new IllegalArgumentException("param error");
    }

    @StreamListener("errorChannel")
    public void handleError(ErrorMessage errorMessage){
        log.error("全局異常. errorMsg: {}", errorMessage);
    }

//    @ServiceActivator(
//            inputChannel = "stream-exception.group-exception.errors"
//    )
//    public void handleError(ErrorMessage errorMessage){
//        log.error("局部異常. errorMsg: {}", errorMessage);
//    }


}

9.SpringCloud Stream 消費組

線上環境中,一個服務通常都會運行多個實例,以保證高可靠,對於消費服務,運行多個實例的時候,每個實例就都會去消費消息,造成重復消費,設置 Consumer Group(消費組)可以實現組內消費者均衡消費。本節我們就學習消費組的設置,體驗其效果。

consumer-group 配置文件

server:
  port: 8082
spring:
  application:
    name: consumer-group
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.31.113:9876
      bindings:
        input:
          destination: topic-consumer-group
          group: test-group

添加注解

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumergroupApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumergroupApplication.class, args);
    }

}

消息監聽器

package com.example.demo;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;

@Service
public class MyConsumer {
    @StreamListener(Sink.INPUT)
    public void receive(String msg)
    {
        System.out.println("msg: " + msg);
    }
}

 

10.SpringCloud Stream 消息分區

消息被哪個實例消費是不一定的,但如果我們希望同一類的消息被同一個實例消費怎么辦?例如同一個用戶的訂單消息希望被同一個示例處理,這樣更便於統計。SpringCloud Stream 提供了消息分區的功能,可以滿足這個場景的需求,本節我們就學習如何使用。

創建1個 Producer 一直發送消息,設置消息如何分區
創建1個 Consumer 接收消息,設置按分區接收消息
啟動4個 Consumer 實例,指定分區標識,同一分區的消息應被相同的 Consumer 實例接收

 

 

 

producer 使用 rabbitmq

producer 配置文件

server:
  port: 8081
spring:
  application:
    name: partition-producer
  cloud:
    stream:
      default-binder: rabbit
      bindings:
        output:
          destination: topic-test-stream-partition
          producer:
            partition-key-expression: headers['partitionKey'] - 1
            partition-count: 4
  rabbitmq:
    addresses: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /

producer TestController 

package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;

@RestController
public class TestController {

    @Autowired
    Source source;

    // 消息內容
    private final String[] data = new String[]{
            "f", "g", "h",
            "fo", "go", "ho",
            "foo", "goo", "hoo",
            "fooz", "gooz", "hooz"
    };

    @GetMapping("/produce")
    public String produce() {
        for (int i = 0; i < 100; i++) {

            try {
                // 隨機從 data 數組中獲取一個字符串,作為消息內容
                Random RANDOM = new Random(System.currentTimeMillis());
                String value = data[RANDOM.nextInt(data.length)];
                System.out.println("Sending: " + value);

                // 發送消息
                source.output().send(
                        MessageBuilder.withPayload(value)
                                // 設置頭信息 partitionKey,值為字符串的長度
                                .setHeader("partitionKey", value.length())
                                .build());


                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return "ok";
    }
}

consumer 配置文件

server:
  port: 9003
spring:
  application:
    name: partition-consumer
  cloud:
    stream:
      default-binder: rabbit
      bindings:
        input:
          destination: topic-test-stream-partition
          group: stream-test-partition
          consumer:
            partitioned: true
      instance-index: 3
      instance-count: 4
  rabbitmq:
    addresses: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM