springboot使用RocketMQ 發送接受消息


Apache RocketMQ(官網地址:http://rocketmq.apache.org)是由阿里巴巴集團開源的大型消息隊列,現在已經貢獻給了Apache開源基金會,同時是一個分布式消息傳遞和流媒體平台,具有低延遲、高性能、可靠性、萬億級容量和靈活的可擴展性。(Github官網地址:https://github.com/apache/rocketmq)

 

1. 加入RocketMQ依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>

2. 配置RocketMQ服務信息

xc:
  rocketmq2:
    topic-string: topic_string
    topic-user: topic_user
    string-consumer-group: string_consumer_group
    user-consumer-group: user_consumer_group
rocketmq:
  name-server: 172.19.25.168:9876
  producer:
    group: producer_group_test

3. 編寫生產者和消費者

@Service
@RocketMQMessageListener(consumerGroup = "${xc.rocketmq2.string-consumer-group}", topic = "${xc.rocketmq2.topic-string}")
public class StringConsumer implements RocketMQListener<String> {

    private Logger log = LoggerFactory.getLogger(this.getClass());

    @Override
    public void onMessage(String message) {
        log.info("消費字符串消息{}", message);
    }
}
@Service
@RocketMQMessageListener(consumerGroup = "${xc.rocketmq2.user-consumer-group}", topic = "${xc.rocketmq2.topic-user}")
public class UserConsumer implements RocketMQListener<User> {

    private Logger log = LoggerFactory.getLogger(this.getClass());

    @Override
    public void onMessage(User user) {
        log.info("消費用戶消息{}", user.getUsername());
    }
}
@RestController
@RequestMapping("rocketmq")
public class RocketmqController {

    private Logger log = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private XcRocketMq2 xcRocketMq2;

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 同步發送
     * 頁面訪問http://localhost:8080/rocketmq/sync
     *
     * @throws Exception
     */
    @GetMapping("sync")
    public void sync() {
        SendResult sendResult = rocketMQTemplate.syncSend(xcRocketMq2.getTopicString(), "Hello world!");
        log.info("同步發送字符串{}, 發送結果{}", xcRocketMq2.getTopicString(), sendResult);

        User user = new User();
        user.setId("1");
        user.setUsername("wusq");
        sendResult = rocketMQTemplate.syncSend(xcRocketMq2.getTopicUser(), user);
        log.info("同步發送對象{}, 發送結果{}", xcRocketMq2.getTopicUser(), sendResult);
    }

    /**
     * 異步發送
     * 頁面訪問http://localhost:8080/rocketmq/async
     *
     * @throws Exception
     */
    @GetMapping("async")
    public void async() {

        rocketMQTemplate.asyncSend(xcRocketMq2.getTopicString(), "Hello world!", new SendCallback() {
            @Override
            public void onSuccess(SendResult var1) {
                log.info("異步發送成功{}", var1);
            }

            @Override
            public void onException(Throwable var1) {
                log.info("異步發送失敗{}", var1);
            }
        });
    }

    /**
     * 單向發送
     * 頁面訪問http://localhost:8080/rocketmq/oneway
     *
     * @throws Exception
     */
    @GetMapping("oneway")
    public void oneway() {
        rocketMQTemplate.sendOneWay(xcRocketMq2.getTopicString(), "Hello world!");
        log.info("單向發送");
    }
}

 

源碼:https://gitee.com/caoyeoo0/xc-springboot/tree/mq%2FRocketMqStart/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM