Spring cloud stream 3.1 rocketmq踩坑記錄


spring cloud stream 新綁定方式

新版spring cloud stream文檔

新版提倡用函數式進行發送和消費信息

定義返回類型為Supplier, Function or Consumer的bean提供消息發送和消費的bean

看看綁定名稱命名規則

  • input - + -in- +
  • output - + -out- +

在配置文件中指定spring.cloud.function.definition的名稱后會把這個bean綁定到對應的消費者和提供者上.

如下定義 會把bean綁定在消費者consumerEvent-in-0或者提供者consumerEvent-out-0上

多個bean可以用 ; 進行分割

spring:
  cloud:
    function:
      definition: consumerEvent

指定這個消費者的topic和group

spring:
  cloud:
    stream:
      bindings:
        consumerEvent-in-0:
          destination: DEMO
          group: demo-group

注冊消費者的bean

// 第一種方式(官方推薦)
@Bean
public Function<Flux<Message<String>>, Mono<Void>> consumerEvent() {
    return flux -> flux.map(message -> {
        System.out.println(message.getPayload());
        return message;
    }).then();
}

// 第二種方式
// 注意使用Flux 要調用 subscribe 不然這個方法不會被消費
@Bean
public Consumer<Flux<Message<String>>> consumerEvent() {
    return flux -> flux.map(message -> {
        System.out.println(message.getPayload());
        return message;
    }).subscribe();
}
// 或
@Bean
public Consumer<Message<String>> consumerEvent() {
    return message -> System.out.println(message.getPayload());
}

示例

提供者

@Configuration
public class EventSender {
    @Bean
    public Demo demo() {
        return new Demo();
    }

    static class Demo implements CommandLineRunner {
        @Autowired
        StreamBridge streamBridge;

        @Override
        public void run(String... args) throws Exception {
            final Message<T> message = MessageBuilder.withPayload("Body")
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build();

            // 第一個配置的是目的地
            // 如果在yaml中有配置會發送到yaml中目的地
            streamBridge.send("DEMO", message);
        }
    }
}

配置rocketmq和stream的配置

spring:
  application:
    name: demo
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
          group: demo
      bindings:
        consumerEvent-in-0:
          destination: DEMO
          content-type: application/json
          group: demo-group
      function:
        definition: consumerEvent

注冊一個消費者

@Configuration
public class EventReceptor {
    @Bean
    public Function<Flux<Message<String>>, Mono<Void>> consumerEvent() {
        return flux -> flux.map(message -> {
            System.out.println(message.getPayload());
            return message;
        }).then();
    }
}

依賴

spring cloud 2020 默認不使用bootstrap啟動 要加這個依賴spring-cloud-starter-bootstrap

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>2020.0.2</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-dependencies</artifactId>
    <version>2.2.5-RocketMQ-RC1</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bootstrap</artifactId>
    <version>3.0.2</version>
</dependency>

Tag過濾

在新版的時候過濾tag一直失效, 后面看源碼發現新版的sql和tag結合到subscription的屬性中

this.pushConsumer.subscribe(this.topic, RocketMQUtils.getMessageSelector(((RocketMQConsumerProperties)this.extendedConsumerProperties.getExtension()).getSubscription()));

public static MessageSelector getMessageSelector(String expression) {
    return StringUtils.hasText(expression) && expression.startsWith("sql:") ? MessageSelector.bySql(expression.replaceFirst("sql:", "")) : MessageSelector.byTag(expression);
}

如果消費者要過濾某個tag需要這么寫

// 新版 (現在的寫法)
rocketmq:
  bindings:
    createUserAccountEvent-in-0:
      consumer:
        subscription: DEMO-TAG

// 舊版 (以前的寫法)
rocketmq:
  bindings:
    createUserAccountEvent-in-0:
      consumer:
        tag: DEMO-TAG


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM