一、RocketMQ 特點
RocketMQ 是阿里巴巴在2012年開源的分布式消息中間件,有點什么的就不多說了,就直接將怎么用。

RocketMq大概就是真么一種結構,具體的使用流程就是,消息生產者(Producer)將消息發布到消息中心,消費者(Consumer)啟動監聽,當監聽到消息時去消息中心拿消息。而NameServer的作用我的理解就類似一個注冊中心,將broker的信息提供給Produce和Consumer。
RocketMq天然支持集群,一個Producer集群擁有同一個groupId,同理,同一個Consumer集群也是有相同的GroupId,當消息的消費模式為集群模式時,同一個集群的消費者智能消費一次消息,即如果Consumer1消費了消息,這個集群中的其他消費者就不能再消費此消息,除非將消費模式設置為廣播模式。
廢話不多說,本地搭建一套RocketMq,實踐出真知。
二、本地部署RocketMq
1、RocketMq安裝包下載地址:http://rocketmq.apache.org/dowloading/releases/
下載后直接找到 mqnamesrv.cmd 和 mqbroker.cmd,推薦使用everything,非常好用的全局搜索軟件。直接百度搜就行,很小。不愛下載的可以從以下路徑找到:rocketMq->distribution->target->rocketMq->bin
2、啟動NameServer
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
啟動日志會打印在控制台,如下圖。

3、啟動broker
nohup sh bin/mqbroker -n localhost:9876 & tail -f ~/logs/rocketmqlogs/broker.log
同樣,日志如下圖

4、啟動Consumer
寫代碼之前首先引入依賴:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>

如上圖所示,這個依賴中已經包括slf4j和netty,所以說如果只為了練習,這一個依賴就夠了。
接着創建一個類,啟動main方法。
public class Consumer {
public static void main(String[] args) throws Exception {
//創建一個消息消費者,並設置一個消息消費者組
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("niwei_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
//設置 Consumer 第一次啟動時從隊列頭部開始消費還是隊列尾部開始消費
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//訂閱指定 Topic 下的所有消息
consumer.subscribe("topic_example_java", "*");
//注冊消息監聽器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
//默認 list 里只有一條消息,可以通過設置參數來批量接收消息
if (list != null) {
for (MessageExt ext : list) {
try {
System.out.println(new Date() + new String(ext.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 消費者對象在使用之前必須要調用 start 初始化
consumer.start();
System.out.println("消息消費者已啟動");
}
}
5、啟動Producer
代碼如下,記得引入包,然后啟動main方法。
public class Producer {
public static void main(String[] args) throws Exception {
//創建一個消息生產者,並設置一個消息生產者組
DefaultMQProducer producer = new DefaultMQProducer("niwei_producer_group");
//指定 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
//初始化 Producer,整個應用生命周期內只需要初始化一次
producer.start();
for (int i = 0; i < 100; i++) {
//創建一條消息對象,指定其主題、標簽和消息內容
Message msg = new Message(
"topic_example_java" /* 消息主題名 */,
"TagA" /* 消息標簽 */,
("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息內容 */
);
//發送消息並返回結果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
// 一旦生產者實例不再被使用則將其關閉,包括清理資源,關閉網絡連接等
producer.shutdown();
}
}
到此,部署並啟動完成,我們看到控制台消費者已經收到了消息。使用起來真的是非常的簡單,當然簡單的練習還是很容易,不過真正在項目里用起來還是會有很多坑,包括重復消費、消息丟失等等很多,還是要多用才能長經驗啊。

