發送普通消息(三種方式)
RocketMQ 發送普通消息有三種實現方式:可靠同步發送
、可靠異步發送
、單向(Oneway)發送
。
注意
:順序消息只支持可靠同步發送。
GitHub地址: https://github.com/yudiandemingzi/SpringBootBlog
一、概念
1、可靠同步發送
原理
:同步發送是指消息發送方發出數據后,會在收到接收方發回響應之后才發下一個數據包的通訊方式。
應用場景:此種方式應用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信系統等。
2、可靠異步發送
原理
:異步發送是指發送方發出數據后,不等接收方發回響應,接着發送下個數據包的通訊方式。 消息隊列 RocketMQ 的異步發送,需要用戶實現異步發送回調接口(SendCallback)。
應用場景:異步發送一般用於鏈路耗時較長,對 RT 響應時間較為敏感的業務場景,例如批量發貨等操作。
3、單向(Oneway)發送
原理
:單向(Oneway)發送特點為發送方只負責發送消息,不等待服務器回應且沒有回調函數觸發,即只發送請求不等待應答。 此方式發送消息的過程耗時非常短,一般在微秒級別。
應用場景:適用於某些耗時非常短,但對可靠性要求並不高的場景,例如日志收集。
4、三種對比
下表概括了三者的特點和主要區別。
發送方式 | 發送 TPS | 發送結果反饋 | 可靠性 |
---|---|---|---|
同步發送 | 快 | 有 | 不丟失 |
異步發送 | 快 | 有 | 不丟失 |
單向發送 | 最快 | 無 | 可能丟失 |
二、代碼示例
1、三種方式代碼示例
@Slf4j
@RestController
public class Controller {
/**
* 生產者組
*/
private static String PRODUCE_RGROUP = "test_producer";
/**
* 創建生產者對象
*/
private static DefaultMQProducer producer = null;
static {
producer = new DefaultMQProducer(PRODUCE_RGROUP);
//不開啟vip通道 開通口端口會減2
producer.setVipChannelEnabled(false);
//綁定name server
producer.setNamesrvAddr("47.99.03.25:9876");
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
@GetMapping("/message")
public void message() throws Exception {
//1、同步
sync();
//2、異步
async();
//3、單項發送
oneWay();
}
/**
* 1、同步發送消息
*/
private void sync() throws Exception {
//創建消息
Message message = new Message("topic_family", (" 同步發送 ").getBytes());
//同步發送消息
SendResult sendResult = producer.send(message);
log.info("Product-同步發送-Product信息={}", sendResult);
}
/**
* 2、異步發送消息
*/
private void async() throws Exception {
//創建消息
Message message = new Message("topic_family", (" 異步發送 ").getBytes());
//異步發送消息
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("Product-異步發送-輸出信息={}", sendResult);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
//補償機制,根據業務情況進行使用,看是否進行重試
}
});
}
/**
* 3、單項發送消息
*/
private void oneWay() throws Exception {
//創建消息
Message message = new Message("topic_family", (" 單項發送 ").getBytes());
//同步發送消息
producer.sendOneway(message);
}
}
2、測試結果
這里消費者代碼就不貼出來了。
通過這個很明顯可以看出三種方式都被 Consumer 消費了。只不過對於 Product 同步和異步發送是有返回信息的,單項發送是沒有返回信息的。
三、SendStatus狀態
當Product發送消息的時候,會返回SendResult對象,該對象又包含了一個SendStatus對象。
package org.apache.rocketmq.client.producer;
public enum SendStatus {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
}
下面對這幾種狀態進行說明
SEND_OK
代表發送成功!但並不保證它是可靠的。要確保不會丟失任何消息,還應啟用SYNC_MASTER或SYNC_FLUSH。
SLAVE_NOT_AVAILABLE
如果Broker的角色是SYNC_MASTER(同步復制)(默認為異步),但沒有配置Slave Broker,將獲得此狀態。
FLUSH_DISK_TIMEOUT
如果Broker設置為 SYNC_FLUSH(同步刷盤)(默認為ASYNC_FLUSH),並且Broker的syncFlushTimeout(默認為5秒)內完成刷新磁盤,將獲得此狀態。
FLUSH_SLAVE_TIMEOUT
如果Broker的角色是SYNC_MASTER(同步復制)(默認為ASYNC_MASTER),並且從屬Broker的syncFlushTimeout(默認為5秒)內完成與主服務器的同步,將獲得此狀態。
參考
只要自己變優秀了,其他的事情才會跟着好起來(上將3)