比如你的訂單系統,平時每秒最多能處理一萬單請求,但促銷活動的時候可能會有五萬個請求,不限制會導致系統崩潰,限制,導致四萬個訂單失敗。可以用消息隊列來做請求緩沖,異步平緩的處理請求,實現流量削峰。
SpringClud 已經為我們提供了消息驅動框架 SpringCloud Stream。Stream定義了一個消息模型,對消息中間件進行一步封裝,可以做到代碼層面對中間件的無感知,使得微服務開發高度解耦。
1.消息系統通用模型
2.RocketMQ 架構
3.RocketMQ 環境搭建
RocketMQ 部署結構中主要包括
NameServer - Producer 和 Consumer 通過 NameServer 查找 Topic 所在的 Broker。
Broker - 負責消息的存儲、轉發。
部署完 NameServer、Broker 之后,RocketMQ 就可以正常工作了,但所有操作都是通過命令行,不太方便,所以我們還需要部署一個擴展項目 rocketmq-console,可以通過web界面來管理 RocketMQ。
下載地址:http://rocketmq.apache.org/dowloading/releases/
解壓編譯
unzip rocketmq-all-4.7.0-source-release.zip cd rocketmq-all-4.7.0-source-release mvn -Prelease-all -DskipTests clean install -U
創建配置文件: conf/broker.properties
brokerIP1=【你的IP】
啟動 NameServer,nohup后台啟動,>> nameserver.log 2> 指定日志生成文件
> cd distribution/target/rocketmq-4.6.0/rocketmq-4.6.0 > nohup sh bin/mqnamesrv >> nameserver.log 2>&1 &
查看日志文件
tail -f nameserver.log
啟動 Broker
nohup sh bin/mqbroker -n IP:9876 -c conf/broker.properties >>broker.log 2>&1 &
查看日志文件,有可能會沒有足夠內存而報錯。
tail -f broker.log
解決內存不足的問題,修改 bin/runbroker.sh ,把內存參數改小一點。
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
進行測試,新建兩個終端窗口。
//進行生產消息
export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
//用於消費消息 export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
一個 rocketmq 的擴展項目,其中的 rocketmq-console 是控制台,下載項目:
https://github.com/apache/rocketmq-externals
配置
cd rocketmq-console
vim src/main/resources/application.properties
vim src/main/resources/application.properties
設置 console 的端口
找到 rocketmq.config.namesrvAddr ,填上自己的地址端口
或者直接運行jar
java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 -rocketmq.config.namesrvAddr=192.168.31.113.9876
啟動成功 訪問頁面地址:http://192.168.31.113:8080
4.RocketMQ 生產者與消費者開發
開發步驟 - Producer(生產者)
添加 RocketMQ 依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency>
RocketMQ 配置
rocketmq: name-server: 192.168.31.113:9876 producer: group: test-group
創建消息實體
public class User { Long id; String name; public User(){} public User(Long id, String name) { this.id = id; this.name = name; } // setter/getter @Override public String toString() { return "User{id=" + id +", name='" + name + "'}"; } }
使用 RocketMQTemplate 發送消息
@RestController public class TestController { @Autowired RocketMQTemplate rocketMQTemplate; @GetMapping("/sendmsg") public String sendmsg(Long id, String name){ rocketMQTemplate.convertAndSend("topic-test", new User(id, name)); return "ok"; } }
開發步驟- Consumer(消費者)
添加 RocketMQ 依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency>
使用 RocketMQListener 接收消息
@Service @RocketMQMessageListener(consumerGroup = "group-consumer", topic = "topic-test") public class MyMQConsumer implements RocketMQListener<User> { @Override public void onMessage(User user) { // consume logic System.out.println(user); } }
5.RocketMQ 實現分布式事務
分布式事務的解決方案中,有一個可靠消息模式,就是使用消息隊列來實現的。這個方案的關鍵點:怎么保證本地事務與發送消息保持一直,本地成功 & 發送成功 || 本地失敗 & 發送失敗
事務型的生產者
代碼實現第一步,發送事務消息。
@GetMapping("/tx/test") public String sendTxMsg() { rocketMQTemplate.sendMessageInTransaction("topic-tx", MessageBuilder.withPayload("hi").build(), null); return "ok"; }
Producer 事務消息監聽器
package com.example.demo; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Component @RocketMQTransactionListener public class TxMsgListener implements RocketMQLocalTransactionListener { //本地事務的方法 @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { System.out.println("executeLocalTransaction ..."); RocketMQLocalTransactionState state = RocketMQLocalTransactionState.ROLLBACK; try{ Thread.sleep(60* 1000); state = RocketMQLocalTransactionState.COMMIT; }catch (Exception e){ e.printStackTrace(); } System.out.println("executeLocalTransaction return : " + state); return state; } //本地事務檢查執行結果的方法 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { System.out.println("checkLocalTransaction ..."); return RocketMQLocalTransactionState.COMMIT; } }
實驗場景
1. 本地事務正常,提交事務消息,Consumer 接收
2. 本地事務失敗,回滾事務消息,Consumer 未接收
3. 本地事務沒返回,mq 回查,Consumer 接收
上面測試第 3 個場景的時候,Consumer 會收到 2 次消息,可能導致重復增加積分。保證消息不被重復處理 ,就是“冪等”冪等是一個數學概念,可以理解為:同一個函數,參數相同的情況下,多次執行后的結果一致
解決方法:Consumer 端建立一個判重表,每次收到消息后先,先到判重表中看一下,看這條消息是否處理過。
6.SpringCloud Stream 開發模型
SpringCloud Stream 生產與消費開發實踐
1. 創建一個 stream-producer,集成 SpringCloud Stream,綁定 RocketMQ,發送消息
2. 創建一個 stream-Consumer,集成 SpringCloud Stream,綁定 RocketMQ,接收消息
stream-producer
添加 stream-rocketmq 依賴:
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency>
添加 stream-rocketmq 依賴:
spring: cloud: stream: rocketmq: binder: name-server: 49.235.54.12:9876 bindings: output: destination: topic-test-stream group: stream-consumer-group
開啟 Binding:@EnableBinding(Source.class)
@SpringBootApplication @EnableBinding(Source.class) public class StreamproducerApplication { public static void main(String[] args) { SpringApplication.run(StreamproducerApplication.class, args); } }
發送消息
@Autowired Source source; @GetMapping("teststream") public String testStream(){ source.output().send(MessageBuilder.withPayload("msg").build()); return "ok"; }
stream-consumer
添加 stream-rocketmq 依賴
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency>
rocketmq binder、binding destination 屬性配置:
spring: cloud: stream: rocketmq: binder: name-server: 49.235.54.12:9876 bindings: input: destination: topic-test-stream group: stream-consumer-group
開啟 Binding:
@SpringBootApplication @EnableBinding(Sink.class) public class StreamconsumerApplication { public static void main(String[] args) { SpringApplication.run(StreamconsumerApplication.class, args); } }
接收消息
@Service public class MyStreamConsumer { @StreamListener(Sink.INPUT) public void receive(String msg){ // consume logic System.out.println("receive: " + msg); } }
消息過濾
Consumer 可能希望處理具有某些特征的消息,這就需要對消息進行過濾。最簡單的方法就是收到消息后自己判斷一下 if ... else ...為了簡化開發,Stream 提供了消息過濾的方式,在 Listener 注解中加一個判斷條件即可:
@Service public class MyStreamConsumer { @StreamListener(value = Sink.INPUT, condition = "headers['test-header']=='my test'") public void receive(String msg){ System.out.println("receive: " + msg); } }
消息監控
收發消息不正常時怎么辦?可以查看監控信息actuator 中有 binding 信息、健康檢查信息,為我們提供排錯依據
/actuator/bindings
/actuator/health
/actuator/channels
7.SpringCloud Stream 自定義接口
上節通過 Stream 發送消息的方式:配置文件中指定了“bindings.output”,使用注解開啟了 binding“@EnableBinding(Source.class)”就可以使用“Source”發送消息了。這種默認的自動化方式非常便利,但是,如果想再加一個“output”通道怎么辦?
producer 添加 output 配置
server: port: 8081 spring: application: name: stream-producer cloud: stream: rocketmq: binder: name-server: 192.168.31.113:9876 bindings: output: destination: topic-test-stream my-output: destination: topic-test-stream-myoutput
producer 創建 source 接口
package com.example.demo; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface MySource { @Output("my-output") MessageChannel output(); }
producer 啟用自定義 source
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication @EnableBinding({Source.class, MySource.class}) public class StreamproducerApplication { public static void main(String[] args) { SpringApplication.run(StreamproducerApplication.class, args); } }
producer 發送消息
package com.example.demo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class TestController { @Autowired Source source; @Autowired MySource mySource; @GetMapping("testmysource") public String testmysource(String msg){ mySource.output().send(MessageBuilder.withPayload(msg).build()); return "ok"; } @GetMapping("teststream") public String teststream(String msg){ source.output().send(MessageBuilder.withPayload(msg) .setHeader("test-header", "my test").build()); return "ok"; } @GetMapping("/hi") public String hi() { return "hi"; } @GetMapping("/hello") public String hello(@RequestParam String name) { return "hello " + name + "!"; } }
自定義 sink
consumer 添加 iutput 配置
server: port: 8082 spring: application: name: stream-consumer cloud: stream: rocketmq: binder: name-server: 192.168.31.113:9876 bindings: input: destination: topic-test-stream group: stream-consumer-group my-input: destination: topic-test-stream-myoutput group: my-group
consumer 創建 sink 接口
package com.example.demo; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface MySink { String INPUT = "my-input"; @Input(INPUT) SubscribableChannel input(); }
consumer 啟用自定義 sink
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication @EnableBinding({Sink.class,MySink.class}) public class StreamconsumerApplication { public static void main(String[] args) { SpringApplication.run(StreamconsumerApplication.class, args); } }
consumer 接收消息
package com.example.demo; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Service; @Service public class MyStreamConsumer { @StreamListener(value= Sink.INPUT, condition = "headers['test-header']=='my test'") public void receive(String msg){ System.out.println("receive: " + msg); } @StreamListener(value= MySink.INPUT) public void receive_myinput(String msg){ System.out.println("receive: " + msg); } }
8.SpringCloud Stream 消費異常處理
消費者在接收消息時,可能會發生異常,如果我們想處理這些異常,需要采取一些處理策略,可以分為:
1. 應用級處理 - 通用,與底層 MQ 無關
2. 系統級處理 - 根據不同的 MQ 特性進行處理,例如 RabbitMQ 可以放入死信隊列
3. 重試 RetryTemplate - 配置消費失敗后如何重試
本節我們學習最通用的“應用級處理”策略,此方式又分為:局部處理方式(某個消息組),全局處理方式。
配置文件
server: port: 8080 spring: application: name: consumer-exception cloud: stream: rocketmq: binder: name-server: 192.168.31.113:9876 bindings: input: destination: stream-exception group: group-exception
開啟注解添加綁定
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication @EnableBinding(Sink.class) public class ConsumerexceptionApplication { public static void main(String[] args) { SpringApplication.run(ConsumerexceptionApplication.class, args); } }
創建消息監聽器
package com.example.demo; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.support.ErrorMessage; import org.springframework.stereotype.Service; @Service @Slf4j public class MyStreamConsumer { @StreamListener(Sink.INPUT) public void receive(String msg){ log.info("msg: {}", msg); throw new IllegalArgumentException("param error"); } @StreamListener("errorChannel") public void handleError(ErrorMessage errorMessage){ log.error("全局異常. errorMsg: {}", errorMessage); } // @ServiceActivator( // inputChannel = "stream-exception.group-exception.errors" // ) // public void handleError(ErrorMessage errorMessage){ // log.error("局部異常. errorMsg: {}", errorMessage); // } }
9.SpringCloud Stream 消費組
線上環境中,一個服務通常都會運行多個實例,以保證高可靠,對於消費服務,運行多個實例的時候,每個實例就都會去消費消息,造成重復消費,設置 Consumer Group(消費組)可以實現組內消費者均衡消費。本節我們就學習消費組的設置,體驗其效果。
consumer-group 配置文件
server: port: 8082 spring: application: name: consumer-group cloud: stream: rocketmq: binder: name-server: 192.168.31.113:9876 bindings: input: destination: topic-consumer-group group: test-group
添加注解
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication @EnableBinding(Sink.class) public class ConsumergroupApplication { public static void main(String[] args) { SpringApplication.run(ConsumergroupApplication.class, args); } }
消息監聽器
package com.example.demo; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Service; @Service public class MyConsumer { @StreamListener(Sink.INPUT) public void receive(String msg) { System.out.println("msg: " + msg); } }
10.SpringCloud Stream 消息分區
消息被哪個實例消費是不一定的,但如果我們希望同一類的消息被同一個實例消費怎么辦?例如同一個用戶的訂單消息希望被同一個示例處理,這樣更便於統計。SpringCloud Stream 提供了消息分區的功能,可以滿足這個場景的需求,本節我們就學習如何使用。
創建1個 Producer 一直發送消息,設置消息如何分區
創建1個 Consumer 接收消息,設置按分區接收消息
啟動4個 Consumer 實例,指定分區標識,同一分區的消息應被相同的 Consumer 實例接收
producer 使用 rabbitmq
producer 配置文件
server: port: 8081 spring: application: name: partition-producer cloud: stream: default-binder: rabbit bindings: output: destination: topic-test-stream-partition producer: partition-key-expression: headers['partitionKey'] - 1 partition-count: 4 rabbitmq: addresses: localhost port: 5672 username: guest password: guest virtual-host: /
producer TestController
package com.example.demo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.util.Random; @RestController public class TestController { @Autowired Source source; // 消息內容 private final String[] data = new String[]{ "f", "g", "h", "fo", "go", "ho", "foo", "goo", "hoo", "fooz", "gooz", "hooz" }; @GetMapping("/produce") public String produce() { for (int i = 0; i < 100; i++) { try { // 隨機從 data 數組中獲取一個字符串,作為消息內容 Random RANDOM = new Random(System.currentTimeMillis()); String value = data[RANDOM.nextInt(data.length)]; System.out.println("Sending: " + value); // 發送消息 source.output().send( MessageBuilder.withPayload(value) // 設置頭信息 partitionKey,值為字符串的長度 .setHeader("partitionKey", value.length()) .build()); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } return "ok"; } }
consumer 配置文件
server: port: 9003 spring: application: name: partition-consumer cloud: stream: default-binder: rabbit bindings: input: destination: topic-test-stream-partition group: stream-test-partition consumer: partitioned: true instance-index: 3 instance-count: 4 rabbitmq: addresses: localhost port: 5672 username: guest password: guest virtual-host: /