消息隊列之RocketMq


一、RocketMQ 特點

RocketMQ 是阿里巴巴在2012年開源的分布式消息中間件,有點什么的就不多說了,就直接將怎么用。

 

 

 

RocketMq大概就是真么一種結構,具體的使用流程就是,消息生產者(Producer)將消息發布到消息中心,消費者(Consumer)啟動監聽,當監聽到消息時去消息中心拿消息。而NameServer的作用我的理解就類似一個注冊中心,將broker的信息提供給Produce和Consumer。

RocketMq天然支持集群,一個Producer集群擁有同一個groupId,同理,同一個Consumer集群也是有相同的GroupId,當消息的消費模式為集群模式時,同一個集群的消費者智能消費一次消息,即如果Consumer1消費了消息,這個集群中的其他消費者就不能再消費此消息,除非將消費模式設置為廣播模式。

廢話不多說,本地搭建一套RocketMq,實踐出真知。

二、本地部署RocketMq

1、RocketMq安裝包下載地址:http://rocketmq.apache.org/dowloading/releases/

下載后直接找到 mqnamesrv.cmd 和 mqbroker.cmd,推薦使用everything,非常好用的全局搜索軟件。直接百度搜就行,很小。不愛下載的可以從以下路徑找到:rocketMq->distribution->target->rocketMq->bin

2、啟動NameServer

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

啟動日志會打印在控制台,如下圖。

 

 

3、啟動broker

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log

 同樣,日志如下圖

 

4、啟動Consumer

寫代碼之前首先引入依賴:

 <dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-client</artifactId>
     <version>4.2.0</version>
  </dependency>

 

 

如上圖所示,這個依賴中已經包括slf4j和netty,所以說如果只為了練習,這一個依賴就夠了。

接着創建一個類,啟動main方法。

public class Consumer {

    public static void main(String[] args) throws Exception {
        //創建一個消息消費者,並設置一個消息消費者組
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("niwei_consumer_group");
        //指定 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        //設置 Consumer 第一次啟動時從隊列頭部開始消費還是隊列尾部開始消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //訂閱指定 Topic 下的所有消息
        consumer.subscribe("topic_example_java", "*");
        //注冊消息監聽器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                //默認 list 里只有一條消息,可以通過設置參數來批量接收消息
                if (list != null) {
                    for (MessageExt ext : list) {
                        try {
                            System.out.println(new Date() + new String(ext.getBody(), "UTF-8"));
                        } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                        }
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 消費者對象在使用之前必須要調用 start 初始化
        consumer.start();
        System.out.println("消息消費者已啟動");
    }
}

5、啟動Producer

代碼如下,記得引入包,然后啟動main方法。

public class Producer {

    public static void main(String[] args) throws Exception {
        //創建一個消息生產者,並設置一個消息生產者組
        DefaultMQProducer producer = new DefaultMQProducer("niwei_producer_group");

        //指定 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");

        //初始化 Producer,整個應用生命周期內只需要初始化一次
        producer.start();

        for (int i = 0; i < 100; i++) {
            //創建一條消息對象,指定其主題、標簽和消息內容
            Message msg = new Message(
                    "topic_example_java" /* 消息主題名 */,
                    "TagA" /* 消息標簽 */,
                    ("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息內容 */
            );

            //發送消息並返回結果
            SendResult sendResult = producer.send(msg);

            System.out.printf("%s%n", sendResult);
        }

        // 一旦生產者實例不再被使用則將其關閉,包括清理資源,關閉網絡連接等
        producer.shutdown();
    }
}

 

到此,部署並啟動完成,我們看到控制台消費者已經收到了消息。使用起來真的是非常的簡單,當然簡單的練習還是很容易,不過真正在項目里用起來還是會有很多坑,包括重復消費、消息丟失等等很多,還是要多用才能長經驗啊。

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM