SpringCloudStream實例


1.解決的痛點

由於市面上有很多的消息中間件(activeMQ,rabbitMQ,RocketMQ,Kafka),當某一天公司切換某一種新的消息中間件的時候,使得我們工作量會變大,增加學習量。

那么有沒有一種新技術,能讓我們不在關注具體的MQ細節,我們只需要用一種適配綁定的方式,自動的給我們在MQ內進行切換。這個時候就是Springcloud Stream要大顯身手的時候。

2.概述

一句話:屏蔽底層消息中間件的差異,降低切換成本,同一消息的編程模型。

官方定義:springcloud Stream 是一個構建消息驅動的微服務框架。

應用程序通過Inputs或者outputs來與springcloud Stream中binder對象交互。

通過我們配置binding(綁定),而springcloud Stream 的binder對象負責與消息中間件交互。

所以,我們只需要搞清楚如何與springcloud stream交互就可以方便的使用消息驅動方式。

通過spring integration 來連接消息代理中間件,以實現消息消息事件驅動。

springcloud stream為一些供應商的消息中間件產品提供了個性化自動配置實現,引用了發布-訂閱、消費組、分區三個核心概念,目前僅支持RabbitMQ、 Kafka

3.官網

https://spring.io/projects/spring-cloud-stream

4.stream憑什么可以統一差異?

通過定義綁定器作為中間層,完美的實現了應用程序與消息中間件細節之間的隔離。

通過向應用程序暴露統一的Channel通道,使得應用程序不需要再考慮各種不同的消息中間件實現。

通過定義綁定器Binder作為中間層,實現了應用程序與消息中間件細節之間的隔離。

5.Binder的兩個重要概念

input對應消費者

output對應生產者

6.Stream消息通信方式是什么?

它遵循了發布-訂閱模式,通過Topic主題進行廣播推送。

7.Stream遵循的標准流程

1.binder 很方便的連接中間件,屏蔽差異。

2.Channel 通道,是隊列QUEUE的一種抽象,在消息通訊系統中就是實現存儲和轉發的媒介,通過channel隊列進行配置。

3.source和sink 簡單的可以理解為參照對象是springcloud stream自身,從stream發布消息就是輸出,接受消息就是輸入。

8.API和常用的注解

組成 說明
Middleware 中間件,目前只支持RabbitMQ和Kafka
Binder binder是應用於消息中間件之間的封裝,目前實行了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變消息類型(對應於Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實現。
@Input 注解標識輸入通道,通過該輸入通道接收到的消息進入應用程序
@Output 注解標識輸出通道,發布的消息將通過該通道離開應用程序
@StreamListener 監聽隊列,用於消費者的隊列的消息接收。
@EnableBinding 指信道channel和exchange綁定在一起。

9.代碼

第一步:創建eureka服務模塊cloud-eureka-server7001、cloud-eureka-server7002

1.POM文件:

 
    <dependencies>
        <!--eureka-server-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--引入熱部署-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies> 

2.創建application.yml

server:
  port: 7001

# eureka配置
eureka:
  instance:
    hostname: eureka7001.com #eureka服務端的實例名稱
  client:
    # false表示不想注冊中心注冊自己
    register-with-eureka: false
    # false表示自己端就是注冊中心,我的職責就是維護服務實例,並不需要檢索服務
    fetch-registry: false
    service-url:
      # 設置Eureka Server交互的地址查詢服務和注冊服務都需要依賴這個地址
      #defaultZone: http://eureka7002.com:7002/eureka/
      # 單機就是7001自己守望自己
      defaultZone: http://eureka7001.com:7001/eureka/
  server:
    enable-self-preservation: false # 禁用自我保護模式
    eviction-interval-timer-in-ms: 2000

3.創建主入口函數

package com.seegot.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

/**
 * @program: cloud2020
 * @description:
 * @author: PP Zhang
 * @create: 2020-06-09 22:15
 */
@SpringBootApplication
@EnableEurekaServer // 表示自己就是注冊中心
public class EurekaMain7001 {
    public static void main(String[] args) {
        SpringApplication.run(EurekaMain7001.class,args);
    }
}

7002和上面配置基本一致,只是端口號不一致,直接clone就行。

第二步:新建消息生產者模塊cloud-stream-rabbitmq-provider8801

1.POM

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <version>3.0.6.RELEASE</version>
        </dependency>
        <!--springcloud stream-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!-- 客戶端 config -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>

        <!--注入eureka client 依賴-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency> 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--引入熱部署-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

2.創建application.yml

 1 server:
 2   port: 8801
 3 spring:
 4   application:
 5     name: cloud-stream-provider
 6   cloud:
 7     stream:
 8       binders: # 在此處配置需要綁定的rabbitmq的服務消息
 9         defaultRabbit: # 表示定義的名稱,用於binding整合
10           type: rabbit # 消息組件類型
11           environment: # 設置rabbitmq的相關環境配置
12             spring:
13               rabbitmq:
14                 host: localhost
15                 port: 5672
16                 username: guest
17                 password: guest
18       bindings: # 服務的整合處理
19         output: # 這個名字是一個通道的名稱
20           destination: studyExchange # 標識要使用的Exchange名稱定義
21           content-type: application/json #設置消息類型,本次為json,本文則設置為“text/plain”
22           binder: defaultRabbit # 設置要綁定的消息服務的具體設置
23 eureka:
24   client:
25     service-url:
26       defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka # 集群版
27   instance:
28     lease-renewal-interval-in-seconds: 2 # 設置心跳時間間隔,默認是30秒
29     lease-expiration-duration-in-seconds: 5 # 如果超過了5秒間隔,默認是90秒
30     instance-id: send-8801.com # 在信息列表時顯示主機名稱
31     prefer-ip-address: true #訪問路徑變為IP地址

3.創建主入口函數

package com.seegot.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

/**
 * @program: cloud2020
 * @description:
 * @author: PP Zhang
 * @create: 2020-06-30 10:45
 */
@SpringBootApplication
@EnableEurekaClient
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class,args);
    }
}

4.創建服務類 IMessageProvider

package com.seegot.springcloud.service;


/**
 * @program: cloud2020
 * @description:
 * @author: PP Zhang
 * @create: 2020-06-30 10:47
 */

public interface IMessageProvider {
    public String send();
}

創建服務實現類

package com.seegot.springcloud.service.impl;

import com.seegot.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * @program: cloud2020
 * @description:
 * @author: PP Zhang
 * @create: 2020-06-30 10:48
 */
@EnableBinding(Source.class) // 定義消息的推送管道。類似output
public class MessageProviderImpl implements IMessageProvider {
    @Resource
    private MessageChannel output; // 屬於消息發送管道。

    // 綁定消息
    @Override
    public String send() {
        String uuid = UUID.randomUUID().toString();// 發送的內容
        output.send(MessageBuilder.withPayload(uuid).build());
        System.out.println("@@@@@uuid: "+ uuid);
        return null;
    }
}

5.創建業務類

 1 package com.seegot.springcloud.controller;
 2 
 3 import com.seegot.springcloud.service.IMessageProvider;
 4 import lombok.extern.slf4j.Slf4j;
 5 import org.springframework.web.bind.annotation.GetMapping;
 6 import org.springframework.web.bind.annotation.RestController;
 7 
 8 import javax.annotation.Resource;
 9 
10 /**
11  * @program: cloud2020
12  * @description:
13  * @author: PP Zhang
14  * @create: 2020-06-30 10:56
15  */
16 @RestController
17 @Slf4j
18 public class SendMessageController {
19     @Resource
20     private IMessageProvider messageProvider;
21     @GetMapping(value = "/sendMessage")
22     public String sendMessage(){
23         return  messageProvider.send();
24     }
25 }

6.測試。http://localhost:8801/sendMessage

2020-06-30 13:22:54.601  INFO 2980 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver      : Resolving eureka endpoints via configuration
@@@@@uuid: baef8eab-67c7-40ce-b6ad-f290ad5ffeba
@@@@@uuid: c457a864-45a4-4af1-99f8-ed0cf430429c
@@@@@uuid: 803cfd67-ab44-4d63-98c0-b08880142316
@@@@@uuid: 818e3d27-6853-4878-9d24-6c73f7349480
@@@@@uuid: 45a28a16-d074-43ce-afd9-74c74d79259f
@@@@@uuid: bcaad91e-fa08-4f5f-98d0-2232843f052b
@@@@@uuid: 89b6b301-ee96-41e8-a99a-5f2c0e579cfa
@@@@@uuid: 55d8c5aa-2093-44f5-ac8c-58392ec45249
@@@@@uuid: 8d79cfe9-e0fa-4880-aad4-bc1342919ec7
@@@@@uuid: 73202336-05b0-4682-bcfc-22627a3141c3
@@@@@uuid: 40206a09-5e06-4f02-95bb-68f7c78e141e
@@@@@uuid: c768c1d1-961c-4a3c-8250-cba4052b046f

 

第三步:新建消費者模塊cloud-stream-rabbitmq-consumer8802、cloud-stream-rabbitmq-consumer8803

1.POM

 <dependencies>
        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <version>3.0.6.RELEASE</version>
        </dependency>
        <!--springcloud stream-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!-- 客戶端 config -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>

        <!--注入eureka client 依賴-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency> 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--引入熱部署-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

2.新建application.yml

在分布式環境下,一條消息只希望被消費一次,避免重復消費,此時需要注意對group的使用,要將消費者統一在同一個組group內,也就是group名稱一致。另外,group還可以實現消息的持久化。可以通過停用消息消費者,然后通過消息生產者發送多條消息,然后重新啟動消息消費者,會看到消息消費者接收到了相應的消息,這就是持久化;反之如果刪除group,那么重新停用消息消費者,然后在通過消息生產者發送消息,再次啟動消息消費者,此時就接收不到相應的消息。

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此處配置需要綁定的rabbitmq的服務消息
        defaultRabbit: # 表示定義的名稱,用於binding整合
          type: rabbit # 消息組件類型
          environment: # 設置rabbitmq的相關環境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        input: # 這個名字是一個通道的名稱
          destination: studyExchange # 標識要使用的Exchange名稱定義
          content-type: application/json #設置消息類型,本次為json,本文則設置為“text/plain”
          binder: defaultRabbit # 設置要綁定的消息服務的具體設置
          group: seegotB #分組並且還可以實現消息持久化。
eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka # 集群版
  instance:
    lease-renewal-interval-in-seconds: 2 # 設置心跳時間間隔,默認是30秒
    lease-expiration-duration-in-seconds: 5 # 如果超過了5秒間隔,默認是90秒
    instance-id: receive-8802.com # 在信息列表時顯示主機名稱
    prefer-ip-address: true #訪問路徑變為IP地址

3.新建主入口函數

package com.seegot.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

/**
 * @program: cloud2020
 * @description:
 * @author: PP Zhang
 * @create: 2020-06-30 11:19
 */
@SpringBootApplication
@EnableEurekaClient
public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class,args);
    }
}

4.新建業務類

package com.seegot.springcloud.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;

/**
 * @program: cloud2020
 * @description:
 * @author: PP Zhang
 * @create: 2020-06-30 11:21
 */
@RestController
@EnableBinding(Sink.class)// 定義消息的接受管道。類似input
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;
    @StreamListener(Sink.INPUT) // 監聽
    public void input(Message<String> message){
        System.out.println("消費者1號,---->接收的消息:"+message.getPayload()+"\t port:"+serverPort);
    }
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM