SpringCloud學習之Stream消息驅動【默認通道】(十)


在實際開發過程中,服務與服務之間通信經常會使用到消息中間件,而以往使用了中間件比如RabbitMQ,那么該中間件和系統的耦合性就會非常高,如果我們要替換為Kafka那么變動會比較大,這時我們可以使用SpringCloudStream來整合我們的消息中間件,來降低系統和中間件的耦合性。

一、消息中間的幾大應用場景

1、異步處理

比如用戶在電商網站下單,下單完成后會給用戶推送短信或郵件,發短信和郵件的過程就可以異步完成。因為下單付款是核心業務,發郵件和短信並不屬於核心功能,並且可能耗時較長,所以針對這種業務場景可以選擇先放到消息隊列中,有其他服務來異步處理。

2、應用解耦:

假設公司有幾個不同的系統,各系統在某些業務有聯動關系,比如 A 系統完成了某些操作,需要觸發 B 系統及 C 系統。如果 A 系統完成操作,主動調用 B 系統的接口或 C 系統的接口,可以完成功能,但是各個系統之間就產生了耦合。用消息中間件就可以完成解耦,當 A 系統完成操作將數據放進消息隊列,B 和 C 系統去訂閱消息就可以了。這樣各系統只要約定好消息的格式就好了。

3、流量削峰

比如秒殺活動,一下子進來好多請求,有的服務可能承受不住瞬時高並發而崩潰,所以針對這種瞬時高並發的場景,在中間加一層消息隊列,把請求先入隊列,然后再把隊列中的請求平滑的推送給服務,或者讓服務去隊列拉取。

4、日志處理

kafka 最開始就是專門為了處理日志產生的。

當碰到上面的幾種情況的時候,就要考慮用消息隊列了。如果你碰巧使用的是 RabbitMQ 或者 kafka ,而且同樣也是在使用 Spring Cloud ,那可以考慮下用 Spring Cloud Stream。

二、什么是SpringCloudStream

  官方定義 Spring Cloud Stream 是一個構建消息驅動微服務的框架。
  應用程序通過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 交互,通過我們配置來 binding ,而 Spring Cloud Stream 的 binder 負責與消息中間件交互。所以,我們只需要搞清楚如何與 Spring Cloud Stream 交互就可以方便使用消息驅動的方式。
  通過使用Spring Integration來連接消息代理中間件以實現消息事件驅動。Spring Cloud Stream 為一些供應商的消息中間件產品提供了個性化的自動化配置實現,引用了發布-訂閱、消費組、分區的三個核心概念。目前僅支持RabbitMQKafka

三、Stream 解決了什么問題?

  Stream解決了開發人員無感知的使用消息中間件的問題,因為Stream對消息中間件的進一步封裝,可以做到代碼層面對中間件的無感知,甚至於動態的切換中間件(rabbitmq切換為kafka),使得微服務開發的高度解耦,服務可以關注更多自己的業務流程

官網結構圖

組成 說明
Middleware 中間件,目前只支持RabbitMQ和Kafka
Binder Binder是應用與消息中間件之間的封裝,目前實行了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變消息類型(對應於Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實現
@Input 注解標識輸入通道,通過該輸入通道接收到的消息進入應用程序
@Output 注解標識輸出通道,發布的消息將通過該通道離開應用程序
@StreamListener 監聽隊列,用於消費者的隊列的消息接收
@EnableBinding 指信道channel和exchange綁定在一起

 

以下實戰代碼是基於RabbitMQ的,不清楚如何安裝RabbitMQ請查看我的另一篇文章最簡單的RabbitMQ消息隊列搭建(windows環境下安裝),項目的三個模塊如下:

 

(一)創建消息生產者【service-sender-stream-8089】

pom.xml文件

pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.xu</groupId> <artifactId>service-sender-stream-8089</artifactId> <version>0.0.1-SNAPSHOT</version> <name>service-sender-stream-8089</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.M3</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.5.0</version> </dependency> <!-- swagger-ui --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.5.0</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> <repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> </repository> </repositories> </project> 

application.yml

server:
  port: 8089
spring:
  application:
    name: spring-cloud-stream-sender
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment: #配置rabbimq連接環境
            spring:
              rabbitmq:
                host: localhost
                username: guest
                password: guest
                virtual-host: /
      bindings:
        output:       #指定輸入通道對應的主題名
          destination: stream-demo       #exchange名稱,交換模式默認是topic
          content-type: text/plain       #消息發送的格式,接收端不用指定格式,但是發送端要

 

IMessageSender.java

package com.xu.serviceconsumer.interfaces;

public interface IMessageSender {

    void sendMessage(String message);
}

MessageSenderImpl.java

package com.xu.serviceconsumer.services;

import com.xu.serviceconsumer.interfaces.IMessageSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;

/**
 * 這個注解給我們綁定消息通道的,Source是Stream給我們提供的,可以點進去看源碼,
 * 可以看到output和input,這和配置文件中的output,input對應的。
 */
@EnableBinding(Source.class)
public class MessageSenderImpl implements IMessageSender {

    private final static Logger logger = LoggerFactory.getLogger(MessageSenderImpl.class);

    //注入Source
    @Autowired
    private Source source;

    @Override
    public void sendMessage(String message) {
        boolean sendStatus = source.output().send(MessageBuilder.withPayload(message).build());
        logger.info("發送數據:{},sendStatus: {}",message,sendStatus);

    }
}

TestController.java

package com.xu.serviceconsumer.controller;

/**
 *
 */

import com.xu.serviceconsumer.interfaces.IMessageSender;
import io.swagger.annotations.Api;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author mazhen
 *
 */
@Api(description = "提交給MQ")
@RestController
public class TestController {

    private final static Logger logger = LoggerFactory.getLogger(TestController.class);

    @Autowired
    private IMessageSender iMessageSender;

    @GetMapping("send")
    public void send() {
        iMessageSender.sendMessage("Ronnie O'Sullivan");
    }

}

附上swagger部分代碼

SwaggerApp.java

package com.xu.serviceconsumer;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerApp {
    @Bean
    public Docket createRestApi() {

        return new Docket(DocumentationType.SWAGGER_2)
                .apiInfo(apiInfo())
                .select()
                //為當前包路徑
                .apis(RequestHandlerSelectors.basePackage("com.xu.serviceconsumer.controller"))
                .paths(PathSelectors.any())
                .build();
    }

    //構建 api文檔的詳細信息函數,注意這里的注解引用的是哪個
    private ApiInfo apiInfo() {
        return new ApiInfoBuilder()
                //頁面標題
                .title("Spring Boot 使用 Swagger2 構建RESTful API")
                //創建人
                .contact(new Contact("Bryan", "http://blog.bianxh.top/", ""))
                //版本號
                .version("1.0")
                //描述
                .description("API 描述")
                .build();
    }
}

項目的啟動類如下,沒有什么特殊的處理:

啟動項目后,輸入地址http://localhost:8089/swagger-ui.html打開swagger頁面,然后點擊try it out發送消息

在后台我們可以看到發送消息成功了

(二)消息消費者【service-consumer-stream-8090】

pom.xml文件如下

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.xu</groupId> <artifactId>service-consumer-stream-8090</artifactId> <version>0.0.1-SNAPSHOT</version> <name>service-consumer-stream-8090</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.M3</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.48</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> <repositories> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/milestone</url> </repository> </repositories> </project> 

application.yml

server:
  port: 8090
spring:
  application:
    name: spring-cloud-stream-receiver-2
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:                                      #配置rabbimq連接環境
            spring:
              rabbitmq:
                host: localhost
                username: guest
                password: guest
                virtual-host: /
      bindings:
        input:
          destination: stream-demo   #exchange名稱,交換模式默認是topic

定義一個消息接收接口

ReceviceMsg.java

package com.xu.serviceconsumer.interfaces;

public interface ReceviceMsg {

    void receive(String message);
}


ReceviceMsgImpl.java

package com.xu.serviceconsumer.services;

import com.xu.serviceconsumer.interfaces.ReceviceMsg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(value = {Sink.class})
public class ReceviceMsgImpl implements ReceviceMsg {

    private static Logger logger = LoggerFactory.getLogger(ReceviceMsgImpl.class);

    @StreamListener(Sink.INPUT)
    @Override
    public void receive(String message) {

        logger.info("8090客戶端接收消息:"+message);
    }
}

啟動類如下,也沒有任何特殊處理:

啟動項目,然后再次用上面的消息發送控制器TestController.java發送消息到消息隊列,然后可以看到消息消費端也收到了隊列的消息如下:

(三)消息消費者【service-consumer-stream-8091】

這是復制上面的消費者創建的另一個消息消費者,基本配置跟上面service-consumer-stream-8090基本一模一樣,只是application.yml中的部分配置略有不同(主要是端口不同),如下:

application.yml

server:
  port: 8091
spring:
  application:
    name: spring-cloud-stream-receiver-1
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:                                      #配置rabbimq連接環境
            spring:
              rabbitmq:
                host: localhost
                username: guest
                password: guest
                virtual-host: /
      bindings:
        input:
          destination: stream-demo   #exchange名稱,交換模式默認是topic


啟動這個消息消費者之后,用上面的消息生產者TestController生產一個消息,然后可以看到這個消費者也接收到了消息隊列的消息了

 

這里我們就用默認的通道完成了消息的發送和接收,下一篇我們將說一下自定義通道實現消息的發送和接收,下次再見!

===============================================================================

如果您覺得此文有幫助,可以打賞點錢給我支付寶或掃描二維碼

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM