使用 spring stream 發送消息


為什么使用spring stream ?

 spring stream 是用來做消息隊列發送消息使用的。他隔離了各種消息隊列的區別,使用統一的編程模型來發送消息。

目前支持:

rabbitmq

kafka

rocketmq

啟動rocketmq 

rocketmq 支持windows

start mqnamesrv.cmd

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

修改pom.xml

<dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
        </dependency>

增加發送接收JAVA代碼

public interface InputOutput {

    String MAIL_OUTPUT = "mailOutput";
    String MAIL_INPUT = "mailInput";

    String OUTPUT = "output";
    String INPUT = "input";


    @Output(OUTPUT)
    MessageChannel output();
    @Input(INPUT)
    SubscribableChannel input();


    @Output(MAIL_OUTPUT)
    MessageChannel mailOutput();
    @Input(MAIL_INPUT)
    SubscribableChannel mailInput();


}

在應用上增加注解

@EnableBinding({InputOutput.class})

增加yml配置

spring:
    cloud:
        stream:
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
          bindings:
            output:
              destination: bpmmessage
              group: bpmmessage-group
    
            input:
              destination: bpmmessage
              group: bpmmessage-group-consumer
    
            mailOutput:
              destination: mail
              group: mail-group
    
            mailInput:
                destination: mail
                group: mail-group-consumer

編寫代碼收發消息:

MessageModel messageModel=new MessageModel();

        messageModel.setMsgType("mail");
        messageModel.setContent("helloworld");

        inputOutput.mailOutput().send( MessageBuilder.withPayload(
                "mail"
        ).build());

        inputOutput.output().send(
                MessageBuilder.withPayload(
                        messageModel
                ).build()
        );

這里發送的是兩類消息。

接收消息:

@Service
public class MessageListener {

    @StreamListener(InputOutput.INPUT)
    public void receive(MessageModel message) {
        System.err.println(message);
        System.err.println("ok");
    }


    @StreamListener(InputOutput.MAIL_INPUT)
    public void receive(String message) {
        System.err.println(message);
        System.err.println("ok");
    }
}

分別接收兩類消息

 

 

 
        

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM