Java操作RocketMQ


第一步:導入依賴

 <dependency>
      <groupId>com.alibaba.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>3.0.10</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba.rocketmq</groupId>
      <artifactId>rocketmq-all</artifactId>
      <version>3.0.10</version>
      <type>pom</type>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.1.1</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-core</artifactId>
      <version>1.1.1</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.10</version>
      <scope>test</scope>
 </dependency>

  

第二步:創建生產者

package com.wish;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

public class Producer {

    public static void main(String[] args) throws MQClientException {
        //創建一個消息的生產者
        // producerGroup:一般發送同樣消息的Producer,歸為同一個Group,應用必須設置,並保證命名唯一
        DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
        //設置名稱srv地址
        producer.setNamesrvAddr("192.168.152.55:9876;192.168.152.66:9876");
        //實例名稱
        producer.setInstanceName("producer");
        //啟動
        producer.start();
        try {
            for (int i = 0; i < 10; i++) {
                Thread.sleep(1000); // 每秒發送一次MQ
                Message msg = new Message("itmayiedu-topic", // topic 主題名稱
                        "TagA", // tag 臨時值
                        ("itmayiedu-"+i).getBytes()// body 內容
                );
                //send()發送
                SendResult sendResult = producer.send(msg);
                //SendResult:發送消息結果
                System.out.println(sendResult.toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        //關掉
        producer.shutdown();
    }

}

  

第三步:創建消費者

package com.wish;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //創建一個消費者
        //consumerGroup:做同樣事情的Consumer歸為同一個Group,應用必須設置,並保證命名唯一
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
        //設置名稱srv地址
        consumer.setNamesrvAddr("192.168.152.55:9876;192.168.152.66:9876");
        //實例名稱
        consumer.setInstanceName("consumer");
        //實現訂閱
        consumer.subscribe("itmayiedu-topic", "TagA");
        //注冊消息監聽器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //啟動消費者
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

  

第四步:分別啟動消費者和生產者,查看瀏覽器

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM