一,引入依賴
1
2
3
4
5
6
7
8
9
10
11
|
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>
4.6
.
0
</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
|
二, 使用demo:
1,simple(簡單消息生產)
/**
* 以單向模式發送消息
* 這種方式主要用在不特別關心發送結果的場景,例如日志發送
*/
public
static
void
OnewayProducer()
throws
Exception {
// 實例化消息生產者Producer
DefaultMQProducer producer =
new
DefaultMQProducer(
"simple_producerGroup_test2"
);
// 設置NameServer的地址
producer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
// 啟動Producer實例
producer.start();
for
(
int
i =
0
; i <
10
; i++) {
// 創建消息,並指定Topic,Tag和消息體
Message msg =
new
Message(
"simple_topic_test2"
/* Topic */
,
"TagA"
/* Tag */
,
(
"Hello RocketMQ "
+ i).getBytes(RemotingHelper.DEFAULT_CHARSET)
/* Message body */
);
// 發送單向消息,沒有任何返回結果
producer.sendOneway(msg);
}
// 如果不再發送消息,關閉Producer實例。
producer.shutdown();
}
/**
* 同步發送消息
* 在重要的通知消息,SMS通知,SMS營銷系統等廣泛的場景中使用可靠的同步傳輸。
*/
public
static
void
SyncProducer()
throws
Exception {
// 實例化消息生產者Producer
DefaultMQProducer producer =
new
DefaultMQProducer(
"simple_producerGroup_test2"
);
// 設置NameServer的地址
producer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
// 啟動Producer實例
producer.start();
for
(
int
i =
0
; i <
100
; i++) {
// 創建消息,並指定Topic,Tag和消息體
Message msg =
new
Message(
"simple_topic_test2"
/* Topic */
,
"TagA"
/* Tag */
,
(
"Hello RocketMQ "
+ i).getBytes(RemotingHelper.DEFAULT_CHARSET)
/* Message body */
);
// 發送消息到一個Broker
SendResult sendResult = producer.send(msg);
// 通過sendResult返回消息是否成功送達
System.out.printf(
"%s%n"
, sendResult);
}
// 如果不再發送消息,關閉Producer實例。
producer.shutdown();
}
/**
* 異步發送消息
* 異步傳輸通常用於對時間敏感的業務場景中。
*/
public
static
void
AsyncProducer ()
throws
Exception {
// 實例化消息生產者Producer
DefaultMQProducer producer =
new
DefaultMQProducer(
"simple_producerGroup_test3"
);
// 設置NameServer的地址
producer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
// 啟動Producer實例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(
0
);
int
messageCount =
100
;
// 根據消息數量實例化倒計時計算器
final
CountDownLatch2 countDownLatch =
new
CountDownLatch2(messageCount);
for
(
int
i =
0
; i < messageCount; i++) {
final
int
index = i;
// 創建消息,並指定Topic,Tag和消息體
Message msg =
new
Message(
"simple_topic_test3"
,
"TagA"
,
"OrderID188"
,
"Hello world"
.getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收異步返回結果的回調
producer.send(msg,
new
SendCallback() {
@Override
public
void
onSuccess(SendResult sendResult) {
System.out.printf(
"%-10d OK %s %n"
, index,
sendResult.getMsgId());
}
@Override
public
void
onException(Throwable e) {
System.out.printf(
"%-10d Exception %s %n"
, index, e);
e.printStackTrace();
}
});
}
// 等待5s
countDownLatch.await(
5
, TimeUnit.SECONDS);
// 如果不再發送消息,關閉Producer實例。
producer.shutdown();
}
|
2,filter(過濾消息)
過濾消息有兩種方式:Tag 和 SQL 表達式
SQL 基本語法:
RocketMQ 只定義了一些基本語法來支持這個特性,支持擴展。
- 數值比較,比如 >、>=、<、<=、BETWEEN、=;
- 字符比較,比如 =、<>、IN;
- IS NULL 或者 IS NOT NULL;
- 邏輯符號 AND、OR、NOT。
常量支持的額類型為:
- 數值,比如 123、3.1415;
- 字符,比如 ‘abc’,必須用單引號包裹起來;
- NULL,特殊的常量;
- 布爾值,TRUE 或 FALSE。
只有使用 push 模式的消費者才能使用 SQL92 標准的 SQL 語句
注:消費者sql的使用需要rocketmq開啟broker配置項的enablePropertyFilter=true,
默認沒開啟,如需開啟使用前提前找對應開通人處理
生產者代碼:
// 實例化消息生產者Producer
DefaultMQProducer producer =
new
DefaultMQProducer(
"filter_producerGroup_test1"
);
// 設置NameServer的地址
producer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
// 啟動Producer實例
producer.start();
for
(
int
i=
0
;i<
10
;i++) {
Message msg =
new
Message(
"filter_topic_test1"
,
"tagA"
,
(
"Hello RocketMQ "
+ i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 設置一些屬性
msg.putUserProperty(
"a"
, String.valueOf(i));
SendResult sendResult = producer.send(msg);
// System.out.println(sendResult.getMessageQueue());
System.out.println(String.format(
"SendResult status:%s,brokerName:%s, queueId:%d, queueOffset:%s"
,
sendResult.getSendStatus(),
sendResult.getMessageQueue().getBrokerName(),sendResult.getMessageQueue().getQueueId(),
sendResult.getQueueOffset()));
}
// 如果不再發送消息,關閉Producer實例。
producer.shutdown();
|
消費者代碼:
// 實例化消費者
DefaultMQPushConsumer consumer =
new
DefaultMQPushConsumer(
"filter_consumerGroup_test1"
);
// 設置NameServer的地址
consumer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
// 需要配置broker的enablePropertyFilter=true
consumer.subscribe(
"filter_topic_test1"
, MessageSelector.bySql(
"a between 0 and 3"
));
// 注冊回調實現類來處理從broker拉取回來的消息
consumer.registerMessageListener(
new
MessageListenerConcurrently() {
@Override
public
ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf(
"%s Receive New Messages: %s %n"
, Thread.currentThread().getName(), msgs);
// 標記該消息已經被成功消費
return
ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者實例
consumer.start();
System.out.printf(
"Consumer Started.%n"
);
|
3,batch(批量發送消息)
批量發送消息能顯著提高傳遞小消息的性能,限制是這些批量消息應該有相同的 Topic,相同的 waitStoreMsgOK,而且不能是延時消息,此外,這一批消息的總大小不應超過 4MB。
// 實例化消息生產者Producer
DefaultMQProducer producer =
new
DefaultMQProducer(
"batch_producerGroup_test2"
);
// 設置NameServer的地址
producer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
// 啟動Producer實例
producer.start();
String topic =
"batch_topic_test2"
;
//批量處理1--每次批處理不超過4M
List<Message> messages =
new
ArrayList<>();
messages.add(
new
Message(topic,
"TagA"
,
"OrderID001"
,
"Hello world 0"
.getBytes()));
messages.add(
new
Message(topic,
"TagA"
,
"OrderID002"
,
"Hello world 1"
.getBytes()));
messages.add(
new
Message(topic,
"TagA"
,
"OrderID003"
,
"Hello world 2"
.getBytes()));
//批量處理2
//把大的消息分裂成若干個小的消息
List<Message> messages =
new
ArrayList<>();
for
(
int
i=
0
;i<
1000
;i++){
//如果每條消息是1kb字節數據 ,限制4M相當於400條數據處理一次,1000條需要處理三次
String msg=
"--10kb字節的數據--"
;
messages.add(
new
Message(topic,
"TagA"
, msg, msg.getBytes()));
}
ListSplitter splitter =
new
ListSplitter(messages);
int
j=
0
;
while
(splitter.hasNext()) {
try
{
List<Message> listItem = splitter.next();
producer.send(listItem);
System.out.println(
"---------------------------"
+
"批處理第"
+j+
"次"
+
"---------------------------"
);
j++;
}
catch
(Exception e) {
e.printStackTrace();
//處理error
}
}
try
{
producer.send(messages);
}
catch
(Exception e) {
e.printStackTrace();
//處理error
}
producer.shutdown();
|
消息分割工具類:
public
class
ListSplitter
implements
Iterator<List<Message>> {
private
final
int
SIZE_LIMIT =
4
*
1024
*
1024
;
//4M
private
final
List<Message> messages;
private
int
currIndex;
public
ListSplitter(List<Message> messages) {
this
.messages = messages;
}
@Override
public
boolean
hasNext() {
return
currIndex < messages.size();
}
@Override
public
List<Message> next() {
int
startIndex = getStartIndex();
int
nextIndex = startIndex;
int
totalSize =
0
;
for
(; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int
tmpSize = calcMessageSize(message);
if
(tmpSize + totalSize > SIZE_LIMIT) {
break
;
}
else
{
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return
subList;
}
@Override
public
void
remove() {
}
private
int
getStartIndex() {
Message currMessage = messages.get(currIndex);
int
tmpSize = calcMessageSize(currMessage);
if
(tmpSize > SIZE_LIMIT){
//發送的數據過大,SIZE_LIMIT需要設置合理,不能小於發送的消息里的單條數據
return
currIndex;
}
while
(tmpSize > SIZE_LIMIT) {
currIndex +=
1
;
Message message = messages.get(currIndex);
tmpSize = calcMessageSize(message);
}
return
currIndex;
}
private
int
calcMessageSize(Message message) {
int
tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for
(Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize +
20
;
// 增加⽇日志的開銷20字節
return
tmpSize;
}
}
|
4,order(有序消息)
訂單號相同的消息會被先后發送到同一個broker的同一個隊列中,保證順序性
生產代碼:
// 訂單流程
String[] msgs =
new
String[] {
"創建消息"
,
"付款消息"
,
"推送消息"
,
"完成消息"
};
for
(
int
i =
0
; i < msgs.length; i++) {
// 創建消息, 指定Topic、Tag、key和消息體
Message msg =
new
Message(
"OrderTopic"
,
"Order"
,
"i"
+ i, msgs[i].getBytes(
"UTF-8"
));
// 發送消息到一個Broker, 參數二: 消息隊列的選擇器, 參數三: 選擇的業務標識
SendResult sendResult = producer.send(msg,
new
MessageQueueSelector() {
@Override
public
MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
long
orderId = (
long
) arg;
long
index = orderId % mqs.size();
// 取模
return
mqs.get((
int
) index);
}
}, orderId);
// 動態傳入訂單id
}
|
消費代碼:
// 訂閱Topic、Tag
consumer.subscribe(
"OrderTopic"
,
"*"
);
// 設置回調函數, 處理消息
consumer.registerMessageListener(
new
MessageListenerOrderly() {
@Override
public
ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for
(MessageExt msg : msgs) {
// 消息
String message =
new
String(msg.getBody());
}
return
ConsumeOrderlyStatus.SUCCESS;
}
});
|
5,scheduled(延時消息)
應用場景:
比如電商場景,提交了一個訂單就可以發送一個延時消息,1h 后去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。
默認時延配置:
messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";對應1~18,18個等級,1是1s,2是5s...,可通過配置修改時延的時長,可在申請開通時找對應開通人開通
生產者代碼:
// 實例化一個生產者來產生延時消息
DefaultMQProducer producer =
new
DefaultMQProducer(
"scheduled_producerGroup_test1"
);
producer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
// 啟動生產者
producer.start();
int
totalMessagesToSend =
100
;
for
(
int
i =
0
; i < totalMessagesToSend; i++) {
Message message =
new
Message(
"scheduled_topic_test1"
, (
"Hello scheduled message "
+ i).getBytes());
// 設置延時等級3,這個消息將在10s之后發送(現在只支持固定的幾個時間,詳看delayTimeLevel)
message.setDelayTimeLevel(
3
);
// 發送消息
producer.send(message);
}
// 關閉生產者
producer.shutdown();
|
6,transaction(事務消息)
- 事務消息不支持延時消息和批量消息。
- 為了避免單個消息被檢查太多次而導致半隊列消息累積,我們默認將單個消息的檢查次數限制為 15 次,但是用戶可以通過 Broker 配置文件的
transactionCheckMax
參數來修改此限制。如果已經檢查某條消息超過 N 次的話( N =transactionCheckMax
) 則 Broker 將丟棄此消息,並在默認情況下同時打印錯誤日志。用戶可以通過重寫AbstractTransactionalMessageCheckListener
類來修改這個行為。 - 事務消息將在 Broker 配置文件中的參數 transactionTimeout 這樣的特定時間長度之后被檢查。當發送事務消息時,用戶還可以通過設置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個限制,該參數優先於
transactionTimeout
參數。 - 事務性消息可能不止一次被檢查或消費。
- 提交給用戶的目標主題消息可能會失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機制來保證,如果希望確保事務消息不丟失、並且事務完整性得到保證,建議使用同步的雙重寫入機制。
- 事務消息的生產者 ID 不能與其他類型消息的生產者 ID 共享。與其他類型的消息不同,事務消息允許反向查詢、MQ服務器能通過它們的生產者 ID 查詢到消費者。
生產者代碼:
// 使用 TransactionMQProducer類創建生產者,並指定唯一的 ProducerGroup,設置自定義線程池來處理這些檢查請求。
// 執行本地事務后、需要根據執行結果對消息隊列進行回復。
// 回傳的事務狀態:
// TransactionStatus.CommitTransaction: 提交事務,它允許消費者消費此消息。
// TransactionStatus.RollbackTransaction: 回滾事務,它代表該消息將被刪除,不允許被消費。
// TransactionStatus.Unknown: 中間狀態,它代表需要檢查消息隊列來確定狀態。
TransactionListener transactionListener =
new
TransactionListenerImpl();
TransactionMQProducer producer =
new
TransactionMQProducer(
"transaction_producerGroup_test1"
);
ExecutorService executorService =
new
ThreadPoolExecutor(
2
,
5
,
100
, TimeUnit.SECONDS,
new
ArrayBlockingQueue<Runnable>(
2000
),
new
ThreadFactory() {
@Override
public
Thread newThread(Runnable r) {
Thread thread =
new
Thread(r);
thread.setName(
"client-transaction-msg-check-thread"
);
return
thread;
}
});
producer.setExecutorService(executorService);
//
producer.setTransactionListener(transactionListener);
producer.setNamesrvAddr(
"10.131.236.179:19300;10.131.236.180:19301"
);
producer.start();
String[] tags =
new
String[] {
"TagA"
,
"TagB"
,
"TagC"
,
"TagD"
,
"TagE"
};
for
(
int
i =
0
; i <
10
; i++) {
try
{
Message msg =
new
Message(
"transaction_topic_test1"
, tags[i % tags.length],
"KEY"
+ i,
(
"Hello RocketMQ "
+ i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg,
null
);
System.out.printf(
"%s%n"
, sendResult);
Thread.sleep(
10
);
}
catch
(MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for
(
int
i =
0
; i <
1000
; i++) {
Thread.sleep(
1000
);
}
producer.shutdown();
|
事務消息監聽處理邏輯 :
public
class
TransactionListenerImpl
implements
TransactionListener {
// 當發送半消息成功時,我們使用 executeLocalTransaction 方法來執行本地事務。
// 返回三個事務狀態之一。
// checkLocalTransaction 方法用於檢查本地事務狀態,並回應消息隊列的檢查請求。
// 也是返回三個事務狀態之一。
private
AtomicInteger transactionIndex =
new
AtomicInteger(
0
);
private
ConcurrentHashMap<String, Integer> localTrans =
new
ConcurrentHashMap<>();
@Override
public
LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.printf(
"#### executeLocalTransaction is executed, msgTransactionId=%s %n"
,
msg.getTransactionId());
int
value = transactionIndex.getAndIncrement();
int
status = value %
3
;
localTrans.put(msg.getTransactionId(), status);
return
LocalTransactionState.UNKNOW;
}
@Override
public
LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if
(
null
!= status) {
switch
(status) {
case
0
:
System.out.printf(
" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n"
, msg.getMsgId());
return
LocalTransactionState.UNKNOW;
case
1
:
System.out.printf(
" # ROLLBACK # Simulating %s related local transaction exec failed! %n"
, msg.getMsgId());
return
LocalTransactionState.COMMIT_MESSAGE;
case
2
:
System.out.printf(
" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n"
);
return
LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return
LocalTransactionState.COMMIT_MESSAGE;
}
}
|
7,開啟消費端的廣播模式(默認集群模式)
使用廣播模式有兩個條件:
1, 創建消費者組的時候開啟consumerBroadcastEnable=true
2, 代碼里加上
consumer.setMessageModel(MessageModel.BROADCASTING);