Spring Cloud Stream


  在實際的企業開發中,消息中間件是至關重要的組件之一。消息中間件主要解決應用解耦,異步消息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。不同的中間件其實現方式,內部結構是不一樣的。如常見的RabbitMQ和Kafka,由於這兩個消息中間件的架構上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分區,這些中間件的差異性導致我們實際項目開發造成了一定的困擾,我們如果用了兩個消息隊列的其中一種,后面的業務需求,我想往另外一種消息隊列進行遷移,這時候無疑就是一個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的系統耦合了,這時候 springcloud Stream 給我們提供了一種解耦合的方式。

  Spring Cloud Stream由一個中間件中立的核組成。應用通過Spring Cloud Stream插入的input(相當於消費者consumer,它是從隊列中接收消息的)和output(相當於生產者producer,它是從隊列中發送消息的。)通道與外界交流。通道通過指定中間件的Binder實現與外部代理連接。業務開發者不再關注具體消息中間件,只需關注Binder對應用程序提供的抽象概念來使用消息中間件實現業務即可。
    

 入門案例:通過rabbitMQ作為消息中間件

  消息生產者:

    (1)創建工程引入依賴

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
    </dependencies>

    (2)定義bingding

      發送消息時需要定義一個接口,不同的是接口方法的返回對象是 MessageChannel,下面是 SpringCloud Stream 內置的接口:

public interface Source {
    String OUTPUT = "output";
    @Output("output")
    MessageChannel output();
}

      接口聲明了一個 binding 命名為 “output”。這個binding 聲明了一個消息輸出流,也就是消息的生產者。

    (3)配置application.yml

server:
  port: 7001 #服務端口
spring:
  application:
    name: stream_producer #指定服務名
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        output:
          destination: fan-default #指定了消息發送的目的地,對應 RabbitMQ,會發送到 exchange 是 fan-default 的所有消息隊列中。
          contentType: text/plain #用於指定消息的類型。
      binders:
        defaultRabbit:
          type: rabbit

    (4)消息發送工具類

// 負責向中間件發送數據
@Component
@EnableBinding(Source.class)
public class MessageSender {

    @Autowired
    private MessageChannel output; // 接口來源於Source.class

    public void send(Object obj) {
        //發送MQ消息
        output.send(MessageBuilder.withPayload(obj).build());
    }
}

    (5)啟動類

@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class);
    }
}

    (6)測試類

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {

    @Autowired
    private MessageSender messageSender;

    @Test
    public void testSend() {
        messageSender.send("hello word");
    }
}

    (7)運行啟動類,訪問RabbitMQ地址 http://127.0.0.1:15672/ 即可看到發送的消息

      

   消息消費者:

    (1)創建工程引入依賴

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>

    (2)定義bingding

      同發送消息一致,在Spring Cloud Stream中接收消息,需要定義一個接口,如下是內置的一個接口。

public interface Sink {
    String INPUT = "input";
    @Input("input")
    SubscribableChannel input();
}

      接口聲明了一個 binding 命名為 “input” 。注釋 @Input 對應的方法,需要返回 SubscribableChannel

    (3)配置application.yml

server:
  port: 7002 #服務端口
spring:
  application:
    name: stream_consumer #指定服務名
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input:
          destination: fan-default
      binders:
        defaultRabbit:
          type: rabbit

    (4)消息監聽類

@Component
@EnableBinding(Sink.class)
public class MessageListener {

    // 監聽 binding 為 Sink.INPUT 的消息
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("監聽收到:" + message.getPayload());
    }
}

    (5)啟動類

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class);
    }
}

    (6) 運行啟動類后,再去運行消息生產者的測試類,即可在消息消費者控制台接收到消息

自定義消息通道:

public interface OrderProcessor {

    String INPUT_ORDER = "inputOrder";
    String OUTPUT_ORDER = "outputOrder";

    @Input(INPUT_ORDER)
    SubscribableChannel inputOrder();

    @Output(OUTPUT_ORDER)
    MessageChannel outputOrder();
}

  Spring Cloud Stream 內置了兩種接口,分別定義了 binding 為 “input” 的輸入流,和 “output” 的輸出流,而在我們實際使用中,往往是需要定義各種輸入輸出流

    一個接口中,可以定義無數個輸入輸出流,可以根據實際業務情況划分。上述的接口,定義了一個訂單輸入和訂單輸出兩個 binding。
    使用時,需要在 @EnableBinding 注解中,添加自定義的接口。
    使用 @StreamListener 做監聽的時候,需要指定 OrderProcessor.INPUT_ORDER

    消息發送工具類要指定注入的通道:

      

    配置文件也要修改:

      

 消息分組:

  通常在生產環境,我們的每個服務都不會以單節點的方式運行在生產環境,當同一個服務啟動多個實例的時候,這些實例都會綁定到同一個消息通道的目標主題(Topic)上。默認情況下,當生產者發出一條消息到綁定通道上,這條消息會產生多個副本被每個消費者實例接收和處理,但是有些業務場景之下,我們希望生產者產生的消息只被其中一個實例消費,這個時候我們需要為這些消費者設置消費組來實現這樣的功能。

  只需要在每個服務消費者端設置spring.cloud.stream.bindings.input.group 屬性即可

    

 消息分區:

  有一些場景需要滿足, 同一個特征的數據被同一個實例消費, 比如同一個id的傳感器監測數據必須被同一個實例統計計算分析, 否則可能無法獲取全部的數據。又比如部分異步任務,首次請求啟動task,二次請求取消task,此場景就必須保證兩次請求至同一實例。

  消息生產者配置:

    1. spring.cloud.stream.bindings.output.producer.partitionKeyExpression :通過該參數指定了分區鍵的表達式規則,我們可以根據實際的輸出消息規則來配置SpEL來生成合適的分區鍵;

    2. spring.cloud.stream.bindings.output.producer.partitionCount :該參數指定了消息分區的數量。
    

   消息消費者配置:

    1. spring.cloud.stream.bindings.input.consumer.partitioned :通過該參數開啟消費者分區功能;

    2. spring.cloud.stream.instanceCount :該參數指定了當前消費者的總實例數量;

    3. spring.cloud.stream.instanceIndex :該參數設置當前實例的索引號,從0開始,最大值為spring.cloud.stream.instanceCount 參數 - 1。

    

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM