Spring Cloud Stream之RocketMQ消息发送接收


Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.

SpringCloud 用于构建高度可扩展的基于事件驱动的微服务在消息系统中。该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring习惯用法和最佳实践之上,包括对持久发布/子语义、消费者组和有状态分区的支持。 

Spring Cloud Stream消息订阅发送

//消息通道
MessageChannel messageChannel = new DirectChannel();

//订阅消息
// Message subscription
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
    @Override
    public void handleMessage(Message<? > message) throws MessagingException {
        System.out.println("receive msg: " + message.getPayload());
    }
});

// Message sending 消息发送
messageChannel.send(MessageBuilder.withPayload("simple msg").build());
代码中的所有消息类型都是由spring-messaging模块提供的,它屏蔽了消息中间件的底层实现。如果想要选用不同消息中间件,只需要在配置文件里面配置消息中间件信息和更换Binder 依赖。 以RocketMQ 消息队列为例,Binder 只需要引入 RocketMQ Binder 的maven依赖 和 在 application.yml 中配置 spring.cloud.stream.rocketmq 信息。 

application.yml 配置

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        #具体配置的 RocketMQ 的消费者 Binding、生产者 Binding
      bindings:
        #接受管道信息
        input:
          destination: topic-product
          content-type: application/json
          group: distributed-group
        #输入管道信息
        output:
          destination: topic-order
          content-type: application/json
          group: distributed-group
            kafaka:

maven示例

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>
如果想要更换为 Kafka 只需要导入 spring-cloud-stream-binder-kafka 依赖和配置 kafka 的 binder信息。 

消息发送和消息生产

以下代码示例,说明创建消息生产者、消息消费者。Spring cloud Stream RocketMQ application.yml bindings配置文件如下:
server:
  port: 9200
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        #具体配置的 RocketMQ 的消费者 Binding、生产者 Binding
      bindings:
        Topic-product:
          destination: Topic-product
          content-type: application/json
          group: distributed-group2
        Topic-order:
          destination: Topic-order
          content-type: application/json
          group: distributed-group2
        Topic-stock:
          destination: Topic-stock
          content-type: application/json
          group: distributed-group2 

生产者消息发送

设置 Binder 自定义
/**
 * 自定义通道 Binder
 *
 */
public interface CustomChannelBinder {

    /**
     * 接收产品 Channel 消息
     *
     * @return
     */
    @Input("Topic-product")
    MessageChannel productInputChannel();

    /**
     * 发送消息到订单 Channel 消息
     *
     * @return
     */
    @Output("Topic-order")
    MessageChannel sendToOrderChannel();


    /**
     * 发送消息到库存 Channel 消息
     *
     * @return
     */
    @Output("Topic-stock")
    MessageChannel sendToStockChannel();
}
启用自定义的 Binding 服务
/**
 * 分布式服务-产品服务
 *
 */
@EnableBinding({CustomChannelBinder.class})
@SpringBootApplication
public class ProductApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProductApplication.class);
    }
}
发送到指定的@Output("Topic-stock")
import com.github.shaylau.rocketmq.distributed.product.model.Product;
import com.github.shaylau.rocketmq.distributed.product.mq.binder.CustomChannelBinder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

/**
 */
@Service
public class ProductProducer {


    @Autowired
    private CustomChannelBinder channelBinder;

    /**
     * 发送消息到订单
     * @param product
     */
    public void sentToStock(Product product) {
        Message<Product> message = MessageBuilder.withPayload(product).build();
        channelBinder.sendToStockChannel().send(message);
    }
}
消费者消息消费
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;

@Service
public class ProductConsumer {

    /**
     * 消费订单消息
     *
     * @param object 订单消息体
     */
    @StreamListener("Topic-order")
    public void consumerForOrder(Object object) {
        System.out.println("收到订单消息:" + object);
    }
}


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM