Rocketmq使用 生產者、push消費者和pull消費者


maven引用

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.0</version>
        </dependency>
        <!--一個好用的工具包,可以不引入-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.3.0</version>
        </dependency>

 

說明

  不管是生產者還是消費者,都有很多參數可以配置,rocketmq命名比較好,基本可以從參數名上判斷具體作用,還有注釋可以看。

下面例子中只給出了常用的一些參數設置。更多參數可自行探索。

 

 簡單生產者實現

注意:

1、NamesrvAddr參數在多個節點時,用英文分號分隔,例: 192.168.9.58:9876;192.168.9.59:9876

import cn.hutool.core.util.RandomUtil;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("127.0.0.1:9876");
        //發送超時時間,默認3000 單位ms
        producer.setSendMsgTimeout(5000);
        producer.start();

        try {
            Message msg = new Message("TestTopic",// topic
                    "177",                       // tag 可以為空,用以簡單的篩選。
                    RandomUtil.randomString(8),  // key 可以為空,可用以查詢。
                    ("test" + RandomUtil.randomString(8)).getBytes());    // body ,我常將對象轉json再獲取byte[] 進行傳輸。
            SendResult send = producer.send(msg);
            if (send.getSendStatus().equals(SendStatus.SEND_OK)) {
                //發送成功處理
            }else {
                //發送失敗處理
            }
        } catch (Exception e) {
            //發送失敗處理
            e.printStackTrace();
        }
        //正式環境不要發完就就shutdown,要在應用退出時再shutdown。
        producer.shutdown();
    }
}

多線程加批量生產者模擬實現

注意:

1、批量發送時,topic必須為同一個,否則發送會報異常。

2、批量發送相較於單條發送速度提升很大。

import cn.hutool.core.util.RandomUtil;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("127.0.0.1:9876");
        //發送超時時間,默認3000 單位ms
        producer.setSendMsgTimeout(5000);
        producer.start();

        int threadCount = 20;
        int forCount = 100000;
        CountDownLatch latch = new CountDownLatch(threadCount);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    List<Message> list = new ArrayList<>();
                    for (int j = 0; j < forCount; j++) {
                        try {
                            Message msg = new Message("TestTopic",// topic
                                    "177",                       // tag
                                    RandomUtil.randomString(8),                       // key
                                    ("test" + RandomUtil.randomString(8)).getBytes());    // body
                            list.add(msg);
                            if (list.size() >= 100) {
                                SendResult send = producer.send(list);
                                if (send.getSendStatus().equals(SendStatus.SEND_OK)) {
                                    //發送成功處理
                                    list.clear();
                                }else {
                                    //發送失敗處理
                                }
                            }
                        } catch (Exception e) {
                            //發送失敗處理
                            e.printStackTrace();
                        }
                    }
                    if (list.size() > 0) {
                        SendResult send = producer.send(list);
                        if (!send.getSendStatus().equals(SendStatus.SEND_OK)) {
                            System.out.println(send);
                        }
                        list.clear();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            }).start();
        }
        latch.await();
        long hs = System.currentTimeMillis() - start;
        System.out.println(hs);

        long speed = (threadCount * forCount) / (hs >= 0 ? 1 : hs / 1000);
        System.out.println("速度" + speed);
        //正式環境不要發完就就shutdown,要在應用退出時再shutdown。
        producer.shutdown();
    }
}

push消費者

import cn.hutool.core.lang.Console;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

public class PushConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumerGroupName");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //一個GroupName第一次消費時的位置
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeThreadMin(20);
        consumer.setConsumeThreadMax(20);
        //要消費的topic,可使用tag進行簡單過濾
        consumer.subscribe("TestTopic", "*");
        //一次最大消費的條數
        consumer.setConsumeMessageBatchMaxSize(100);
        //消費模式,廣播或者集群,默認集群。
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //在同一jvm中 需要啟動兩個同一GroupName的情況需要這個參數不一樣。
        consumer.setInstanceName("InstanceName");
        //配置消息監聽
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            try {
                //業務處理
                msgs.forEach(msg -> {
                    Console.log(msg);
                });
            } catch (Exception e) {
                System.err.println("接收異常" + e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

pull消費者

從4.6之后,提供了DefaultLitePullConsumer  大大簡化了pull的操作。以下為新實現,4.6之前的版本不支持。

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Console;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class PullConsumer {
    private static boolean runFlag = true;
    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("PullConsumerGroupName");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //要消費的topic,可使用tag進行簡單過濾
        consumer.subscribe("TestTopic", "*");
        //一次最大消費的條數
        consumer.setPullBatchSize(100);
        //無消息時,最大阻塞時間。默認5000 單位ms
        consumer.setPollTimeoutMillis(5000);
        consumer.start();
        while (runFlag){
            try {
                //拉取消息,無消息時會阻塞 
                List<MessageExt> msgs = consumer.poll();
                if (CollUtil.isEmpty(msgs)){
                    continue;
                }
                //業務處理
                msgs.forEach(msg-> Console.log(new String(msg.getBody())));
                //同步消費位置。不執行該方法,應用重啟會存在重復消費。
                consumer.commitSync();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
        consumer.shutdown();
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM