Spring Cloud:Stream基礎知識


背景

  消息中間件有多種,rabbitmq,rocketmq,activemq,kafka等。
  不同的消息中間件具體細節不一樣。那么有沒有一種新的技術誕生,讓我們不再關注具體MQ細節,我們只需要用一種適配綁定的方式,自動給我們在各種MQ內切換。
  一句話:屏蔽底層消息中間件的差異,降低切換成本,統一消息的編程模型。Spring Cloud Stream 因此誕生。
  官方定義 Spring Cloud Stream是一個構建消息驅動微服務的框架。
  應用程序使用inputs或者outputs來與springcloud stream中binder交互。
  通過我們配置來bingding(綁定),而stream的binder對象負責與消息中間件交互。
  所以,我們只需要搞清楚如何與springcloud stream交互就可以方便使用消息驅動方式。
  SpringCloud Stream為一些供應商的消息中間件產品提供了個性化的自動化配置實現,引用發布-訂閱、消費組、分區三個核心概念。
  目前僅支持RabbitMQ和Kafka。
Stream處理架構:

通過定義綁定器Binder作為中間層,實現了應用程序與消息中間件之間的隔離。
stream標准流程套路:

Binder:很方便的連接中間件,屏蔽差異。
channel:通道,是隊列Queue的一種抽象,在消息通訊系統中就是實現存儲和轉發的媒介,通過channel對隊列進行配置。
source和sink:簡單理解為消息的輸入輸出。
編碼API和注解:

Stream消息生產者

pom依賴

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

yml配置

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      bindings:
        output:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.10.132
                port: 5672
                username: guest
                password: guest

eureka:
  client:
    #是否將自己注冊到Eureka Server 默認為true
    register-with-eureka: true
    #是否從EurekaServer抓取已有的注冊信息,默認為true,單節點無所謂,集群必須設置true才能配合ribbon做負載均衡
    fetch-registry: true
    service-url:
      #設置eureka server交互的地址查詢服務和注冊服務都需要依賴這個地址
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2
    lease-expiration-duration-in-seconds: 5
    instance-id: send-8801.com
    prefer-ip-address: true

發送消息的接口:

public interface IMessageProvider {
    public String send();
}

發送消息的實現:

@EnableBinding(Source.class)//定義消息的推送管道
public class MessageProvider implements IMessageProvider {

    @Autowired
    private MessageChannel output;

    @Override
    public String send() {
        String serial = IdUtil.simpleUUID();
        System.out.println(serial+"============");
        output.send(MessageBuilder.withPayload(serial).build());
        return serial;
    }
}

controller

@RestController
public class SendController {

    @Autowired
    private IMessageProvider messageProvider;

    @GetMapping("sendMessage")
    public String sendMessage(){
        return messageProvider.send();
    }
}

調用接口,觀察效果

Stream消息消費者

配置yml

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      bindings:
        input:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit
      binders:
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.10.132
                port: 5672
                username: guest
                password: guest

eureka:
  client:
    #是否將自己注冊到Eureka Server 默認為true
    register-with-eureka: true
    #是否從EurekaServer抓取已有的注冊信息,默認為true,單節點無所謂,集群必須設置true才能配合ribbon做負載均衡
    fetch-registry: true
    service-url:
      #設置eureka server交互的地址查詢服務和注冊服務都需要依賴這個地址
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2
    lease-expiration-duration-in-seconds: 5
    instance-id: receive-8802.com
    prefer-ip-address: true

接收消息:

@EnableBinding(Sink.class)
@RestController
public class ReceiveMessageListenerController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(value = Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消息:"+message.getPayload()+"serverPort:"+serverPort);
    }

}

stream重復消費

下默認配置中,多個消費者存在,會存在重復消費問題
原因:默認分組group是不同的,組流水號不一樣,被認為是不同組,可以消費,所以要自定義配置分組。

yml配置:

  cloud:
    stream:
      bindings:
        input:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit
          group: wen.jie

通過配置后,兩個消費者被分配到一組,就不存在重復消費的問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM