【進階技術】一篇文章搞掂:Spring Cloud Stream


本文總結自官方文檔http://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.0.RC3/single/spring-cloud-stream.html

一、Spring的數據集成簡史

二、一個最簡單的實例

三、2.0版本的新特性

四、Spring Cloud Stream介紹

Spring Cloud Steam內容簡介:

  • 一個框架,用於構建消息驅動的微服務應用程序;
  • 構建在SpringBoot之上;
  • 使用Spring Integration提供與消息代理的連接;
  • 提供了幾個不同供應商的中間件的opinionated配置;
  • 引入了持久發布-訂閱語義、使用者組和分區的概念。

接收消息:

使用@EnableBinding注釋,可以連接到消息代理服務器。

使用@StreamListener添加到方法中,可以接收用於流處理的事件。

下面的示例顯示接收外部消息的接收器應用程序:

@SpringBootApplication
@EnableBinding(Sink.class)
public class VoteRecordingSinkApplication {

  public static void main(String[] args) {
    SpringApplication.run(VoteRecordingSinkApplication.class, args);
  }

  @StreamListener(Sink.INPUT)
  public void processVote(Vote vote) {
      votingService.recordVote(vote);
  }
}

通道接口:

@EnableBinding注釋將一個或多個接口作為參數(在本例中,參數是單個Sink接口)。

接口聲明輸入和輸出通道。Spring Cloud Stream提供了Source、Sink和Processor接口。

您還可以定義自己的接口。

下面的清單顯示了Sink接口的定義:

public interface Sink {
  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();
}

@Input注釋標識了一個輸入通道,用於接收消息,使消息能進入應用程序。

@Output注釋標識了一個輸出通道,通過它發布的消息可以離開應用程序。

@Input和@Output注釋可以將通道名稱作為參數。如果沒有提供名稱,則使用注釋方法的方法名稱。

Spring Cloud Stream為您創建了接口的實現。您可以通過自動裝配來使用它,如下面的示例所示(來自測試用例):

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = VoteRecordingSinkApplication.class)
@WebAppConfiguration
@DirtiesContext
public class StreamApplicationTests {

  @Autowired
  private Sink sink;

  @Test
  public void contextLoads() {
    assertNotNull(this.sink.input());
  }
}

5、主要概念

Spring Cloud Stream通過一些術語和抽象來簡化了消息驅動程序的編寫。

5.1、程序模型

Spring Cloud Stream的核心與中間件實現無關。

Stream應用通過輸入輸出通道(channel)來與外界交互。

通道(channel)通過與外部中間件對應的綁定器(Binder)具體實現,來與外部的中間件產品進行通信。

5.2、綁定器(Binder)抽象

Spring Cloud Stream提供了kafka、RabbitMQ對應的Binder實現、也包含一個TestSupportBinder用於測試,也可以編寫自己的Binder

Spring Cloud Stream使用Spring Boot配置以及Binder抽象,使之可以靈活地配置如何連接到一個消息中間件。例如,可以在部署時設置連接到哪種類型,或者哪個消息中間件。

這些配置可以通過外部設置或者任何Spring Boot支持的配置方式(如程序參數、環境變量、yml文件、properties文件)進行

Spring Cloud Stream會根據classpath下的依賴自動選擇binder,你也可以包含多個binder,在運行時決定使用哪個,甚至根據不同的通道使用不同的binder

5.3、發布訂閱的持久化支持

應用程序之間的通信遵循發布-訂閱模型,其中數據通過共享主題廣播。在下圖中可以看到這一點,它顯示了一組交互的Spring Cloud Stream應用程序的典型部署。

SCSt sensors

Spring Cloud Stream發布訂閱模式

傳感器的數據報告到一個HTTP端點,然后被送到一個共同的destination(目的地):raw-sensor-data。

從這個destination(目的地)開始,有2個微服務訂閱了raw-sensor-data這個主題,一個負責計算窗口平均值,一個將原始數據導入HDFS(Hadoop Distributed File System)。

發布-訂閱通信模型降低了生產者和使用者的復雜性,並允許在不中斷現有流的情況下將新的應用程序添加到拓撲中。例如,在計算平均值的應用程序的下游,您可以添加計算顯示和監視的最高溫度值的應用程序。然后,您可以添加另一個應用程序來解釋用於故障檢測的相同平均值流。通過共享主題(而不是點對點隊列)進行所有通信可以減少微服務之間的耦合。

雖然發布-訂閱消息傳遞的概念並不新鮮,Spring Cloud Stream采取了額外的步驟,使其成為其應用程序模型的一個opinionated choice。通過使用本地中間件支持,Spring Cloud Stream還簡化了跨不同平台的發布-訂閱模型的使用。

5.4、消費者組

為了提升程序的處理能力,我們部署時會創建多個實例的應用程序;而此時,不同實例對於消息是互相競爭的關系,只需要有其中一個實例來對消息進行消費即可。

Spring Cloud Stream通過消費者組的概念對這種行為進行建模。(Spring Cloud Stream消費者組與卡夫卡消費者組相似並受到其啟發。)

每個消費者binding都可以使用spring.cloud.stream.bindings.<channelName>.group屬性指定組名。

對於如下圖所示的使用者,此屬性將設置為spring.cloud.stream.bindings.<channelName>.group=hdfsWrite或spring.cloud.stream.bindings.<channelName>.group=average.SCSt groups

訂閱給定目標的所有組都會收到已發布數據的副本,但每個組中只有一個成員從該目的地接收到給定的消息。

默認情況下,當未指定組時,Spring Cloud Stream會把應用程序放到一個匿名的、獨立的、只有一個成員的消費者組中,然后和其它消費者組放在一起維護。

5.5、消費者類型

支持2種消費者類型:

  • Message-driven (sometimes referred to as Asynchronous)
  • Polled (sometimes referred to as Synchronous)

2.0前,只支持異步消費者;A message is delivered as soon as it is available and a thread is available to process it.

如果想控制消息處理的速率,可以使用同步消費者;

持久化:

與Spring Cloud Stream程序模型一致,消費者組是持久化的。

Binder實現保證了組訂閱是持久的,即使消息是在消費者都停止的狀態下發送的,只要消費者組創建了一個訂閱者,這個組就開始接收數據。

!!匿名訂閱本質上是不持久的。對於某些綁定器實現(如RabbitMQ),有可能具有非持久的組訂閱。

通常,在將應用程序綁定到給定目標時,最好始終使用消費者組。在擴展Spring Cloud Stream應用程序時,必須為每個輸入綁定指定一個使用者組。這樣做可以防止應用程序的實例接收重復的消息(除非需要這種行為,這是不尋常的)。

5.6、分區支持

Spring Cloud Stream支持在給定應用程序的多個實例之間划分數據。在分區場景中,物理通信介質(例如the broker topic)被視為被構造成多個分區。

一個或多個生產者應用實例向多個使用者應用實例發送數據,並確保由公共特征標識的數據由同一個使用者實例處理。

Spring Cloud Stream為統一實現分區處理提供了一個公共抽象。因此,無論代理本身是否是自然分區的(例如Kafka),都可以使用分區(例如RabbitMQ)。

SCSt partitioning

 Spring Cloud Stream分區

分區是有狀態處理中的一個關鍵概念,在這種情況下,確保所有相關數據一起處理是至關重要的(無論是出於性能還是一致性的原因)。

例如,在時間加窗的平均計算示例中,來自任何給定傳感器的所有測量都由同一個應用實例處理是很重要的。

若要設置分區處理方案,必須同時配置數據生成端和數據消耗端。

六、編程模型

核心概念:

  • Destination Binders:負責集成外部消息隊列系統的組件。
  • Destination Bindings:由綁定器創建的,連接外部消息隊列系統和提供信息生產者或消費者的應用程序的橋梁。
  • Message:生產者和消費者使用的規范數據結構,用於與Destination Binders(以及通過外部消息傳遞系統的其他應用程序)通信。

6.1、Destination Binders

Destination Binders是Spring Cloud Stream的擴展組件,為實現與外部消息系統集成,提供必要的配置和實現。這種集成負責消息與生產者和使用者之間的連接、委托和路由、數據類型轉換、用戶代碼的調用等等。

Binders處理了很多重要的事情,但是有些地方需要人為幫助,通常是以配置的方式來實現,在本文其余部分會詳細介紹這些選項。

6.2、Destination Bindings

如前所述,Destination Bindings在連接外部消息隊列系統和提供信息生產者或消費者的應用程序的橋梁。

將@EnableBinding注釋應用於應用程序的一個配置類,可以定義一個binding。@EnableBinding注釋本身使用@Configuration進行元注釋,並觸發SpringCloudStream基礎結構的配置。

下面的示例顯示了一個完全配置和功能良好的Spring Cloud Stream應用程序,該應用程序以字符串類型從輸入目標接收消息的有效負載(請參見第9章內容類型協商部分),將其記錄到控制台,並在將其轉換為大寫后發送到輸出目的地。

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyApplication {

    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String handle(String value) {
        System.out.println("Received: " + value);
        return value.toUpperCase();
    }
}

如您所見,@EnableBinding注釋可以使用一個或多個接口類作為參數。這些參數稱為綁定,它們包含表示可綁定組件的方法。這些組件通常是基於信道的綁定器(如Ribbit、Kafka和其他)的消息通道(參見Spring消息傳遞)。然而,其他類型的綁定可以提供對相應技術的本機特性的支持。例如,Kafka流綁定器(以前稱為KStream)允許直接綁定到Kafka流(有關更多細節,請參見Kafka流)。

SpringCloudStream已經為典型的消息交換契約提供了綁定接口,其中包括:

  • Sink:提供消息消費的目的地,是消息的消費者
  • Source:提供消息發布的目的地,是消息的生產者
  • Processor:即使消費者,也是生產者
public interface Sink {
  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();
}
public interface Source {
  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();
}
public interface Processor extends Source, Sink {}

雖然前面的示例滿足了大多數情況,但您也可以通過定義自己的綁定接口來定義自己的契約,並使用@Input和@Output注釋來標識實際的可綁定組件。

public interface Barista {

    @Input
    SubscribableChannel orders();

    @Output
    MessageChannel hotDrinks();

    @Output
    MessageChannel coldDrinks();
}

使用前面示例中顯示的接口作為@EnableBinding參數,將觸發分別名為Orders、HotDrinks和ColdDrinks的三個綁定通道的創建。您可以根據需要為@EnableBinding注釋提供任意數量的綁定接口,如下面的示例所示:

@EnableBinding(value = { Orders.class, Payment.class })

在SpringCloudStream中,可用的通道綁定組件包括:Spring Messaging的Message Channel(用於出站)及其擴展,Subscribable Channel(用於入站)。

Pollable Destination Binding

雖然前面描述的綁定支持基於事件的消息消耗,但有時需要更多的控制,例如消耗率。

從版本2.0開始,您現在可以綁定pollable consumer:

下面的示例演示如何綁定pollable consumer:

public interface PolledBarista {

    @Input
    PollableMessageSource orders();
    . . .
}

在這種情況下,一個PollableMessageSource實現綁定到訂單通道。See Section 6.3.5, “Using Polled Consumers” for more details.

自定義通道名稱

通過使用@Input和@Output注釋,可以為通道指定自定義的通道名稱,如下面的示例所示:

public interface Barista {
    @Input("inboundOrders")
    SubscribableChannel orders();
}

Normally, you need not access individual channels or bindings directly (other then configuring them via @EnableBinding annotation). However there may be times, such as testing or other corner cases, when you do.

Aside from generating channels for each binding and registering them as Spring beans, for each bound interface, Spring Cloud Stream generates a bean that implements the interface. That means you can have access to the interfaces representing the bindings or individual channels by auto-wiring either in your application, as shown in the following two examples:

在這個示例中,創建的綁定通道被命名為inboundOrders。

通常,您不需要直接訪問單個通道或綁定(然后通過@EnableBinding注釋配置它們)。

然而,也有一些時候,例如測試或其他特殊情況,你可能需要這么做。

除了為每個綁定生成通道並將它們注冊為Spring Bean之外,Spring Cloud Stream為每個綁定接口生成一個實現該接口的bean。這意味着您可以通過應用程序中的自動裝配表示綁定或單個通道的接口,如以下兩個示例所示:

自動裝配Binding接口

@Autowire
private Source source

public void sayHello(String name) {
    source.output().send(MessageBuilder.withPayload(name).build());
}

自動裝配一個通道

@Autowire
private MessageChannel output;

public void sayHello(String name) {
    output.send(MessageBuilder.withPayload(name).build());
}

您還可以使用標准Spring的@Qualifier注釋,用於自定義信道名稱或需要指定通道的多通道場景中的情況。

@Autowire
@Qualifier("myChannel")
private MessageChannel output;

6.3、生產和消費消息

2種使用方式:

  • Spring Integration注解
  • Spring Cloud注解

6.3.1、Spring Integration注解用法:

Spring Cloud Stream的基礎:企業集成模式定義的概念和模式

Spring Cloud Stream的內部實現:依賴於Spring Integration框架

所以Stream支持Spring Integration已經建立的基礎、語義和配置選項

例如:可以通過@InboundChannelAdapter注解獲取到一個Source或MessageSource的輸出通道:

@EnableBinding(Source.class)
public class TimerSource {

  @Bean
  @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
  public MessageSource<String> timerMessageSource() {
    return () -> new GenericMessage<>("Hello Spring Cloud Stream");
  }
}

類似:可以通過@Transformer或@ServiceActivator,提供一個對來自Processor binding的消息的處理實現:

 

@EnableBinding(Processor.class)
public class TransformProcessor {
  @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
  public Object transform(String message) {
    return message.toUpperCase();
  }
}

注意一點:

使用Spring Cloud Stream中的@StreamListener注釋綁定同一個綁定時,使用的是發布訂閱模型,所以每一個使用了@StreamListener注釋的方法,都會接收到一份消息;

而使用Spring Integration中的注解 (such as @Aggregator, @Transformer, or @ServiceActivator)時,使用的是競爭模型,只會有一個消費者得到消息;而且,不會為每一個消費者創建單獨的使用者組。

6.3.2、使用Spring Cloud Stream的注解@StreamListener

作為對Spring Integration框架的補充,SpringCloudStream提供了自己的@StreamListener注解,該注釋借鑒了其他Spring消息注解(@Messagemap、@JmsListener、@RabbitListener等),並提供了方便性,如基於內容的路由等。

@EnableBinding(Sink.class)
public class VoteHandler {

  @Autowired
  VotingService votingService;

  @StreamListener(Sink.INPUT)
  public void handle(Vote vote) {
    votingService.record(vote);
  }
}

與其他Spring消息傳遞方法一樣,方法參數可以使用@Payload、@Header和@Header進行注釋。對於返回數據的方法,必須使用@SendTo注釋為該方法返回的數據指定輸出綁定目的地,如以下示例所示:

@EnableBinding(Processor.class)
public class TransformProcessor {

  @Autowired
  VotingService votingService;

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public VoteResult handle(Vote vote) {
    return votingService.record(vote);
  }
}

6.3.3、@StreamListener用於基於內容的路由

Spring Cloud Stream支持根據conditions向多個帶有@StreamListener注釋的處理程序方法分發消息。

為了支持條件分派,方法必須滿足以下條件:

    • 不能有返回值。
    • 它必須是an individual message handling method(不支持reactive API methods)。

條件由注釋的條件參數中的Spel表達式指定,並對每條消息進行計算。匹配條件的所有處理程序都是在同一個線程中調用的,不需要對調用的順序作出任何假設。

在具有調度條件的@StreamListener的下面示例中,所有帶有值bogey的header type的消息都被分派到RecveBogey方法,而帶有值Bacall的header type的所有消息都被分派到ReceiveBacall方法。

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
    public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'")
    public void receiveBacall(@Payload BacallPojo bacallPojo) {
       // handle the message
    }
}

 Content Type Negotiation in the Context of condition

條件語境下的內容類型協商

It is important to understand some of the mechanics behind content-based routing using the condition argument of @StreamListener, especially in the context of the type of the message as a whole. It may also help if you familiarize yourself with the Chapter 9, Content Type Negotiation before you proceed.

Consider the following scenario:

理解使用@StreamListener的條件參數l來進行基於內容的路由背后的一些機制是很重要的,特別是在整個消息類型的上下文中。

如果您熟悉了第9章,內容類型協商,這也會有幫助。

考慮以下情況:

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class CatsAndDogs {

    @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Dog'")
    public void bark(Dog dog) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Cat'")
    public void purr(Cat cat) {
       // handle the message
    }
}

上面的代碼是完全有效的。它編譯和部署沒有任何問題,但它永遠不會產生您期望的結果。

這是因為您正在測試一些尚未以您期望的狀態存在的東西。

這是因為消息的有效負載尚未傳輸類型轉換成所需類型。換句話說,它還沒有經歷第9章“內容類型協商”中描述的類型轉換過程。

因此,除非您使用計算原始數據的Spel表達式(例如,字節數組中第一個字節的值),否則使用基於消息頭的表達式(例如,condition = "headers['type']=='dog'")。

 At the moment, dispatching through @StreamListener conditions is supported only for channel-based binders (not for reactive programming) support.

6.3.4 Spring Cloud Function support

自SpringCloudStreamv2.1以來,定義stream handlers and sources的另一個替代方法是使用對 Spring Cloud Function 的內置支持,其中可以將它們表示為beans of type java.util.function.[Supplier/Function/Consumer].

To specify which functional bean to bind to the external destination(s) exposed by the bindings, you must provide spring.cloud.stream.function.definitionproperty.

Here is the example of the Processor application exposing message handler as java.util.function.Function

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyFunctionBootApp {

    public static void main(String[] args) {
        SpringApplication.run(MyFunctionBootApp.class, "--spring.cloud.stream.function.definition=toUpperCase");
    }

    @Bean
    public Function<String, String> toUpperCase() {
        return s -> s.toUpperCase();
    }
}

In the above you we simply define a bean of type java.util.function.Function called toUpperCase and identify it as a bean to be used as message handler whose 'input' and 'output' must be bound to the external destinations exposed by the Processor binding.

Below are the examples of simple functional applications to support Source, Processor and Sink.

Here is the example of a Source application defined as java.util.function.Supplier

在上面,我們只定義一個java.util.Function.Function類型的bean,函數名稱為toUpperCase,並將其標識為用作消息處理器,其“輸入”和“輸出”必須綁定到Processor binding暴露的外部目的地。

下面是支持Source、Processor和Sink的簡單功能應用程序的示例。

下面是定義為java.util.Function.Supplier的Source應用程序的示例

@SpringBootApplication
@EnableBinding(Source.class)
public static class SourceFromSupplier {
    public static void main(String[] args) {
        SpringApplication.run(SourceFromSupplier.class, "--spring.cloud.stream.function.definition=date");
    }
    @Bean
    public Supplier<Date> date() {
        return () -> new Date(12345L);
    }
}

 Here is the example of a Processor application defined as java.util.function.Function

@SpringBootApplication
@EnableBinding(Processor.class)
public static class ProcessorFromFunction {
    public static void main(String[] args) {
        SpringApplication.run(ProcessorFromFunction.class, "--spring.cloud.stream.function.definition=toUpperCase");
    }
    @Bean
    public Function<String, String> toUpperCase() {
        return s -> s.toUpperCase();
    }
}

Here is the example of a Sink application defined as java.util.function.Consumer

@EnableAutoConfiguration
@EnableBinding(Sink.class)
public static class SinkFromConsumer {
    public static void main(String[] args) {
        SpringApplication.run(SinkFromConsumer.class, "--spring.cloud.stream.function.definition=sink");
    }
    @Bean
    public Consumer<String> sink() {
        return System.out::println;
    }
}

 

Functional Composition

使用此編程模型,您還可以從函數組合中受益,在這種情況下,您可以從一組簡單的函數中動態地組合復雜的處理程序。作為一個示例,讓我們將下面的函數bean添加到上面定義的應用程序中

@Bean
public Function<String, String> wrapInQuotes() {
    return s -> "\"" + s + "\"";
}

and modify the spring.cloud.stream.function.definition property to reflect your intention to compose a new function from both ‘toUpperCase’ and ‘wrapInQuotes’. To do that Spring Cloud Function allows you to use | (pipe) symbol. So to finish our example our property will now look like this:

—spring.cloud.stream.function.definition=toUpperCase|wrapInQuotes

6.3.5 Using Polled Consumers

6.4 Error Handling

6.5 Reactive Programming Support

7. Binders

SpringCloudStream提供了一個Binder抽象,用於連接外部中間件。

本節提供有關BinderSPI背后的主要概念、其主要組件和具體實現細節的信息。

7.1 Producers and Consumers

下圖顯示生產者與消費者之間的一般關系:

A producer is any component that sends messages to a channel. The channel can be bound to an external message broker with a Binder implementation for that broker. When invoking the bindProducer() method, the first parameter is the name of the destination within the broker, the second parameter is the local channel instance to which the producer sends messages, and the third parameter contains properties (such as a partition key expression) to be used within the adapter that is created for that channel.

生產者:是向channel發送消息的任何組件。

channel :使用一個對應消息代理的Binder實現綁定到一個外部消息代理。

當調用bindProducer()方法時,第一個參數是代理中destination 的名稱,第二個參數是生產者發送消息的local channel instance,第三個參數包含要在為該通道創建的適配器中使用的屬性(例如分區鍵表達式)。

A consumer is any component that receives messages from a channel. As with a producer, the consumer’s channel can be bound to an external message broker. When invoking the bindConsumer() method, the first parameter is the destination name, and a second parameter provides the name of a logical group of consumers. Each group that is represented by consumer bindings for a given destination receives a copy of each message that a producer sends to that destination (that is, it follows normal publish-subscribe semantics). If there are multiple consumer instances bound with the same group name, then messages are load-balanced across those consumer instances so that each message sent by a producer is consumed by only a single consumer instance within each group (that is, it follows normal queueing semantics).

 使用者是從通道接收消息的任何組件。

與生產者一樣,使用者的通道可以綁定到外部消息代理。

當調用bindConsumer()方法時,第一個參數是destination 名稱,第二個參數提供使用者組的邏輯名稱。由給定目標的使用者綁定表示的每個組接收生產者發送到該目標的每個消息的副本(也就是說,它遵循正常的發布-訂閱語義)。如果有多個以同一個組名綁定的使用者實例,那么這些使用者實例之間的消息是負載均衡的,這樣由生產者發送的每條消息只被每個組中的單個使用者實例所使用(也就是說,它遵循正常的排隊語義)。

7.2 Binder SPI

BinderSPI由許多接口、開箱即用的實用工具類和提供連接到外部中間件的可插拔機制的發現策略組成。

SPI的關鍵點是Binder接口,它是一種將輸入和輸出連接到外部中間件的策略。

下面的清單顯示了Binder接口的定義:

 

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
    Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);

    Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}

 

 

The interface is parameterized, offering a number of extension points:

  • Input and output bind targets. As of version 1.0, only MessageChannel is supported, but this is intended to be used as an extension point in the future.
  • Extended consumer and producer properties, allowing specific Binder implementations to add supplemental properties that can be supported in a type-safe manner.

A typical binder implementation consists of the following:

  • A class that implements the Binder interface;
  • A Spring @Configuration class that creates a bean of type Binder along with the middleware connection infrastructure.
  • META-INF/spring.binders file found on the classpath containing one or more binder definitions, as shown in the following example:

 接口是參數化的,提供了許多擴展點:

  • 輸入和輸出綁定目標。從1.0版開始,只支持MessageChannel,但這將在將來用作擴展點。
  • 擴展使用者和生產者屬性,允許特定的Binder實現,去添加類型安全的補充屬性。

典型的綁定器實現包括以下內容:

  • 實現Binder接口的類;
  • 創建Binder類型bean的Spring@Configuration類以及中間件連接基礎設施。
  • 在類路徑上找到一個meta-INF/Spring.binders文件,其中包含一個或多個綁定器定義,如下面的示例所示:
kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration

 

 

7.3 Binder Detection

SpringCloudStream依賴於BinderSPI的實現來執行將channels 連接到消息代理的任務。每個Binder實現通常連接到一種類型的消息傳遞系統。

7.3.1 Classpath Detection

默認情況下,SpringCloudStream依賴SpringBoot的自動配置來配置綁定過程。如果在類路徑上找到單個Binder實現,SpringCloudStream將自動使用它。例如,旨在綁定到RabbitMQ的SpringCloudStream項目可以添加以下依賴項:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

有關其他綁定器依賴項的特定Maven坐標,請參見該綁定器實現的文檔。

7.4 Multiple Binders on the Classpath

當類路徑上存在多個綁定器時,應用程序必須指示要為每個通道綁定使用哪個綁定器。每個綁定器配置都包含一個meta-INF/Spring.binders文件,它是一個簡單的屬性文件,如下面的示例所示:

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration

Similar files exist for the other provided binder implementations (such as Kafka), and custom binder implementations are expected to provide them as well. The key represents an identifying name for the binder implementation, whereas the value is a comma-separated list of configuration classes that each contain one and only one bean definition of type org.springframework.cloud.stream.binder.Binder.

Binder selection can either be performed globally, using the spring.cloud.stream.defaultBinder property (for example, spring.cloud.stream.defaultBinder=rabbit) or individually, by configuring the binder on each channel binding. For instance, a processor application (that has channels named input and output for read and write respectively) that reads from Kafka and writes to RabbitMQ can specify the following configuration:

spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.binder=rabbit

7.5 Connecting to Multiple Systems

7.6 Binding visualization and control
綁定可視化和控制,可以通過端點來查看,或者暫停通道

7.7 Binder Configuration Properties


 當自定義綁定器配置時,下列屬性可用。這些通過org.springframework.cloud.stream.config.BinderProperties公開的屬性必須以spring.cloud.stream.binders.<configurationName>.作為前綴

type

The binder type. It typically references one of the binders found on the classpath — in particular, a key in a META-INF/spring.binders file.

By default, it has the same value as the configuration name.

inheritEnvironment

Whether the configuration inherits the environment of the application itself.

Default: true.

environment

Root for a set of properties that can be used to customize the environment of the binder. When this property is set, the context in which the binder is being created is not a child of the application context. This setting allows for complete separation between the binder components and the application components.

Default: empty.

defaultCandidate

Whether the binder configuration is a candidate for being considered a default binder or can be used only when explicitly referenced. This setting allows adding binder configurations without interfering with the default processing.

Default: true.

 

8. Configuration Options

一些binders有額外的binding 屬性支持特定中間件的特性。

可以通過SpringBoot支持的任何機制向SpringCloudStream應用程序提供配置選項。

這包括應用程序參數、環境變量和YAML或.properties文件。

8.1 Binding Service Properties

 

These properties are exposed via org.springframework.cloud.stream.config.BindingServiceProperties

spring.cloud.stream.instanceCount

The number of deployed instances of an application. Must be set for partitioning on the producer side. Must be set on the consumer side when using RabbitMQ and with Kafka if autoRebalanceEnabled=false.

Default: 1.

spring.cloud.stream.instanceIndex
The instance index of the application: A number from  0 to  instanceCount - 1. Used for partitioning with RabbitMQ and with Kafka if  autoRebalanceEnabled=false. Automatically set in Cloud Foundry to match the application’s instance index.
spring.cloud.stream.dynamicDestinations

A list of destinations that can be bound dynamically (for example, in a dynamic routing scenario). If set, only listed destinations can be bound.

Default: empty (letting any destination be bound).

spring.cloud.stream.defaultBinder

The default binder to use, if multiple binders are configured. See Multiple Binders on the Classpath.

Default: empty.

spring.cloud.stream.overrideCloudConnectors

This property is only applicable when the cloud profile is active and Spring Cloud Connectors are provided with the application. If the property is false (the default), the binder detects a suitable bound service (for example, a RabbitMQ service bound in Cloud Foundry for the RabbitMQ binder) and uses it for creating connections (usually through Spring Cloud Connectors). When set to true, this property instructs binders to completely ignore the bound services and rely on Spring Boot properties (for example, relying on the spring.rabbitmq.* properties provided in the environment for the RabbitMQ binder). The typical usage of this property is to be nested in a customized environment when connecting to multiple systems.

Default: false.

spring.cloud.stream.bindingRetryInterval

The interval (in seconds) between retrying binding creation when, for example, the binder does not support late binding and the broker (for example, Apache Kafka) is down. Set it to zero to treat such conditions as fatal, preventing the application from starting.

Default: 30

8.2 Binding Properties

Binding properties are supplied by using the format of spring.cloud.stream.bindings.<channelName>.<property>=<value>. The <channelName> represents the name of the channel being configured (for example, output for a Source).

To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of spring.cloud.stream.default.<property>=<value>.

When it comes to avoiding repetitions for extended binding properties, this format should be used - spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>.

In what follows, we indicate where we have omitted the spring.cloud.stream.bindings.<channelName>. prefix and focus just on the property name, with the understanding that the prefix ise included at runtime.

8.2.1 Common Binding Properties

These properties are exposed via org.springframework.cloud.stream.config.BindingProperties

The following binding properties are available for both input and output bindings and must be prefixed with spring.cloud.stream.bindings.<channelName>. (for example, spring.cloud.stream.bindings.input.destination=ticktock).

Default values can be set by using the spring.cloud.stream.default prefix (for example`spring.cloud.stream.default.contentType=application/json`).

destination
The target destination of a channel on the bound middleware (for example, the RabbitMQ exchange or Kafka topic). If the channel is bound as a consumer, it could be bound to multiple destinations, and the destination names can be specified as comma-separated  String values. If not set, the channel name is used instead. The default value of this property cannot be overridden.
group

The consumer group of the channel. Applies only to inbound bindings. See Consumer Groups.

Default: null (indicating an anonymous consumer).

contentType

The content type of the channel. See Chapter 9, Content Type Negotiation”.

Default: application/json.

binder

The binder used by this binding. See Section 7.4, “Multiple Binders on the Classpath”” for details.

Default: null (the default binder is used, if it exists).

8.2.2 Consumer Properties

These properties are exposed via org.springframework.cloud.stream.binder.ConsumerProperties

The following binding properties are available for input bindings only and must be prefixed with spring.cloud.stream.bindings.<channelName>.consumer. (for example, spring.cloud.stream.bindings.input.consumer.concurrency=3).

Default values can be set by using the spring.cloud.stream.default.consumer prefix (for example, spring.cloud.stream.default.consumer.headerMode=none).

concurrency

The concurrency of the inbound consumer.

Default: 1.

partitioned

Whether the consumer receives data from a partitioned producer.

Default: false.

headerMode

When set to none, disables header parsing on input. Effective only for messaging middleware that does not support message headers natively and requires header embedding. This option is useful when consuming data from non-Spring Cloud Stream applications when native headers are not supported. When set to headers, it uses the middleware’s native header mechanism. When set to embeddedHeaders, it embeds headers into the message payload.

Default: depends on the binder implementation.

maxAttempts

If processing fails, the number of attempts to process the message (including the first). Set to 1 to disable retry.

Default: 3.

backOffInitialInterval

The backoff initial interval on retry.

Default: 1000.

backOffMaxInterval

The maximum backoff interval.

Default: 10000.

backOffMultiplier

The backoff multiplier.

Default: 2.0.

defaultRetryable

Whether exceptions thrown by the listener that are not listed in the retryableExceptions are retryable.

Default: true.

instanceIndex

When set to a value greater than equal to zero, it allows customizing the instance index of this consumer (if different from spring.cloud.stream.instanceIndex). When set to a negative value, it defaults to spring.cloud.stream.instanceIndex. See Section 11.2, “Instance Index and Instance Count”” for more information.

Default: -1.

instanceCount

When set to a value greater than equal to zero, it allows customizing the instance count of this consumer (if different from spring.cloud.stream.instanceCount). When set to a negative value, it defaults to spring.cloud.stream.instanceCount. See Section 11.2, “Instance Index and Instance Count”” for more information.

Default: -1.

retryableExceptions

A map of Throwable class names in the key and a boolean in the value. Specify those exceptions (and subclasses) that will or won’t be retried. Also see defaultRetriable. Example: spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false.

Default: empty.

useNativeDecoding

When set to true, the inbound message is deserialized directly by the client library, which must be configured correspondingly (for example, setting an appropriate Kafka producer value deserializer). When this configuration is being used, the inbound message unmarshalling is not based on the contentType of the binding. When native decoding is used, it is the responsibility of the producer to use an appropriate encoder (for example, the Kafka producer value serializer) to serialize the outbound message. Also, when native encoding and decoding is used, the headerMode=embeddedHeaders property is ignored and headers are not embedded in the message. See the producer property useNativeEncoding.

Default: false.

8.2.3 Producer Properties

These properties are exposed via org.springframework.cloud.stream.binder.ProducerProperties

The following binding properties are available for output bindings only and must be prefixed with spring.cloud.stream.bindings.<channelName>.producer. (for example, spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id).

Default values can be set by using the prefix spring.cloud.stream.default.producer (for example, spring.cloud.stream.default.producer.partitionKeyExpression=payload.id).

partitionKeyExpression

A SpEL expression that determines how to partition outbound data. If set, or if partitionKeyExtractorClass is set, outbound data on this channel is partitioned. partitionCount must be set to a value greater than 1 to be effective. Mutually exclusive with partitionKeyExtractorClass. See Section 5.6, “Partitioning Support””.

Default: null.

partitionKeyExtractorClass

PartitionKeyExtractorStrategy implementation. If set, or if partitionKeyExpression is set, outbound data on this channel is partitioned. partitionCount must be set to a value greater than 1 to be effective. Mutually exclusive with partitionKeyExpression. See Section 5.6, “Partitioning Support””.

Default: null.

partitionSelectorClass

PartitionSelectorStrategy implementation. Mutually exclusive with partitionSelectorExpression. If neither is set, the partition is selected as the hashCode(key) % partitionCount, where key is computed through either partitionKeyExpression or partitionKeyExtractorClass.

Default: null.

partitionSelectorExpression

A SpEL expression for customizing partition selection. Mutually exclusive with partitionSelectorClass. If neither is set, the partition is selected as the hashCode(key) % partitionCount, where key is computed through either partitionKeyExpression or partitionKeyExtractorClass.

Default: null.

partitionCount

The number of target partitions for the data, if partitioning is enabled. Must be set to a value greater than 1 if the producer is partitioned. On Kafka, it is interpreted as a hint. The larger of this and the partition count of the target topic is used instead.

Default: 1.

requiredGroups
A comma-separated list of groups to which the producer must ensure message delivery even if they start after it has been created (for example, by pre-creating durable queues in RabbitMQ).
headerMode

When set to none, it disables header embedding on output. It is effective only for messaging middleware that does not support message headers natively and requires header embedding. This option is useful when producing data for non-Spring Cloud Stream applications when native headers are not supported. When set to headers, it uses the middleware’s native header mechanism. When set to embeddedHeaders, it embeds headers into the message payload.

Default: Depends on the binder implementation.

useNativeEncoding

When set to true, the outbound message is serialized directly by the client library, which must be configured correspondingly (for example, setting an appropriate Kafka producer value serializer). When this configuration is being used, the outbound message marshalling is not based on the contentType of the binding. When native encoding is used, it is the responsibility of the consumer to use an appropriate decoder (for example, the Kafka consumer value de-serializer) to deserialize the inbound message. Also, when native encoding and decoding is used, the headerMode=embeddedHeaders property is ignored and headers are not embedded in the message. See the consumer property useNativeDecoding.

Default: false.

errorChannelEnabled

When set to true, if the binder supports asynchroous send results, send failures are sent to an error channel for the destination. See Section 6.4, “Error Handling” for more information.

Default: false.

 

8.3 Using Dynamically Bound Destinations

 

9. Content Type Negotiation

10. Schema Evolution Support

11. Inter-Application Communication

12. Testing

13. Health Indicator

 

14. Metrics Emitter

15. Samples

 

 

Spring Cloud Stream 中 RabbitMQ Binder實現

1、使用

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

 

或者

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. RabbitMQ Binder Overview

下圖展示了RabbitMQ綁定器(Binder)實現的操作方式

 

 

默認情況下,RabbitMQ Binder實現將每個destination映射到Topic交換器(Exchange)。

對於每個consumer group,一個Queue 綁定到這個TopicExchange。

每個consumer 實例都有相應的RabbitMQ consumer 實例連接到該consumer group對應的隊列(queue)。

為了分區生產者和使用者,隊列以分區索引作為后綴,並使用分區索引作為路由鍵(routing key)。

對於匿名consumer (那些沒有組屬性的用戶),使用自動刪除隊列(具有隨機唯一名稱)。

 

通過使用可選的autoBindDlq選項,您可以配置binder來創建和配置死信隊列(DLQs)(以及死信交換器DLX以及路由基礎設施)。

默認情況下,死信隊列的名稱為destination.dlq。

如果啟用了重試(maxAttempt>1),則在重試結束后,失敗的消息將傳遞給DLQ。

如果禁用重試(maxAttempt=1),則應將requeueRejected設置為false(默認值),那么失敗的消息會路由到DLQ,而不是重新排隊。

此外,republishToDlq會導致binder 將失敗的消息發布到DLQ(而不是拒絕它)。

此特性允許將附加信息(例如在header中的x-exception-stacktrace的堆棧跟蹤信息)添加到報頭中的消息中。

有關截斷堆棧跟蹤的信息,請參閱FrameMaxHeadRoom屬性。

此選項不需要啟用重試。只需重試一次,就可以重新發布失敗的消息。從版本1.2開始,您可以配置重新發布消息的傳遞模式。請參見rePublishDeliveryMode屬性。 

 

如果流監聽器拋出一個ImmediateAcKnowamqpException,則會繞過DLQ並簡單地丟棄消息。從Version 2.1開始,不管rePublishToDlq的設置如何,都會這樣執行;以前,則只有在rePublishToDlq為false時才是如此。

 

重要:!!!!!!!!!!!

將requeueRejected設置為true(二設置rePublishToDlq=false)將導致消息被重新排隊並不斷地重新傳遞,除非故障的原因是短暫的,否則通常不是您想要的。

通常,您應該在綁定器中啟用重試,方法是將maxAttempt設置為大於1,或者將rePublishToDlq設置為true。

 

有關這些屬性的更多信息,請參見3.1節“RabbitMQ綁定器屬性”。

該框架沒有提供任何標准機制來使用死信消息(或將它們重新路由回主隊列)。一些選項將在第6章“死信隊列處理”中描述.

 

當在SpringCloudStream應用程序中使用多個RabbitMQ Binder時,一定要禁用“RabbitAutoConfiguration”,以避免將RabbitAutoConfiguration中相同的配置應用到這幾個Binder中。你可以使用@SpringBootApplication注釋來排除掉這個類。

 

從版本2.0開始,RabbitMessageChannelBinder將RabbitTemplate.userPublisherConnection屬性設置為true,避免非事務producers對consumers造成死鎖,如果由於代理上的內存警報而阻塞緩存連接,則可能發生死鎖。

目前,只有消息驅動的消費 才支持多工使用者(一個偵聽多個隊列的使用者)polled consumers只能從單個隊列中檢索消息。

 

3. Configuration Options

 

本節包含特定於RabbitMQ綁定程序和綁定通道的設置

對於通用的設置,請參考Spring Cloud Stream Core的文檔

 

3.1 RabbitMQ Binder Properties

默認情況下,RabbitMQ Binder使用SpringBoot的ConnectionFactory。因此,它支持所有SpringBoot配置選項(有關參考,請參閱SpringBoot文檔)。RabbitMQ配置選項使用Spring.rabbitmq前綴。

除了SpringBoot選項之外,RabbitMQ綁定程序還支持以下屬性:

spring.cloud.stream.rabbit.binder.adminAddresses

一個以逗號分隔的RabbitMQ管理插件URL列表.僅當節點包含多個條目時才使用。此列表中的每個條目必須在Spring.rabbitmq.Address中有相應的條目。

Only needed if you use a RabbitMQ cluster and wish to consume from the node that hosts the queue. See Queue Affinity and the LocalizedQueueConnectionFactory for more information.

默認值:空。

spring.cloud.stream.rabbit.binder.nodes

以逗號分隔的RabbitMQ節點名稱列表。當有多個條目時,用於定位隊列所在的服務器地址。此列表中的每個條目必須在Spring.rabbitmq.Address中有相應的條目。

Only needed if you use a RabbitMQ cluster and wish to consume from the node that hosts the queue. See Queue Affinity and the LocalizedQueueConnectionFactory for more information.

默認值:空。

spring.cloud.stream.rabbit.binder.compressionLevel

壓縮綁定的壓縮級別。參見java.util.zip.Deflater。

Default: 1 (BEST_LEVEL).

spring.cloud.stream.binder.connection-name-prefix

A connection name prefix used to name the connection(s) created by this binder. The name is this prefix followed by #n, where n increments each time a new connection is opened.

一個連接名前綴,用於命名binder創建的連接。名稱后跟着#n,每次創建新連接,n都會遞增。

Default: none (Spring AMQP default).

3.2 RabbitMQ Consumer Properties

以下屬性僅適用於 Rabbit consumers,必須以spring.cloud.stream.rabbit.bindings.<channelName>.consumer..作為前綴

acknowledgeMode

確認模式。

Default: AUTO.

autoBindDlq

是否自動聲明DLQ並將其綁定到綁定器DLX。

Default: false.

bindingRoutingKey

The routing key with which to bind the queue to the exchange (if bindQueue is true). For partitioned destinations, -<instanceIndex> is appended.

將queue 綁定到Exchange使用的路由鍵(如果bindQueue為true)。為了給destinations分區,附加-<instanceIndex>。

Default: #.

bindQueue

是否聲明queue 並將其綁定到目標exchange。如果您已經設置了自己的基礎結構,並且已經創建並綁定了隊列,則將其設置為false。

Default: true.

consumerTagPrefix

用於創建consumer 標記;將由#n追加,每創建一個consumer 則自增1。

示例:${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}.

默認值:無-代理將生成隨機的使用者標記。

deadLetterQueueName

The name of the DLQ

Default: prefix+destination.dlq

deadLetterExchange

A DLX to assign to the queue. Relevant only if autoBindDlq is true.

Default: 'prefix+DLX'

deadLetterExchangeType

The type of the DLX to assign to the queue. Relevant only if autoBindDlq is true.

Default: 'direct'

deadLetterRoutingKey

A dead letter routing key to assign to the queue. Relevant only if autoBindDlq is true.

Default: destination

declareDlx

Whether to declare the dead letter exchange for the destination. Relevant only if autoBindDlq is true. Set to false if you have a pre-configured DLX.

是否為destination創建死信exchange。只有當autoBindDlq為true時才考慮設置。如果您有預先配置的DLX,則設置為false。

Default: true.

declareExchange

Whether to declare the exchange for the destination.

Default: true.

delayedExchange

Whether to declare the exchange as a Delayed Message Exchange. Requires the delayed message exchange plugin on the broker. The x-delayed-typeargument is set to the exchangeType.

是否將exchange 定義為Delayed Message Exchange。Requires the delayed message exchange plugin on the broker. The x-delayed-typeargument is set to the exchangeType.

Default: false.

dlqDeadLetterExchange

如果聲明了DLQ,則向該隊列分配DLX。

Default: none

dlqDeadLetterRoutingKey

如果聲明了DLQ,則向該隊列分配一個死信路由鍵。

Default: none

dlqExpires

刪除未使用的死信隊列的時間(以毫秒為單位)。

Default: no expiration

dlqLazy

Declare the dead letter queue with the x-queue-mode=lazy argument. See Lazy Queues”. Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.

使用 x-queue-mode=lazy參數聲明死信隊列。See Lazy Queues”. 考慮使用策略而不是此設置,因為使用策略可以在不刪除隊列的情況下更改設置。

Default: false.

dlqMaxLength

Maximum number of messages in the dead letter queue.

Default: no limit

dlqMaxLengthBytes

Maximum number of total bytes in the dead letter queue from all messages.

Default: no limit

dlqMaxPriority

Maximum priority of messages in the dead letter queue (0-255).

死信隊列(0-255)中消息的最大優先級。

Default: none

dlqOverflowBehavior

Action to take when dlqMaxLength or dlqMaxLengthBytes is exceeded; currently drop-head or reject-publish but refer to the RabbitMQ documentation.

當超過dlqMaxLength或dlqMaxLengthBytes時要采取的操作;當前是 drop-head or reject-publish ,但請參考RabbitMQ文檔。

Default: none

dlqTtl

Default time to live to apply to the dead letter queue when declared (in milliseconds).

聲明一個死信隊列時,引用的默認存貨時間(以毫秒為單位)

Default: no limit

durableSubscription

Whether the subscription should be durable. Only effective if group is also set.

訂閱是否應該持久化。只有在也設置group 的情況下才有效。

Default: true.

exchangeAutoDelete

If declareExchange is true, whether the exchange should be auto-deleted (that is, removed after the last queue is removed).

如果declareExchange為true,這里設置是否應自動刪除exchange (最后一個隊列被刪除后,這個exchange會別刪除)。

Default: true.

exchangeDurable

If declareExchange is true, whether the exchange should be durable (that is, it survives broker restart).

如果declareExchange為true,則該exchange 是否應是持久的(也就是說,幾時重啟代理,他仍然存在)。

Default: true.

exchangeType

The exchange type: directfanout or topic for non-partitioned destinations and direct or topic for partitioned destinations.

Default: topic.

exclusive

Whether to create an exclusive consumer. Concurrency should be 1 when this is true. Often used when strict ordering is required but enabling a hot standby instance to take over after a failure. See recoveryInterval, which controls how often a standby instance attempts to consume.

是否創建獨占消費者。如果為true,那么並發性應該是1。 Often used when strict ordering is required but enabling a hot standby instance to take over after a failure. See recoveryInterval, which controls how often a standby instance attempts to consume.

Default: false.

expires

How long before an unused queue is deleted (in milliseconds).

刪除未使用隊列的時間(以毫秒為單位)。

Default: no expiration

failedDeclarationRetryInterval

The interval (in milliseconds) between attempts to consume from a queue if it is missing.

如果隊列丟失,嘗試從隊列中消耗的間隔(以毫秒為單位)。

Default: 5000

frameMaxHeadroom

The number of bytes to reserve for other headers when adding the stack trace to a DLQ message header. All headers must fit within the frame_max size configured on the broker. Stack traces can be large; if the size plus this property exceeds frame_max then the stack trace will be truncated. A WARN log will be written; consider increasing the frame_max or reducing the stack trace by catching the exception and throwing one with a smaller stack trace.

將堆棧跟蹤添加到DLQ消息頭時為其他標頭保留的字節數。所有標頭必須符合代理上配置的框架_max大小。堆棧跟蹤可以很大;如果大小加上此屬性超過Framemax,則堆棧跟蹤將被截斷。將寫入一個警告日志;考慮通過捕獲異常並拋出一個具有較小堆棧跟蹤的異常來增加Framemax或減少堆棧跟蹤。

Default: 20000

headerPatterns

Patterns for headers to be mapped from inbound messages.

從入站消息映射標題的模式。

Default: ['*'] (all headers).

lazy

Declare the queue with the x-queue-mode=lazy argument. See Lazy Queues”. Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue.

使用x-queue-mode=lazy參數聲明隊列。 See Lazy Queues”. ”。考慮使用策略而不是此設置,因為使用策略可以在不刪除隊列的情況下更改設置。

Default: false.

maxConcurrency

The maximum number of consumers.

最大數量的消費者。

Default: 1.

maxLength

The maximum number of messages in the queue.

隊列中的最大消息數。

Default: no limit

maxLengthBytes

The maximum number of total bytes in the queue from all messages.

隊列中來自所有消息的最大字節數。

Default: no limit

maxPriority

The maximum priority of messages in the queue (0-255).

隊列中消息的最大優先級(0-255)。

Default: none

missingQueuesFatal

When the queue cannot be found, whether to treat the condition as fatal and stop the listener container. Defaults to false so that the container keeps trying to consume from the queue — for example, when using a cluster and the node hosting a non-HA queue is down.

當找不到隊列時,是否將此情況視為致命狀態並停止偵聽器容器。默認為false,這樣容器就一直試圖從隊列中消費-例如,當使用集群和承載非HA隊列的節點時。

Default: false

overflowBehavior

Action to take when maxLength or maxLengthBytes is exceeded; currently drop-head or reject-publish but refer to the RabbitMQ documentation.

當maxLength或maxLengthBytes被超過時要采取的操作;當前是drop-head or reject-publish,但請參考RabbitMQ文檔。

Default: none

prefetch

Prefetch count.

Default: 1.

prefix

A prefix to be added to the name of the destination and queues.

Default: "".

queueDeclarationRetries

The number of times to retry consuming from a queue if it is missing. Relevant only when missingQueuesFatal is true. Otherwise, the container keeps retrying indefinitely.

如果缺少隊列,則重試從隊列中消耗的次數。只有在錯誤答案時才有關聯。否則,容器將無限期地重試。

Default: 3

queueNameGroupOnly

When true, consume from a queue with a name equal to the group. Otherwise the queue name is destination.group. This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue.

連接恢復嘗試之間的間隔,以毫秒為單位。

Default: false.

recoveryInterval

The interval between connection recovery attempts, in milliseconds.

連接恢復嘗試之間的間隔,以毫秒為單位。

Default: 5000.

requeueRejected

Whether delivery failures should be re-queued when retry is disabled or republishToDlq is false.

當Retry被禁用時,是否應該重新排隊傳遞失敗,還是REREREDDlq是假的。

Default: false.

republishDeliveryMode

When republishToDlq is true, specifies the delivery mode of the republished message.

當rePublishToDlq為true時,指定重新發布消息的傳遞模式。

Default: DeliveryMode.PERSISTENT

republishToDlq

By default, messages that fail after retries are exhausted are rejected. If a dead-letter queue (DLQ) is configured, RabbitMQ routes the failed message (unchanged) to the DLQ. If set to true, the binder republishs failed messages to the DLQ with additional headers, including the exception message and stack trace from the cause of the final failure. Also see the frameMaxHeadroom property.

默認情況下,在重試結束后失敗的消息將被拒絕。如果配置了死信隊列(DLQ),RabbitMQ將失敗消息(未更改)路由到DLQ。如果設置為true,則綁定器會將失敗消息重新發布到DLQ,其中包含來自最終故障原因的異常消息和堆棧跟蹤。還請參閱FrameMaxHeadRoom屬性。

Default: false

transacted

Whether to use transacted channels.

是否使用交易通道。

Default: false.

ttl

Default time to live to apply to the queue when declared (in milliseconds).

聲明時應用於隊列的默認存活時間(以毫秒為單位)。

Default: no limit

txSize

The number of deliveries between acks.

接送的數量。

Default: 1.

 

3.3 Advanced Listener Container Configuration

 To set listener container properties that are not exposed as binder or binding properties, add a single bean of type ListenerContainerCustomizer to the application context. The binder and binding properties will be set and then the customizer will be called. The customizer (configure() method) is provided with the queue name as well as the consumer group as arguments.

 若要設置未公開為綁定或綁定屬性的偵聽器容器屬性,請將ListenerContainerCustomizer類型的單個bean添加到應用程序上下文中。將設置綁定器和綁定屬性,然后調用自定義程序。定制器(配置()方法)提供了隊列名和使用者組作為參數。

 

3.4 Rabbit Producer Properties

 以下屬性僅適用於Rabbit producers,必須以spring.cloud.stream.rabbit.bindings.<channelName>.producer..作為前綴

 

autoBindDlq

Whether to automatically declare the DLQ and bind it to the binder DLX.

Default: false.

batchingEnabled

Whether to enable message batching by producers. Messages are batched into one message according to the following properties (described in the next three entries in this list): 'batchSize', batchBufferLimit, and batchTimeout. See Batching for more information.

Default: false.

batchSize

The number of messages to buffer when batching is enabled.

Default: 100.

batchBufferLimit

The maximum buffer size when batching is enabled.

Default: 10000.

batchTimeout

The batch timeout when batching is enabled.

Default: 5000.

bindingRoutingKey

The routing key with which to bind the queue to the exchange (if bindQueue is true). Only applies to non-partitioned destinations. Only applies if requiredGroups are provided and then only to those groups.

Default: #.

bindQueue

Whether to declare the queue and bind it to the destination exchange. Set it to false if you have set up your own infrastructure and have previously created and bound the queue. Only applies if requiredGroups are provided and then only to those groups.

Default: true.

compress

Whether data should be compressed when sent.

Default: false.

confirmAckChannel

When errorChannelEnabled is true, a channel to which to send positive delivery acknowledgments (aka publisher confirms). If the channel does not exist, a DirectChannel is registered with this name. The connection factory must be configured to enable publisher confirms.

Default: nullChannel (acks are discarded).

deadLetterQueueName

The name of the DLQ Only applies if requiredGroups are provided and then only to those groups.

Default: prefix+destination.dlq

deadLetterExchange

A DLX to assign to the queue. Relevant only when autoBindDlq is true. Applies only when requiredGroups are provided and then only to those groups.

Default: 'prefix+DLX'

deadLetterExchangeType

The type of the DLX to assign to the queue. Relevant only if autoBindDlq is true. Applies only when requiredGroups are provided and then only to those groups.

Default: 'direct'

deadLetterRoutingKey

A dead letter routing key to assign to the queue. Relevant only when autoBindDlq is true. Applies only when requiredGroups are provided and then only to those groups.

Default: destination

declareDlx

Whether to declare the dead letter exchange for the destination. Relevant only if autoBindDlq is true. Set to false if you have a pre-configured DLX. Applies only when requiredGroups are provided and then only to those groups.

Default: true.

declareExchange

Whether to declare the exchange for the destination.

Default: true.

delayExpression

A SpEL expression to evaluate the delay to apply to the message (x-delay header). It has no effect if the exchange is not a delayed message exchange.

Default: No x-delay header is set.

delayedExchange

Whether to declare the exchange as a Delayed Message Exchange. Requires the delayed message exchange plugin on the broker. The x-delayed-typeargument is set to the exchangeType.

Default: false.

deliveryMode

The delivery mode.

Default: PERSISTENT.

dlqDeadLetterExchange

When a DLQ is declared, a DLX to assign to that queue. Applies only if requiredGroups are provided and then only to those groups.

Default: none

dlqDeadLetterRoutingKey

When a DLQ is declared, a dead letter routing key to assign to that queue. Applies only when requiredGroups are provided and then only to those groups.

Default: none

dlqExpires

How long (in milliseconds) before an unused dead letter queue is deleted. Applies only when requiredGroups are provided and then only to those groups.

Default: no expiration

dlqLazy
Declare the dead letter queue with the  x-queue-mode=lazy argument. See  Lazy Queues”. Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue. Applies only when  requiredGroups are provided and then only to those groups.
dlqMaxLength

Maximum number of messages in the dead letter queue. Applies only if requiredGroups are provided and then only to those groups.

Default: no limit

dlqMaxLengthBytes

Maximum number of total bytes in the dead letter queue from all messages. Applies only when requiredGroups are provided and then only to those groups.

Default: no limit

dlqMaxPriority

Maximum priority of messages in the dead letter queue (0-255) Applies only when requiredGroups are provided and then only to those groups.

Default: none

dlqTtl

Default time (in milliseconds) to live to apply to the dead letter queue when declared. Applies only when requiredGroups are provided and then only to those groups.

Default: no limit

exchangeAutoDelete

If declareExchange is true, whether the exchange should be auto-delete (it is removed after the last queue is removed).

Default: true.

exchangeDurable

If declareExchange is true, whether the exchange should be durable (survives broker restart).

Default: true.

exchangeType

The exchange type: directfanout or topic for non-partitioned destinations and direct or topic for partitioned destinations.

Default: topic.

expires

How long (in milliseconds) before an unused queue is deleted. Applies only when requiredGroups are provided and then only to those groups.

Default: no expiration

headerPatterns

Patterns for headers to be mapped to outbound messages.

Default: ['*'] (all headers).

lazy

Declare the queue with the x-queue-mode=lazy argument. See Lazy Queues”. Consider using a policy instead of this setting, because using a policy allows changing the setting without deleting the queue. Applies only when requiredGroups are provided and then only to those groups.

Default: false.

maxLength

Maximum number of messages in the queue. Applies only when requiredGroups are provided and then only to those groups.

Default: no limit

maxLengthBytes

Maximum number of total bytes in the queue from all messages. Only applies if requiredGroups are provided and then only to those groups.

Default: no limit

maxPriority

Maximum priority of messages in the queue (0-255). Only applies if requiredGroups are provided and then only to those groups.

Default: none

prefix

A prefix to be added to the name of the destination exchange.

Default: "".

queueNameGroupOnly

When true, consume from a queue with a name equal to the group. Otherwise the queue name is destination.group. This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue. Applies only when requiredGroups are provided and then only to those groups.

Default: false.

routingKeyExpression

A SpEL expression to determine the routing key to use when publishing messages. For a fixed routing key, use a literal expression, such as routingKeyExpression='my.routingKey' in a properties file or routingKeyExpression: '''my.routingKey''' in a YAML file.

Default: destination or destination-<partition> for partitioned destinations.

transacted

Whether to use transacted channels.

Default: false.

ttl

Default time (in milliseconds) to live to apply to the queue when declared. Applies only when requiredGroups are provided and then only to those groups.

Default: no limit

[Note]

In the case of RabbitMQ, content type headers can be set by external applications. Spring Cloud Stream supports them as part of an extended internal protocol used for any type of transport — including transports, such as Kafka (prior to 0.11), that do not natively support headers.

 

 

4. Retry With the RabbitMQ Binder

5. Error Channels

6. Dead-Letter Queue Processing

7. Partitioning with the RabbitMQ Binder

是否將當前網頁翻譯成中文 
網頁翻譯
 
中英對照
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM