一、概念
1. 中間件:位於系統之間的服務
2. 消息中間件:消息隊列MQ,用於接收消息、存儲消息、轉發消息的中間件
3. Rocket MQ: 分布式的消息中間件,生產者、消費者、隊列都可以分布式
4. 基於Netty開發
二、RocketMQ使用
1. 在服務器上安裝Rocket MQ
2. 啟動rocket mq,即name server,啟動之后監聽端口,等待broker\producer\consumer連接
3. 啟動broker, 設置對應的name server,broker用於收取和存儲消息
4. 手動/自動創建Topic
5. 消費者代碼
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { //聲明並初始化一個consumer //需要一個consumer group名字作為構造方法的參數,這里為consumer1 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1"); //同樣也要設置NameServer地址 consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876"); //這里設置的是一個consumer的消費策略 //CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息 //CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍 //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //設置consumer所訂閱的Topic和Tag,*代表全部的Tag consumer.subscribe("TopicTest", "*"); //設置一個Listener,主要進行消息的邏輯處理 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); //返回消費狀態 //CONSUME_SUCCESS 消費成功 //RECONSUME_LATER 消費失敗,需要稍后重新消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //調用start()方法啟動consumer consumer.start(); System.out.println("Consumer Started."); } }
6. 生產者代碼
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { //聲明並初始化一個producer //需要一個producer group名字作為構造方法的參數,這里為producer1 DefaultMQProducer producer = new DefaultMQProducer("producer1"); //設置NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔 //NameServer的地址必須有,但是也可以通過環境變量的方式設置,不一定非得寫死在代碼里 producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876"); //調用start()方法啟動一個producer實例 producer.start(); //發送10條消息到Topic為TopicTest,tag為TagA,消息內容為“Hello RocketMQ”拼接上i的值 for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body ); //調用producer的send()方法發送消息 //這里調用的是同步的方式,所以會有返回結果 SendResult sendResult = producer.send(msg); //打印返回結果,可以看到消息發送的狀態以及一些相關信息 System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } //發送完消息之后,調用shutdown()方法關閉producer producer.shutdown(); } }
三、架構和名詞
1. NameServer:名稱服務器,為Producer和Consumer提供路由信息,用於管理Broker節點信息,記錄Broker與Topic的對應關系
2. ConsumerGroup:消費同一類消息的多個 consumer 實例組成一個消費者組
3. Topic:消息的邏輯分類,物理實現上,一個Topic由多個隊列組成
4. Message:消息,指定topic,有消息內容
5. Tag:標簽,是對Topic的進一步細化,可以用來過濾
6. Broker:消息服務器,就是MQ,分為Master和Slave節點,每個Broker與所有NameServer集群中所有節點建立連接,定時注冊Topic到所有的NameServer
7. Producer與一個NameServer建立長連接,定期從NameServer獲取Topic信息,向Broker發送消息
8. Consumer與一個NameServer建立長連接,定期從NameServer獲取Topic信息,從Broker消費消息
四、特性
1. 發布/訂閱,點對點(P2P)
2. 消息優先級:Rocket MQ沒有特意支持消息優先級,但可以配置優先級不同的兩個隊列
3. 消息順序:Rocket MQ嚴格保證消息順序,先進先出
4. 消息過濾:生產端和消費端都可以過濾,各有優缺點
5. 消息持久化:Rocket MQ以文件形式持久化
6. 消息可靠性:避免消息丟失,需要生產者、消費者和MQ隊列都保證
7. 消息延遲:Rocket MQ使用長輪詢pull方式,保證實時
8. 消息堆積:因為需要削峰填谷,需要支持消息堆積,億級別的消息堆積能力
9. 消息重試:消費失敗后,重新再消費一次
10. 每個消息必須投遞一次
11. 不運行重復的消息,需要業務保證冪等
12. 隊列大小,定期刪除數據
13. 定時消息
14. 事務機制
五、消費者的消費模式
1. 集群消費:一條消息只會被投遞到一個consumer group下面的一個實例
2. 廣播消費:一條消息會被投遞到一個consumer group下面的所有實例
六、消費者獲取消息的模式
1. 推送模式:能及時消費
2. 拉取模式:可以主動控制拉取時機
七、Rocket MQ和其他消息隊列比較(ActiveMQ, RabbitMQ, ZeroMQ, Kafka),為什么選擇Rocket MQ?
1. 嚴格的順序消息
2. 億級消息堆積能力
3. Pull/Push消費模式
4. 歷經多次天貓雙十一海量消息考驗
參考:
https://www.cnblogs.com/lizhangyong/p/8978855.html
https://www.cnblogs.com/buyige/p/9378102.html
https://blog.csdn.net/tototuzuoquan/article/details/78325192