Java操作RocketMQ


導入依賴

<dependency>
      <groupId>com.alibaba.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>3.0.10</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba.rocketmq</groupId>
      <artifactId>rocketmq-all</artifactId>
      <version>3.0.10</version>
      <type>pom</type>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.1.1</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-core</artifactId>
      <version>1.1.1</version>
    </dependency>

提供者

/**
 * 生產者
 */
public class Provider {
    public static void main(String[] args) throws MQClientException {
        //創建一個生產者
        DefaultMQProducer producer=new DefaultMQProducer("rmq-group");
        //設置NameServer地址
        producer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876");
        //設置生產者實例名稱
        producer.setInstanceName("producer");
        //啟動生產者
        producer.start();

        try {
            //發送消息
            for (int i=1;i<=10;i++){
                //模擬網絡延遲,每秒發送一次MQ
                Thread.sleep(1000);
                //創建消息,topic主題名稱  tags臨時值代表小分類, body代表消息體
                Message message=new Message("itmayiedu-topic","TagA",("itmayiedu-"+i).getBytes());
                //發送消息
                SendResult sendResult=producer.send(message);
                System.out.println("來了來了:"+sendResult.toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }
}

消費者

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //創建消費者
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
        //設置NameServer地址
        consumer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876");
        //設置實例名稱
        consumer.setInstanceName("consumer");
        //訂閱topic
        consumer.subscribe("itmayiedu-topic","TagA");

        //監聽消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //獲取消息
                for (MessageExt messageExt:list){
                    //RocketMQ由於是集群環境,所有產生的消息ID可能會重復
                    System.out.println(messageExt.getMsgId()+"---"+new String(messageExt.getBody()));
                }
                //接受消息狀態 1.消費成功    2.消費失敗   隊列還有
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //啟動消費者
        consumer.start();
        System.out.println("consumer Started!");
    }
}

控制台效果

提供者

 

 消費者

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM