Spring Cloud Stream之RocketMQ消息發送接收


Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.

SpringCloud 用於構建高度可擴展的基於事件驅動的微服務在消息系統中。該框架提供了一個靈活的編程模型,它建立在已經建立和熟悉的Spring習慣用法和最佳實踐之上,包括對持久發布/子語義、消費者組和有狀態分區的支持。 

Spring Cloud Stream消息訂閱發送

//消息通道
MessageChannel messageChannel = new DirectChannel();

//訂閱消息
// Message subscription
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
    @Override
    public void handleMessage(Message<? > message) throws MessagingException {
        System.out.println("receive msg: " + message.getPayload());
    }
});

// Message sending 消息發送
messageChannel.send(MessageBuilder.withPayload("simple msg").build());
代碼中的所有消息類型都是由spring-messaging模塊提供的,它屏蔽了消息中間件的底層實現。如果想要選用不同消息中間件,只需要在配置文件里面配置消息中間件信息和更換Binder 依賴。 以RocketMQ 消息隊列為例,Binder 只需要引入 RocketMQ Binder 的maven依賴 和 在 application.yml 中配置 spring.cloud.stream.rocketmq 信息。 

application.yml 配置

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        #具體配置的 RocketMQ 的消費者 Binding、生產者 Binding
      bindings:
        #接受管道信息
        input:
          destination: topic-product
          content-type: application/json
          group: distributed-group
        #輸入管道信息
        output:
          destination: topic-order
          content-type: application/json
          group: distributed-group
            kafaka:

maven示例

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>
如果想要更換為 Kafka 只需要導入 spring-cloud-stream-binder-kafka 依賴和配置 kafka 的 binder信息。 

消息發送和消息生產

以下代碼示例,說明創建消息生產者、消息消費者。Spring cloud Stream RocketMQ application.yml bindings配置文件如下:
server:
  port: 9200
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        #具體配置的 RocketMQ 的消費者 Binding、生產者 Binding
      bindings:
        Topic-product:
          destination: Topic-product
          content-type: application/json
          group: distributed-group2
        Topic-order:
          destination: Topic-order
          content-type: application/json
          group: distributed-group2
        Topic-stock:
          destination: Topic-stock
          content-type: application/json
          group: distributed-group2 

生產者消息發送

設置 Binder 自定義
/**
 * 自定義通道 Binder
 *
 */
public interface CustomChannelBinder {

    /**
     * 接收產品 Channel 消息
     *
     * @return
     */
    @Input("Topic-product")
    MessageChannel productInputChannel();

    /**
     * 發送消息到訂單 Channel 消息
     *
     * @return
     */
    @Output("Topic-order")
    MessageChannel sendToOrderChannel();


    /**
     * 發送消息到庫存 Channel 消息
     *
     * @return
     */
    @Output("Topic-stock")
    MessageChannel sendToStockChannel();
}
啟用自定義的 Binding 服務
/**
 * 分布式服務-產品服務
 *
 */
@EnableBinding({CustomChannelBinder.class})
@SpringBootApplication
public class ProductApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProductApplication.class);
    }
}
發送到指定的@Output("Topic-stock")
import com.github.shaylau.rocketmq.distributed.product.model.Product;
import com.github.shaylau.rocketmq.distributed.product.mq.binder.CustomChannelBinder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

/**
 */
@Service
public class ProductProducer {


    @Autowired
    private CustomChannelBinder channelBinder;

    /**
     * 發送消息到訂單
     * @param product
     */
    public void sentToStock(Product product) {
        Message<Product> message = MessageBuilder.withPayload(product).build();
        channelBinder.sendToStockChannel().send(message);
    }
}
消費者消息消費
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;

@Service
public class ProductConsumer {

    /**
     * 消費訂單消息
     *
     * @param object 訂單消息體
     */
    @StreamListener("Topic-order")
    public void consumerForOrder(Object object) {
        System.out.println("收到訂單消息:" + object);
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM