一、Producer 介紹
1.1 消息發送的步驟
1) 設置 Producer 的 GroupName(Producer Group是一類 Producer 的集合);
2) 設置 InstanceName,當一個 JVM 需要啟動多個 Producer 時,需要指定不同的 InstanceName 來區分,不顯式設置時使用默認名稱 "DEFAULT";
3) 設置發送失敗重試次數,默認值是2次,可能會出現重復消息,因此需要消費端進行控制;
4) 設置 NameServer 地址;
5) 組裝數據並發送
1.2 生產者核心參數
* producerGroup:生產者組名
* createTopicKey:創建 Topic,生產環境一般不直接從代碼層面創建而是在控制台創建
* defaultTopicQueueNums:每個 Topic 下的隊列數量,默認數量是4
* sendMsgTimeout:消息發送超時時間,單位ms
* compressMsgBodyOverHowmuch:當消息大小超過指定字節就會開啟壓縮,默認字節為4096
* retryTimesWhenSendFailed:同步模式下,消息發送失敗重試次數,默認2次
* retryTimesWhenSendAsyncFailed:異步模式下,消息發送失敗重試次數,默認2次
* retryAnotherBrokerWhenNotStoreOK:當 broker 接收失敗時,是否切換另一個 broker ,默認為 false
* maxMessageSize:最大的消息容量限制,默認是4M
二、不同類型的生產者
生產者向消息隊列中寫入消息,根據不同的業務場景需要采用不同的寫入策略,如同步發送、異步發送、延遲發送和發送事務消息等。
2.1 同步發送
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException,
InterruptedException, MQBrokerException {
// 創建生產者對象
DefaultMQProducer producer = new DefaultMQProducer("producerGroupName");
// 設置實例化名稱
producer.setInstanceName("SyncProducer");
// 指定同步模式下,失敗重試次數
producer.setRetryTimesWhenSendFailed(5);
// 設置服務器地址
producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
// 啟動實例
producer.start();
// 實例化消息對象
Message message = new Message("topicTest", "tagA", "同步消息發送".getBytes());
// 同步發送消息
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
// 關閉生產者
producer.shutdown();
}
}
2.2 異步發送
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException,
InterruptedException, MQBrokerException {
// 創建生產者對象
DefaultMQProducer producer = new DefaultMQProducer("producerGroupName");
// 設置實例化名稱
producer.setInstanceName("AsyncProducer");
// 指定異步模式下,失敗重試次數
producer.setRetryTimesWhenSendAsyncFailed(5);
// 設置服務器地址
producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
// 啟動實例
producer.start();
// 實例化消息對象
Message message = new Message("topicTest", "tagA", "異步消息發送".getBytes());
// 異步發送消息
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%s%n", sendResult);
// 關閉生產者
producer.shutdown();
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
}
}
2.3 延遲發送
RocketMQ 支持發送延遲消息,Broker 在收到這類消息后,會延遲一段時間再處理,使消息在規定的一段時間后生效。
延遲消息的使用方法是在創建 Message 對象時,調用 setDelayTimeLevel(int level) 方法設置延遲時間。目前不支持自定義時間,只能使用預定義的時間長度,如 setDelayTimeLevel(3) 表示延遲10s。
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException,
InterruptedException, MQBrokerException {
// 創建生產者對象
DefaultMQProducer producer = new DefaultMQProducer("producerGroupName");
// 設置實例化名稱
producer.setInstanceName("SyncProducer");
// 指定同步模式下,失敗重試次數
producer.setRetryTimesWhenSendFailed(5);
// 設置服務器地址
producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
// 啟動實例
producer.start();
// 實例化消息對象
Message message = new Message("topicTest", "tagA", "延遲消息發送".getBytes());
// 設置延遲時間,時間長度為(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)
message.setDelayTimeLevel(3);
// 發送消息
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
// 關閉實例
producer.shutdown();
}
}
2.4 自定義發送規則
一個 Topic 會有多個 Message Queue , Producer 的默認配置會輪流向各個 Message Queue 發送消息。Consumer 在消費消息時,會根據負載均衡策略,消費被分配到的 Message Queue。如果要把消息發送到指定的 Message Queue,可以使用 Message-QueueSelector。
public class MyMessageQueueSelector implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object key) {\
// 自定義選擇 Message Queue 規則
int id = Integer.parseInt(key.toString());
int idMainIndex = id / 100;
int size = mqs.size();
int index = idMainIndex % size;
return mqs.get(index);
}
}
在發送消息的時候,將 MessageQueueSelector 對象作為參數即可。
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException,
InterruptedException, MQBrokerException {
// 創建生產者對象
DefaultMQProducer producer = new DefaultMQProducer("producerGroupName");
// 設置實例化名稱
producer.setInstanceName("SyncProducer");
// 指定同步模式下,失敗重試次數
producer.setRetryTimesWhenSendFailed(5);
// 設置服務器地址
producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
// 啟動實例
producer.start();
// 實例化消息對象
Message message = new Message("topicTest", "tagA", "自定義消息發送".getBytes());
// 發送消息
SendResult sendResult = producer.send(message,new MyMessageQueueSelector(),1000);
System.out.printf("%s%n", sendResult);
// 關閉實例
producer.shutdown();
}
}
2.5 事務消息
當某幾件事需要同時成功或失敗的時候,就需要使用到事務功能,如銀行轉賬:A 銀行的某賬戶要轉一萬元到 B 銀行的某賬戶:
1) 從 A 賬戶扣除一萬元
2) 對 B 賬戶增加一萬元
這兩個操作需要同時成功或同時失敗, RocketMQ 采用兩階段提交的方式實現事務消息,TransactionMqRroducer 處理流程如下:

1) 發送方向 RocketMQ 發送 "待確認" 消息;
2) RocketMQ 將收到的 "待確認" 消息持久化成功后,向發送方回復消息已經發送成功,此時第一階段消息發送完成;
3) 發送發開始執行本地事件邏輯
4) 發送方根據本地事件執行結果向 RocketMQ 發送二次確認(Commit 或 Rollback) 消息:
* 接收到 commit 消息,將把第一階段消息標記為可投遞,訂閱方將會收到該消息;
* 接收到 rollback 消息,將刪除第一階段消息,訂閱方不會接受到該消息;
5) 如果出現異常情況,步驟4 提交的二次確認最終未到達 RocketMQ ,服務器將經過固定時間段后將對 "待確認" 消息發起回查請求;
6) 發送方收到消息回傳請求后(如果第一階段發送的 Producer 不能工作時,將會回傳給同一個 ProducerGroup 的其他 Producer),通過對檢查對應消息的本地事件執行結果返回 Commit 或 Rollback 狀態;
7) RocketMQ 收到回查請求后,按照步驟4) 流程繼續處理
RocketMQ 通過以下幾個類來支持用戶實現事務消息:
-
TransactionMQProducer
和 DefaultMQProducer 用戶類似,通過它啟動事務消息,相比 DefaultMQProducer 需要多設置本地事務處理函數和回查狀態函數 -
TransactionListener
提供本地執行方法和回查方法,返回 LocalTransactionState 狀態標識: * LocalTransactionState.COMMIT_MESSAGE:提交 * LocalTransactionState.ROLLBACK_MESSAGE:回滾 * LocalTransactionState.UNKNOW:未知,需要回查
實現 TransactionListener 接口
public class MyTransactionListener implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private AtomicInteger checkTimes = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
// 執行本地事務
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String msgKey = msg.getKeys();
System.out.println("start execute local transaction " + msgKey);
LocalTransactionState state;
if (msgKey.contains("1")) {
// 第一條消息讓他通過
state = LocalTransactionState.COMMIT_MESSAGE;
} else if (msgKey.contains("2")) {
// 第二條消息模擬異常,明確回復回滾操作
state = LocalTransactionState.ROLLBACK_MESSAGE;
} else {
// 第三條消息無響應,讓它調用回查事務方法
state = LocalTransactionState.UNKNOW;
// 給剩下3條消息,放1,2,3三種狀態
localTrans.put(msgKey, transactionIndex.incrementAndGet());
}
System.out.println("executeLocalTransaction:" + msg.getKeys() + ",execute state:" + state + ",current time:" + System.currentTimeMillis());
return state;
}
// 檢查本地事務結果
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String msgKey = msg.getKeys();
System.out.println("start check local transaction " + msgKey);
Integer state = localTrans.get(msgKey);
switch (state) {
case 1:
System.out.println("check result unknown 回查次數" + checkTimes.incrementAndGet());
return LocalTransactionState.UNKNOW;
case 2:
System.out.println("check result commit message, 回查次數" + checkTimes.incrementAndGet());
return LocalTransactionState.COMMIT_MESSAGE;
case 3:
System.out.println("check result rollback message, 回查次數" + checkTimes.incrementAndGet());
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
}
實現事務消息生產者
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException,
InterruptedException, MQBrokerException {
// 創建事務生產者對象
TransactionMQProducer producer = new TransactionMQProducer("producerGroupName");
// 設置實例化名稱
producer.setInstanceName("SyncProducer");
// 指定同步模式下,失敗重試次數
producer.setRetryTimesWhenSendFailed(5);
// 設置事務監聽器
producer.setTransactionListener(new MyTransactionListener());
// 設置服務器地址
producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
// 啟動實例
producer.start();
for (int i = 0; i < 5; i++) {
// 實例化消息對象
Message message = new Message("topicTest", "tagA","msg-" + i, ("事務消息發送" + ":" + i).getBytes());
// 發送消息
SendResult sendResult = producer.sendMessageInTransaction(message, i);
System.out.printf("%s%n", sendResult);
}
// 關閉實例
// producer.shutdown();
}
}
三、消息返回狀態
3.1 獲取消息發送返回狀態對象
// 獲取發送消息結果
SendResult sendResult = producer.send(message);
// 從結果中獲取狀態對象
SendStatus sendStatus = sendResult.getSendStatus();
3.2 狀態值
public enum SendStatus {
// 表示發送成功
SEND_OK,
// 表示沒有在指定時間內完成刷盤(需要 Broker 的刷盤策略被設置成SYNC_FLUSH)
FLUSH_DISK_TIMEOUT,
// 表示在主備模式下,並且 Broker 被設置成SYNC_MASTER,沒有在指定時間內完成主從同步
FLUSH_SLAVE_TIMEOUT,
// 表示在主備模式下,並且 Broker 被設置成SYNC_MASTER,沒有找到被配置成 Slave 的 Broker
SLAVE_NOT_AVAILABLE,
}
注:當返回狀態不是 SEND_OK 時,都需要有補償機制。
