Spring Cloud Stream(十三)


說明

對Spring Boot 和 Spring Integration的整合,通過Spring Cloud Stream能夠簡化消息中間件使用的復雜難度!讓業務人員更多的精力能夠花在業務層面

簡單例子

consumer

1.創建一個一個項目名為spring-cloud-stream-consumer

2.引入pom依賴

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

3.yml配置

spring:
  application:
    name: streamConsumer
  rabbitmq:   #更多mq配置看書331頁
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
server:
  port: 8081

5.創建消息監聽類

//實現對定義了多個@Input和@Output的接口實現對通道的綁定 Sink定義了@Input 我們自己處理時是自己定義接口
@EnableBinding(Sink.class)
public class SkinReceiverService {
    private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
    //對input的消息監聽處理
    @StreamListener(Sink.INPUT)
    public void receiver(Object message){
        logger.info(message.toString());
    }
}

6.啟動rabbitmq

7.啟動項目

8.通過mq管理頁面發送消息

 

 

 控制台打印 表示成功接收到消息

核心概念

綁定器

應用程序與消息中間件的抽象層。應用程序中間件的解耦。應用程序不需要考慮用的什么類型的消息中間件。當我們需要更換消息中間件 只需要替換綁定器

發布訂閱

spring cloud stream 完全遵循發布訂閱模式 當一條消息被發布到消息中間件后 將會以topic主題模式進行廣播,消費者對訂閱的topic主題進行相應的邏輯處理。topic是spring cloud stream的一個抽象概念,不同消息中間件topic概念可能不同 rabbitMq對應exchage

如rabbitMQ的topic

 

 發布訂閱模式能夠有效避免點對點的耦合 當一種消息要增加一種處理方式時只需要增加一個消息訂閱者

消費組

一般我們的消費組都會集群部署 但是我們再集群部署的情況下 會形成多個訂閱者 導致消息被消費多次, 消費組則是解決一個消息只能被一個實例消費者消費

消費分區

指定統一特征的消息被指定服務實例消費 spring cloud stream消費分區提供通用的抽象實現 使不支持分區的中間件也能支持消費分區

自定義輸入和輸出

定義輸入

Sink 是spring cloud stream 的默認實現 我們可以通過查看源碼

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

通過@Input注冊參數為通道名字 同時需要返回SubscribableChannel

我們通過參考Sink定義一個輸入通道 比如處理訂單保存的通道

1.定義第一個通道

public interface OrderMQInputChannel {
    String saveOrderChannelName="saveOrder";//定義通道的名字
    @Input(saveOrderChannelName)//定義為輸入通道
    public SubscribableChannel saveOrder();
}

2.綁定通道並監聽

//通過綁定器 對OrderMQInputChannel通道進行綁定
@EnableBinding(OrderMQInputChannel.class)
public class OrderMQReceiverService {
    private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
    //對OrderMQInputChannel.saveOrderChannelName的消息監聽處理
    @StreamListener(OrderMQInputChannel.saveOrderChannelName)
    public void receiver(Object message){
        logger.info(message.toString());
    }
}

定義輸出

1.創建一個測試消費提供者的項目

 

2.引入pom依賴

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>

2.yml配置文件配置

spring:
  application:
    name: streamProvider
  rabbitmq:   #更多mq配置看書331頁
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
server:
  port: 8082

3.定義一個輸出通道

public interface OrderMQOutputChannel {
    String saveOrderChannelName="saveOrder";
    @Output(saveOrderChannelName)//定義輸出管道的名字
    MessageChannel saveOrder();
}

4.綁定通道

@EnableBinding(OrderMQOutputChannel.class) //綁定通道OrderMQOutputChannel
public class OrderChannelBindConfig {
}

5.添加測試contorller

@Controller
public class TestContorller {
    @Autowired
    OrderMQOutputChannel orderMQOutputChannel;
    @RequestMapping("/saveOrder")
    @ResponseBody
    public boolean saveOrder(){
        //發送一條保存訂單的命令
       return  orderMQOutputChannel.saveOrder().send(MessageBuilder.withPayload("fff").build());
    }
}

或者

@Controller
public class TestContorller {

    //直接注入對應通道是的實例
    @Autowired@Qualifier(OrderMQOutputChannel.saveOrderChannelName)
    MessageChannel messageChannel;

    @RequestMapping("/saveOrder")
    @ResponseBody
    public boolean saveOrder(){
        //發送一條保存訂單的命令
        return  messageChannel.send(MessageBuilder.withPayload("fff").build());
    }

}

 

 

6.訪問

http://127.0.0.1:8082/saveOrder

7.consumer打印 表示消息被消費

 spring intergration原生支持

spring cloud stream 是通過spring boot和spring intergreation的整合 所以也可以使用原生的用法實現相同的功能

provider

@EnableBinding(OrderMQOutputChannel.class) //綁定通道OrderMQOutputChannel
public class OriginalOrderMQOutPutChannelService {
    //定義2秒發送一次消息
    @Bean
    @InboundChannelAdapter(value = OrderMQOutputChannel.saveOrderChannelName, poller = @Poller(fixedDelay = "2000"))
    public MessageSource<Date> timerMessageSource() {
        return () -> new GenericMessage<>(new Date());

    }
}

consumer

//通過綁定器 對OrderMQInputChannel通道進行綁定
@EnableBinding(OrderMQInputChannel.class)
public class OriginalOrderMQReceiverService {
    private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);

    //對OrderMQInputChannel.saveOrderChannelName的消息監聽處理
    @ServiceActivator(inputChannel = OrderMQInputChannel.saveOrderChannelName)
    public void receiver(Object message) {
        logger.info(message.toString());
    }

    //定義消息轉換器 轉換saveOrderChannelName 通道的的消息
    @Transformer(inputChannel = OrderMQInputChannel.saveOrderChannelName, outputChannel = OrderMQInputChannel.saveOrderChannelName)
    public Object transform(Date message) {
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message);

    }
}

啟動之后conusmer2秒則會受到一條消息 更多用法查看spring intergration文檔

消息轉換 

通過上面我們可以看到原生通過@Transformer實現消息轉換 spring cloud stream 只需要定義消息通道的消息類型

spring.cloud.stream.bindings.[inputname].content-type=application/json
spring:
  application:
    name: streamConsumer
  rabbitmq:   #更多mq配置看書331頁
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
  cloud:
    stream:
      bindings:
        saveOrder:
          content-type: application/json
server:
  port: 8081

 

消費者

//通過綁定器 對OrderMQInputChannel通道進行綁定
@EnableBinding(OrderMQInputChannel.class)
public class OrderMQReceiverService {
    private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
    //對OrderMQInputChannel.saveOrderChannelName的消息監聽處理
    @StreamListener(OrderMQInputChannel.saveOrderChannelName)
    public void receiver(Order order){
       
        logger.info(order.getId()+"-"+order.getOrderCode());
    }
}

消息提供者

@Controller
public class TestContorller {

    //直接注入對應通道是的實例
    @Autowired@Qualifier(OrderMQOutputChannel.saveOrderChannelName)
    MessageChannel messageChannel;

    @RequestMapping("/saveOrder")
    @ResponseBody
    public boolean saveOrder(){
       com.liqiang.entity.Order order=new  com.liqiang.entity.Order();
       order.setId(1L);
       order.setOrderCode("201901020001");
        //發送一條保存訂單的命令
        return  messageChannel.send(MessageBuilder.withPayload(order).build());
    }

}

 

 

消息反饋

用於將消息交給別的應用處理 處理后再回傳  或者異步請求 接收處理結果

provider

public interface OrderMQOutputChannel {
    String saveOrderChannelName="saveOrder";
    String saveOrderCallbackChannelName="saveOrderCallback";//定義回調通道的名字
    @Output(saveOrderChannelName)//定義輸出管道的名字
    MessageChannel saveOrder();

    @Input(saveOrderCallbackChannelName)//定義為輸入通道
    public SubscribableChannel saveOrderCallback();
}
@EnableBinding(OrderMQOutputChannel.class) //綁定通道OrderMQOutputChannel
public class OrderChannelBindConfig {
    private static Logger logger = LoggerFactory.getLogger(OrderMQOutputChannel.class);
    //對OrderMQInputChannel.saveOrderChannelName的消息監聽處理
    @StreamListener(OrderMQOutputChannel.saveOrderCallbackChannelName)
    public void receiver(boolean boo){
        logger.info(String.valueOf(boo));
    }
}

consumer

public interface OrderMQInputChannel {
    String saveOrderChannelName="saveOrder";//定義通道的名字
    String saveOrderCallbackChannelName="saveOrderCallback";//定義回調通道的名字
    @Input(saveOrderChannelName)//定義為輸入通道
    public SubscribableChannel saveOrder();
}
//通過綁定器 對OrderMQInputChannel通道進行綁定
@EnableBinding(OrderMQInputChannel.class)
public class OrderMQReceiverService {
    private static Logger logger = LoggerFactory.getLogger(SkinReceiverService.class);
    //對OrderMQInputChannel.saveOrderChannelName的消息監聽處理
    @StreamListener(OrderMQInputChannel.saveOrderChannelName)
    @SendTo(OrderMQInputChannel.saveOrderCallbackChannelName)//反饋的通道名字
    public boolean receiver(Order order){

        logger.info(order.getId()+"-"+order.getOrderCode());
        return true;
    }
}

消息分組

多實例情況下 只需要指定spring.cloud.stream.bindings.[channelname].group=gorupname 當同一組實例對同一個主題的消息只能會有一個實例消費

1.測試 創建2個配置文件 分別為

application-peer1.yml

spring:
  application:
    name: streamConsumer
  rabbitmq:   #更多mq配置看書331頁
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
  cloud:
    stream:
      bindings:
        saveOrder:
          group: groupA
          content-type: application/json
server:
  port: 8081

application-peer2.yml

spring:
  application:
    name: streamConsumer
  rabbitmq:   #更多mq配置看書331頁
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
  cloud:
    stream:
      bindings:
        saveOrder:
          group: groupA
          content-type: application/json
server:
  port: 8083

通過啟動2個消費者

java -jar /Users/liqiang/Desktop/java開發環境/javadom/spring-cloud-parent/spring-cloud-stream-consumer/target/spring-cloud-stream-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=peer1

java -jar /Users/liqiang/Desktop/java開發環境/javadom/spring-cloud-parent/spring-cloud-stream-consumer/target/spring-cloud-stream-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=peer2

只要一個實例消費了 

消費分區

再某些場景 我需要指定某一類消息只能被哪些實例消費

消費者

application-peer1.yml

spring:
  application:
    name: streamConsumer
  rabbitmq:   #更多mq配置看書331頁
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
  cloud:
    stream:
      instanceCount: 2 #跟分區一起使用 有多少實例
      instanceIndex: 0 #分區當前實例編號 從0開始
      bindings:
        saveOrder:
          group: streamConsumer
          content-type: application/json
          consumer:
            partitioned: true #開啟消息分區的功能

server:
  port: 8081

application-peer2.yml

spring:
  application:
    name: streamConsumer
  rabbitmq:   #更多mq配置看書331頁
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
  cloud:
    stream:
      instanceCount: 2 #跟分區一起使用 有多少實例
      instanceIndex: 1 #當前實例編號 從0開始
      bindings:
        saveOrder:
          group: streamConsumer
          content-type: application/json
          consumer:
            partitioned: true #開啟消息分區的功能

server:
  port: 8083

生產者

spring:
  application:
    name: streamProvider
  rabbitmq:   #更多mq配置看書331頁
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
  cloud:
    stream:
      bindings:
        saveOrder:
          producer:
            partitionKeyExpression: '0' #表示只有實例索引為0的才能收到消息 支持SpEL表達式
            partitionCount: 2
server:
  port: 8082

當啟動2個消費者 和生產者 當前生產者 生產的消息只能被實例編號為0的消費

 

這里限制死了當前實例生產的消息被某個實例消費。如果我們需要指定 當前生產者生產的某一類服務被指定實例消費呢可以通過SpEL表達式設置

生產者yml

spring:
  application:
    name: streamProvider
  rabbitmq:   #更多mq配置看書331頁
    username: liqiang
    password: liqiang
    host: localhost
    port: 5672
  cloud:
    stream:
      bindings:
        saveOrder:
          producer:
            partitionKeyExpression: headers['partitionKey'] #SpEL表達式 通過讀取消息hearder的partitionKey屬性動態指定
            partitionCount: 2 #消息分區數量
server:
  port: 8082

消息生產通過header動態指定

package com.liqiang.springcloudstreamprovider;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.annotation.Order;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class TestContorller {

    //直接注入對應通道是的實例
    @Autowired@Qualifier(OrderMQOutputChannel.saveOrderChannelName)
    MessageChannel messageChannel;
    private static  int index=0;

    @RequestMapping("/saveOrder")
    @ResponseBody
    public boolean saveOrder(){
       com.liqiang.entity.Order order=new  com.liqiang.entity.Order();
       order.setId(1L);
       order.setOrderCode("201901020001");

        //發送一條保存訂單的命令
        return  messageChannel.send(MessageBuilder.withPayload(order).setHeader("partitionKey",(index++)%2==0?0:1).build());
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM