spring-cloud-stream消息驅動的微服務


  Spring Cloud Stream 是 一 個用來為微服務應用構建消息驅動能力的框架。 它可以基於Spring Boot 來創建獨立的、 可用於生產的 Spring 應用程序。 它通過使用 Spring Integration來連接消息代理中間件以實現消息事件驅動。 Spring Cloud Stream 為 一 些供應商的消息中間件產品提供了個性化的自動化配置實現,並且引入了發布與訂閱、 消費組以及分區這三個核心概念。 簡單地說, Spring Cloud Stream 本質上就是整合了 Spring Boot 和 SpringIntegration, 實現了 一 套輕量級的消息驅動的微服務框架。 通過使用 Spring Cloud Stream,可以有效簡化開發人員對消息中間件的使用復雜度, 讓系統開發人員可以有更多的精力關注於核心業務邏輯的處理。 由於 Spring Cloud Stream 基於 Spring Boot 實現,所以它秉承了Spring Boot 的優點,自動化配置的功能可幫助我們快速上手使用。

快速入門:

1.創建 一 個基礎的 Spring Boot 工程,添加以下依賴:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>

2.創建用於接收來自 RabbitMQ 消息的消費者 SinkReceiver

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; @EnableBinding(Sink.class) public class SinkReceiver { private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class); @StreamListener(Sink.INPUT) public void receive(String payload) { logger.info("Received Message: " + payload); } }

3.創建主類:

@SpringBootApplication public class StreamApp { private final static Logger log = LoggerFactory.getLogger(StreamApp.class); public static void main(String[] args) { SpringApplication.run(StreamApp.class, args); log.info("服務啟動成功 "); } }

4.新增配置:

spring.rabbitmq.host = 192.168.1.101 spring.rabbitmq.username = guest spring.rabbitmq.password = guest spring.rabbitmq.port = 5672

  到這里, 快速入門示例的編碼任務已經完成了。 下面我們分別啟動 RabbitMQ 以及該Spring Boot 應用, 然后做下面的測試, 看看它們是如何運作的。先來看一 下Spring Boot 應用的啟動日志,我們可以發現如下信息

   我們看到連接了 Rabbit,聲明了一 個名為input.anonymous.eAIrmYmoRfGNwBYNynD3Uw 的隊列,並通過RabbitMessageChannelBinder將自己綁定為它的消費者。 這些信息我們也能在RabbitMQ的控制台中發現它們

  下面我們可以在 RabbitMQ的控制台中 進入 input.anonymous.eAIrmYmoRfGNwBYNynD3Uw 隊列 的管理頁面, 通過Publish message功能來發送 一 條消息到該 隊列中。

 

   此時, 我們可以在當前啟動的Spring Boot應用程序的控制台中看到下面的內容:

  可以 發現在應用控制台中輸出的內容就是SinkReceiver中的receive方法定義的, 而輸出的具體內容則來自消息隊列中獲取的對象。 

  接着, 我們再來看看這里用到的幾個Spring Cloud Stream的核心注解, 它們都被定義在SinkReceiver中。

  @EnableBinding, 該注解用來指定 一 個或多個定義了@Input或@Output 注解的接口, 以此實現對消息通道 (Channel) 的綁定。 在上面的例子中, 我們通過@EnableBinding(Sink.class)綁定了 Sink接口, 該接口是 Spring CloudStream中默認實現的對輸入消息通道綁定的定義, 它的源碼如下:

public interface Sink { String INPUT = "input"; @Input("input") SubscribableChannel input(); }

  它通過@Input注解綁定了 一 個名為input的通道。除了Sink之外,Spring CloudStream還默認實現了綁定 output 通道的 Source 接口, 還有結合了Sink和Source的Processor接口,實際使用時我們也可以自已通過@Input和@Output注解來定義綁定消息通道的接口。 當需要為@EnableBinding指定多個接口來綁定消息通道的時候, 可以這樣定義: @EnableBinding ( value = {Sink. class,Source .class})。

  @StreamListener: 它主要定義在方法上, 作用是將被修飾的方法注冊為消息中間件上數據流的事件監聽器, 注解中的屬性值對應了監聽的消息通道名。 在上面的例子中, 我們通過@StreamListener (Sink. INPUT)注解將receive方法注冊為input消息通道的監聽處理器,所以當我們在RabbitMQ的控制頁面中發布消息的時候, receive方法會做出對應的響應動作。

核心概念:

  下圖是官方文檔中Spring Cloud Stream應用模型的結構圖。從中我們可以看到,SpringCloud Stream構建的應用程序與消息中間件之間是通過綁定器 Binder相關聯的, 綁定器對於應用程序而言起到了隔離作用, 它使得不同消息中間件的實現細節對應用程序來說是透明的。 所以對於每 一 個Spring Cloud Stream的應用程序來說, 它不需要知曉消息中間件的通信細節, 它只需知道Binder 對應程序提供的抽象概念來使用消息中間件來實現業務邏輯即可, 而 這個抽象概念就是在上文中我們提到的消息通道: Channel。如下圖所示, 在應用程序和Binder之間定義 了兩條輸入通道和三條輸出通道來傳遞消息, 而綁定器則是作為這些通道和消息中間件之間的橋梁進行通信。

綁定器:

  Binder綁定器是Spring Cloud Stream中 一 個非常重要的概念。 在沒有綁定器這個概念的情況下, SpringBoot應用要直接與消息中間件進行信息交互的時候, 由於各消息中間件構建的初衷不同, 所以它們在實現細節上會有較大的差異, 這使得我們實現的消息交互邏輯就會非常笨重, 因為對具體的中間件實現細節有太重的依賴, 當中間件有較大的變動升級或是更換中間件的時候, 我們就需要付出非常大的代價來實施。

  通過定義綁定器作為中間層, 完美地實現了應用程序與消息中間件細節之間的隔離。 通過向應用程序暴露統 一 的Channel通道, 使得應用程序不需要再考慮各種不同的消息中間件的實現。 當需要升級消息中間件, 或是更換其他消息中間件產品時, 我們要做的就是更換它們對應的Binder綁定器而 不需要修改任何SpringBoot的應用邏輯。這 一 點在消息總線時, 從RabbitMQ切換到Kafka的過程中, 已經能夠讓我們體驗到這 一 好處。目前版本的Spring Cloud Stream為主流的消息中間件產品RabbitMQ和Kafka提供了默認的Binder 實現, 在上文的例子中, 我們就使用了RabbitMQ的Binder。 另外,Spring Cloud Stream還實現了 一 個專門用於測試的TestSupportBinder, 開發者可以直接使用它來對通道的接收內容進行可靠的測試斷言。 如果要使用除了RabbitMQ 和 Kafka以外的消息中間件的話, 我們也可以通過使用它所提供的擴展API來實現其他中間件的Binder。

  發布-訂閱模式:

  Spring Cloud Stream中的消息通信方式遵循了發布-訂閱模式, 當 一 條消息被投遞到消息中間件之后,它會通過共享的Topic主題進行廣播,消息消費者在訂閱的主題中收到它並觸發自身的業務邏輯處理。這里所 提到的Topic 主題是SpringCloud Stream中的 一 個抽象概念,用來代表發布共享消息給消費者的地方。 在不同的消息中間件中,Topic可能對應不同的概念, 比如, 在RabbitMQ中,它對應Exchange, 而在 Kakfa中則對應Kafka中的Topic。

  在上面的示例中,我們通過RabbitMQ的Channel發布消息給 我們編寫的應用程序消費, 而實際上SpringCloud Stream應用啟動的時候,在RabbitMQ的Exchange中也創建了 一 個名為 input的Exchange交換器, 由於Binder的隔離作用, 應用程序並無法感知它的存在,應用程序只知道自己指向Binder的輸入或是輸出通道。而在Exchanges選項卡中,我們還能找到名為input的交換器:

  單擊進入可以看到如下圖所示的詳情頁面。 Bindings欄中的內容就是應用程序綁定通道中的消息隊列, 我們可以通過Exchange頁面的Publish Message來發布消息, 此時可以發現啟動的應用程序都輸出了消息內容。

  相對於點對點隊列實現的消息通信來說,Spring Cloud Stream采用的發布-訂閱模式可以有效降低消息生產者與消費者之間的耦合。 當需要對同 一 類消息增加 一 種處理方式時,只需要增加 一 個應用程序並將輸入通道綁定到既有的Topic中就可以實現功能的擴展,而不需要改變原來已經實現的任何內容。

  消費組:

  雖然Spring Cloud Stream通過發布-訂閱模式將消息生產者與消費者做了很好的解耦,基於相同主題的消費者可以輕松地進行擴展, 但是這些擴展都是針對不同的應用實例而言的。 在現實的微服務架構中, 我們的每 一 個微服務應用為了實現高可用和負載均衡, 實際上都會部署多個實例。 在很多情況下, 消息生產者發送消息給某個具體微服務時, 只希望被消費 一 次,為了解決這個問題, 在Spring Cloud Stream中提供了消費組的概念。

  如果在同 一 個主題上的應用需要啟動多個實例的時候,我們可以通過 spring.cloud.stream.bindings.input.group屬性為應用指定 一 個組名,這樣這個應用的多個實例在接收到消息的時候, 只會有 一 個成員真正收到消息並進行處理。 默認情況下, 當沒有為應用指定消費組的時候, Spring Cloud Stream會為其分配 一 個獨立的匿名消費組。 所以, 如果同 一 主題下的所有應用都沒有被指定消費組的時候, 當有消息發布之后, 所有的應用都會對其進行消費, 因為它們各自都屬於 一 個獨立的組。 大部分情況下, 我們在創建Spring Cloud Stream應用的時候, 建議最好為其指定 一 個消費組,以防止對消息的重復處理, 除非該行為需要這樣做(比如刷新所有實例的配置等)。

  消息分區:

  通過引入消費組的概念, 我們已經能夠在多實例的清況下, 保障每個消息只被組內的一個實例消費。 通過上面對消費組參數設置后的實驗, 我們可以觀察到, 消費組無法控制消息具體被哪個實例消費。 也就是說, 對於同 一 條消息, 它多次到達之后可能是由不同的實例進行消費的。 但是對於一 些業務場景, 需要對 一 些具有相同特征的消息設置每次都被同 一 個消費實例處理, 比如,一 些用於監控服務, 為了統計某段時間內消息生產者發送的報告內容, 監控服務需要在自身聚合這些數據, 那么消息生產者可以為消息增加 一 個固有的特征ID來進行分區, 使得擁有這些ID的消息每次都能被發送到 一 個特定的實例上實現累計統計的效果, 否則這些數據就會分散到各個不同的節點導致監控結果不 一 致的情況。而分區概念的引入就是為了解決這樣的問題: 當生產者將消息數據發送給多個消費者實例時, 保證擁有共同特征的消息數據始終是由同 一 個消費者實例接收和處理。

  Spring Cloud Stream 為分區提供了通用的抽象實現, 用來在消息中間件的上層實現分區處理所以它對於消息中間件自身是否實現了消息分區並不關心, 這使得 Spring CloudStream 為不具備分區功能的消息中間件也增加了分區功能擴展。

使用詳解:

開啟綁定功能:

  在 Spring Cloud Stream 中,我們需要通過 @EnableBinding 注解來為應用啟動消息驅動的功能, 該注解我們在快速入門中已經有了基本的介紹, 下面來詳細看看它的定義:

@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Configuration @Import({ //該配置會加載消息通道綁定必要的一些實例,比如SpEL表達式轉換器配置
BindingServiceConfiguration.class, //要是在Spring加載Bean 的時候被調用, 用來實現加載更多的Bean
BindingBeansRegistrar.class, //主要用來加載與消息中間件相關的配置信息, 比如, 它會從應用工程的META-INF/spring.binders中加載針對具體消息中間件相關的配置文件等。
BinderFactoryConfiguration.class} ) @EnableIntegration public @interface EnableBinding { Class<?>[] value() default {}; }

  從該注解的定義中我們可以看到,它自身包含了 @Configuration 注解,所以用它注解的類也會成為 Spring 的基本配置類。 另外該注解還通過 @Import 加載了 Spring Cloud Stream 運行需要的幾個基礎配置類。

綁定消息通道:

  在Spring Cloud Steam中, 我們可以在接口中通過@nput和@Output注解來定義消息通道, 而用於定義綁定消息通道的接口則可以被@EnableBinding注解的value參數來指定, 從而在應用啟動的時候實現對定義消息通道的綁定。上面的例子我們使用Sink接口綁定的消息通道。Sink接口是SpringCloud Steam提供的一 個默認實現, 除此之外還有Source和Processor, 可從它們的源t碼中學習它們的定義方式:

public interface Sink { String INPUT = "input"; @Input("input") SubscribableChannel input(); } public interface Source { String OUTPUT = "output"; @Output("output") MessageChannel output(); } public interface Processor extends Source, Sink { }

  我們可以看到,Sink和Source中分別通過@Input和@Output注解定義了 輸入通道和輸出通道,而Processor通過繼承Source和Sink的方式同時定義了 一 個輸入通道和一 個輸出通道。

  另外, @Input和@Output 注解都還有一 個value屬性,該屬性可以用來設置消息通道的名稱,這里Sink和Source中指定的消息通道名稱分別為 input和output。如果我們直接使用這兩個注解而沒有指定具體的 value值,將默認使用方法名作為消息通道的名稱 。需要 注意 一 點,當我們定義輸出通道的時候,需要返回MessageChannel接口對象, 該接口定義了向消息通道發送消息的方法; 而定義輸入通道時,需要返回SubscribableChannel接口對象,該接口 繼承自MessageChannel接口,它定義了維護消息通道訂閱者的方法。

注入綁定接口:

  我們在示例中已經為 Sink接口綁定的 input消息通道實現了具體的消息消費者,下面可以通過注入的方式實現一 個消息生成者,向 input消息通道發送數據。

1.創建一 個將 Input消息通道作為 輸出通道的接口,具體如下:

public interface SinkSender {

    @Output(Source.OUTPUT)
    MessageChannel output();

    @Input(Sink.INPUT)
    SubscribableChannel input();
}

2.對快速入門中定義的 SinkReceiver做 一 些修改:在@EnableBinding 注解中增加對SinkSender接口的指定,使Spring Cloud Stream能創建出對應的實例 。

@EnableBinding(value = {SinkSender.class})
public class SinkReceiver {

    private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);

    @StreamListener(Sink.INPUT)
    public void receive(String payload) {
        logger.info("Received Message: " + payload);
    }
}

3.添加配置

#交換機
spring.cloud.stream.bindings.input.destination = input-wuzz-data
spring.cloud.stream.bindings.output.destination=output-wuzz-data
# 若指定了group  ,需要重新自定義Sink  默認的 Sink  input通道會創建默認的group ,下面這樣子寫回拋出Bean重復定義異常,所以需要自定義Sink
#spring.cloud.stream.bindings.input.group=positionInput
#  交換機類型
#spring.cloud.stream.rabbit.bindings.input.consumer.exchange-type=direct
# routingkey
#spring.cloud.stream.rabbit.bindings.input.consumer.binding-routing-key=air.task.result

4.加個接口,注入 SinkSender 的實例, 並調用它的發送消息方法。

@RestController public class TestController { @Autowired private SinkSender sinkSender; @GetMapping(value = "/hello") public void test(){ sinkSender.output().send(MessageBuilder.withPayload("From SinkSender").build()); } }

5.測試,調用該接口,如果可以在控制台中找到如下輸出內容, 表明我們的試驗已經成功了, 消息被正確地發送到了 input 通道中, 並被相對應的消息消費者輸出。

 注入消息通道:

  由於 Spring Cloud Stream 會根據綁定接口中的 Input 和 @Output 注解來創建消息通道實例, 所以我們也可以通過直接注入的方式來使用消息通道對象。

@Autowired private MessageChannel output;
@GetMapping(value
= "/hello") public void test(){   output.send(MessageBuilder.withPayload("From MessageChannel").build()); }

  這種用法雖然很直接, 但是也容易犯錯, 很多時候我們在 一 個微服務應用中可能會創建多個不同名的 MessageChannel 實例, 這樣通過 @Autowired 注入時, 要注意參數命名需要與通道同名才能被正確注入, 或者也可以使用 @Qualifier 注解來特別指定具體實例的名稱, 該名稱需要與定義 MessageChannel 的 @Output 中的value 參數 一 致, 這樣才能被正確注入。 比如下面的例子, 在 一 個接口中定義了兩個輸出通道, 分別命名為 Output-1和Output-2, 當要使用 Output-1 的時候, 可以通過@Qualifier("Output-1") 來指定這個具體的實例來注入使用。

public interface SinkSender { @Output("Output-1") MessageChannel outputl(); @Output("Output-2") MessageChannel output2(); } @RestController public class TestController { @Autowired @Qualifier("Output-1") private MessageChannel output; ...... }

消息生產與消費:

  由於 Spring Cloud Stream 是基於 Spring Integration 構建起來的, 所以在使用 SpringCloud Stream 構建消息驅動服務的時候,完全可以使用 Spring Integration 的原生注解來實現各種業務需求。 同時, 為了簡化面向消息的編程模型, Spring Cloud Stream 還提供了@StreamListener 注解對輸入通道的處理做了進 一 步優化。 下面我們分別從這兩方面來學習一 下對消息的處理。

  spring Integration 原生支持:

  通過之前的內容, 我們已經能夠通過注入綁定接口和消息通道的方式實現向名為 input 的消息通道發送信息。接下來, 我們通過 Spring Integration 原生的 @ServiceActivator和@InboundChannelAdapter 注解來嘗試實現相同的功能, 具體實現如下:

  消費者及生產者需要屬於兩個不同的應用。消費者:

@EnableBinding(value = {Sink.class}) public class IntegrationSinkReceiver { private static Logger logger = LoggerFactory.getLogger(IntegrationSinkReceiver.class); @ServiceActivator(inputChannel=Sink.INPUT) public void receive(@Payload Date payload, @Headers Map headers) { logger.info(headers.get("contentType").toString()); logger.info("Received from {} channel Date: {}", Sink.INPUT, payload); } }

  生產者:

@EnableBinding(value = {IntegrationSinkSender.SinkOutput.class})
public class IntegrationSinkSender {
    private static Logger logger = LoggerFactory.getLogger(IntegrationSinkSender.class);

    @Bean
    @InboundChannelAdapter(value = SinkOutput.OUTPUT, poller = @Poller(fixedDelay = "2000"))
    public MessageSource<Date> timerMessageSource() {
        return () -> new GenericMessage<>(new Date());
    }

    public interface SinkOutput {
        String OUTPUT = Sink.INPUT;
@Output(SinkOutput.OUTPUT) MessageChannel output(); } }

  兩個應用配置一樣的信息:

spring.cloud.stream.bindings.input.destination = input-wuzz-data
spring.cloud.stream.bindings.output.destination = out-wuzz-data
  • IntegrationSinkReceiver 類屬於消息消費者實現, 與之前實現的類似, 只是做了一 些修改:使用原生的 @ServiceActivator 注解替換了 @StreamListener, 實現對Sink.INPUT 通道的監聽處理, 而該通道綁定了名為 input的主題。
  • IntegrationSinkSender 類屬於消息生產者實現, 它在內部定義了 SinkOutput 接口來將輸出通道綁定到名為 input 的主題中。 由於 SinkSender 和 SinkReceiver 共用一 個主題, 所以它們構成了 一 組生產者與消費者。 另外, 在SinkSender 中還創建了用於生產消息的 timerMessageSource 方法, 該方法會將當前時間作為消息返回。而 InboundChannelAdapter 注解定義了該方法是對 SinkOutput.OUTPUT通道的輸出綁定, 同時使用 poller 參數將該方法設置為輪詢執行, 這里我們定義為2000毫秒,所以它會以2秒的頻率向 SinkOutput.OUTPUT 通道輸出當前時間。

  執行上面定義的程序, 可以得到類似下面的輸出:

  另外, 還可以通過 @Transformer 注解對指定通道的消息進行轉換。 比如, 我們可以在上面的 IntegrationSinkSender 類中增加下面的內容:

@Transformer(inputChannel = Sink.INPUT, outputChannel = SinkOutput.OUTPUT) public Object transform(Date message) {   return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message); }

  同時修改消息接收方法:

@ServiceActivator(inputChannel=Sink.INPUT) public void receive(String payload) {   logger.info("Received from {} channel Date: {}", Sink.INPUT, payload); }

  再次執行就會輸出以下信息:

  如果我們使用 @SteamListener 注解的話 ,可以在方法上直接定義對應類型的消息對象。比如上面的String類型的輸出。

  消息反饋:特別注意 input 跟 output,如果接收方接不到消息,請檢查這個是否寫反了。

  很多時候在處理完輸入消息之后, 需要反饋一 個消息給對方, 這時候可以通過@SendTo 注解來指定返回內容的輸出通道。

  接收方:

@EnableBinding(value = {Processor.class}) public class IntegrationSinkReceiverAnswer { private static Logger logger = LoggerFactory.getLogger(IntegrationSinkReceiverAnswer.class); @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public Object receive(String payload) { logger.info("Received from {} channel Date: {}", Processor.INPUT, payload); return "From Input Channel Return - " + payload; } @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.INPUT) public Object transform(Date message) { return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message); } }

  或者這樣:除了可以使用 @SendTo 注解將方法返回結果輸出到消息通道中, 還可以使用原生注解 @ServiceActivator 的 outputChannel 屬性配置

@EnableScheduling @EnableBinding(value = {Processor.class}) public class IntegrationSinkReceiverAnswer { private static Logger logger = LoggerFactory.getLogger(IntegrationSinkReceiverAnswer.class); @ServiceActivator(inputChannel = Processor.INPUT,outputChannel = Processor.OUTPUT) public Object receive(String payload) { logger.info("Received from {} channel Date: {}", Processor.INPUT, payload); return "From Input Channel Return - " + payload; } @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.INPUT) public Object transform(Date message) { return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message); } }

  接收方的通道配置:

spring.cloud.stream.bindings.input.destination=input-wuzz-data spring.cloud.stream.bindings.output.destination=output-wuzz-data

  發送方:

@EnableBinding(value = {Processor.class}) public class IntegrationSinkSenderAnswer { private static Logger logger = LoggerFactory.getLogger(IntegrationSinkSenderAnswer.class); @Bean @InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "2000")) public MessageSource<Date> timerMessageSource() { return () -> new GenericMessage<>(new Date()); } @Transformer(inputChannel = Processor.OUTPUT, outputChannel = Processor.OUTPUT) public Object transform(Date message) { return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message) + " Wuzz"; } // 監聽input通道
 @StreamListener(Processor.INPUT) public void receiveFromInput(String s) { logger.info("SendToSender Received: " + s); } }

  發送方通道配置:

spring.cloud.stream.bindings.input.destination = output-wuzz-data spring.cloud.stream.bindings.output.destination = input-wuzz-data

  啟動后的效果:

響應式編程::特別注意 input 跟 output,如果接收方接不到消息,請檢查這個是否寫反了。

  在 Spring Cloud Stream 中還支持使用基千於RxJava 的響應式編程來處理消息的輸入和輸出。 與 RxJava 的整合使用同樣很容易, 下面我們詳細看看如何使用 RxJava 實現上面消息。

  基於上述反饋中試驗的場景:

1.在消費者端添加依賴

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-rxjava</artifactId>
    <version>1.0.2.RELEASE</version>
</dependency>

2.改造接收消息:

@EnableRxJavaProcessor public class RxjavaSinkReceiverAnswer { private static Logger logger = LoggerFactory.getLogger(RxjavaSinkReceiverAnswer.class); @Bean public RxJavaProcessor<String, String> receive() { //data 就是接收方的消息
        return inputStream -> inputStream.map(data -> { logger.info("Received: " + data); return data; //返回發送端的消息
        }).map(data -> String.valueOf("From Input Channel Return - " + data)); } }

3.這里提醒一下大家,如果我們什么都不設置,由於傳輸的是  byte[] ,這里就直接報錯了。所以要設置contentType,但是按照書上說的在propertis里面配置我一直不起作用,所以我在發消息的時候設置了:

@EnableBinding(value = {Processor.class}) public class IntegrationSinkSenderAnswer { private static Logger logger = LoggerFactory.getLogger(IntegrationSinkSenderAnswer.class); @Bean @InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "2000")) public MessageSource<String> timerMessageSource() { Map map =new HashMap(); map.put("contentType","text/plain;charset=UTF-8"); return () -> new GenericMessage<>(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "RxJava",new MessageHeaders(map)); } // 監聽input通道
 @StreamListener(Processor.INPUT) public void receiveFromInput(String s) { logger.info("SendToSender Received: " + s); } }

4.按照我這個代碼,大家要時刻注意 input 跟output,本例子的配置如下:

#消費端 spring.cloud.stream.bindings.input.destination = input-wuzz-data spring.cloud.stream.bindings.output.destination = output-wuzz-data #發送端 spring.cloud.stream.bindings.input.destination = output-wuzz-data spring.cloud.stream.bindings.output.destination = input-wuzz-data

  除了實現上面的場景之外,通過利用 RxJava 的支持,我們還能輕易地實現消息的緩存聚合。 比如, 我們希望Appl在接收到5條消息之后才將處理結果返回給輸出通道, 那么只需通過下面的改進即可輕松實現這樣的場景:

@Bean public RxJavaProcessor<String, String> receive() { //data 就是接收方的消息
        return inputStream -> inputStream.map(data -> { logger.info("Received: " + data); return data; //返回發送端的消息
        }).buffer(5).map(data -> String.valueOf("From Input Channel Return - " + data)); }

  這樣子以后,消費端要累計消費5條消息才會相應,響應結果如下:

消息類型:

  Spring Cloud Stream為了讓開發者能夠在消息中聲明它的內容類型, 在輸出消息中定義了 一 個默認 的頭信息: content Type。對於那些不直接支持頭信息 的消息中間件,SpringClou d Stream提供了自己的實現機制,它會在消息發出前自動將消息 包裝進它自定義的消息封裝格式中, 並加入頭信息 。 而對於那些 自身就支待頭信息 的消息中間件, SpringCloudStream構建的服務可以接收並處理來自非SpringCloud Stream構建但 包含符合規范頭信息的應用程序發出的消息 。Spring Clou d Stream允許使用 spring.cloud.strearn.bindngs.<channelNarne>.content-type屬性以聲明式的配置方式為綁定的輸入和輸出通道設置消息 內容的類型。此外, 原生的消息類型轉換器依然可以輕松地用於我們的應用程序 。 目前, Spring CloudStream中自帶支持 了以下幾種常用的消息類型轉換。

  • JSON與POJO的互相轉換。
  • JSON與org.springfrarnework.tuple.Tuple的互相轉換。
  • Object與byte[]的互相轉換。 為 了實現遠程傳輸序列化 的原始字節, 應用程序需要發送byte類型的數據, 或是 通過實現Jav a的序列化接口來轉換為字節(Object對象必須可序列化)。
  • String與byte[]的互相轉換。
  • Object向純文本的 轉換: Object需要實現toString()方法。

  上面所指的 JSON類型可以 表現為一 個 byte類型的數組,也可以是 一 個包含有效JSON內容的字符串。 另外, Object對象可以由JSON、 byte數組或者字符串轉換而來,但是在 轉換為JSON的時候總是以字符串的形式返回。

MIME類型:配置方式可以參考以上響應式編程的Demo里的發送方。

  在SpringClou d Stream中定義的 content-type屬性采用了 Media Type, 即InternetMediaType (互聯網媒體類型), 也被稱為MIME類型, 常見的有 application/json、text/plain;charset=UTF-8, 相信接觸過HTTP的工程師們對這些類型都不會感到陌生。MIME類型對於標示如何轉換為 String或byte[]非常有用。 並且, 我們還可以使用MIME 類型格式來表示Jav a類型, 只需要使用帶有類型參數的一般類型 :application/x-java-object。 比如, 我 們可 以 使用 application/x-java­object;type = java.util.Map來表示傳輸的是一 個java.util.Map對象 , 或是使用application/x-java-object;type = com.wuzz.User 來表示傳輸的是 一 個com.wuzz.User對象;除此之外,更重要的是,它還提供了自定義的MIME類型,比如通過 application/x-spring-tuple來指定Spring的 Tuple類型。在Spring Cloud Stream中 默認提供了 一 些可以開箱即用的類型轉換器, 具體 如下表所示。

  消息類型的轉換行為只會在需要進行轉換時才被執行, 比如, 當服務模塊產生了一 個頭信息為application/json的XML字符串消息,Spring Cloud Stream是不會將該XML字符串轉換為JSON的, 這是因為該模塊的輸出內容已經是 一 個字符串類型了, 所以它並不會將其做進一 步的轉換。

  另外需要注意的是, Spring Cloud Stream雖然同時支待輸入通道和輸出通道的消息類型轉換, 但還是推薦開發者盡量在輸出通道中做消息轉換。 因為對千輸入通道的消費者來說, 當目標是 一 個POJO的時候, 使用@StreamListener注解是能夠支待自動對其進行轉換的。

  Spring Cloud Stream除了提供上面這些開箱即用的轉換器之外, 還支持開發者自定義的消息轉換器。 這使得我們可以使用任意格式(包括二進制)的數據進行發送和接收, 並且將這些數據與特定的contentType相關聯。 在應用啟用的時候, Spring Cloud Stream會將所有org.springframework. messaging.converter.MessageConverter接口實現的自定義轉換器以及默認實現的那些轉換器都加載到消息轉換工廠中, 以提供給消息處理時使用。

消費組與消息分區:

  消費組:

  通常每個服務都不會以單節點的方式運行在生產環境中, 當同一 個服務啟動多個實例的時候, 這些實例會綁定到同一 個消息通道的目標主題上。 默認情況下,當生產者發出一條消息到綁定通道上, 這條消息會產生多個副本被每個消費者實例接收和處理。 但是在有些業務場景之下, 我們希望生產者產生的消息只被其中一 個實例消費, 這個時候就需要為這些消費者設置消費組來實現這樣的功能。 實現的方式非常簡單, 只需在服務消費者端設置 spring.cloud.stream.bindings.input.group 屬性即可,比如可以像下面這樣實現。

spring.cloud.stream.bindings.input.destination=input-wuzz-data spring.cloud.stream.bindings.output.destination=output-wuzz-data spring.cloud.stream.bindings.input.group=Service-A

  服務端也配置上如下信息:

spring.cloud.stream.bindings.input.destination = input-wuzz-data spring.cloud.stream.bindings.output.destination = output-wuzz-data

  到這里, 對千消費分組的示例就完成了。分別運行上面實現的生產者與消費者, 其中消費者我們啟動多個實例。通過控制台, 可以發現, 每個生產者發出的消息會被啟動的消費者以輪詢的方式進行接收和輸出 。

  消息分區:

  通過消費組的設置, 雖然我們已經能夠在多實例環境下, 保證同 一 消息只被 一 個消費者 實例進行接收和處理, 但是, 對於一 些特殊場景,除了要保證單 一 實例消費之外, 還希望那些具備相同特征的消息都能夠被同 一 個實例進行消費。 這時候我們就需要對 消息進行分區處理 。

  接收方的配置:

spring.cloud.stream.bindings.input.destination=input-wuzz-data spring.cloud.stream.bindings.output.destination=output-wuzz-data spring.cloud.stream.bindings.input.group=Service-A #通過該參數開啟消費者分區功能。 spring.cloud.stream.bindings.input.consumer.partitioned= true #該參數指定了當前消費者的總實例數量。 spring.cloud.stream.instance-count = 2 #該參數設置當前實例的索引號, 從0開始, # 最大值為 spring.cloud.stream.instanceCount參數-1。 # 試驗的時候需要啟動多個實例, 可以通過運行參數來為不同實例設置不同的索引值。 spring.cloud.stream.instance-index= 0

  發送方的配置

spring.cloud.stream.bindings.input.destination = input-wuzz-data spring.cloud.stream.bindings.output.destination = output-wuzz-data #分區 #通過該參數指定了分區鍵的表達式規則,我們可以根據實際的輸出 #消息規則配置 SpEL 來生成合適的分區鍵。 spring.cloud.stream.bindings.input.producer.partitionKeyExpression = "partitionKey" #該參數指定了消息分區的數量。 spring.cloud.stream.bindings.input.producer.partitionCount = 2

  發送方的代碼里要在header加上partitionKey ,另外特別注意 input 跟 output,如果接收方接不到消息,請檢查這個是否寫反了。

@Bean @InboundChannelAdapter(value = SinkOutput.OUTPUT, poller = @Poller(fixedDelay = "2000")) public MessageSource<CustomerMessage> timerMessageSource() { CustomerMessage customerMessage = new CustomerMessage(); customerMessage.setId("111"); customerMessage.setBody("hello"); Message<CustomerMessage> partitionKey = MessageBuilder.withPayload(customerMessage).setHeader("partitionKey", 1).build(); return () -> partitionKey; }

RabbitMQ與Kafka綁定器:

  Spring Cloud Stream自身就提供了對RabbitMQ和Kafka的綁定器實現。 由於RabbitMQ 和 Kafka自身的實現結構有所不同 , 理解綁定器實現與消息中間件自有概念之間的對應關系,對於正確使用綁定器和消息中間件會有非常大的幫助。下面就來分別說說RabbitMQ與 Kafka的綁定器是如何使用消息中間件中不同概念來實現消息的生產與消費的。

  RabbitMQ綁定器:在 RabbitMQ中, 通過Exchange交換器來實現 Spring CloudStream的主題概念,所以消息通道的輸入輸出目標映射了 一 個具體的Exchange交換器。 而對於每個消費組, 則會為對t應的Exchange交換器綁定 一 個Queue隊列進行消息收發。依賴是:spring-cloud-stream-binder-rabbit

  Kafka綁定器: 由於Kafka自身就有Topic概念, 所以 Spring Cloud Stream的主題直接采用了Kafka的Topic 主題概念, 每個消費組的通道目標都會直接連接Kafka的主題進行消息收發。依賴是:spring-cloud-stream-binder-kafka

  我們這個Demo里面使用的是 RabiitMQ綁定器。至於Kafka的相關也是類似的。就不詳細說明了。

配置詳解:

  在Spring Cloud Stream中對綁定通道和綁定器提供了通用的屬性配置項, 一些綁定器還允許使用附加屬性來對消息中間件的 一 些獨有特性進行配置。 這些屬性的配置可以通過Spring Boot支持的任何配置方式來進行, 包括使用環境變量、 YAML或者properties 配置文件等。

  詳細的配置可以參考 spring cloud 微服務實戰一書。有疑問的歡迎隨時留言討論。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM