概述
在上一節 RocketMQ源碼詳解 | Producer篇 · 其一:Start,然后 Send 一條消息 中,我們了解了 Producer 在發送消息的流程。這次我們再來具體下看消息的構成與其發送的鏈路
Message
在 RocketMQ 的使用中,Message 類是在發送消息時必須用到的,其中 body 即是消息的存放位置,還有的就是消息的 標識(flag) 和 屬性(properties)
public class Message {
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
private String transactionId;
}
消息的標識(flag)
區分是普通 RPC 還是 oneway RPC 的標志
消息的屬性(properties)
而消息的 properties 較多,只摘了一小段
| 屬性 | 含義 |
|---|---|
| KEYS | 消息的 Key。服務器會通過 key 設置索引,應用可以通過 Topic 和 Key 來查找這條消息以及被誰消費 |
| TAGS | 消息的子類型,可以根據 tag 選擇性消費 |
| DELAY | 延遲消息的延遲級別(共16級,理論上可以有18級) |
| RETRY_TOPIC | 需要重試的 Topic(在 Broker 中會存放到 SCHEDULE_TOPIC_XXXX Topic,其中有 18 個 queue,對應 18 個重試延遲) |
| REAL_TOPIC | 真實的 Topic (RocketMQ 經常使用更換目的 Topic 的"把戲",如事務消息和延時消息,這個字段記錄了真正的 Topic) |
| PGROUP | 生產者組 |
| MAX_OFFSET\MIN_OFFSET | 在 pull 中的最大偏移量和最小偏移量 |
| TRANSFER_FLAG | 事務有關標識 |
| MSG_TYPE | 消息類型,是否為回復消息 |
| BUYER_ID | 嗯...買家ID? |
當然,這只是在生產者中的消息的樣子,在 Broker 和消費者的眼中中,它是這樣的
public class MessageExt extends Message {
private static final long serialVersionUID = 5720810158625748049L;
private String brokerName;
private int queueId;
// 存盤的大小
private int storeSize;
// 在 ConsumerQueue 中的偏移量
private long queueOffset;
private int sysFlag;
// 消息創建時間
private long bornTimestamp;
// 創建地址
private SocketAddress bornHost;
// 存盤時間
private long storeTimestamp;
private SocketAddress storeHost;
private String msgId;
// 在 commitLog 中的偏移量
private long commitLogOffset;
// crc 校驗
private int bodyCRC;
// 消費重試次數
private int reconsumeTimes;
private long preparedTransactionOffset;
}
消息的包裝
那么,producer 生成了這樣的消息后,會直接將其發出去嗎?
讓我們繼續跟蹤上一篇沒講完的內容
MQClientAPIImpl#sendMessage
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
// 是否為 reply 消息
boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
if (isReply) {
// 是 smart 消息則加上請求頭
if (sendSmartMsg) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
}
} else {
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}
}
request.setBody(msg.getBody());
/* -- pass -- */
在這里,我們可以看到在這又加了層套娃(只保留了body),然后才發送
RemotingCommand 的具體屬性如下
private int code;
private LanguageCode language = LanguageCode.JAVA;
private int version = 0;
private int opaque = requestId.getAndIncrement();
private int flag = 0;
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private transient byte[] body;
我們還在他的方法中找到了一個叫 encode 的方法,並且返回的是 ByteBuffer 。因此這就是實際發送的消息。
public ByteBuffer encode() {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
if (this.body != null) {
length += body.length;
}
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
// body data;
if (this.body != null) {
result.put(this.body);
}
result.flip();
return result;
}
消息的結構
具體的消息結構如下圖:
fix bug:圖中的 b(bit) 應為 B(byte)
其中每個字段在 Request 和 Response 中都有不同的含義
code
在 Request 中,為請求的操作碼
public class RequestCode {
// 發送消息
public static final int SEND_MESSAGE = 10;
// 拉取消息
public static final int PULL_MESSAGE = 11;
// 查詢消息(所在topic, 需要的 key, 最大數量, 開始偏移量, 結束偏移量)
public static final int QUERY_MESSAGE = 12;
// 查詢 Broker 偏移量(未使用)
public static final int QUERY_BROKER_OFFSET = 13;
/*
* 查詢消費者偏移量
* 消費者會將偏移量存儲在內存中,當使用主從架構時,會默認由主 Broker 負責讀於寫
* 為避免消息堆積,堆積消息超過指定的值時,會由從服務器來接管讀,但會導致消費進度問題
* 所以主從消費進度的一致性由 從服務器主動上報 和 消費者內存進度優先 來保證
*/
// 查詢消費者自己的偏移量
public static final int QUERY_CONSUMER_OFFSET = 14;
// 提交自己的偏移量
public static final int UPDATE_CONSUMER_OFFSET = 15;
// 創建或更新Topic
public static final int UPDATE_AND_CREATE_TOPIC = 17;
// 獲取所有的Topic信息
public static final int GET_ALL_TOPIC_CONFIG = 21;
/* unused */
public static final int GET_TOPIC_CONFIG_LIST = 22;
public static final int GET_TOPIC_NAME_LIST = 23;
// 更新 Broker 配置
public static final int UPDATE_BROKER_CONFIG = 25;
// 獲取 Broker 配置
public static final int GET_BROKER_CONFIG = 26;
public static final int TRIGGER_DELETE_FILES = 27;
// 獲取 Broker 運行時信息
public static final int GET_BROKER_RUNTIME_INFO = 28;
// 通過時間戳查找偏移量
public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29;
// 獲取最大偏移量
public static final int GET_MAX_OFFSET = 30;
// 獲取最小偏移量
public static final int GET_MIN_OFFSET = 31;
//
public static final int GET_EARLIEST_MSG_STORETIME = 32;
/* 由 Broker 處理 */
// 通過消息ID查詢消息
public static final int VIEW_MESSAGE_BY_ID = 33;
// 心跳消息
public static final int HEART_BEAT = 34;
// 注銷客戶端
public static final int UNREGISTER_CLIENT = 35;
// 報告消費失敗(一段時間后重試) (Deprecated)
public static final int CONSUMER_SEND_MSG_BACK = 36;
// 事務結果(可能是 commit 或 rollback)
public static final int END_TRANSACTION = 37;
// 通過消費者組獲取消費者列表
public static final int GET_CONSUMER_LIST_BY_GROUP = 38;
// 檢查事務狀態; Broker對於事務的未知狀態的回查操作
public static final int CHECK_TRANSACTION_STATE = 39;
// 通知消費者的ID已經被更改
public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40;
// 批量鎖定 Queue (rebalance使用)
public static final int LOCK_BATCH_MQ = 41;
// 解鎖 Queue
public static final int UNLOCK_BATCH_MQ = 42;
// 獲得該 Broker 上的所有的消費者偏移量
public static final int GET_ALL_CONSUMER_OFFSET = 43;
// 獲得延遲 Topic 上的偏移量
public static final int GET_ALL_DELAY_OFFSET = 45;
// 檢查客戶端配置
public static final int CHECK_CLIENT_CONFIG = 46;
// 更新或創建 ACL
public static final int UPDATE_AND_CREATE_ACL_CONFIG = 50;
// 刪除 ACL 配置
public static final int DELETE_ACL_CONFIG = 51;
// 獲取 Broker 集群的 ACL 信息
public static final int GET_BROKER_CLUSTER_ACL_INFO = 52;
// 更新全局白名單
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG = 53;
// 獲取 Broker 集群的 ACL 配置
public static final int GET_BROKER_CLUSTER_ACL_CONFIG = 54;
/* NameServer 相關 */
// 放入鍵值配置
public static final int PUT_KV_CONFIG = 100;
// 獲取鍵值配置
public static final int GET_KV_CONFIG = 101;
// 刪除鍵值配置
public static final int DELETE_KV_CONFIG = 102;
// 注冊 Broker
public static final int REGISTER_BROKER = 103;
// 注銷 Broker
public static final int UNREGISTER_BROKER = 104;
// 獲取指定 Topic 的路由信息
public static final int GET_ROUTEINFO_BY_TOPIC = 105;
// 獲取 Broker 的集群信息
public static final int GET_BROKER_CLUSTER_INFO = 106;
// 更新或創建訂閱組
public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200;
// 獲取所有訂閱組的配置
public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201;
// 獲取 Topic 的度量指標
public static final int GET_TOPIC_STATS_INFO = 202;
// 獲取消費者在線列表(rpc)
public static final int GET_CONSUMER_CONNECTION_LIST = 203;
// 獲取生產者在線列表
public static final int GET_PRODUCER_CONNECTION_LIST = 204;
public static final int WIPE_WRITE_PERM_OF_BROKER = 205;
// 從 NameSrv 獲取所有 Topic
public static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206;
// 刪除訂閱組
public static final int DELETE_SUBSCRIPTIONGROUP = 207;
// 獲取消費者的度量指標
public static final int GET_CONSUME_STATS = 208;
public static final int SUSPEND_CONSUMER = 209;
public static final int RESUME_CONSUMER = 210;
public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211;
public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212;
public static final int ADJUST_CONSUMER_THREAD_POOL = 213;
public static final int WHO_CONSUME_THE_MESSAGE = 214;
// 刪除 Broker 中的 Topic
public static final int DELETE_TOPIC_IN_BROKER = 215;
// 刪除 NameSrv 中的 Topic
public static final int DELETE_TOPIC_IN_NAMESRV = 216;
// 獲取鍵值列表
public static final int GET_KVLIST_BY_NAMESPACE = 219;
// 重置消費者的消費進度
public static final int RESET_CONSUMER_CLIENT_OFFSET = 220;
// 從消費者中獲取消費者的度量指標
public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221;
// 讓 Broker 重置消費進度
public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222;
// 讓 Broker 更新消費者的度量信息
public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223;
// 查詢消息被誰消費
public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300;
// 從集群中獲取 Topic
public static final int GET_TOPICS_BY_CLUSTER = 224;
// 注冊過濾器服務器
public static final int REGISTER_FILTER_SERVER = 301;
// 注冊消息過濾類
public static final int REGISTER_MESSAGE_FILTER_CLASS = 302;
// 查詢消費時間
public static final int QUERY_CONSUME_TIME_SPAN = 303;
// 從 NameSrv 中獲取系統Topic
public static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304;
// 從 Broker 中獲取系統Topic
public static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305;
// 清理過期的消費隊列
public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306;
// 獲取 Consumer 的運行時信息
public static final int GET_CONSUMER_RUNNING_INFO = 307;
// 查詢修正偏移量
public static final int QUERY_CORRECTION_OFFSET = 308;
// 直接消費消息
public static final int CONSUME_MESSAGE_DIRECTLY = 309;
// 發送消息(v2),優化網絡數據包
public static final int SEND_MESSAGE_V2 = 310;
// 單元化相關 topic
public static final int GET_UNIT_TOPIC_LIST = 311;
// 獲取含有單元化訂閱組的 Topic 列表
public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312;
// 獲取含有單元化訂閱組的非單元化 Topic 列表
public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313;
// 克隆消費進度
public static final int CLONE_GROUP_OFFSET = 314;
// 查詢 Broker 上的度量信息
public static final int VIEW_BROKER_STATS_DATA = 315;
// 清理未使用的 Topic
public static final int CLEAN_UNUSED_TOPIC = 316;
// 獲取 broker 上的有關消費的度量信息
public static final int GET_BROKER_CONSUME_STATS = 317;
/* update the config of name server */
public static final int UPDATE_NAMESRV_CONFIG = 318;
/* get config from name server */
public static final int GET_NAMESRV_CONFIG = 319;
// 發送批量消息
public static final int SEND_BATCH_MESSAGE = 320;
// 查詢消費的 Queue
public static final int QUERY_CONSUME_QUEUE = 321;
// 查詢數據版本
public static final int QUERY_DATA_VERSION = 322;
/* resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before */
public static final int RESUME_CHECK_HALF_MESSAGE = 323;
// 回送消息
public static final int SEND_REPLY_MESSAGE = 324;
public static final int SEND_REPLY_MESSAGE_V2 = 325;
// push回送消息到客戶端
public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
}
在 Response 中,為響應碼
public class ResponseCode extends RemotingSysResponseCode {
// 刷新到磁盤超時
public static final int FLUSH_DISK_TIMEOUT = 10;
// 從節點不可達
public static final int SLAVE_NOT_AVAILABLE = 11;
// 從節點刷盤超時
public static final int FLUSH_SLAVE_TIMEOUT = 12;
// 非法的消息結構
public static final int MESSAGE_ILLEGAL = 13;
// 服務不可用
public static final int SERVICE_NOT_AVAILABLE = 14;
// 版本不支持
public static final int VERSION_NOT_SUPPORTED = 15;
// 未授權的
public static final int NO_PERMISSION = 16;
// Topic 不存在
public static final int TOPIC_NOT_EXIST = 17;
// Topic 已經存在
public static final int TOPIC_EXIST_ALREADY = 18;
// 要拉取的偏移量不存在
public static final int PULL_NOT_FOUND = 19;
// 立刻重新拉取
public static final int PULL_RETRY_IMMEDIATELY = 20;
// 重定向拉取的偏移量
public static final int PULL_OFFSET_MOVED = 21;
// 不存在的隊列
public static final int QUERY_NOT_FOUND = 22;
// 訂閱的 url 解析失敗
public static final int SUBSCRIPTION_PARSE_FAILED = 23;
// 目標訂閱不存在
public static final int SUBSCRIPTION_NOT_EXIST = 24;
// 訂閱不是最新的
public static final int SUBSCRIPTION_NOT_LATEST = 25;
// 訂閱組不存在
public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26;
// 訂閱的數據不存在 (tag表達式異常)
public static final int FILTER_DATA_NOT_EXIST = 27;
// 該 Broker 上訂閱的數據不是最新的
public static final int FILTER_DATA_NOT_LATEST = 28;
// 事務應該提交
public static final int TRANSACTION_SHOULD_COMMIT = 200;
// 事務應該回滾
public static final int TRANSACTION_SHOULD_ROLLBACK = 201;
// 事務狀態位置
public static final int TRANSACTION_STATE_UNKNOW = 202;
// 事務狀態Group錯誤
public static final int TRANSACTION_STATE_GROUP_WRONG = 203;
// 買家ID不存在
public static final int NO_BUYER_ID = 204;
public static final int NOT_IN_CURRENT_UNIT = 205;
// 消費者不在線(rpc)
public static final int CONSUMER_NOT_ONLINE = 206;
// 消費超時
public static final int CONSUME_MSG_TIMEOUT = 207;
// 消息不存在
public static final int NO_MESSAGE = 208;
// 更新或創建 ACL 配置失敗
public static final int UPDATE_AND_CREATE_ACL_CONFIG_FAILED = 209;
// 刪除 ACL 配置失敗
public static final int DELETE_ACL_CONFIG_FAILED = 210;
// 更新全局白名單地址失敗
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
}
lang
字段為消息發起方編碼語言,這里默認為 java
private LanguageCode language = LanguageCode.JAVA;
version
消息發起方的程序版本
opaque
該字段是為了在同一連接上標識不同的請求,在響應的時候能夠回調對應的函數( rocketmq 的發送使用了 TCP 連接復用)
remark
在 Reqeust 中,用於傳輸自定義文本
在 Response 中,用於傳輸錯誤的原因
ext
傳輸自定義的消息頭
消息的發送
在知道消息長啥樣后,就可以繼續看發送代碼了
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
return null;
NettyRemotingClient#invokeOneway
我們先來看最簡單的Oneway
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 創建 Channel
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
doBeforeRpcHooks(addr, request);
// 使用建立好的連接發送
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
以上可以大致抽象為兩個操作:獲取或建立TCP連接、通過連接發送數據,同時一旦發生異常則關閉連接
NettyRemotingClient#getAndCreateChannel
先看第一個操作
private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
// 地址為空則說明要獲取的是NameServer的地址
if (null == addr) {
return getAndCreateNameserverChannel();
}
// 嘗試從緩存中獲取
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
// 沒有或未就緒則新建連接
return this.createChannel(addr);
}
可以看出,這里是由一個 ChannelTable 來維護所有的連接,而 ChannelTable 又是由 NettyRemotingClient 維護,即其是在 JVM 上的全局共享實例。
然后再具體查看創建的方法,可以發現 Channel 最終是由客戶端的 Bootstrap 異步創建
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
// 連接的建立是串行的
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
// 雙重校驗保證連接確實沒被創建
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {
if (cw.isOK()) {
// 連接建立完成
return cw.getChannel();
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
// 建立過但失敗了
this.channelTables.remove(addr);
createNewConnection = true;
}
} else {
createNewConnection = true;
}
if (createNewConnection) {
// 實際上的連接創建
ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
this.channelTables.put(addr, cw);
}
} catch (Exception e) {
log.error("createChannel: create channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
// 阻塞直到創建完成
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
}
} else {
log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
}
}
return null;
NettyRemotingAbstract#invokeOnewayImpl
然后接着看第二個操作:通過連接發送數據
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
// 在請求頭上的 flag 標記為 oneway 請求
request.markOnewayRPC();
// 獲取信號量,保證不會系統不會承受過多請求
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// 真正發送完成后,釋放鎖
once.release();
if (!f.isSuccess()) {
log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
}
}
});
} catch (Exception e) {
once.release();
log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
// 超出請求數
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
// 超時
String info = String.format(
"invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",
timeoutMillis,
this.semaphoreOneway.getQueueLength(),
this.semaphoreOneway.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
這塊比較簡單,在獲取發送 oneway 的信號量后調用 Channel 的 writeAndFlush 方法發送,發送完成后釋放
MQClientAPIImpl#sendMessageSync
然后來看同步的發送
// 發送請求
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
// 處理響應
return this.processSendResponse(brokerName, msg, response,addr);
其中在 NettyRemotingClient#invokeSync 做的事和 oneway 發送差不多,都是創建或獲取 Channel 然后處理鈎子然后調用父類的響應實現。
所以我們直接來看父類是咋做的
NettyRemotingAbstract#invokeSyncImpl
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
// 發送失敗,回填 responseFuture 並在 responseTable 移除其
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
// 使用 countDownLatch 來等待響應到達
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
發現和 oneway 的發送的區別了嗎?其中最大的區別有兩個:
- 在 oneway 中出現的信號量限流不見了
- 出現了 responseTable 來管理所有的 responseFuture
我們發現了以下定義
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
由之前介紹到的 opaque 可以知道,這里對 opaque 和 responseFuture 做了映射,當響應到來時,可以根據 opaque 處理對應的 responseFuture。而流控的消失也是可以理解的,畢竟同步發送會阻塞整個線程,所以在發送方來做流控是不合理的
最后發送完成后使用 processSendResponse 處理響應后返回發送結果
NettyRemotingAbstract#invokeAsyncImpl
最后看異步的發送
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final int opaque = request.getOpaque();
// 信號量流控
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
// 發生了任何阻塞操作后都要檢查是否超時...
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
once.release();
throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
}
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreAsync.getQueueLength(),
this.semaphoreAsync.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
這里和同步發送區別不大,主要還是用了信號量做流控,且不在 responseFuture 使用 countDownLatch 阻塞
Netty 組件
總所周知,在 Channel 寫入消息后,就會進入 Pipeline 的尾部,並往出站的方向流出,接下來就看看在出站的過程中又是怎樣做的
NettyRemotingAbstract
在 rocketMQ 的 remoting 模塊下有一個 netty 包,那里就是 RPC 調用的處理位置。
而在 NettyRemotingAbstract 下,我們就已經看到了有三個重要的方法 invokeOnewayImpl、invokeAsyncImpl、invokeSyncImpl
同時這個類的子類有 NettyRemotingClient 和 NettyRemotingServer,我們先來看和 Producer 有關的部分
public abstract class NettyRemotingAbstract {
/* oneway請求的信號量 */
protected final Semaphore semaphoreOneway;
/* async請求的信號量 */
protected final Semaphore semaphoreAsync;
/* 緩存所有進行中的請求(因為請求是並行的,要對對應的請求做出對應響應), */
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
/* 對請求碼進行對應的處理 */
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
/* Executor to feed netty events to user defined {@link ChannelEventListener}. */
protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
/* 默認請求處理器 */
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
/* SSL上下文 {@link SslHandler}. */
protected volatile SslContext sslContext;
/* rpc hooks */
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
/**
* Custom channel event listener.
*
* @return custom channel event listener if defined; null otherwise.
*/
public abstract ChannelEventListener getChannelEventListener();
}
request 響應部分和 response 響應部分
/**
* 對請求進行響應
* @param ctx Channel handler context.
* @param msg incoming remoting command.
* @throws Exception if there were any error while processing the incoming command.
*/
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
// 該請求可能是一個 request, 也可能是自己發出一個請求的 response
if (cmd != null) {
// 通過請求頭上的 flag 就能判斷
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
/**
* Process incoming request command issued by remote peer.
*
* @param ctx channel handler context.
* @param cmd request command.
*/
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
// 如果不是 oneway 請求的話,則需要進行響應
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
}
};
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
// 是否觸發流控
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
// 交由對應的處理器處理
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
// 沒有對應的響應方式且不存在默認響應器
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
/**
* Process response from remote peer to the previous issued requests.
*
* @param ctx channel handler context.
* @param cmd response command instance.
*/
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
// 處理回調方法
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
// 如果不是的話,說明這是一個阻塞調用,還需要去進行釋放
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}
/**
* 在回調執行器中執行回調。如果回調執行器為空,則直接在當前線程中運行
*/
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
}
/**
* 執行回調方法需要的線程池
*/
public abstract ExecutorService getCallbackExecutor();
/**
* 定期調用此方法來掃描和過期已棄用的請求。
*/
public void scanResponseTable() {
final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
while (it.hasNext()) {
Entry<Integer, ResponseFuture> next = it.next();
ResponseFuture rep = next.getValue();
if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
rep.release();
it.remove();
rfList.add(rep);
log.warn("remove timeout request, " + rep);
}
}
for (ResponseFuture rf : rfList) {
try {
executeInvokeCallback(rf);
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
}
/**
* 該服務用於使用子類實現的自定義的事件處理器對指定的事件進行處理
*/
class NettyEventExecutor extends ServiceThread {
}
這段代碼看起來比較長,但實際邏輯還是比較簡單的
- 響應 Request
- 流量控制觸發檢查;觸發則直接返回
- 獲取 request code 對應的處理器,執行請求;根據同步或異步執行不同請求
- 返回響應;如果為 oneway request,不進行響應
- 響應 Response
- 取出對應的活動中的 request
- 根據異步或同步來處理回調
NettyRemotingClient
再來看它的客戶端實現類,這個類中我們主要看 Bootstrap 的創建
// 建立客戶端的 Bootstrap
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
// 禁用 Nagle 算法
.option(ChannelOption.TCP_NODELAY, true)
// 關閉 keepalive,由我們自己管理連接
.option(ChannelOption.SO_KEEPALIVE, false)
// 設定的超時時間
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
// 發送窗口和接收窗口的大小
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// TLS層添加
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}
pipeline.addLast(
// 使用自定義的 EventLoop
defaultEventExecutorGroup,
// 注冊編解碼注冊器
new NettyEncoder(),
new NettyDecoder(),
// 注冊 idle 檢查,下一個handle通過覆寫userEventTriggered監聽連接超時事件
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
// 管理連接,超時處理,維護channelTables與存活的連接
new NettyConnectManageHandler(),
// 實際上處理收到的請求
new NettyClientHandler());
}
});
編碼和解碼我們都已經知道是直接編解碼成字節流,而 NettyClientHandler 的實現就是直接調用父類的請求處理,所以我們主要看下 NettyConnectManageHandler
class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);
super.connect(ctx, remoteAddress, localAddress, promise);
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel()));
}
}
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
closeChannel(ctx.channel());
super.disconnect(ctx, promise);
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
// 從 channelTables 移除
closeChannel(ctx.channel());
super.close(ctx, promise);
// 處理已經失敗的請求,調用回調方法
NettyRemotingClient.this.failFast(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
// 處理超過時未接收心跳的 channel
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this
.putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
}
}
}
ctx.fireUserEventTriggered(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
}
}
}
NettyConnectManageHandler 繼承了 ChannelDuplexHandler 類,以此監聽 Channel。
其主要做的事是:
- 在連接 close 時移除在 channelTables 中移除並關閉連接
- 關閉超時的連接
最后,NettyClientHandler 將收到的請求直接傳入 NettyRemotingAbstract 的 processMessageReceived 方法來處理
