每天學點SpringCloud(十三):SpringCloud-Stream整合RabbitMQ


我們知道,當微服務越來越來多的時候,僅僅是feign的http調用方式已經滿足不了我們的使用場景了。這個時候系統就需要接入消息中間件了。相比較於傳統的Spring項目、SpringBoot項目使用消息中間件的很多配置不同,SpringCloud Stream抽象了中間件產品的不同,在SpringCloud中你僅僅需要修改幾行配置文件就可以靈活的切換中間件產品而不需要修改任何代碼。

現在我們以SpringCloud Stream整合RabbitMQ為例來學習一下

創建生產者

1. 引入依賴

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. 定義配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spring:
cloud:
stream:
binders:
test:
type: rabbit
environment:
spring:
rabbitmq:
addresses: 10.0.20.132
port: 5672
username: root
password: root
virtual-host: /unicode-pay
bindings:
testOutPut:
destination: testRabbit
content-type: application/json
default-binder: test

現在來解釋一下這些配置的含義

  1. binders: 這是一組binder的集合,這里配置了一個名為test的binder,這個binder中是包含了一個rabbit的連接信息
  2. bindings:這是一組binding的集合,這里配置了一個名為testOutPut的binding,這個binding中配置了指向名test的binder下的一個交換機testRabbit。
  3. 擴展: 如果我們項目中不僅集成了rabbit還集成了kafka那么就可以新增一個類型為kafka的binder、如果項目中會使用多個交換機那么就使用多個binding,

3.創建通道

1
2
3
4
5
6
7
8
public interface MqMessageSource {

String TEST_OUT_PUT = "testOutPut";

@Output(TEST_OUT_PUT)
MessageChannel testOutPut();

}

這個通道的名字就是上方binding的名字

4. 發送消息

1
2
3
4
5
6
7
8
9
10
11
12
@EnableBinding(MqMessageSource.class)
public class MqMessageProducer {
@Autowired
@Output(MqMessageSource.TEST_OUT_PUT)
private MessageChannel channel;


public void sendMsg(String msg) {
channel.send(MessageBuilder.withPayload(msg).build());
System.err.println("消息發送成功:"+msg);
}
}

這里就是使用上方的通道來發送到指定的交換機了。需要注意的是withPayload方法你可以傳入任何類型的對象,但是需要實現序列化接口

5. 創建測試接口

EnableBinding注解綁定的類默認是被Spring管理的,我們可以在controller中注入它

1
2
3
4
5
6
7
8
@Autowired
private MqMessageProducer mqMessageProducer;

@GetMapping(value = "/testMq")
public String testMq(@RequestParam("msg")String msg){
mqMessageProducer.sendMsg(msg);
return "發送成功";
}

生產者的代碼到此已經完成了。

創建消費者

1. 引入依賴

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. 定義配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spring:
cloud:
stream:
binders:
test:
type: rabbit
environment:
spring:
rabbitmq:
addresses: 10.0.20.132
port: 5672
username: root
password: root
virtual-host: /unicode-pay
bindings:
testInPut:
destination: testRabbit
content-type: application/json
default-binder: test

這里與生產者唯一不同的地方就是testIntPut了,相信你已經明白了,它是binding的名字,也是通道與交換機綁定的關鍵

3.創建通道

1
2
3
4
5
6
7
8
public interface MqMessageSource {

String TEST_IN_PUT = "testInPut";

@Input(TEST_IN_PUT)
SubscribableChannel testInPut();

}

4. 接受消息

1
2
3
4
5
6
7
8
@EnableBinding(MqMessageSource.class)
public class MqMessageConsumer {
@StreamListener(MqMessageSource.TEST_IN_PUT)
public void messageInPut(Message<String> message) {
System.err.println(" 消息接收成功:" + message.getPayload());
}

}

這個時候啟動Eureka、消息生產者和消費者,然后調用生產者的接口應該就可以接受到來自mq的消息了。

GitHub地址:https://github.com/shiyujun/spring-cloud-demo。代碼所在模塊:cloud-demo-consumer,cloud-demo-provider-2

如果對您有所幫助,請記得幫忙點一個star哦

 

本文出自http://zhixiang.org.cn,轉載請保留


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM