RocketMQ原生API收發消息
pom文件
新建 maven 項目或 module,添加 rocketmq-client
依賴。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.tedu</groupId>
<artifactId>demo1</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-store</artifactId>
<version>4.7.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
同步消息
同步消息發送要保證強一致性,發到master的消息向slave復制后,才會向生產者發送反饋信息。
這種可靠性同步地發送方式使用的比較廣泛,比如:重要的消息通知,短信通知。
生產者
package demo1;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.Scanner;
/*
發送同步消息
*/
public class Producer {
public static void main(String[] args) throws Exception {
/*
group 相同的生產者成為一個生產者組
標識發送同一類消息的Producer,通常發送邏輯一致。
發送普通消息的時候,僅標識使用,並無特別用處。
若發送事務消息,發送某條消息的producer-A宕機,
使得事務消息一直處於PREPARED狀態並超時,
則broker會回查同一個group的其他producer,
確認這條消息應該commit還是rollback。
但開源版本並不完全支持事務消息(閹割了事務回查的代碼)。?????
*/
DefaultMQProducer p = new DefaultMQProducer("producer-demo1");
/*
連接nameserver集群, 獲得注冊的broker信息
*/
p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
p.start();
/*
主題相當於是消息的分類, 一類消息使用一個主題
*/
String topic = "Topic1";
/*
tag 相當於是消息的二級分類, 在一個主題下, 可以通過 tag 再對消息進行分類
*/
String tag = "TagA";
while (true) {
System.out.print("輸入消息,用逗號分隔多條消息: ");
String[] a = new Scanner(System.in).nextLine().split(",");
for (String s : a) {
Message msg = new Message(topic, tag, s.getBytes()); //一級分類, 二級分類, 消息內容
SendResult r = p.send(msg);// 發送消息后會得到服務器反饋, 包含: smsgId, sendStatus, queue, queueOffset, offsetMsgId
System.out.println(r);
}
}
}
}
消費者
消費者的要點:
1. push 和 pull
消費者有兩種模式:push 和 pull。
push 模式由服務器主動向消費者發送消息;pull 模式由消費者主動向服務器請求消息。
在消費者處理能力有限時,為了減輕消費者的壓力,可以采用pull模式。多數情況下都采用 pull 模式。
2. NameServer
消費者需要向 NameServer 詢問 Topic 的路由信息。
3. Topic
從指定的Topic接收消息。Topic相當於是一級分類。
4. Tag
Topic 相當於是一級分類,Tag 相當於是2級分類。
- 多個 Tag 可以這樣寫:
TagA || TagB || TagC
- 不指定 Tag,或者說接收所有的 Tag,可以寫星號:
*
package demo1;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
/*
標識一類Consumer的集合名稱,
這類Consumer通常消費一類消息,且消費邏輯一致。
同一個Consumer Group下的各個實例將共同消費
topic的消息,起到負載均衡的作用。
消費進度以Consumer Group為粒度管理,不同
Consumer Group之間消費進度彼此不受影響,
即消息A被Consumer Group1消費過,也會再
給Consumer Group2消費。
注: RocketMQ要求同一個Consumer Group的
消費者必須要擁有相同的注冊信息,即必須要聽一樣
的topic(並且tag也一樣)。
*/
DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo1");
c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
c.subscribe("Topic1", "TagA");
c.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext ctx) {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()) + " - " + msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
c.start();
System.out.println("開始消費數據");
}
}
異步消息
master 收到消息后立即向生產者進行反饋。之后再以異步方式向 slave 復制消息。
異步消息通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間地等待Broker的響應。
生產者
package demo2;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.Scanner;
/*
異步發送消息
一條消息送出后, 不必暫停等待服務器針對這條消息的反饋, 而是可以立即發送后續消息.
使用監聽器, 以異步的方式接收服務器的反饋
*/
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer p = new DefaultMQProducer("producer-demo2");
p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
p.start();
p.setRetryTimesWhenSendAsyncFailed(0);
String topic = "Topic2";
String tag = "TagA";
String key = "Key-demo2";
while (true) {
System.out.print("輸入消息,用逗號分隔多條消息: ");
String[] a = new Scanner(System.in).nextLine().split(",");
for (String s : a) {
Message msg = new Message(topic, tag, key, s.getBytes());
p.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("\n\n消息發送成功 : "+sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("\n\n消息發送失敗");
}
});
System.out.println("--------------------消息已送出-----------------------");
}
}
}
}
消費者
package demo2;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/*
與 demo1.Consumer 完全相同
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo2");
c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
c.subscribe("Topic2", "TagA");
c.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()) + " - " + msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
c.start();
System.out.println("開始消費數據");
}
}
單向消息
這種方式主要用在不特別關心發送結果的場景,例如日志發送。
生產者
package demo3;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.Scanner;
/*
單向消息
消息發出后, 服務器不會返回結果
*/
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer p = new DefaultMQProducer("producer-demo3");
p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
p.start();
String topic = "Topic3";
String tag = "TagA";
while (true) {
System.out.print("輸入消息,用逗號分隔多條消息: ");
String[] a = new Scanner(System.in).nextLine().split(",");
for (String s : a) {
Message msg = new Message(topic, tag, s.getBytes());
p.sendOneway(msg);
}
System.out.println("--------------------消息已送出-----------------------");
}
}
}
消費者
package demo3;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/*
與 demo1.Consumer 完全相同
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo2");
c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
c.subscribe("Topic3", "TagA");
c.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()) + " - " + msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
c.start();
System.out.println("開始消費數據");
}
}
順序消息
上圖演示了 Rocketmq 順序消息的基本原理:
- 同一組有序的消息序列,會被發送到同一個隊列,按照 FIFO 的方式進行處理
- 一個隊列只允許一個消費者線程接收消息,這樣就保證消息按順序被接收
下面以訂單為例:
一個訂單的順序流程是:創建、付款、推送、完成。訂單號相同的消息會被先后發送到同一個隊列中。消費時,從同一個隊列接收同一個訂單的消息。
生產者
package demo4;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.List;
import java.util.Scanner;
/*
以下消息, 相同id的消息按順序發送到同一個隊列,
消費時也從同一個隊列按順序消費
topic
======================= queue1
======================= queue2
111,消息1 111,消息2 111,消息3 ------->======================= queue3
======================= queue4
222,消息1 222,消息2 222,消息3 ------->======================= queue5
======================= queue6
333,消息1 333,消息2 333,消息3 ------->======================= queue7
======================= queue8
......
*/
public class Producer {
static String[] msgs = {
"15103111039,創建",
"15103111065,創建",
"15103111039,付款",
"15103117235,創建",
"15103111065,付款",
"15103117235,付款",
"15103111065,完成",
"15103111039,推送",
"15103117235,完成",
"15103111039,完成"
};
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer p = new DefaultMQProducer("producer-demo4");
p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
p.start();
String topic = "Topic4";
String tag = "TagA";
for (String s : msgs) {
System.out.println("按回車發送此消息: "+s);
new Scanner(System.in).nextLine();
Message msg = new Message(topic, tag, s.getBytes());
String[] a = s.split(",");
long orderId = Long.parseLong(a[0]);
/*
MessageQueueSelector用來選擇發送的隊列,
這里用訂單的id對隊列數量取余來計算隊列索引
send(msg, queueSelector, obj)
第三個參數會傳遞到queueSelector, 作為它的第三個參數
*/
SendResult r = p.send(msg, new MessageQueueSelector() {
/*
三個參數的含義:
queueList: 當前Topic中所有隊列的列表
message: 消息
o: send()方法傳入的orderId
*/
@Override
public MessageQueue select(List<MessageQueue> queueList, Message message, Object o) {
Long orderId = (Long) o;
//訂單id對隊列數量取余, 相同訂單id得到相同的隊列索引
long index = orderId % queueList.size();
System.out.println("消息已發送到: "+queueList.get((int) index));
return queueList.get((int) index);
}
}, orderId);
System.out.println(r+"\n\n");
}
}
}
消費者
package demo4;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo4");
c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
c.subscribe("Topic4", "*");
c.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
String t = Thread.currentThread().getName();
for (MessageExt msg : list) {
System.out.println(t+" - "+ msg.getQueueId() + " - " +new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
c.start();
System.out.println("開始消費數據");
}
}
延時消息
消息發送到 Rocketmq 服務器后, 延遲一定時間再向消費者進行投遞。
延時消息的使用場景:
比如電商里,提交了一個訂單就可以發送一個延時消息,1h后去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。
生產者發送消息時,對消息進行延時設置:
msg.setDelayTimeLevel(3);
其中 3
代表級別而不是一個具體的時間值,級別和延時時長對應關系是在 MessageStoreConfig
類種進行定義的:
this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
對應關系表:
級別 | 延時時長 |
---|---|
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
生產者
package demo5;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.Scanner;
/*
延時消息
延時消息的使用場景
比如電商里,提交了一個訂單就可以發送一個延時消息,1h后去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。
*/
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer p = new DefaultMQProducer("producer-demo5");
p.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876");
p.start();
while (true) {
System.out.print("輸入消息,用逗號分隔多條消息: ");
String[] a = new Scanner(System.in).nextLine().split(",");
for (String s : a) {
Message msg = new Message("Topic5", s.getBytes());
/*
設置消息的延遲時間,這里不支持任意的時間,只支持18個固定的延遲時長,
分別用Leven 1到18 來表示:
org/apache/rocketmq/store/config/MessageStoreConfig.java
this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
*/
msg.setDelayTimeLevel(3);
p.send(msg);
}
}
}
}
消費者
package demo5;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo5");
c.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876");
c.subscribe("Topic5", "*");
c.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext ctx) {
System.out.println("------------------------------");
for (MessageExt msg : list) {
long t = System.currentTimeMillis() - msg.getBornTimestamp();
System.out.println(new String(msg.getBody()) + " - 延遲: "+t);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
c.start();
System.out.println("開始消費數據");
}
}
批量消息
批量發送消息能顯著提高傳遞小消息的性能。限制是這些批量消息應該有相同的topic,相同的waitStoreMsgOK,而且不能是延時消息。此外,這一批消息的總大小不應超過4MB。
生產者
package demo6;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.ArrayList;
import java.util.Scanner;
/*
批量發送消息能顯著提高傳遞小消息的性能。限制是:
- 這些批量消息應該有相同的topic,
- 相同的waitStoreMsgOK,
- 而且不能是延時消息。
- 這一批消息的總大小不應超過4MB。
如果超出4M需要進行數據分割, 請參考官方代碼樣例https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
*/
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer p = new DefaultMQProducer("producer-demo6");
p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
p.start();
String topic = "Topic6";
while (true) {
System.out.print("輸入消息,用逗號分隔多條消息: ");
String[] a = new Scanner(System.in).nextLine().split(",");
ArrayList<Message> messages = new ArrayList<>();
for (String s : a) {
messages.add(new Message(topic, s.getBytes()));
}
p.send(messages);
System.out.println("批量消息已發送");
}
}
}
消費者
package demo6;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo6");
c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
c.subscribe("Topic6", "*");
c.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg :
list) {
System.out.println("收到: "+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
c.start();
System.out.println("開始消費數據");
}
}
消息過濾
Tag 過濾
Tag 可以滿足大多數消息過濾的需求。使用 Tag 過濾非常簡單,例如:
consumer.subscribe("Topic1", "TagA || TagB || TagC");
對自定義屬性過濾
生產者可以在消息中添加自定義的屬性:
msg.putUserProperty("prop1", "1");
msg.putUserProperty("prop2", "2");
消費者接收數據時,可以根據屬性來過濾消息:
consumer.subscribe("Topic7", MessageSelector.bySql("prop1=1 or prop2=2"));
1
可以看到,自定義屬性的過濾語法是 Sql 語法,RocketMQ只定義了一些基本語法來支持這個特性,支持的 Sql 過濾語法如下:
- 數值比較,比如:>,>=,<,<=,BETWEEN,=;
- 字符比較,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 邏輯符號 AND,OR,NOT;
生產者
package demo7;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.Random;
import java.util.Scanner;
/*
發送的消息中包含 tag 和 userProperty
消費者接收時,可以選擇用 tag 或 userProperty 進行過濾
*/
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer p = new DefaultMQProducer("producer-demo7");
p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
p.start();
String topic = "Topic7";
while (true) {
System.out.print("輸入消息,用逗號分隔多條消息: ");
String[] a = new Scanner(System.in).nextLine().split(",");
System.out.print("輸入Tag: ");
String tag = new Scanner(System.in).nextLine();
for (String s : a) {
Message msg = new Message(topic, tag, s.getBytes());
msg.putUserProperty("rnd", ""+new Random().nextInt(4));
p.send(msg);
}
}
}
}
消費者
package demo7;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Scanner;
/*
如果使用sql過濾,需要在 broker.properties 中添加配置來啟用 sql 過濾:
enablePropertyFilter=true
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
System.out.print("使用Tag過濾還是使用Sql過濾(tag/sql): ");
String ts = new Scanner(System.in).nextLine();
DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo7");
c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
if (ts.equalsIgnoreCase("tag")) {
System.out.println("使用Tag過濾: TagA || TagB || TagC");
c.subscribe("Topic7", "TagA || TagB || TagC");
} else {
System.out.println("使用Sql過濾: rnd=1 or rnd > 2");
c.subscribe("Topic7", MessageSelector.bySql("rnd=1 or rnd > 2"));
}
c.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()) + " - " + msg.getUserProperty("rnd"));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
c.start();
System.out.println("開始消費數據");
}
}
事務消息
RocketMQ 提供了可靠性消息,也叫事務消息。下面分析一下其原理。
事務消息的原理
下面來看 RocketMQ 的事務消息是如何來發送“可靠消息”的,只需要以下三步:
- 發送半消息(半消息不會發送給消費者)
- 執行本地事務
- 提交消息
完成事務消息發送后,消費者就可以以正常的方式來消費數據。
RocketMQ 的自動重發機制在絕大多數情況下,都可以保證消息被正確消費。
假如消息最終消費失敗了,還可以由人工處理進行托底。
上面分析的是正常情況下的執行流程。下面再來看兩種錯誤情況:
- 事務執行失敗時回滾消息
- 服務器無法得知消息狀態時,需要主動回查消息狀態
回滾:
消息回查:
生產者
package demo8;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.Scanner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
public class Producer {
public static void main(String[] args) throws MQClientException {
TransactionMQProducer p = new TransactionMQProducer("producer-demo8");
p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
p.setExecutorService(Executors.newFixedThreadPool(5));
p.setTransactionListener(new TransactionListener() {
ConcurrentHashMap<String, LocalTransactionState> localTx = new ConcurrentHashMap<>();
/*
在這里執行本地事務
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("執行本地事務");
if (Math.random()<0.333) {
System.out.println("本地事務執行成功, 按回車提交事務消息");
new Scanner(System.in).nextLine();
localTx.put(message.getTransactionId(), LocalTransactionState.COMMIT_MESSAGE);
return LocalTransactionState.COMMIT_MESSAGE;
} else if (Math.random()<0.666) {
System.out.println("本地事務執行失敗, 按回車回滾事務消息");
new Scanner(System.in).nextLine();
localTx.put(message.getTransactionId(), LocalTransactionState.ROLLBACK_MESSAGE);
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
System.out.println("本地事務執行情況未知, 按回車繼續");
new Scanner(System.in).nextLine();
localTx.put(message.getTransactionId(), LocalTransactionState.UNKNOW);
return LocalTransactionState.UNKNOW;
}
}
/*
回查方法
檢測頻率默認1分鍾,可通過在broker.conf文件中設置transactionCheckInterval的值來改變默認值,單位為毫秒。
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("服務器正在回查消息狀態");
LocalTransactionState s = localTx.get(messageExt.getTransactionId());
if (s == null || s == LocalTransactionState.UNKNOW) {
s = LocalTransactionState.ROLLBACK_MESSAGE;
}
return s;
}
});
p.start();
String topic = "Topic8";
while (true) {
System.out.print("輸入消息,用逗號分隔多條消息: ");
String[] a = new Scanner(System.in).nextLine().split(",");
for (String s : a) {
Message msg = new Message(topic, s.getBytes());
System.out.println("---------發送半消息-----------");
TransactionSendResult r = p.sendMessageInTransaction(msg, null);
System.out.println("事務消息發送結果: "+ r.getLocalTransactionState().name());
}
}
}
}
消費者
package demo8;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/*
如果返回 RECONSUME_LATER, 服務器會等待一會再重試發送消息
消息屬性默認設置 DELAY=6, 等待時間為 2 分鍾,
org/apache/rocketmq/store/config/MessageStoreConfig.java
this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
*/
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo8");
c.setNamesrvAddr("192.168.64.151:9876:192.168.64.152:9876");
c.subscribe("Topic8", "*");
c.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext ctx) {
for (MessageExt msg : list) {
System.out.println(new String(msg.getBody()) + " - " + msg);
}
if (Math.random()<0.5) {
System.out.println("消息處理完成");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
System.out.println("消息處理失敗, 要求服務器稍后重試發送消息");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
c.start();
System.out.println("開始消費數據");
}
}