rocketmq(三 java操作rocket API, rocketmq 冪等性)


  • JAVA操作rocketmq:

1.導入rocketmq所需要的依賴:

    <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>3.0.10</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-all</artifactId>
            <version>3.0.10</version>
            <type>pom</type>
        </dependency>

2.創建生產者

package com.example.producer;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("producer-group");
        producer.setNamesrvAddr("192.168.31.165:9876;192.168.31.144:9876");
        producer.setInstanceName("producer");
        producer.start();
        try {
            for (int i = 0; i < 10; i++) {
                // Thread.sleep(1000); // 每秒發送一次MQ
                Message msg = new Message("producer-topic", // topic 主題名稱
                        "msg", // pull 臨時值 在消費者消費的時候 可以根據msg類型進行消費
                        ("pushmsg-" + i).getBytes()// body 內容
                );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult.toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }

}

3.創建消費者

package com.example.consumer;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");

        consumer.setNamesrvAddr("192.168.31.165:9876;192.168.31.144:9876");
        consumer.setInstanceName("consumer");
        consumer.subscribe("producer-topic", "msg");//此處是根據Message對象的參數來獲取

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消息id:"+msg.getMsgId() + "---" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

}

 

4.運行結果:

 生產者運行結果:

 

消費者運行結果:

 

 

  • rocetmq冪等性問題:

在Activemq中 jms規范支持兩種消息模型:點對點和發布訂閱,在rocketmq中 有兩種消費模式:廣播消費,和集群消費。

在消費的過程中,如果消費者出現異常或者超時,導致mq沒有及時的相應消費的狀態,則可能讓mq重試,重試機制就有可能導致出現冪等性,而rocketmq的冪等性 只會出現在集群消費(類似activemq中的點對點消息模型)

生產者:

 

package com.example.producer;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("producer-group");
        producer.setNamesrvAddr("192.168.31.169:9876;192.168.31.177:9876");
        producer.setInstanceName("producer");
        producer.start();
        try {
            for (int i = 0; i < 10; i++) {
                Message msg = new Message("topic", // topic 主題名稱
                        "msg", // pull 臨時值 在消費者消費的時候 可以根據msg類型進行消費
                        (i + "條消息").getBytes()// body 內容
                );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult.toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }

}

 

消費者:

package com.example.consumer;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");

        consumer.setNamesrvAddr("192.168.31.169:9876;192.168.31.177:9876");
        consumer.setInstanceName("consumer");
        consumer.subscribe("topic1", "msg");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消息id:" + msg.getMsgId() + "---" + new String(msg.getBody()));
                }
                // 超時的情況 或者程序異常
                int i = 2 / 0;
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

}

 

消費結果:

消息id:C0A81FB100002A9F00000000000268EC---5條消息
消息id:C0A81FB100002A9F000000000002686E---4條消息
消息id:C0A81FA900002A9F0000000000037E6A---1條消息
消息id:C0A81FB100002A9F000000000002696A---6條消息
消息id:C0A81FB100002A9F00000000000269E8---7條消息
消息id:C0A81FA900002A9F0000000000038062---9條消息
消息id:C0A81FA900002A9F0000000000037EE8---2條消息
消息id:C0A81FA900002A9F0000000000037FE4---8條消息
消息id:C0A81FA900002A9F0000000000037F66---3條消息
消息id:C0A81FA900002A9F0000000000037DEC---0條消息
消息id:C0A81FA900002A9F0000000000038704---1條消息
消息id:C0A81FA900002A9F000000000003880C---9條消息
消息id:C0A81FA900002A9F0000000000038914---2條消息
消息id:C0A81FA900002A9F0000000000038A1C---0條消息
消息id:C0A81FA900002A9F0000000000038B24---3條消息
消息id:C0A81FA900002A9F0000000000038C2C---8條消息
消息id:C0A81FB100002A9F0000000000026E7E---4條消息
消息id:C0A81FB100002A9F0000000000026F86---7條消息
消息id:C0A81FB100002A9F0000000000027196---5條消息
消息id:C0A81FB100002A9F000000000002708E---6條消息

在Activimq中,可以通過消息id 來作為全局變量,檢測是不是重復消費。但是在rocketmq中消費重試的結果中,任意選出兩條相同的消息,可以看出 重試的時候消息id是不同的,此時在用消息id作為全局變量判斷是否重復消費顯然是不可能的。rocketmq中提供了一個消息的key,可以將業務id作為該key。例如:訂單號什么的。可以將消息設置的key 在第一次消費的時候存放到數據庫之中

 

冪等性消費者:

package com.example.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

public class Consumer {
    public static Map<String, String> map = new HashMap<String, String>();// 模擬內存,實際情況可以將key放在redis之中

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
        consumer.setNamesrvAddr("192.168.31.169:9876;192.168.31.177:9876");
        consumer.setInstanceName("consumer");
        consumer.subscribe("topic1", "msg");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    if (!map.containsKey(msg.getKeys())) {
                        // 如果此時的業務邏輯是將收到的消息存放到數據庫
                        System.out.println("消息id:" + msg.getMsgId() + "---" + new String(msg.getBody()));
                        map.put(msg.getKeys(), new String(msg.getBody()));
                    } else {
                        System.out.println("重復消費");
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                }
                // 超時的情況 或者程序異常
                int i = 2 / 0;
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM