參考程序員DD大佬的文章,自己新建demo學習學習,由於需要消息回執,看到了@SendTo這個注解能夠實現,下面開始學習demo,新建兩個項目cloud-stream-consumer
消費端 和 cloud-stream-consumer
生產端
public interface StreamReceive {
@Input("MQRece")
SubscribableChannel mqReceive();
}
添加一個StreamReceive
接口,定義@input
通道
@Component
@Slf4j
public class ReceiveListener {
@StreamListener("MQRece")
public byte[] receive(byte[] bytes){
log.info("接受消息:"+new String(bytes));
return "ok".getBytes();
}
}
添加消息監聽,接受消息定義為byte[]
添加application.properties
配置文件信息
spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.211.11:9876
spring.cloud.stream.bindings.MQRece.destination=message-topic
spring.cloud.stream.bindings.MQRece.group=rece-group
server.port=19999
為MQRece通道添加主題message-topic
,組名rece-group
到此Stream 客戶端消費就完成了,本節需要把@SendTo注解用起來,需要新建一個MessageChannel進行產生消息
public interface MsgBackPush {
@Output("back-push")
MessageChannel backPush();
}
然后在ReceiveListener
添加@SendTo
@Component
@Slf4j
public class ReceiveListener {
@StreamListener("MQRece")
@SendTo("back-push")
public byte[] receive(byte[] bytes){
log.info("接受消息:"+new String(bytes));
return "ok".getBytes();
}
}
新增通道配置application.properties
spring.cloud.stream.bindings.back-push.destination=back-topic
spring.cloud.stream.bindings.back-push.group=back-group
SpringBoot啟動類記得添加 @EnableBinding(value = {StreamReceive.class,MsgBackPush.class})
@SpringBootApplication
@EnableBinding(value = {StreamReceive.class,MsgBackPush.class})
public class CloudStreamConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(CloudStreamConsumerApplication.class, args);
}
}
到此,cloud-stream-consumer這個demo就完成了
接下來看看 cloud-stream-producer
public interface StreamPush {
@Output("MQPush")
MessageChannel mqPush();
}
定義一個通道名為MQPush
,進行消息生產
public interface ProducerReceive {
@Input("producer-receive")
SubscribableChannel producerReceive();
}
定義一個通道名為producer-receive
,進行回執消息的消費
@Component
@Slf4j
public class ProducerListener {
@StreamListener("producer-receive")
public void producerReceive(byte[] bytes){
log.info("come back message:"+new String(bytes));
}
}
具體回執消息處理邏輯,再來看看application.properties
spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.214.191:9876
spring.cloud.stream.bindings.MQPush.destination=message-topic
spring.cloud.stream.bindings.MQPush.group=push-group
spring.cloud.stream.bindings.producer-receive.destination=back-topic
spring.cloud.stream.bindings.producer-receive.group=back-group
server.port=20000
為通道設置topic和group,新建一個Http接口測試一下成果
@SpringBootApplication
@EnableBinding(value = {StreamPush.class,ProducerReceive.class})
@RestController
public class CloudStreamProducerApplication {
public static void main(String[] args) {
SpringApplication.run(CloudStreamProducerApplication.class, args);
}
@Autowired
private StreamPush streamPush;
@GetMapping("/sendMessage")
public String sendMessage(){
streamPush.mqPush().send(MessageBuilder.withPayload("message body".getBytes()).build());
return "ok";
}
}
訪問http://localhost:20000/sendMessage
,結果圖如下
cloud-stream-consumer日志輸出
cloud-stream-producer日志輸出
學習@ServiceActivator這個注解,上面的項目cloud-stream-consumer
ReceiveListener類中添加
@Component
@Slf4j
public class ReceiveListener {
@StreamListener("MQRece")
@SendTo("back-push")
public byte[] receive(byte[] bytes){
log.info("接受消息:"+new String(bytes));
// 拋出異常
if(1==1){
throw new RuntimeException("Message consumer failed!");
}
return "ok".getBytes();
}
@Autowired
private MsgBackPush msgBackPush;
@ServiceActivator(inputChannel = "message-topic.rece-group.errors")
public void error(Message<?> message){
log.info("消費者消費消息失敗:"+message);
msgBackPush.backPush().send(MessageBuilder.withPayload("消息消費失敗".getBytes()).build());
}
}
通過使用@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")指定了某個通道的錯誤處理映射。其中,inputChannel的配置中對應關系如下:
- message-topic:消息通道對應的目標(destination,即:spring.cloud.stream.bindings.MQRece.destination的配置)
- rece-group:消息通道對應的消費組(group,即:spring.cloud.stream.bindings.MQRece.group的配置)
訪問http://localhost:20000/sendMessage
,結果圖如下
cloud-stream-consumer日志輸出
cloud-stream-producer日志輸出