Rocket MQ簡介


一、概念

1. 中間件:位於系統之間的服務

2. 消息中間件:消息隊列MQ,用於接收消息、存儲消息、轉發消息的中間件

3. Rocket MQ: 分布式的消息中間件,生產者、消費者、隊列都可以分布式

4. 基於Netty開發

 

二、RocketMQ使用

1. 在服務器上安裝Rocket MQ

2. 啟動rocket mq,即name server,啟動之后監聽端口,等待broker\producer\consumer連接

3. 啟動broker, 設置對應的name server,broker用於收取和存儲消息

4. 手動/自動創建Topic

5. 消費者代碼

public class Consumer {
 
    public static void main(String[] args) throws InterruptedException, MQClientException {
    
        //聲明並初始化一個consumer
        //需要一個consumer group名字作為構造方法的參數,這里為consumer1
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
 
        //同樣也要設置NameServer地址
        consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
 
        //這里設置的是一個consumer的消費策略
        //CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息
        //CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍
        //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
        //設置consumer所訂閱的Topic和Tag,*代表全部的Tag
        consumer.subscribe("TopicTest", "*");
 
        //設置一個Listener,主要進行消息的邏輯處理
        consumer.registerMessageListener(new MessageListenerConcurrently() {
 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
 
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                
                //返回消費狀態
                //CONSUME_SUCCESS 消費成功
                //RECONSUME_LATER 消費失敗,需要稍后重新消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
 
        //調用start()方法啟動consumer
        consumer.start();
 
        System.out.println("Consumer Started.");
    }
}

 6. 生產者代碼

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
 
        //聲明並初始化一個producer
        //需要一個producer group名字作為構造方法的參數,這里為producer1
        DefaultMQProducer producer = new DefaultMQProducer("producer1");
        
        //設置NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔
        //NameServer的地址必須有,但是也可以通過環境變量的方式設置,不一定非得寫死在代碼里
        producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
        
        //調用start()方法啟動一個producer實例
        producer.start();
 
        //發送10條消息到Topic為TopicTest,tag為TagA,消息內容為“Hello RocketMQ”拼接上i的值
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
                );
                
                //調用producer的send()方法發送消息
                //這里調用的是同步的方式,所以會有返回結果
                SendResult sendResult = producer.send(msg);
                
                //打印返回結果,可以看到消息發送的狀態以及一些相關信息
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
 
        //發送完消息之后,調用shutdown()方法關閉producer
        producer.shutdown();
    }
}

 

 

三、架構和名詞

1. NameServer:名稱服務器,為Producer和Consumer提供路由信息,用於管理Broker節點信息,記錄Broker與Topic的對應關系

2. ConsumerGroup:消費同一類消息的多個 consumer 實例組成一個消費者組

3. Topic:消息的邏輯分類,物理實現上,一個Topic由多個隊列組成

4. Message:消息,指定topic,有消息內容

5. Tag:標簽,是對Topic的進一步細化,可以用來過濾

6. Broker:消息服務器,就是MQ,分為Master和Slave節點,每個Broker與所有NameServer集群中所有節點建立連接,定時注冊Topic到所有的NameServer

7. Producer與一個NameServer建立長連接,定期從NameServer獲取Topic信息,向Broker發送消息

8. Consumer與一個NameServer建立長連接,定期從NameServer獲取Topic信息,從Broker消費消息

 

 

四、特性

1. 發布/訂閱,點對點(P2P)

2. 消息優先級:Rocket MQ沒有特意支持消息優先級,但可以配置優先級不同的兩個隊列

3. 消息順序:Rocket MQ嚴格保證消息順序,先進先出

4. 消息過濾:生產端和消費端都可以過濾,各有優缺點

5. 消息持久化:Rocket MQ以文件形式持久化

6. 消息可靠性:避免消息丟失,需要生產者、消費者和MQ隊列都保證

7. 消息延遲:Rocket MQ使用長輪詢pull方式,保證實時

8. 消息堆積:因為需要削峰填谷,需要支持消息堆積,億級別的消息堆積能力

9. 消息重試:消費失敗后,重新再消費一次

10. 每個消息必須投遞一次

11. 不運行重復的消息,需要業務保證冪等

12. 隊列大小,定期刪除數據 

13. 定時消息

14. 事務機制

 

五、消費者的消費模式

1. 集群消費:一條消息只會被投遞到一個consumer group下面的一個實例

2. 廣播消費:一條消息會被投遞到一個consumer group下面的所有實例

 

六、消費者獲取消息的模式

1. 推送模式:能及時消費

2. 拉取模式:可以主動控制拉取時機

 

七、Rocket MQ和其他消息隊列比較(ActiveMQ, RabbitMQ, ZeroMQ, Kafka),為什么選擇Rocket MQ?

1. 嚴格的順序消息

2. 億級消息堆積能力

3. Pull/Push消費模式

4. 歷經多次天貓雙十一海量消息考驗

 

 

 

 

參考:

https://www.cnblogs.com/lizhangyong/p/8978855.html

https://www.cnblogs.com/buyige/p/9378102.html

https://blog.csdn.net/tototuzuoquan/article/details/78325192

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM