依賴引入
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
配置mq連接屬性
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
仿照Processor
// 定中的常量
public static final String SEND_MSG = "sendmsg";
public static final String RECEIVED_MSG = "receivedmsg";
- 仿接口org.springframework.cloud.stream.messaging.Processor
public interface StreamClient {
@Input(Constant.SEND_MSG)
SubscribableChannel input();
@Output(Constant.SEND_MSG)
MessageChannel output();
發送消息
- 此處寫在了 controller,也可以寫在測試類
package com.cloud.order.controller;
import com.cloud.order.msg.StreamClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@RestController
public class SendMsg {
@Autowired
private StreamClient streamClient; // 自己定義的StreamClient
@RequestMapping("/sendmsg")
public void sendMsg() {
String msg = "hello " + new Date();
streamClient.output().send(MessageBuilder.withPayload(msg).build());
}
}
接收消息
@Component
@EnableBinding(StreamClient.class)//定義好的接口
@Slf4j
public class StreamReceiver {
@StreamListener(Constant.SEND_MSG) // 監聽的消息隊列
public String process(Object val){
log.info("StreamReceiver msg"+ val);
return "received msg "+new Date().getTime();
}
}
消費后返回消息
@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {
@StreamListener(Constant.SEND_MSG)
@SendTo(Constant.RECEIVED_MSG) //返回給的隊列,創建方式同上
public String process(Object val){
log.info("StreamReceiver msg"+ val);
return "received msg "+new Date().getTime();
}
@StreamListener(Constant.RECEIVED_MSG)
public void processReceiver(Object val){
log.info("received msg -- --"+ val);
}
}
其他
多實例,僅一個實例可接受到消息
# 僅讓一個實例接收到消息,msg-是程序中定義的隊列名字,order-根據語義自定義即可
spring.cloud.stream.bindings.msg.group=order
發送對象類型消息,在MQ中查看未消費的消息
# 可在消息隊列中看到堆積的消息的(當消息為對象格式時)完整屬性,msg-是程序中定義的隊列名字
spring.cloud.stream.bindings.msg.content-type=application/json