消息中間件的功能:
通過學習ActiveMq,kafka,rabbitMq這些消息中間件,我們大致能為消息中間件的功能做一下以下定義:可以先從基本的需求開始思考
- 最基本的是要能支持消息的發送和接收,需要涉及到網絡通信就一定會涉及到NIO
- 消息中心的消息存儲(持久化/非持久化)
- 消息的序列化和反序列化
- 是否跨語言
- 消息的確認機制,如何避免消息重發
高級功能:
- 消息的有序性
- 是否支持事務消息
- 消息收發的性能,對高並發大數據量的支持
- 是否支持集群
- 消息的可靠性存儲
- 是否支持多協議
MQ消息存儲選擇:
從主流的幾種MQ消息隊列采用的存儲方式來看,主要會有三種
- 分布式KV存儲,比如ActiveMQ中采用的levelDB、Redis, 這種存儲方式對於消息讀寫能力要求不高的情況下可以使用
- 文件系統存儲,常見的比如kafka、RocketMQ、RabbitMQ都是采用消息刷盤到所部署的機器上的文件系統來做持久化,這種方案適合對於有高吞吐量要求的消息中間件,因為消息刷盤是一種高效率,高可靠、高性能的持久化方式,除非磁盤出現故障,否則一般是不會出現無法持久化的問題
- 關系型數據庫,比如ActiveMQ可以采用mysql作為消息存儲,關系型數據庫在單表數據量達到千萬級的情況下IO性能會出現瓶頸,所以ActiveMQ並不適合於高吞吐量的消息隊列場景。
總的來說,對於存儲效率,文件系統要優於分布式KV存儲,分布式KV存儲要優於關系型數據庫.
RocketMQ的發展歷史:
RocketMq是一個由阿里巴巴開源的消息中間件, 2012年開源,2017年成為apache頂級項目。它的核心設計借鑒了Kafka,所以我們在了解RocketMQ的時候,會發現很多和kafka相同的特性。同時呢,Rocket在某些功能上和kafka又有較大的差異,接下來我們就去了解RocketMQ
- 支持集群模型、負載均衡、水平擴展能力
- 億級別消息堆積能力
- 采用零拷貝的原理,順序寫盤,隨機讀
- 底層通信框架采用Netty NIO
- NameServer代替Zookeeper,實現服務尋址和服務協調
- 消息失敗重試機制、消息可查詢
- 強調集群無單點,可擴展,任意一點高可用,水平可擴展
- 經過多次雙十一的考驗
RocketMQ的架構:

集群本身沒有什么特殊之處,和kafka的整體架構類似,其中zookeeper替換成了NameServer。在rocketmq的早版本(2.x)的時候,是沒有namesrv組件的,用的是zookeeper做分布式協調和服務發現,但是后期阿里數據根據實際業務需求進行改進和優化,自主研發了輕量級的namesrv,用於注冊Client服務與Broker的請求路由工作,namesrv上不做任何消息的位置存儲,頻繁操作zookeeper的位置存儲數據會影響整體集群性能.
RocketMQ由四部分組成:
- Name Server 可集群部署,節點之間無任何信息同步。提供輕量級的服務發現和路由
- Broker(消息中轉角色,負責存儲消息,轉發消息) 部署相對復雜,Broker 分為Master 與Slave,一個Master 可以對應多個Slave,但是一個Slave 只能對應一個Master,Master 與Slave 的對應關系通過指定相同的BrokerName,不同的BrokerId來定 義,BrokerId為0 表示Master,非0 表示Slave。Master 也可以部署多個。
- Producer,生產者,擁有相同 Producer Group 的 Producer 組成一個集群, 與Name Server 集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server 取Topic 路由信息,並向提供Topic服務的Master 建立長連接,且定時向Master 發送心跳。Producer 完全無狀態,可集群部署。
- Consumer,消費者,接收消息進行消費的實例,擁有相同 Consumer Group 的 Consumer 組成一個集群,與Name Server 集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server 取Topic 路由信息,並向提供Topic 服務的Master、Slave 建立長連接,且定時向Master、Slave 發送心跳。Consumer既可以從Master 訂閱消息,也可以從Slave 訂閱消息,訂閱規則由Broker 配置決定。
要使用rocketmq,至少需要啟動兩個進程,nameserver、broker,前者是各種topic注冊中心,后者是真正的broker。
單機環境RocketMQ的安裝(單master):
下載 rocketmq的安裝文件: http://rocketmq.apache.org
解壓 unzip rocketmq-all-4.4.0-bin-release.zip
啟動 nameserver:
進入rocketMQ解壓目錄下的bin文件夾,啟動namesrv服務:nohup sh mqnamesrv & tail -f ~/logs/rocketmqlogs/namesrv.log 查看啟動日志
停止 nameserver : sh bin/mqshutdown namesrv . 停止服務的時候需要注意,要先停止broker,其次停止nameserver。
默認情況下,nameserver監聽的是 9876 端口。查看日志內容出現如下信息即啟動成功:

啟動 broker:
nohup sh bin/mqbroker -n ${namesrvIp}:9876 -c /conf/broker.conf & 其中[-c可以指定broker.conf配置文件]。默認情況下會加載conf/broker.conf
停止broker :sh bin/mqshutdown broker
- nohup sh mqbroker -n localhost:9876 & 啟動broker,其中-n表示指定當前broker對應的命名服務地址: 默認情況下,broker監聽的是10911端口 。
- 輸入 tail -f ~/logs/rocketmqlogs/broker.log 查看日志
- 如果 tail -f ~/logs/rocketmqlogs/broker.log 提示找不到文件,則打開當前目錄下的 nohup.out日志文件查看,出現如下日志表示啟動失敗,提示內存無法分配

內存不足的問題:
這是因為bin 目錄下啟動 nameserv 與 broker 的 runbroker.sh 和 runserver.sh 文件中默認分配的內存太大,rocketmq比較耗內存,所以默認分配的內存比較大,而系統實際內存卻太小導致啟動失敗,通常像虛擬機上安裝的 CentOS 服務器內存可能是沒有高的,只能調小。實際中應該根據服務器內存情況,配置一個合適的值 ,我這里設置成1g。
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m" Xms:是指設定程序啟動時占用內存大小。一般來講,大點,程序會啟動的快一點,但是也可能會導致機器暫時間變慢。 Xmx:是指設定程序運行期間最大可占用的內存大小。如果程序運行需要占用更多的內存,超出了這個設置值,就會拋出OutOfMemory異常。 xmn:年輕代的heap大小,一般設置為Xmx的3、4分之一
修改后重新啟動,輸入 tail -f ~/logs/rocketmqlogs/broker.log 查看日志:

在這里我們會發現,這個broker所監聽的IP地址似乎不是我localhost,這個會導致后續我們會連接不上broker,我們需要修改配置文件 broker.conf 增加一行配置 brokerIP1=192.168.1.101,然后重新啟動 nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &,再查看日志:

broker.conf 文件 基本配置:
- namesrvAddr :nameserver地址
- brokerClusterName = DefaultCluster:Cluster名稱,如果集群機器數比較多,可以分成多個cluster,每個cluster提供給不同的業務場景使用
- brokerName = broker-a:broker名稱,如果配置主從模式,master和slave需要配置相同的名稱來表明關系
- brokerId = 0:在主從模式中,一個master broker可以有多個slave,0表示master,大於0表示不同slave的id
- deleteWhen = 04:刪除文件時間點,默認是凌晨4點
- fileReservedTime = 48:文件保留時間,默認48小時
- brokerRole = ASYNC_MASTER: SYNC_MASTER/ASYNC_MASTER/SLAVE ; 同步表示slave和master消息同步完成后再返回信息給客戶端
- flushDiskType = ASYNC_FLUSH:刷盤方式
- autoCreateTopicEnable = true : topic不存在的情況下自動創建
- brokerIP1 ip ip設置外網ip,不需要連接外網的話,可以在參數前面加#注釋掉
- listenPort port port可自由設置,一般設置10911
- brokerPermission 0x4|0x2 broker讀寫權限
- defaultTopicQueueNums 8 默認topic讀寫隊列數
- clusterTopicEnable true 是否啟用集群topic
- brokerTopicEnable true 是否啟用brokertopic
- autoCreateSubscriptionGroup TRUE 是否允許Broker 自動創建訂閱組,建議線下開啟,線上關閉
- sendMessageThreadPoolNums 1 發送消息線程池數量
- storePathConsumeQueue $HOME/store/consumequeue 消費隊列存儲路徑
- storePathIndex $HOME/store/index 消息索引存儲路徑
- storeCheckpoint $HOME/store/checkpoint checkpoint 文件存儲路徑
- abortFile $HOME/store/abort abort 文件存儲路徑
消息發送和接收基本應用:
1.添加 pom 依賴:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency>
2.生產者 producer:
public class RocketMqProducer { public static void main(String[] args) throws MQClientException, InterruptedException { /* *生產者組,簡單來說就是多個發送同一類消息的生產者稱之為一個生產者組 *rocketmq支持事務消息,在發送事務消息時,如果事務消息異常(producer掛了),broker端會來回查 *事務的狀態,這個時候會根據group名稱來查找對應的producer來執行相應的回查邏輯。相當於實現了producer的高可用 */ DefaultMQProducer producer = new DefaultMQProducer("unique_producer_group_name");
producer.setDefaultTopicQueueNums(3);//設置默認的queue數量 //指定namesrv服務地址,獲取broker相關信息 producer.setNamesrvAddr("192.168.1.101:9876"); producer.start(); for (int i = 0; i < 10; i++) { try { //創建一個消息實例,指定指定topic、tag、消息內容 Message msg = new Message("testTopic", "testTag", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */); //發送消息並且獲取發送結果 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
SendResult中,有一個sendStatus狀態,表示消息的發送狀態。一共有四種狀態:
- FLUSH_DISK_TIMEOUT : 表示沒有在規定時間內完成刷盤(需要Broker 的刷盤策Ill創立設置成SYNC_FLUSH 才會報這個錯誤) 。
- FLUSH_SLAVE_TIMEOUT :表示在主備方式下,並且Broker 被設置成SYNC_MASTER 方式,沒有在設定時間內完成主從同步。
- SLAVE_NOT_AVAILABLE : 這個狀態產生的場景和FLUSH_SLAVE_TIMEOUT 類似, 表示在主備方式下,並且Broker 被設置成SYNC_MASTER ,但是沒有找到被配置成Slave 的Broker 。
- SEND_OK :表示發送成功,發送成功的具體含義,比如消息是否已經被存儲到磁盤?消息是否被同步到了Slave 上?消息在Slave 上是否被寫入磁盤?需要結合所配置的刷盤策略、主從策略來定。這個狀態還可以簡單理解為,沒有發生上面列出的三個問題狀態就是SEND OK
3.消費者consumer:
public class RocketMqConsumer { public static void main(String[] args) throws MQClientException { //消費者的組名,這個和kafka是一樣,這里需要注意的是, DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_consumer_group_name"); //指定NameServer地址,多個地址以 ; 隔開 consumer.setNamesrvAddr("192.168.1.101:9876"); //設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 //如果非第一次啟動,那么按照上次消費的位置繼續消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //訂閱PushTopic下Tag為push的消息 consumer.subscribe("testTopic", "*"); //*表示不過濾,可以通過tag來過濾,比如:”tagA” /* * 注冊消息監聽回調這里有兩種監聽,MessageListenerConcurrently以及MessageListenerOrderly * 前者是普通監聽,后者是順序監聽。這塊在后續單獨說明 */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//返回消息消費狀態 } }); consumer.start(); System.out.printf("Consumer Started.%n"); }
Rocketmq中支持廣播消息,就意味着同一個group中的消費者可以消費同一個消息。
consumerGroup:位於同一個consumerGroup中的consumer實例和producerGroup中的各個produer實例承擔的角色類似;同一個group中可以配置多個consumer,可以提高消費端的並發消費能力以及容災,和kafka一樣,多個consumer會對消息做負載均衡,意味着同一個topic下的不同messageQueue會分發給同一個group中的不同consumer。同時,如果我們希望消息能夠達到廣播的目的,那么只需要把consumer加入到不同的group就行。
RocketMQ提供了兩種消息消費模型,一種是pull主動拉去,另一種是push,被動接收。但實際上RocketMQ都是pull模式,只是push在pull模式上做了一層封裝,也就是pull到消息以后觸發業務消費者注冊到這里的callback. RocketMQ是基於長輪訓來實現消息的pull。
nameServer的地址:name server地址,用於獲取broker、topic信息。
SpringBoot整合RocketMq:
1.pom.xml:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
2.application.yml :
rocketmq: # 生產者配置 producer: isOnOff: on # 發送同一類消息的設置為同一個group,保證唯一 groupName: unique_producer_group_name # 服務地址 namesrvAddr: 192.168.1.101:9876 # 消息最大長度 默認1024*4(4M) maxMessageSize: 4096 # 發送消息超時時間,默認3000 sendMsgTimeout: 3000 # 發送消息失敗重試次數,默認2 retryTimesWhenSendFailed: 2 # 消費者配置 consumer: isOnOff: on # 官方建議:確保同一組中的每個消費者訂閱相同的主題。 groupName: unique_consumer_group_name # 服務地址 namesrvAddr: 192.168.1.101:9876 # 接收該 Topic 下所有 Tag topics: testTopic~*; consumeThreadMin: 20 consumeThreadMax: 64 # 設置一次消費消息的條數,默認為1條 consumeMessageBatchMaxSize: 1 # 配置 Group Topic Tag plat: plat-group: unique_group_name plat-topic: testTopic plat-tag: testTag
3. ProducerConfig 生產者配置:
@Configuration public class ProducerConfig { private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ; @Value("${rocketmq.producer.groupName}") private String groupName; @Value("${rocketmq.producer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.producer.maxMessageSize}") private Integer maxMessageSize ; @Value("${rocketmq.producer.sendMsgTimeout}") private Integer sendMsgTimeout; @Value("${rocketmq.producer.retryTimesWhenSendFailed}") private Integer retryTimesWhenSendFailed; @Bean public DefaultMQProducer defaultMQProducer() { DefaultMQProducer producer; producer = new DefaultMQProducer(this.groupName); producer.setNamesrvAddr(this.namesrvAddr); //如果需要同一個jvm中不同的producer往不同的mq集群發送消息,需要設置不同的instanceName if(this.maxMessageSize!=null){ producer.setMaxMessageSize(this.maxMessageSize); } if(this.sendMsgTimeout!=null){ producer.setSendMsgTimeout(this.sendMsgTimeout); } //如果發送消息失敗,設置重試次數,默認為2次 if(this.retryTimesWhenSendFailed!=null){ producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); } try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } return producer; } }
4.ConsumerConfig 消費者配置:
@Configuration public class ConsumerConfig { private static final Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ; @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.groupName}") private String groupName; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Value("${rocketmq.consumer.topics}") private String topics; @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}") private int consumeMessageBatchMaxSize; @Resource private RocketMsgListener msgListener; @Bean public DefaultMQPushConsumer defaultMQPushConsumer(){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.registerMessageListener(msgListener); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); try { String[] topicTagsArr = topics.split(";"); for (String topicTags : topicTagsArr) { String[] topicTag = topicTags.split("~"); consumer.subscribe(topicTag[0],topicTag[1]); } consumer.start(); }catch (MQClientException e){ e.printStackTrace(); } return consumer; } }
5.RocketMsgListener 監聽器:
@Component public class RocketMsgListener implements MessageListenerConcurrently { private static final Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ; @Resource private ParamConfigService paramConfigService ; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { if (CollectionUtils.isEmpty(list)){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } MessageExt messageExt = list.get(0); LOG.info("接受到的消息為:"+new String(messageExt.getBody())); int reConsume = messageExt.getReconsumeTimes(); // 消息已經重試了3次,如果不需要再次消費,則返回成功 if(reConsume ==3){ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } if(messageExt.getTopic().equals(paramConfigService.platTopic)){ String tags = messageExt.getTags() ; switch (tags){ case "testTag": LOG.info("匹配到testTag"+tags); break ; default: LOG.info("未匹配到Tag == >>"+tags); break; } } // 消息消費成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
6.參數配置類:
@Service public class ParamConfigService { @Value("${plat.plat-group}") public String platGroup ; @Value("${plat.plat-topic}") public String platTopic ; @Value("${plat.plat-tag}") public String accountTag ; //省略 get set }
7.測試類:
@RestController public class TestController { @Autowired private DefaultMQProducer defaultMQProducer; @Autowired private ParamConfigService paramConfigService; @RequestMapping(value = "/testStringQueue.json", method = {RequestMethod.GET}) public SendResult testStringQueue() { // 可以不使用Config中的Group defaultMQProducer.setProducerGroup(paramConfigService.platGroup); SendResult sendResult = null; String msgInfo = "rocketmq message 1"; try { Message sendMsg = new Message(paramConfigService.platTopic, paramConfigService.accountTag, msgInfo.getBytes()); sendResult = defaultMQProducer.send(sendMsg); } catch (Exception e) { e.printStackTrace(); } return sendResult; } }
啟動項目訪問接口就可以看到效果。
