使用SpringCloudStream整合RabbitMQ


如果項目中我們用的是RabbitMQ進行消息傳輸,隨着后面的業務需求,我們需要向Kafka遷移,如果單純去修改代碼,那是很繁瑣的。

那么怎么解決這種情況呢,既能使用RabbitMQ又可以快速切換KafKa?這時就用到了SpringCloudStream:

其可以屏蔽底層消息中間件的差異,降低切換成本,統一消息的編程模型。不過目前只支持RabbitMQ 和 Kafka。

通過定義綁定器 Binder 作為中間層,實現了應用程序與消息中間件細節之間的隔離。向應用程序暴露統一的 Channel 通道,使得應用程序不需要再考慮各種消息中間件的實現

inputs是消費者,outputs是生產者 

Stream中的消息通信方式遵循了發布-訂閱模式,用 Topic 主題進行廣播(在RabbitMQ就是Exchange,在Kafka中就是Topic)

其主要流程如下圖

Binder:很方便的連接中間件,屏蔽差異。

Channel:通道,是隊列Queue的一種抽象,在消息通訊系統中就是實現存儲和轉發的媒介,通過channel對隊列進行配置。

Source和Sink:簡單理解為消息的生產者和消費者。

SpringBoot整合SpringCLoudStream:

1、添加依賴

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- commons-lang3 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.4</version>
        </dependency>
        <!-- fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.54</version>
        </dependency>
        <!-- Swagger2 -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.7.0</version>
        </dependency>

        <!-- Spring Cloud Stream -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <version>2.1.3.RELEASE</version>
            <exclusions>
                <exclusion>
                    <artifactId>objenesis</artifactId>
                    <groupId>org.objenesis</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.1.3.RELEASE</version>
        </dependency>

2、配置文件application.yml

server:
  port: 8088
spring:
  application:
    name: stream_demo
  cloud:
    stream:
      binders: #需要綁定的rabbitmq的服務信息
        defaultRabbit:  #定義的名稱,用於binding整合
          type: rabbit  #消息組件類型
          environment:  #配置rabbitmq連接環境
            spring:
              rabbitmq:
                host: **.**.**.**    #rabbitmq 服務器的地址
                port: 5672           #rabbitmq 服務器端口
                username: ****       #rabbitmq 用戶名
                password: ****       #rabbitmq 密碼
                virtual-host: /       #虛擬路徑
      bindings:        #服務的整合處理
        #inputs 對應消費者,outputs 對應生產者
        #Stream中的消息通信方式遵循了發布-訂閱模式
        #在Stream中,處於同一個組的多個消費者是競爭關系,就可以保證消息只被一個服務消費一次,而不同組是可以重復消費的。現在默認分組就是不同的,組流水號不一樣。
        #消費者宕機:如果未配置group,則消費者上線后無法消費之前的消息(消息丟失);如果配置了group,則消費上線后可以消費之前的消息(消息持久化)
        testOutput:    #生產者消息輸出通道 ---> 消息輸出通道 = 生產者相關的定義:Exchange & Queue
          destination: exchange-test          #exchange名稱,交換模式默認是topic;把SpringCloud Stream的消息輸出通道綁定到RabbitMQ的exchange-test交換器。
          content-type: application/json      #設置消息的類型,本次為json
          default-binder: defaultRabbit       #設置要綁定的消息服務的具體設置,默認綁定RabbitMQ
          group: testGroup                    #分組=Queue名稱,如果不設置會使用默認的組流水號
        testInput:     #消費者消息輸入通道 ---> 消息輸入通道 = 消費者相關的定義:Exchange & Queue
          destination: exchange-test          #exchange名稱,交換模式默認是topic;把SpringCloud Stream的消息輸入通道綁定到RabbitMQ的exchange-test交換器。
          content-type: application/json
          default-binder: defaultRabbit
          group: testGroup

3、Channel信道

package com.stream.api;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

/**
 * @ClassName: TestChannelProcessor
 * @Description: 定義Channel信道
 * @Author: qiaojiacheng
 * @Date: 2021/3/10 3:20 下午
 */
@Component
public interface TestChannelProcessor {
    /**
     * 生產者消息輸出通道(需要與配置文件中的保持一致)
     */
    String TEST_OUTPUT = "testOutput";

    /**
     * 消息生產
     *
     * @return
     */
    @Output(TEST_OUTPUT)
    MessageChannel testOutput();

    /**
     * 消費者消息輸入通道(需要與配置文件中的保持一致)
     */
    String TEST_INPUT = "testInput";

    /**
     * 消息消費
     *
     * @return
     */
    @Input(TEST_INPUT)
    SubscribableChannel testInput();
}

4、生產者

package com.stream.provider;

import com.alibaba.fastjson.JSON;
import com.stream.api.TestChannelProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;

import java.util.HashMap;
import java.util.Map;

/**
 * @ClassName: TestMessageProducer
 * @Description: 生產者生產消息
 * @Author: qiaojiacheng
 * @Date: 2021/3/10 3:21 下午
 */
@EnableBinding(value = {TestChannelProcessor.class})
public class TestMessageProducer {

    @Autowired
    private BinderAwareChannelResolver channelResolver;

    /**
     * 生產消息
     *
     * @param msg
     */
    public void testSendMessage(String msg) {
        Map<String, Object> headers = new HashMap<>();
        Map<String, Object> payload = new HashMap<>();
        payload.put("msg", msg);
        System.err.println("生產者發送消息:" + JSON.toJSONString(payload));
        channelResolver.resolveDestination(TestChannelProcessor.TEST_OUTPUT).send(
                MessageBuilder.createMessage(payload, new MessageHeaders(headers))
        );
    }
}

5、發送消息的Controller

package com.stream.controller;

import com.stream.provider.TestMessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName: TestController
 * @Description: 測試發送消息
 * @Author: qiaojiacheng
 * @Date: 2021/3/10 3:23 下午
 */
@RestController
public class TestController {

    @Autowired
    private TestMessageProducer testMessageProducer;

    /**
     * 發送保存訂單消息
     *
     * @param message
     */
    @GetMapping(value = "sendTestMessage")
    public void sendTestMessage(@RequestParam("message") String message) {
        //發送消息
        testMessageProducer.testSendMessage(message);
    }
}

6、消費者

package com.stream.provider;

import com.stream.api.TestChannelProcessor;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;

/**
 * @ClassName: TestMessageConsumer
 * @Description: 消費者消費消息
 * @Author: qiaojiacheng
 * @Date: 2021/3/10 4:09 下午
 */
@EnableBinding(TestChannelProcessor.class)
public class TestMessageConsumer {

    @StreamListener(TestChannelProcessor.TEST_INPUT)
    public void testConsumeMessage(Message<String> message) {
        System.err.println("消費者消費消息:" + message.getPayload());
    }
}

7、swagger配置,方便測試

package com.stream.swagger;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.async.DeferredResult;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

/**
 * @author qiaojiacheng
 */
@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Bean
    public Docket createRestApi() {

        return new Docket(DocumentationType.SWAGGER_2)
                .genericModelSubstitutes(DeferredResult.class)
                .select()
                .paths(PathSelectors.any())
                .build().apiInfo(apiInfo());

    }

    private ApiInfo apiInfo() {

        return new ApiInfoBuilder().title("Stream server")
                .description("測試SpringCloudStream")
                .termsOfServiceUrl("https://spring.io/projects/spring-cloud-stream")
                .version("1.0").build();
    }
}

8、啟動類

package com.stream;

import com.stream.api.TestChannelProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding(value= {TestChannelProcessor.class})
public class StreamDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamDemoApplication.class, args);
    }

}

訪問swagger進行測試

 

 控制台輸出結果

 

 查看rabbitmq控制台

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM