SpringCloud之Spring Cloud Stream:消息驅動


Spring Cloud Stream 是一個構建消息驅動微服務的框架,該框架在Spring Boot的基礎上整合了Spring Integrationg來連接消息代理中間件(RabbitMQ, Kafka等),提供了個性化的自動化配置實現,並引入了發布-訂閱、消費組、分區這三個核心概念。
應用程序通過input通道或者output通道來與Spring Cloud Stream中binder(綁定器)交互,通過配置來binding. 而Spring Cloud Stream的binder負責與中間件交互。

開發工具:IntelliJ IDEA 2019.2.3

一、服務器端

1、創建項目

IDEA中創建一個新的SpringBoot項目,名稱為“spring-server”,SpringBoot版本選擇2.1.10,在選擇Dependencies(依賴)的界面勾選Spring Cloud Discovery -> Eureka Server。
pom.xml完整內容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>spring-server</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-server</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
View Code

2、修改配置application.yml

修改端口號為8761;取消將自己信息注冊到Eureka服務器,不從Eureka服務器抓取注冊信息。

server:
  port: 8761
eureka:
  client:
    register-with-eureka: false
    fetch-registry: false

3、修改啟動類代碼

增加注解@EnableEurekaServer

package com.example.springserver;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
public class SpringServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringServerApplication.class, args);
    }

}
View Code

二、消息生產者

1、創建項目
IDEA中創建一個新的SpringBoot項目,名稱為“spring-producer”,SpringBoot版本選擇2.1.10,在選擇Dependencies(依賴)的界面勾選Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打開pom.xml,添加依賴spring-cloud-starter-stream-rabbit,會自動引入spring-cloud-stream和spring-cloud-stream-binder。
pom.xml完整內容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>spring-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-producer</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
View Code

2、修改配置application.yml

pom.xml使用RabbitMQ,默認情況下,連接本地的5672端口。下面這段rabbitmq也可省略。

server:
  port: 8081
spring:
  application:
    name: spring-producer
eureka:
  instance:
    hostname: localhost
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
rabbitmq:
  host: localhost
  post: 5672
  username: guest
  password: guest

3、編寫發送服務

方法sendOrder使用@Output("myInput")注解表示創建myInput的消息通道。調用該方法后,會向myInput通道投遞消息。
如果不使用參數myInput,則使用方法名作為通道名稱。

package com.example.springproducer;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;

public interface SendService {
    @Output("myInput")
    SubscribableChannel sendOrder();
}

4、修改啟動類代碼

加入注解@EnableBinding以開啟Spring容器的綁定功能,以SendService.class為參數,Spring容器啟動時,會自動綁定SendService接口中定義的通道。

package com.example.springproducer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(SendService.class)
public class SpringProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringProducerApplication.class, args);
    }

}

5、添加一個控制器類

調用SendService的發送方法,往服務器發送消息。

package com.example.springproducer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    @Autowired
    SendService sendService;

    @RequestMapping(value="/send",method= RequestMethod.GET)
    public String sendRequest(){
        //創建消息
        Message msg = MessageBuilder.withPayload("hello world".getBytes()).build();
        //發送消息
        sendService.sendOrder().send(msg);
        return "SUCCESS";
    }
}

三、消息消費者

1、創建項目

IDEA中創建一個新的SpringBoot項目,名稱為“spring-consumer”,SpringBoot版本選擇2.1.10,在選擇Dependencies(依賴)的界面勾選Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打開pom.xml,添加依賴spring-cloud-starter-stream-rabbit

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>spring-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-consumer</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR4</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
View Code

2、修改配置application.yml

server:
  port: 8080
spring:
  application:
    name: spring-consumer
eureka:
  instance:
    hostname: localhost
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/
rabbitmq:
  host: localhost
  post: 5672
  username: guest
  password: guest

3、縮寫接受消息的通道接口

package com.example.springconsumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface ReceiveService {
    @Input("myInput")
    SubscribableChannel myInput();
}

4、修改啟動類代碼

同樣綁定消息通道

package com.example.springconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

@SpringBootApplication
@EnableBinding(ReceiveService.class)
public class SpringConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringConsumerApplication.class, args);
    }

    //訂閱myInput通道的消息
    @StreamListener("myInput")
    public void receive(byte[] msg){
        System.out.println("接收到的消息:" + new String(msg));
    }
}

5、測試

(1)檢查服務里面的RabbitMQ是否有啟動(默認啟動);

(2)啟動spring-server(8761端口);

(3)啟動spring-producer(8081端口);

(4)啟動spring-consumer(8080端口);

(5)瀏覽器訪問http://localhost:8081/send,spring-consumer項目的控制台輸出:

接收到的消息:hello world

說明消費者已經可以從消息代理中獲取到消息。

四、更換綁定器

上面使用了RabbitMQ作為消息代理,如果使用Kafka,可以更換Maven依賴實現。
在生產者和消費者的pom.xml中,將spring-cloud-starter-stream-rabbit修改為spring-cloud-starter-stream-kafka。

五、Sink、Source與Processor接口

為了簡化開發,Spring Cloud Stream內置了3個接口:Sink、Source與Processor。接口定義如下:

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}
public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}
public interface Processor extends Source, Sink {
}

通過上面內置接口,不必編寫服務接口,生產者綁定通道時加入Source.class,消費者綁定通道時加入Sink.class,因為Processor繼承於Sink與Source,也可以只使用Processor。

1、消息生產者代碼改寫如下:

(1)啟動類代碼

綁定通道時加入Processor.class

package com.example.springproducer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(Processor.class)
public class SpringProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringProducerApplication.class, args);
    }

}

(2)修改配置application.yml

綁定名為input的消息隊列,否則消息隊列默認為output。

server:
  port: 8081
spring:
  application:
    name: spring-producer
  cloud:
    stream:
      bindings:
        output:
          destination: input
eureka:
  instance:
    hostname: localhost
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/

(3)刪除服務接口SendService

2、消息消費者代碼改寫如下:

(1)啟動類代碼

綁定通道時加入Processor.class

package com.example.springconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;

@SpringBootApplication
@EnableBinding(Processor.class)
public class SpringConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringConsumerApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    public void receive(byte[] msg){
        System.out.println("接收到的消息:" + new String(msg));
    }
}

(2)刪除服務接口ReceiveService 

重新啟動應用,瀏覽器訪問http://localhost:8081/send,spring-consumer項目的控制台輸出:

接收到的消息:hello world

、消費者組

消費者組不同時,會發送給所有的消費者實例。
消費者組相同時,對於發送過來的消息,僅由其中一個消費者實例處理。

1、修改消費者spring-consumer項目的配置application.yml,配置消費者組為groupA

server:
  port: 8080
spring:
  application:
    name: spring-consumer
  cloud:
    stream:
      bindings:
        input:
          group:
            groupA
eureka:
  instance:
    hostname: localhost
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/

2、仿照上面消費者spring-consumer項目,新建兩個消費者項目spring-second-consumer(端口8082)和spring-third-consumer(端口8083),配置application.yml的消費者組都為groupB。

重新啟動應用,瀏覽器多次訪問http://localhost:8081/send,可以看到原來消費者spring-consumer都能收到,新加的兩個消費者spring-second-consumer(端口8082)和spring-third-consumer(端口8083)會輪詢處理。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM