在實際開發中,經常需要查看MQ中消息的內容,RocketMQ提供了多種消息查詢方式,給開發和運維帶來了極大的便利,一些其他消息中間件,如Kafka,並不具備消息查詢能力。
本文對RocketMQ提供到的查詢機制和背后原理進行深入的介紹。文章主要包括3個部分:
-
消息查詢介紹
:介紹消息查詢中使用到的Message Key 、Unique Key、Message Id 的區別 -
消息查詢工具
:分別介紹命令行工具、管理平台、客戶端API這三種工具的詳細用法,以及如何讓消費者重新消費特定的消息。 -
實現原理
:介紹Message Key & Unique Key與Message Id的實現機制上區別,Unique Key在精確一次消費(Exactly Once)語義下的作用,以及為什么Message Id查詢效率更高。
消息查詢介紹
RocketMQ提供了3種消息查詢方式:
-
按照Message Key 查詢
:消息的key是業務開發同學在發送消息之前自行指定的,通常會把具有業務含義,區分度高的字段作為消息的key,如用戶id,訂單id等。 -
按照Unique Key查詢
:除了業務開發同學明確的指定消息中的key,RocketMQ生產者客戶端在發送發送消息之前,會自動生成一個UNIQ_KEY,設置到消息的屬性中,從邏輯上唯一代表一條消息。 -
按照Message Id 查詢
:Message Id 是消息發送后,在Broker端生成的,其包含了Broker的地址,和在CommitLog中的偏移信息,並會將Message Id作為發送結果的一部分進行返回。Message Id中屬於精確匹配,從物理上唯一代表一條消息,查詢效率更高。
RocketMQ有意弱化Unique Key與Message Id的區別,有時都稱之為Message Id。在通過RocketMQ的命令行工具或管理平台進行查詢時,二者可以通用。在根據Unique Key進行查詢時,本身是有可能查詢到多條消息的,但是查詢工具會進行過濾,只會返回一條消息。種種情況導致很多RocketMQ的用戶,並未能很好對二者進行區分。
業務開發同學在使用RocketMQ時,應該養成良好的習慣,在發送/消費消息時,將這些信息記錄下來,通常是記錄到日志文件中,以便在出現問題時進行排查。
以生產者在發送消息為例,通常由以下3步組成:
//1 構建消息對象Message
Message msg = new Message();
msg.setTopic("TopicA");
msg.setKeys("Key1");
msg.setBody("message body".getBytes());
try{
//2 發送消息
SendResult result = producer.send(msg);
//3 打印發送結果
System.out.println(result);
}catch (Exception e){
e.printStackTrace();
}
第1步:構建消息
構建消息對象Message,在這里我們通過setKeys方法設置消息的key,如果有多個key可以使用空格" "進行分割
第2步:發送消息
發送消息,會返回一個SendResult對象表示消息發送結果。
第3步:打印發送結果
結果中包含Unique Key和Message Id,如下所示:
SendResult [
sendStatus=SEND_OK,
msgId=C0A801030D4B18B4AAC247DE4A0D0000,
offsetMsgId=C0A8010300002A9F000000000007BEE9,
messageQueue=MessageQueue [topic=TopicA, brokerName=broker-a, queueId=0],
queueOffset=0]
其中:
-
sendStatus
:表示消息發送結果的狀態 -
msgId
:注意這里的命名雖然是msgId,但實際上其是Unique Key -
offsetMsgId
:Broker返回的Message ID 。在后文中,未進行特殊說明的情況下,Message ID總是表示offsetMsgId。 -
messageQueue
:消息發送到了哪個的隊列,如上圖顯示發送到broker-a的第0個的隊列 -
queueOffset
:消息在隊列中的偏移量,每次發送到一個隊列時,offset+1
事實上,用戶主動設置的Key以及客戶端自動生成的Unique Key,最終都會設置到Message對象的properties屬性中,如下圖所示:
其中:
-
KEYS
:表示用戶通過setKeys方法設置的消息key, -
UNIQ_KEY
:表示消息發送之前由RocketMQ客戶端自動生成的Unique Key
。細心的讀者發現了其值與上述打印SendResult結果中的msgId字段的值是一樣的,這驗證了前面所說的msgId表示的實際上就是Unique Key的說法。
在了解如何主動設置Key,以及如何獲取RocketMQ自動生成的Unique Key和Message Id后,就可以利用一些工具來進行查詢。
消息查詢工具
RocketMQ提供了3種方式來根據Message Key、Unique Key、Message Id來查詢消息,包括:
-
命令行工具
:主要是運維同學使用 -
管理平台
:運維和開發同學都可以使用 -
客戶端API
:主要是開發同學使用
命令行工具
RocketMQ自帶的mqadmin命令行工具提供了一些子命令,用於查詢消息,如下:
$ sh bin/mqadmin
The most commonly used mqadmin commands are:
...
queryMsgById 按照Message Id查詢消息
queryMsgByKey 按照Key查詢消息
queryMsgByUniqueKey 按照UNIQ_KEY查詢消息
...
管理平台
RocketMQ提供的命令行工具,雖然功能強大,一般是運維同學使用較多。通過RocketMQ提供的管理平台進來行消息查詢,則對業務開發同學更加友好。在管理平台的消息一欄,有3個TAB,分別用於:根據Topic時間范圍查詢、Message Key查詢、Message Id查詢
客戶端API
除了通過命令行工具和管理平台,還可以通過客戶端API的方式來進行查詢,這其實是最本質的方式,命令行工具和管理平台的查詢功能都是基於此實現。
在org.apache.rocketmq.client.MQAdmin接口中,定義了以下幾個方法用於消息查詢:
//msgId參數:僅接收SendResult中的offsetMsgId,返回單條消息
MessageExt viewMessage(final String msgId)
//msgId參數:傳入SendResult中的offsetMsgId、msgId都可以,返回單條消息
MessageExt viewMessage(String topic,String msgId)
//在指定topic下,根據key進行查詢,並指定最大返回條數,以及開始和結束時間
QueryResult queryMessage(final String topic, final String key,
final int maxNum, final long begin,final long end)
實現原理
Unqiue Key & Message Key都需要利用RocketMQ的哈希索引機制來完成消息查詢,由於建立索引有一定的開銷,因此Broker端提供了相關配置項來控制是否開啟索引。
Message Id是在Broker端生成的,其包含了Broker地址和commit Log offset信息,可以精確匹配一條消息,查詢消息更好
。下面分別介紹 Unqiue Key & Message Id的生成和作用。
Unique Key生成
Unique Key是生產者發送消息之前,由RocketMQ 客戶端自動生成的,具體來說,RocketMQ發送消息之前,最終都要通過以下方法:
DefaultMQProducerImpl#sendKernelImpl
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) {//省略異常聲明
//...略
try {
//如果不是批量消息,則生成Unique Key
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
//...略
如上所示,如果不是批量消息,會通過MessageClientIDSetter的setUniqID方法為消息設置Unique key,該方法實現如下所示:
MessageClientIDSetter#setUniqID
public static void setUniqID(final Message msg) {
// Unique Key不為空的情況下,才進行設置
if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,) == null) {
msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,, createUniqID());
}
}
如果消息的Unique Key屬性為null,就通過createUniqID()方法為消息創建一個新的Unique Key,並設置到消息屬性中。之所以要判斷Unique Key是否為null與其作用有關。
Unique Key作用
了解Unique Key的作用對於我們理解消息重復的原因有很大的幫助。RocketMQ並不保證消息投遞過程中的Exactly Once語義,即消息只會被精確消費一次,需要消費者自己做冪等。而通常導致消息重復消費的原因,主要包括:
-
生產者發送時消息重復:RocketMQ對於無序消息發送失敗,默認會重試2次
-
消費者Rebalance時消息重復
導致生產者發送重復消息的原因可能是:一條消息已被成功發送到服務端並完成持久化,由於網絡超時此時出現了網絡閃斷或者客戶端宕機,導致服務端對客戶端應答失敗,此時生產者將再次嘗試發送消息。
在重試發送時,sendKernelImpl會被重復調用,意味着setUniqID方法會被重復調用,不過由於setUniqID方法實現中進行判空處理,因此重復設置Unique Key。在這種情況下,消費者后續會收到兩條內容相同並且 Unique Key 也相同的消息(offsetMsgId不同,因為對Broker來說存儲了多次)。
那么消費者如何判斷,消費重復是因為重復發送還是Rebalance導致的重復消費呢?
消費者實現MessageListener接口監聽到的消息類型是MessageExt,可以將其強制轉換為MessageClientExt,之后調用getMsgId方法獲取Unique Key,調用getOffsetMsgId獲得Message Id。如果多消息的Unique Key相同,但是offsetMsgId不同,則有可能是因為重復發送導致。
批量發送模式下的Unique Key
DefaultMQProducer提供了批量發送消息的接口:
public SendResult send(Collection<Message> msgs)
在內部,這批消息首先會被構建成一個MessageBatch對象。在前面sendKernelImpl方法中我們也看到了,對於MessageBatch對象,並不會設置Unique Key。這是因為在將批量消息轉換成MessageBatch時,已經設置過了。
可能有一部分同學會誤以為一個批量消息中每條消息Unique Key是相同的,其實不然,每條消息Unique Key都不同
。
可以參考DefaultMQProducer#batch
方法源碼:
private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
MessageBatch msgBatch;
try {
//1 將消息集合轉換為MessageBatch
msgBatch = MessageBatch.generateFromList(msgs);
//2 迭代每個消息,逐一設置Unique Key
for (Message message : msgBatch) {
Validators.checkMessage(message, this);
MessageClientIDSetter.setUniqID(message);
}
//3 設置批量消息的消息體
msgBatch.setBody(msgBatch.encode());
} catch (Exception e) {
throw new MQClientException("Failed to initiate the MessageBatch", e);
}
return msgBatch;
}
Message Id生成
SendResult中的offsetMsgId,即常規意義上我們所說的Message Id是在Broker端生成的,用於唯一標識一條消息,在根據Message Id查詢的情況下,最多只能查詢到一條消息。Message Id總共 16 字節,包含消息存儲主機地址,消息 Commit Log offset。如下圖所示:
RocketMQ內部通過一個MessageId對象進行表示:
public class MessageId {
private SocketAddress address; //broker地址
private long offset; //commit log offset
並提供了一個MessageDecoder對象來創建或者解碼MessageId。
public static String createMessageId(final ByteBuffer input,
final ByteBuffer addr, final long offset)
public static MessageId decodeMessageId(final String msgId)
Broker端在順序存儲消息時,首先會通過createMessageId方法創建msgId。源碼如下所示:
CommitLog.DefaultAppendMessageCallback#doAppend
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer,
final int maxBlank,final MessageExtBrokerInner msgInner) {
//1 PHY OFFSET:即Commit Log Offset 或者稱之為msgOffsetId
long wroteOffset = fileFromOffset + byteBuffer.position();
//2 hostHolder用於維護broker地址信息
this.resetByteBuffer(hostHolder, 8);
//3 創建msgOffsetId
String msgId = MessageDecoder.createMessageId(this.msgIdMemory,
msgInner.getStoreHostBytes(hostHolder), wroteOffset);
而客戶端在根據msgId向Broker查詢消息時,首先會將通過MessageDecoder的decodeMessageId方法,之后直接向這個broker進行查詢指定位置的消息。
參見:MQAdminImpl#viewMessage
public MessageExt viewMessage(String msgId) {//省略異常聲明
//1 根據msgId解碼成MessageId對象
MessageId messageId = null;
try {
messageId = MessageDecoder.decodeMessageId(msgId);
} catch (Exception e) {
throw new MQClientException(ResponseCode.NO_MESSAGE,
"query message by id finished, but no message.");
}
//2 根據MessageId中的Broker地址和commit log offset信息進行查詢
return this.mQClientFactory.getMQClientAPIImpl().viewMessage(
RemotingUtil.socketAddress2String(messageId.getAddress()),
messageId.getOffset(),
timeoutMillis);
}
由於根據Message Id進行查詢,實際上是直接從特定Broker的CommitLog中的指定位置進行查詢的,屬於精確匹配,並不像用戶設置的key,或者Unique Key那么樣,需要使用到哈希索引機制,因此效率很高。
總結
-
RocketMQ提供了3種消息查詢方式:Message Key & Unique Key & Message Id
-
RocketMQ提供了3種消息查詢工具:命令行、管理平台、客戶端API,且支持將查詢到讓特定/所有消費者組重新消費
-
RocketMQ有意對用戶屏蔽Unique Key & Message Id區別,很多地方二者可以通用
-
Message Key & Unique Key 需要使用到哈希索引機制,有額外的索引維護成本
-
Message Id由Broker和commit log offset組成,屬於精確匹配,查詢效率更好