Spring Cloud Stream消息總線


Springcloud 里面對於MQ的整合一個是前一篇的消息總線一個是本文介紹的消息驅動

大體要學習這么幾個知識點: 

課題:SpringCloud消息驅動Stream
1.什么是SpringCloud消息驅動
2.消息驅動Stream實現原理
3.消息驅動Stream與傳統MQ區別
4.基於消息驅動整合Kafka
5.基於消息驅動整合Rabbitmq
6.基於消息驅動Stream消息分組

 

什么是消息驅動?

SpringCloud Stream消息驅動可以簡化開發人員對消息中間件的使用復雜度,讓系統開發人員更多盡力專注與核心業務邏輯的開發。SpringCloud Stream基於SpringBoot實現,自動配置化的功能可以幫助我們快速上手學習,類似與我們之前學習的orm框架,可以平滑的切換多種不同的數據庫。

目前SpringCloud Stream 目前只支持 RabbitMQ和kafka 

 

在topic模式上 區別很大的 這兩個MQ  SpringCloud去整合了  類似於Hibernate  通過對象得到sql語句  

開發人員不需要知道具體的MQ底層實現,只需要關心業務邏輯編碼就OK了

底層是如何實現的? Stream組件對rabbitMQ和kafka進行封裝成同一個API,開發人員只需要對接Stream即可 !

消息驅動原理

綁定器

通過定義綁定器作為中間層,實現了應用程序與消息中間件細節之間的隔離。通過向應用程序暴露統一的Channel通過,是的應用程序不需要再考慮各種不同的消息中間件的實現。當需要升級消息中間件,或者是更換其他消息中間件產品時,我們需要做的就是更換對應的Binder綁定器而不需要修改任何應用邏輯 。

 

在該模型圖上有如下幾個核心概念:

  • Source: 當需要發送消息時,我們就需要通過Source,Source將會把我們所要發送的消息(POJO對象)進行序列化(默認轉換成JSON格式字符串),然后將這些數據發送到Channel中;
  • Sink: 當我們需要監聽消息時就需要通過Sink來,Sink負責從消息通道中獲取消息,並將消息反序列化成消息對象(POJO對象),然后交給具體的消息監聽處理進行業務處理;
  • Channel: 消息通道是Stream的抽象之一。通常我們向消息中間件發送消息或者監聽消息時需要指定主題(Topic)/消息隊列名稱,但這樣一旦我們需要變更主題名稱的時候需要修改消息發送或者消息監聽的代碼,但是通過Channel抽象,我們的業務代碼只需要對Channel就可以了,具體這個Channel對應的是那個主題,就可以在配置文件中來指定,這樣當主題變更的時候我們就不用對代碼做任何修改,從而實現了與具體消息中間件的解耦;
  • Binder: Stream中另外一個抽象層。通過不同的Binder可以實現與不同消息中間件的整合,比如上面的示例我們所使用的就是針對Kafka的Binder,通過Binder提供統一的消息收發接口,從而使得我們可以根據實際需要部署不同的消息中間件,或者根據實際生產中所部署的消息中間件來調整我們的配置。

 

消息驅動有通道,綁定MQ。

 生產者消息傳遞到通道里面之后,通道是跟MQ做綁定,封裝的。消息一旦到MQ之后,發送給消費者通道,然后消費者進行消費 。綁定部分是底層幫助實現的。

封裝也只是實現了部分功能。MQ的功能不是百分百都實現了的

 

消息驅動環境搭建

     生產者:

    maven的pom:

 

 

 引入配置文件pom:

  其實就是對springboot整合rabbitmq再進行了一層封裝

  

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.toov5</groupId>
  <artifactId>SpringCloud-stream-producer</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
      <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>
    <dependencies>
        <!-- SpringBoot整合Web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
    </dependencies>
  
</project>

 

 然后編碼:

   在rebbitmq中需要有交換機  隊列   底層都幫助實現了! 以通道名稱創建交換機  消費者啟動時候 隨機創建一個隊列名稱  

   創建通道:

  

package com.toov5.stream;
//創建發送通道

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;

public interface  SendMsgInterface {
 
    //創建發送通道
    @Output("my_stream_channel")
    SubscribableChannel sendMsg();
    
}

controller:

package com.toov5.controller;

import java.util.UUID;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import com.toov5.stream.SendMsgInterface;

public class SendMsgController {
   //生產者流程
   // 1 生產者發送消息通道   獲取通道
    @Autowired
    private SendMsgInterface sendMsgInterface;
    
   //2. 生產者投遞消息  往通道發送消息
    public String sendMsg() {
            String msg = UUID.randomUUID().toString();
            System.out.println("生產者發送內容msg:" + msg);
            Message build = MessageBuilder.withPayload(msg.getBytes()).build();
            sendMsgInterface.sendMsg().send(build);
            return "success";

    }
   //3. 開啟綁定  然后業務邏輯中就可以從Springboot中拿對象了  啟動類去綁定
    
}

啟動類: 開啟通道綁定

package com.toov5;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

import com.toov5.stream.SendMsgInterface;

@SpringBootApplication
@EnableBinding(SendMsgInterface.class)   //開啟綁定通道(通道接口)  然后業務邏輯中就可以從Springboot中拿對象了
public class AppProducer {
   public static void main(String[] args) {
    SpringApplication.run(AppProducer.class, args);
}
}

yml:

server:
  port: 9000
spring:
  application:
    name: spingcloud-stream-producer
  rabbitmq:
  ####連接地址
    host: 192.168.91.6
   ####端口號  
    port: 5672
   ####賬號
    username: admin
   ####密碼 
    password: admin
   ### 地址  主機獨立的virtualhost
    virtual-host: /admin_toov5 

 

啟動生產者:

此時還沒有創建隊列 沒有進行綁定:

   小結:只需要發送信息到通道里面就OK了

 

 下面創建消費者: 對應生產者去寫就OK了

maven依賴:

 

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.toov5</groupId>
  <artifactId>SpringCloud-stream-consumer</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
  <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>
    <dependencies>
        <!-- SpringBoot整合Web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
    </dependencies>
  
</project>

通道接口:

package com.toov5.stream;
//創建發送通道

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface  ReadMsgInterface {
 
    //創建發送通道
    @Input("my_stream_channel")
    SubscribableChannel readMsg();
    
}

消費者:監聽通道

package com.toov5.consumer;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
    
    @StreamListener("my_stream_channel") //監聽生產者響應的通道
    public void readMsg(String msg) {
      System.out.println("消費者獲取到生產者投遞的消息:"+msg);    
    }
}

啟動類: 綁定通道

package com.toov5;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

import com.toov5.stream.ReadMsgInterface;

@SpringBootApplication
@EnableBinding(ReadMsgInterface.class)
public class AppConsumer {
    public static void main(String[] args) {
        SpringApplication.run(AppConsumer.class, args);
    }
}

yml:

server:
  port: 8000
spring:
  application:
    name: spingcloud-stream-consumer
  rabbitmq:
  ####連接地址
    host: 192.168.91.6
   ####端口號  
    port: 5672
   ####賬號
    username: admin
   ####密碼 
    password: admin
   ### 地址  主機獨立的virtualhost
    virtual-host: /admin_toov5 

  消費者是有隊列的  我們在代碼中沒有寫隊列的相關哦   底層會創建的哦  底層自動創建一個隊列 綁定交換機  隊列名字是隨機的

 消費者啟動后的多了個隊列

  

 

       也綁定上了

 

 下面訪問接口 發送個消息試試:

   

 

 

 如果換成kafka,修改下maven依賴,配置文件依賴名稱改成kafka就OK了~~kafka 的連接信息一配置就搞定

 

 

關於消息分組:

    生產者投遞消息 投遞消息到通道后 兩個消費者會都進行消費 重復消費了

    通過分組去解決,同一個組的消費者會進行輪訓消費,只有一個消費者進行消費

     

 

  

consumer關閉,控制台中的隊列會消失了。連接關閉會消失,因為木有固定隊列哦 隨機生成的

 

我們啟動兩個隊列 通過端口號去標識:

並且綁定了同一個交換機:

 

 

訪問接口后:

都能收到消息呀

 

分組配置:

 消費者yml加入配置:

cloud:
stream:
bindings:
my_stream_channel: ###指定 管道名稱!!!!
#指定該應用實例屬於 stream 消費組
group: stream

 此時的yml:

server:
  port: 8002
spring:
  application:
    name: spingcloud-stream-consumer
  rabbitmq:
  ####連接地址
    host: 192.168.91.6
   ####端口號  
    port: 5672
   ####賬號
    username: admin
   ####密碼 
    password: admin
   ### 地址  主機獨立的virtualhost
    virtual-host: /admin_toov5 
  cloud:
    stream:
      bindings:
        my_stream_channel: ###指定 管道名稱!!!!
          #指定該應用實例屬於 stream 消費組
          group: stream
 

啟動兩個consumer:

分組之后只有一個隊列!

綁定信息依然不變

  

 

訪問:

 

 

 

 只有一個能接收消息 且輪訓

 分組 的概念是由於這個框架搞的 不是原先的MQ固有的

 

 

Kafka的策略整合 暫時先不寫了 很簡單的  有空再補上吧~~~

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM