SpringCloud Stream RabbitMQ


依賴引入

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

配置mq連接屬性

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

仿照Processor

  • 常量類Constant
// 定中的常量
    public static final String SEND_MSG = "sendmsg";
    public static final String RECEIVED_MSG = "receivedmsg";
  • 仿接口org.springframework.cloud.stream.messaging.Processor
public interface StreamClient {

    @Input(Constant.SEND_MSG)
    SubscribableChannel input();

    @Output(Constant.SEND_MSG)
    MessageChannel output();

發送消息

  • 此處寫在了 controller,也可以寫在測試類
package com.cloud.order.controller;

import com.cloud.order.msg.StreamClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
@RestController
public class SendMsg {

    @Autowired
    private StreamClient streamClient; // 自己定義的StreamClient

    @RequestMapping("/sendmsg")
    public void sendMsg() {
        String msg = "hello " + new Date();
        streamClient.output().send(MessageBuilder.withPayload(msg).build());
    }
}

接收消息

@Component
@EnableBinding(StreamClient.class)//定義好的接口
@Slf4j
public class StreamReceiver {

    @StreamListener(Constant.SEND_MSG) // 監聽的消息隊列
    public  String  process(Object val){
        log.info("StreamReceiver msg"+ val);
        return "received msg "+new Date().getTime();
    }
}

消費后返回消息

@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {

    @StreamListener(Constant.SEND_MSG)
    @SendTo(Constant.RECEIVED_MSG) //返回給的隊列,創建方式同上
    public  String  process(Object val){
        log.info("StreamReceiver msg"+ val);
        return "received msg "+new Date().getTime();
    }

    @StreamListener(Constant.RECEIVED_MSG)
    public  void  processReceiver(Object val){
        log.info("received msg -- --"+ val);
    }
}

其他

多實例,僅一個實例可接受到消息

  • 添加到配置文件
# 僅讓一個實例接收到消息,msg-是程序中定義的隊列名字,order-根據語義自定義即可
spring.cloud.stream.bindings.msg.group=order

發送對象類型消息,在MQ中查看未消費的消息

# 可在消息隊列中看到堆積的消息的(當消息為對象格式時)完整屬性,msg-是程序中定義的隊列名字
spring.cloud.stream.bindings.msg.content-type=application/json


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM