RocketMQ生產者和消費者


一.導入依賴

   <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.1.0-incubating</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.1.1</version>
        </dependency>

二:生產者

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import redis.clients.jedis.Jedis;

public class Producer {
    public static void main(String[] args) throws MQClientException {
     
        //創建生產者並指定組
        DefaultMQProducer producer = new DefaultMQProducer("my-group");
        //指定服務地址
        producer.setNamesrvAddr("192.168.118.3:9876;192.168.118.4:9876");
        //創建生產者實例
        producer.setInstanceName("producer");
        //啟動生成者
        producer.start();
        try {
            for (int i = 0; i < 10; i++) {
                Thread.sleep(1000); // 每秒發送一次MQ

                Message msg = new Message("my-topic", // topic 主題名稱
                        "TagA", // tag 臨時值
                        ("message-"+i).getBytes()// body 內容
                );

                SendResult sendResult = producer.send(msg);
                //打印消息的完整信息
                System.out.println(sendResult.toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        //發送完所有信息停掉生產者
        producer.shutdown();
    }
}

三.消費者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group");
        consumer.setNamesrvAddr("192.168.118.3:9876");
        consumer.setInstanceName("consumer");
        consumer.subscribe("my-topic", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

四:解決消息重復消費

在客戶端網絡延遲或者報錯的情況下導致消息無法成功簽收,其他的消費者能繼續監聽到這個消息,導致重復消費的情況

我們可以給沒一條消息一個獨一無二的標識,當作消息的keys,接受到消息之后,查看redis中有沒有這個key,如果有就代表重復消費了,反之正常執行業務邏輯,先將keys放到redis中,防止其他消費者進行重復消費

,在所有操作執行完之后將keys重redis中刪除

消費者代碼

package com.yjc.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import redis.clients.jedis.Jedis;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        Jedis jedis=new Jedis("192.168.118.3",6379);
        jedis.auth("admin");
        jedis.select(0);
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group");
        consumer.setNamesrvAddr("192.168.118.3:9876;192.168.118.4");
        consumer.setInstanceName("consumer");
        consumer.subscribe("my-topic3", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                   System.out.println("監聽到消息");
                    //查看此消息的key是否在緩存中
                    if (jedis.exists(msg.getKeys())) {
                        System.out.println("重復消費!");
                        //重復消費的情況簽收失敗
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    jedis.set(msg.getKeys(),new String(msg.getBody()));
                    System.out.println("放到緩存中");
                    //模擬出錯
                    int a=5/0;
                    System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
                    //消費完從緩存中刪除
                    jedis.del(msg.getKeys());
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM