SpringcloudStream簡單使用


官網:https://spring.io/projects/spring-cloud-stream

1.簡介

1.什么是Springcloud Stream

  Springcloud Stream 是一個構建消息驅動微服務的框架。說白了就是操作MQ的,可以屏蔽底層的MQ類型。

  應用程序通過inputs 或者 outputs 與SpringcloudStream中binder對象交互。所以我們通過配置來綁定(binding),而與SpringcloudStream 的binder對象負責與消息中間件交互。所以我們只需要了解如何與Stream的binder交互就可以了,屏蔽了與底層MQ交互。

  Springcloud Stream為一些MQ提供了個性化的自動化配置實現,引用了發布-訂閱、消費組、分區的三個核心概念。

2.為什么引入Stream

  屏蔽底層消息中間件的差異,降低切換成本,統一消息的編程模型。類似於Hibernate的作用一樣, 我們更多的注重業務開發。Hibernate屏蔽了數據庫的差異,可以很好的實現數據庫的切換。Stream屏蔽了底層MQ的區別,可以很好的實現切換。目前主流的有ActiveMQ、RocketMQ、RabbitMQ、Kafka,Stream支持的有RabbitMQ和Kafka。

3.設計思想

1. 標准的MQ

(1)生產者/消費者通過消息媒介傳遞消息內容

(2)消息必須走特定的通道Channel

2.引入Stream

  通過定義綁定器Binder作為中間層,實現了應用程序與消息中間件細節的隔離。

組成 說明
Middleware 中間件,目前只支持RabbitMQ和Kafka
Binder Binder是應用與消息中間件之間的封裝,目前實行了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變消息類型(對應於Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實現
@Input 注解標識輸入通道,通過該輸入通道接收到的消息進入應用程序
@Output 注解標識輸出通道,發布的消息將通過該通道離開應用程序
@StreamListener 監聽隊列,用於消費者的隊列的消息接收
@EnableBinding 指信道channel和exchange綁定在一起

3.Stream 的消息通信模式遵循了發布-訂閱模式,也就是Topic模式。在RabbitMQ中是Exchange交換機,在Kafka是Topic。

4. 術語

(1)Binder 綁定器,通過Binder可以很方便的連接中間件,屏蔽差異

(2)Channel: 通道,是Queue的一種抽象,主要實現存儲和轉發的媒介,通過Channel對隊列進行配置

(3)Source和Sink 簡單的理解為參照對象是Spring Cloud Stream自身,從Stream發布消息就是輸出,接收消息就是輸入。

5.過程可以理解為下圖

2.使用

1.RabbitMQ環境安裝

參考:  docker安裝rabbitMQ

2.建立生產者項目

1.新建模塊  cloud-stream-rabbitmq-provider8801

 2.修改pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud</artifactId>
        <groupId>cn.qz.cloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-provider8801</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!--基礎配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

3.新建application.yml

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
      stream:
        binders: # 在此處配置要綁定的rabbitmq的服務信息;
          defaultRabbit: # 表示定義的名稱,用於於binding整合
            type: rabbit # 消息組件類型
            environment: # 設置rabbitmq的相關的環境配置
              spring:
                rabbitmq:
                  host: 192.168.99.100
                  port: 5672
                  username: guest
                  password: guest
        bindings: # 服務的整合處理
          output: # 這個名字是一個通道的名稱
            destination: studyExchange # 表示要使用的Exchange名稱定義
            content-type: application/json # 設置消息類型,文本則設置“text/plain”
            binder: defaultRabbit # 設置要綁定的消息服務的具體設置

eureka:
  client: # 客戶端進行Eureka注冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒)
    instance-id: send-8801.com  # 在信息列表時顯示主機名稱
    prefer-ip-address: true     # 訪問的路徑變為IP地址

4.主啟動類:

package cn.qz.cloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @Author: qlq
 * @Description
 * @Date: 21:39 2020/11/9
 */
@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }
}

5.業務類:

(1)Service

接口:

package cn.qz.cloud.service;

/**
 * @Author: qlq
 * @Description
 * @Date: 21:41 2020/11/9
 */
public interface IMessageProvider {

    String send();
}

實現類:

package cn.qz.cloud.service.impl;

import cn.qz.cloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * @Author: qlq
 * @Description
 * @Date: 21:41 2020/11/9
 */
@EnableBinding(Source.class) //定義消息的推送管道
public class MessageProviderImpl implements IMessageProvider {

    @Resource
    private MessageChannel output; // 消息發送管道

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        return serial;
    }
}

(2)Controller

package cn.qz.cloud.controller;

import cn.qz.cloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @Author: qlq
 * @Description
 * @Date: 21:46 2020/11/9
 */
@RestController
public class MessageController {

    @Resource
    private IMessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage() {
        return messageProvider.send();
    }

}

6.測試:

(1)啟動后可以到RabbitMQ查看有一個交換機

 (2)測試發消息

$ curl http://localhost:8801/sendMessage
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    36  100    36    0     0    765      0 --:--:-- --:--:-- --:--:--  2250f68f21d0-7fc7-4eaf-8f43-6e672617d0df

  發出消息后到MQ看不到消息,因為Exchange本身沒有存儲消息的功能。此時還沒有隊列。但是可以通過MQ的Message rates波峰查看:

3.建立消費者一

1.新建模塊 cloud-stream-rabbitmq-consumer8802

2.修改pom

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--基礎配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

3.新建application.yml

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
      stream:
        binders: # 在此處配置要綁定的rabbitmq的服務信息;
          defaultRabbit: # 表示定義的名稱,用於於binding整合
            type: rabbit # 消息組件類型
            environment: # 設置rabbitmq的相關的環境配置
              spring:
                rabbitmq:
                  host: 192.16899.100
                  port: 5672
                  username: guest
                  password: guest
        bindings: # 服務的整合處理
          input: # 這個名字是一個通道的名稱
            destination: studyExchange # 表示要使用的Exchange名稱定義
            content-type: application/json # 設置消息類型,如果是文本則設置“text/plain”
            binder: defaultRabbit # 設置要綁定的消息服務的具體設置

eureka:
  client: # 客戶端進行Eureka注冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒)
    instance-id: receive-8802.com  # 在信息列表時顯示主機名稱
    prefer-ip-address: true     # 訪問的路徑變為IP地址

4.主啟動類:

package cn.qz.cloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @Author: qlq
 * @Description
 * @Date: 22:01 2020/11/9
 */
@SpringBootApplication
public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class, args);
    }
}

5.業務類:

package cn.qz.cloud.listener;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * @Author: qlq
 * @Description
 * @Date: 22:03 2020/11/9
 */
@Component
@EnableBinding(Sink.class) //定義消息的接收管道
public class MQMessageListener {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println(message.getPayload() + "\t  port: " + serverPort);
    }
}

4.建立消費者二

cloud-stream-rabbitmq-consumer8803  模塊,和上面項目一模一樣。只是端口不同,模擬集群部署一個項目。

5.測試

1.啟動兩個消費者 8802、8803

2.啟動一個生產者 8801

3.RabbitMQ查看

(1)交換機

 (2)隊列:會生成兩個隨機隊列

查看其中一個隊列:(查看RoutingKey 為#, 也就是匹配任何隊列,類似於fanout廣播類型。也就是接受該交換機的任何消息。並且該隊列自動刪除[auto-delete=true],沒有消費者隊列會自動刪除)

 4.生產者發送消息:

$ curl http://localhost:8801/sendMessage
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    36  100    36    0     0   1161      0 --:--:-- --:--:-- --:--:-- 3600095847545-72ca-41a2-9c24-6940455151c8

5.查看兩個消費者:

 

 6.這種模式實際有個重復消費的情況。也就是某個應用多實例部署,每個實例都會收到消息。從MQ的本質Queue不可重復消費,但是多個實例監聽的是不同的queue,所以出現重復消費。解決辦法就是讓多個實例監聽同一個queue,解決辦法,兩個消費者端都加上group屬性監聽相同的隊列。

server:
  port: 8803

spring:
  application:
    name: cloud-stream-consumer
  cloud:
      stream:
        binders: # 在此處配置要綁定的rabbitmq的服務信息;
          defaultRabbit: # 表示定義的名稱,用於於binding整合
            type: rabbit # 消息組件類型
            environment: # 設置rabbitmq的相關的環境配置
              spring:
                rabbitmq:
                  host: 192.168.99.100
                  port: 5672
                  username: guest
                  password: guest
        bindings: # 服務的整合處理
          input: # 這個名字是一個通道的名稱
            destination: studyExchange # 表示要使用的Exchange名稱定義
            content-type: application/json # 設置消息類型,如果是文本則設置“text/plain”
            binder: defaultRabbit # 設置要綁定的消息服務的具體設置
        group: testGroup

eureka:
  client: # 客戶端進行Eureka注冊的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 設置心跳的時間間隔(默認是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(默認是90秒)
    instance-id: receive-8803.com  # 在信息列表時顯示主機名稱
    prefer-ip-address: true     # 訪問的路徑變為IP地址

啟動后查看RabbitMQ:

(1)兩個消費者只有一個queue,實際上上面的group指定了一個queue

 (2)可以看到有兩個消費者,並且由一個重要的特性就是該隊列不會自動刪除,也就是沒有消費者,隊列仍然會保存消息(間接的保證了消息的持久化)。這個很好理解,設置了group屬性的隊列,不會自動刪除,也就是即使Consumer斷開連接,隊列仍然存在,Exchange本身不具備存儲的能力,只負責轉發,所以在隊列存在的情況下隊列可以保存消息;當消費者上線后會自動消費隊列中的消息。

(3)測試:這種相同group的消費同一個queue的時候是輪詢的方式,每個實例一條輪着消費。 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM