SpringCloud-Stream消息通信


SpringCloud微服務實戰系列教程                                                                                                                             

  Spring Cloud Stream 消息驅動組件幫助我們更快速,更⽅便,更友好的去構建消息驅動微服務的。當時定時任務和消息驅動的⼀個對⽐。(消息驅動:基於消息機制做⼀些事情)MQ:消息隊列/消息中間件/消息代理,產品有很多,ActiveMQ RabbitMQ RocketMQ Kafka

一、 Stream解決的痛點問題

  MQ消息中間件⼴泛應⽤在應⽤解耦合、異步消息處理、流量削峰等場景中。 不同的MQ消息中間件內部機制包括使⽤⽅式都會有所不同,⽐如RabbitMQ中有Exchange(交換機/交換器)這⼀概念,kafka有Topic、Partition分區這些概念,MQ消息中間件的差異性不利於我們上層的開發應⽤,當我們的系統希望從原有的RabbitMQ切換到Kafka時,我們會發現 切換⽐較困難,很多要操作可能重來(因為⽤程序和具體的某⼀款MQ消息中間件耦合在⼀起了)。
  Spring Cloud Stream進⾏了很好的上層抽象, 可以讓我們與具體消息中間件解耦合,屏蔽掉了底層具體MQ消息中間件的細節差異,就像Hibernate屏蔽掉了具體數據庫(Mysql/Oracle⼀樣)。如此⼀
來,我們學習、開發、維護MQ都會變得輕松。⽬前Spring Cloud Stream⽀持RabbitMQ和Kafka。

二、Stream重要概念

  Spring Cloud Stream 是⼀個構建消息驅動微服務的框架。應⽤程序通過inputs(相當於消息消費者consumer)或者outputs(相當於消息⽣產者producer)來與Spring Cloud Stream中的binder對象交互,⽽Binder對象是⽤來屏蔽底層MQ細節的,它負責與具體的消息中間件交互。
  Binder綁定器是Spring Cloud Stream 中⾮常核⼼的概念,就是通過它來屏蔽底層不同MQ消息中間件的細節差異,當需要更換為其他消息中間件時,我們需要做的就是更換對應的Binder綁定器⽽不需要修改任何應⽤邏輯(Binder綁定器的實現是框架內置的,Spring Cloud Stream⽬前⽀持Rabbit、Kafka兩種消息隊列)

三、Stream消息通信⽅式

  Stream中的消息通信⽅式遵循了發布—訂閱模式。在Spring Cloud Stream中的消息通信⽅式遵循了發布-訂閱模式,當⼀條消息被投遞到消息中間件之后,它會通過共享的 Topic 主題進⾏⼴播,消息消費者在訂閱的主題中收到它並觸發⾃身的業務邏輯處理。這⾥所提到的 Topic 主題是Spring Cloud Stream中的⼀個抽象概念,⽤來代表發布共享消息給消費者的地⽅。在不同的消息中間件中, Topic 可能對應着不同的概念,⽐如:在RabbitMQ中的它對應了Exchange、在Kakfa中則對應了Kafka中的Topic。

 四、基於RabbitMQ應用

  第一步:構建消息生產者

    1、引入依賴pom.ml

        <!--eureka client 客戶端依賴引⼊-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!--spring cloud stream 依賴(rabbit)-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

    2、添加配置

    注意:標紅的位置應設置相同的binder,藍色位置必須為output,被默認框架定義好的

server:
  port: 9090
spring:
  application:
    name: stream-provider
  cloud:
    stream:
      binders: # 綁定MQ服務信息(此處我們是RabbitMQ)
        cityRabbitBinder: # 給Binder定義的名稱,⽤於后⾯的關聯
          type: rabbit # MQ類型,如果是Kafka的話,此處配置kafka
          environment: # MQ環境配置(⽤戶名、密碼等)
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 關聯整合通道和binder對象
        output: # output是我們定義的通道名稱,此處不能亂改
          destination: cityExchange # 要使⽤的Exchange名稱(消息隊列主題名稱)
          content-type: text/plain # application/json # 消息類型設置,⽐如json
          binder: cityRabbitBinder # 關聯MQ服務
eureka:
  client:
    serviceUrl: # eureka server的路徑
      defaultZone: http://localhost:8761/eureka/
    instance:
      prefer-ip-address: true #使⽤ip注冊

  3、消息發送通過source對象

package city.alber;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/9/24  2:26 PM
 * Source.class⾥⾯就是對輸出通道的定義(這是Spring Cloud Stream內置的通道封裝)
 */
@EnableBinding(Source.class)
public class ProviderService {
    /**
     * 將MessageChannel的封裝對象Source注⼊到這⾥使⽤
     */
    @Autowired
    private Source source;

    public void sendMessage(String content) {
        // 向mq中發送消息(並不是直接操作mq,應該操作的是spring cloud stream)
        // 使⽤通道向外發出消息(指的是Source⾥⾯的output通道)
        source.output().send(MessageBuilder.withPayload(content).build());
    }
}

  4、啟動類添加@EnableDiscoveryClient 注解,啟動

 第二步:構建消息消費者

  下面的內容是和生產者不一致的地方

  1、配置,標藍地方為不同點

server:
  port: 9091
spring:
  application:
    name: stream-consumer
  cloud:
    stream:
      binders: # 綁定MQ服務信息(此處我們是RabbitMQ)
        cityRabbitBinder: # 給Binder定義的名稱,⽤於后⾯的關聯
          type: rabbit # MQ類型,如果是Kafka的話,此處配置kafka
          environment: # MQ環境配置(⽤戶名、密碼等)
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 關聯整合通道和binder對象
        input: # output是我們定義的通道名稱,此處不能亂改
          destination: cityExchange # 要使⽤的Exchange名稱(消息隊列主題名稱)
          content-type: text/plain # application/json # 消息類型設置,⽐如json,自動將對象轉為json
          binder: cityRabbitBinder # 關聯MQ服務
eureka:
  client:
    serviceUrl: # eureka server的路徑
      defaultZone: http://localhost:8761/eureka/
    instance:
      prefer-ip-address: true #使⽤ip注冊

    2、接收消費消息類

package city.albert;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/9/24  2:43 PM
 */
@EnableBinding(Sink.class)
public class ConsumerMsg {
    @StreamListener(Sink.INPUT)
    public void recevieMessages(Message<String> message) {
        System.out.println("=========接收到的消息:" + message);
    }
}

五、定義輸出

  定一使用與上面配置中藍色字體的input/ouput類似,定義完成配置應該為 inputSysLog/outputSysLog

  1、定義通過接口,目的是給生產者消費者調用

package city.albert;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/9/24  3:11 PM
 */
public interface CustomStreamConfig {

    String INPUT_SYS_LOG = "inputSysLog";
    String OUTPUT_SYS_LOG = "outputSysLog";

    @Input(INPUT_SYS_LOG)
    SubscribableChannel inputSysLog();

    @Output(OUTPUT_SYS_LOG)
    MessageChannel outputSysLog();

    /**
     * 。。。。。可以跟上面一樣定義多個通道信息
     */
}
View Code

  2、生產者調用

package city.albert;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/9/24  3:14 PM
 *
 */
@EnableBinding(CustomStreamConfig.class)
public class CustomStreamProvider {

    @Autowired
    CustomStreamConfig customStreamConfig;

    public void sendMessage(String content) {
        // 向mq中發送消息(並不是直接操作mq,應該操作的是spring cloud stream)
        // 使⽤通道向外發出消息(指的是Source⾥⾯的output通道)
        customStreamConfig.outputSysLog().send(MessageBuilder.withPayload(content).build());
    }
}
View Code

  3、消費者調用

package city.albert;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/9/24  3:14 PM
 *
 */
@EnableBinding(CustomStreamConfig.class)
public class CustomStreamConsumer {

    @StreamListener(CustomStreamConfig.INPUT_SYS_LOG)
    public void messages(Message<String> message) {
        System.out.println("=========接收到的消息:" + message);
    }
}
View Code

六、分組

  同組內一條消息,只能一個消費者獲取,添加配置即可

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM