Spring Cloud Stream


在實際的企業開發中,消息中間件是至關重要的組件之一。消息中間件主要解決應用解耦,異步消
息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。不同的中間件其實現方式,內
部結構是不一樣的。如常見的RabbitMQKafka,由於這兩個消息中間件的架構上的不同,像
RabbitMQexchangekafkaTopicpartitions分區,這些中間件的差異性導致我們實際項目開發
給我們造成了一定的困擾,我們如果用了兩個消息隊列的其中一種,后面的業務需求,我想往另外一種
消息隊列進行遷移,這時候無疑就是一個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的
系統耦合了,這時候 springcloud Stream 給我們提供了一種解耦合的方式。

概述
Spring Cloud Stream由一個中間件中立的核組成。應用通過Spring Cloud Stream插入的input(相當於
消費者consumer,它是從隊列中接收消息的)output(相當於生產者producer,它是從隊列中發送消
息的。)通道與外界交流。通道通過指定中間件的Binder實現與外部代理連接。業務開發者不再關注具
體消息中間件,只需關注Binder對應用程序提供的抽象概念來使用消息中間件實現業務即可。

 

 

說明:最底層是消息服務,中間層是綁定層,綁定層和底層的消息服務進行綁定,頂層是消息生產者和
消息消費者,頂層可以向綁定層生產消息和和獲取消息消費

綁定器
Binder 綁定器是Spring Cloud Stream中一個非常重要的概念。在沒有綁定器這個概念的情況下,我們
的Spring Boot應用要直接與消息中間件進行信息交互的時候,由於各消息中間件構建的初衷不同,它
們的實現細節上會有較大的差異性,這使得我們實現的消息交互邏輯就會非常笨重,因為對具體的中間
件實現細節有太重的依賴,當中間件有較大的變動升級、或是更換中間件的時候,我們就需要付出非常
大的代價來實施。
通過定義綁定器作為中間層,實現了應用程序與消息中間件(Middleware)細節之間的隔離。通過向應用
程序暴露統一的Channel通過,使得應用程序不需要再考慮各種不同的消息中間件的實現。當需要升級
消息中間件,或者是更換其他消息中間件產品時,我們需要做的就是更換對應的Binder綁定器而不需要
修改任何應用邏輯 。甚至可以任意的改變中間件的類型而不需要修改一行代碼。
Spring Cloud Stream支持各種binder實現,下表包含GitHub項目的鏈接。

RabbitMQ
Apache Kafka
Amazon Kinesis
Google PubSub (partner maintained)
Solace PubSub+ (partner maintained)
Azure Event Hubs (partner maintained)

通過配置把應用和spring cloud stream 的 binder 綁定在一起,之后我們只需要修改 binder 的配置來
達到動態修改topic、exchange、type等一系列信息而不需要修改一行代碼。
發布/訂閱模型
在Spring Cloud Stream中的消息通信方式遵循了發布-訂閱模式,當一條消息被投遞到消息中間件之
后,它會通過共享的 Topic 主題進行廣播,消息消費者在訂閱的主題中收到它並觸發自身的業務邏輯處
理。這里所提到的 Topic 主題是Spring Cloud Stream中的一個抽象概念,用來代表發布共享消息給消
費者的地方。在不同的消息中間件中, Topic 可能對應着不同的概念,比如:在RabbitMQ中的它對應
了Exchange、而在Kakfa中則對應了Kafka中的Topic。

案例中通過rabbitMQ作為消息中間件,完成SpringCloud Stream的案例。需要自行安裝
消息生產者
(1)創建工程引入依賴

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
    </dependencies>

 

(2)定義bingding
發送消息時需要定義一個接口,不同的是接口方法的返回對象是 MessageChannel:

/**
 * 自定義的消息通道
 */
public interface MyProcessor {

    /**
     * 消息生產者的配置
     */
    String MYOUTPUT = "myoutput";

    @Output("myoutput")
    MessageChannel myoutput();

    /**
     * 消息消費者的配置
     */
    String MYINPUT = "myinput";

    @Input("myinput")
    SubscribableChannel myinput();
}
/**
 * 負責向中間件發送數據
 */
@Component
@EnableBinding(MyProcessor.class)
public class MessageSender {

    @Autowired
    @Qualifier(value="myoutput")
    private MessageChannel myoutput;

    //發送消息
    public void send(Object obj) {
        myoutput.send(MessageBuilder.withPayload(obj).build());
    }
}

 

yml清單

server:
  port: 7001 #服務端口
spring:
  application:
    name: stream_producer #指定服務名
  rabbitmq:
    addresses: 192.168.180.137
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        output:
          destination: topcheer-default  #指定消息發送的目的地,在rabbitmq中,發送到一個topcheer-default的exchange中
        myoutput:
          destination: topcheer-custom-output
#          producer:
#            partition-key-expression: payload  #分區關鍵字   對象中的id,對象
#            partition-count: 2  #分區大小
      binders:  #配置綁定器
        defaultRabbit:
          type: rabbit

消息消費者
(1)創建工程引入依賴

  <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
    </dependencies>

2)定義bingding
同發送消息一致,在Spring Cloud Stream中接受消息,需要定義一個接口。

/**
 * 自定義的消息通道
 */
public interface MyProcessor {

    /**
     * 消息生產者的配置
     */
    String MYOUTPUT = "myoutput";

    @Output("myoutput")
    MessageChannel myoutput();

    /**
     * 消息消費者的配置
     */
    String MYINPUT = "myinput";

    @Input("myinput")
    SubscribableChannel myinput();
}
@Component
@EnableBinding(MyProcessor.class)
public class MessageListener {

    //監聽binding中的消息
    @StreamListener(MyProcessor.MYINPUT)
    public void input(String message) {
        System.out.println("獲取到消息: "+message);
    }

}

 

yml清單

server:
  port: 7002 #服務端口
spring:
  application:
    name: rabbitmq-consumer #指定服務名
  rabbitmq:
    addresses: 192.168.180.137
    username: guest
    password: guest
  cloud:
    stream:
#      instanceCount: 2  #消費者總數
#      instanceIndex: 0  #當前消費者的索引
      bindings:
        input: #內置的獲取消息的通道 , 從topcheer-default中獲取消息
          destination: topcheer-default
        myinput:
          destination: topcheer-custom-output
#          group: group1
#          consumer:
#            partitioned: true  #開啟分區支持
      binders:
        defaultRabbit:
          type: rabbit

 

 

 

 測試類:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class Test {

    @Autowired(required = false)
    private MessageSender messageSender;

    @org.junit.Test
    public void toSend(){
        messageSender.send("測試");
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM