🙂🙂🙂關注微信公眾號:【芋艿的后端小屋】有福利:
- RocketMQ / MyCAT / Sharding-JDBC 所有源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文注釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將得到認真回復。甚至不知道如何讀源碼也可以請教噢。
- 新的源碼解析文章實時收到通知。每周更新一篇左右。
- 認真的源碼交流微信群。
- 1、概述
- 2、ConsumeQueue 結構
- 3、ConsumeQueue 存儲
- 4、Broker 提供[拉取消息]接口
- 5、Broker 提供[更新消費進度]接口
- 6、Broker 提供[發回消息]接口
- 7、結尾
1、概述
本章主要解析 消費 邏輯涉及到的源碼。
因為篇幅較長,分成上下兩篇:
- 上篇:
Broker
相關源碼。 - 下篇:
Consumer
相關源碼。
本文即是上篇。
ok,先看第一張關於消費邏輯的圖:
再看消費邏輯精簡的順序圖(實際情況會略有差別):
2、ConsumeQueue 結構
ConsumeQueue
、MappedFileQueue
、MappedFile
的關系如下:
ConsumeQueue
:MappedFileQueue
:MappedFile
= 1 : 1 : N。
反應到系統文件如下:
Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ pwd
/Users/yunai/store/consumequeue
Yunai-MacdeMacBook-Pro-2:consumequeue yunai$ cd TopicRead3/
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ ls -ls
total 0
0 drwxr-xr-x 3 yunai staff 102 4 27 21:52 0
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 1
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 2
0 drwxr-xr-x 3 yunai staff 102 4 27 21:55 3
Yunai-MacdeMacBook-Pro-2:TopicRead3 yunai$ cd 0/
Yunai-MacdeMacBook-Pro-2:0 yunai$ ls -ls
total 11720
11720 -rw-r--r-- 1 yunai staff 6000000 4 27 21:55 00000000000000000000
ConsumeQueue
、MappedFileQueue
、MappedFile
的定義如下:
MappedFile
:00000000000000000000等文件。MappedFileQueue
:MappedFile
所在的文件夾,對MappedFile
進行封裝成文件隊列,對上層提供可無限使用的文件容量。- 每個
MappedFile
統一文件大小。 - 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在
ConsumeQueue
里默認為 6000000B。
- 每個
ConsumeQueue
:針對MappedFileQueue
的封裝使用。Store : ConsumeQueue = ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>
。
ConsumeQueue
存儲在 MappedFile
的內容必須大小是 20B( ConsumeQueue.CQ_STORE_UNIT_SIZE
),有兩種內容類型:
MESSAGE_POSITION_INFO
:消息位置信息。BLANK
: 文件前置空白占位。當歷史Message
被刪除時,需要用BLANK
占位被刪除的消息。
MESSAGE_POSITION_INFO
在 ConsumeQueue
存儲結構:
第幾位 | 字段 | 說明 | 數據類型 | 字節數 |
---|---|---|---|---|
1 | offset | 消息 CommitLog 存儲位置 |
Long | 8 |
2 | size | 消息長度 | Int | 4 |
3 | tagsCode | 消息tagsCode | Long | 8 |
BLANK
在 ConsumeQueue
存儲結構:
第幾位 | 字段 | 說明 | 數據類型 | 字節數 |
---|---|---|---|---|
1 | 0 | Long | 8 | |
2 | Integer.MAX_VALUE | Int | 4 | |
3 | 0 | Long | 8 |
3、ConsumeQueue 存儲
主要有兩個組件:
ReputMessageService
:write ConsumeQueue。FlushConsumeQueueService
:flush ConsumeQueue。
ReputMessageService
1: class ReputMessageService extends ServiceThread {
2:
3: /**
4: * 開始重放消息的CommitLog物理位置
5: */
6: private volatile long reputFromOffset = 0;
7:
8: public long getReputFromOffset() {
9: return reputFromOffset;
10: }
11:
12: public void setReputFromOffset(long reputFromOffset) {
13: this.reputFromOffset = reputFromOffset;
14: }
15:
16: @Override
17: public void shutdown() {
18: for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
19: try {
20: Thread.sleep(100);
21: } catch (InterruptedException ignored) {
22: }
23: }
24:
25: if (this.isCommitLogAvailable()) {
26: log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
27: DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
28: }
29:
30: super.shutdown();
31: }
32:
33: /**
34: * 剩余需要重放消息字節數
35: *
36: * @return 字節數
37: */
38: public long behind() {
39: return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
40: }
41:
42: /**
43: * 是否commitLog需要重放消息
44: *
45: * @return 是否
46: */
47: private boolean isCommitLogAvailable() {
48: return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
49: }
50:
51: private void doReput() {
52: for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
53:
54: // TODO 疑問:這個是啥
55: if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
56: && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
57: break;
58: }
59:
60: // 獲取從reputFromOffset開始的commitLog對應的MappeFile對應的MappedByteBuffer
61: SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
62: if (result != null) {
63: try {
64: this.reputFromOffset = result.getStartOffset();
65:
66: // 遍歷MappedByteBuffer
67: for (int readSize = 0; readSize < result.getSize() && doNext; ) {
68: // 生成重放消息重放調度請求
69: DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
70: int size = dispatchRequest.getMsgSize(); // 消息長度
71: // 根據請求的結果處理
72: if (dispatchRequest.isSuccess()) { // 讀取成功
73: if (size > 0) { // 讀取Message
74: DefaultMessageStore.this.doDispatch(dispatchRequest);
75: // 通知有新消息
76: if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
77: && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
78: DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
79: dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
80: dispatchRequest.getTagsCode());
81: }
82: // FIXED BUG By shijia
83: this.reputFromOffset += size;
84: readSize += size;
85: // 統計
86: if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
87: DefaultMessageStore.this.storeStatsService
88: .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
89: DefaultMessageStore.this.storeStatsService
90: .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
91: .addAndGet(dispatchRequest.getMsgSize());
92: }
93: } else if (size == 0) { // 讀取到MappedFile文件尾
94: this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
95: readSize = result.getSize();
96: }
97: } else if (!dispatchRequest.isSuccess()) { // 讀取失敗
98: if (size > 0) { // 讀取到Message卻不是Message
99: log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
100: this.reputFromOffset += size;
101: } else { // 讀取到Blank卻不是Blank
102: doNext = false;
103: if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
104: log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
105: this.reputFromOffset);
106:
107: this.reputFromOffset += result.getSize() - readSize;
108: }
109: }
110: }
111: }
112: } finally {
113: result.release();
114: }
115: } else {
116: doNext = false;
117: }
118: }
119: }
120:
121: @Override
122: public void run() {
123: DefaultMessageStore.log.info(this.getServiceName() + " service started");
124:
125: while (!this.isStopped()) {
126: try {
127: Thread.sleep(1);
128: this.doReput();
129: } catch (Exception e) {
130: DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
131: }
132: }
133:
134: DefaultMessageStore.log.info(this.getServiceName() + " service end");
135: }
136:
137: @Override
138: public String getServiceName() {
139: return ReputMessageService.class.getSimpleName();
140: }
141:
142: }
- 說明:重放消息線程服務。
- 該服務不斷生成 消息位置信息 到 消費隊列(ConsumeQueue)
- 該服務不斷生成 消息索引 到 索引文件(IndexFile)
- 第 61 行 :獲取
reputFromOffset
開始的CommitLog
對應的MappedFile
對應的MappedByteBuffer
。 - 第 67 行 :遍歷
MappedByteBuffer
。 - 第 69 行 :生成重放消息重放調度請求 (
DispatchRequest
) 。請求里主要包含一條消息 (Message
) 或者 文件尾 (BLANK
) 的基本信息。 - 第 72 至 96 行 :請求是有效請求,進行邏輯處理。
- 第 75 至 81 行 :當
Broker
是主節點 &&Broker
開啟的是長輪詢,通知消費隊列有新的消息。NotifyMessageArrivingListener
會 調用PullRequestHoldService#notifyMessageArriving(...)
方法,詳細解析見:PullRequestHoldService
- 第 75 至 81 行 :當
- 第 73 至 92 行 :請求對應的是
Message
,進行調度,生成ConsumeQueue
和IndexFile
對應的內容。詳細解析見: - 第 93 至 96 行 :請求對應的是
Blank
,即文件尾,跳轉指向下一個MappedFile
。 - 第 97 至 110 行 :請求是無效請求。出現該情況,基本是一個BUG。
- 第 61 行 :獲取
- 第 127 至 128 行 :每 1ms 循環執行重放邏輯。
- 第 18 至 30 行 :
shutdown
時,多次sleep(100)
直到CommitLog
回放到最新位置。恩,如果未回放完,會輸出警告日志。
DefaultMessageStore#doDispatch(...)
1: /**
2: * 執行調度請求
3: * 1. 非事務消息 或 事務提交消息 建立 消息位置信息 到 ConsumeQueue
4: * 2. 建立 索引信息 到 IndexFile
5: *
6: * @param req 調度請求
7: */
8: public void doDispatch(DispatchRequest req) {
9: // 非事務消息 或 事務提交消息 建立 消息位置信息 到 ConsumeQueue
10: final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
11: switch (tranType) {
12: case MessageSysFlag.TRANSACTION_NOT_TYPE:
13: case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
14: DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
15: req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
16: break;
17: case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
18: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
19: break;
20: }
21: // 建立 索引信息 到 IndexFile
22: if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
23: DefaultMessageStore.this.indexService.buildIndex(req);
24: }
25: }
26:
27: /**
28: * 建立 消息位置信息 到 ConsumeQueue
29: *
30: * @param topic 主題
31: * @param queueId 隊列編號
32: * @param offset commitLog存儲位置
33: * @param size 消息長度
34: * @param tagsCode 消息tagsCode
35: * @param storeTimestamp 存儲時間
36: * @param logicOffset 隊列位置
37: */
38: public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
39: long logicOffset) {
40: ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
41: cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
42: }
ConsumeQueue#putMessagePositionInfoWrapper(...)
1: /**
2: * 添加位置信息封裝
3: *
4: * @param offset commitLog存儲位置
5: * @param size 消息長度
6: * @param tagsCode 消息tagsCode
7: * @param storeTimestamp 消息存儲時間
8: * @param logicOffset 隊列位置
9: */
10: public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long storeTimestamp,
11: long logicOffset) {
12: final int maxRetries = 30;
13: boolean canWrite = this.defaultMessageStore.getRunningFlags().isWriteable();
14: // 多次循環寫,直到成功
15: for (int i = 0; i < maxRetries && canWrite; i++) {
16: // 調用添加位置信息
17: boolean result = this.putMessagePositionInfo(offset, size, tagsCode, logicOffset);
18: if (result) {
19: // 添加成功,使用消息存儲時間 作為 存儲check point。
20: this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimestamp);
21: return;
22: } else {
23: // XXX: warn and notify me
24: log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + offset
25: + " failed, retry " + i + " times");
26:
27: try {
28: Thread.sleep(1000);
29: } catch (InterruptedException e) {
30: log.warn("", e);
31: }
32: }
33: }
34:
35: // XXX: warn and notify me 設置異常不可寫入
36: log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
37: this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
38: }
39:
40: /**
41: * 添加位置信息,並返回添加是否成功
42: *
43: * @param offset commitLog存儲位置
44: * @param size 消息長度
45: * @param tagsCode 消息tagsCode
46: * @param cqOffset 隊列位置
47: * @return 是否成功
48: */
49: private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
50: final long cqOffset) {
51: // 如果已經重放過,直接返回成功
52: if (offset <= this.maxPhysicOffset) {
53: return true;
54: }
55: // 寫入位置信息到byteBuffer
56: this.byteBufferIndex.flip();
57: this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
58: this.byteBufferIndex.putLong(offset);
59: this.byteBufferIndex.putInt(size);
60: this.byteBufferIndex.putLong(tagsCode);
61: // 計算consumeQueue存儲位置,並獲得對應的MappedFile
62: final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
63: MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
64: if (mappedFile != null) {
65: // 當是ConsumeQueue第一個MappedFile && 隊列位置非第一個 && MappedFile未寫入內容,則填充前置空白占位
66: if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { // TODO 疑問:為啥這個操作。目前能夠想象到的是,一些老的消息很久沒發送,突然發送,這個時候剛好滿足。
67: this.minLogicOffset = expectLogicOffset;
68: this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
69: this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
70: this.fillPreBlank(mappedFile, expectLogicOffset);
71: log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
72: + mappedFile.getWrotePosition());
73: }
74: // 校驗consumeQueue存儲位置是否合法。TODO 如果不合法,繼續寫入會不會有問題?
75: if (cqOffset != 0) {
76: long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
77: if (expectLogicOffset != currentLogicOffset) {
78: LOG_ERROR.warn(
79: "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
80: expectLogicOffset,
81: currentLogicOffset,
82: this.topic,
83: this.queueId,
84: expectLogicOffset - currentLogicOffset
85: );
86: }
87: }
88: // 設置commitLog重放消息到ConsumeQueue位置。
89: this.maxPhysicOffset = offset;
90: // 插入mappedFile
91: return mappedFile.appendMessage(this.byteBufferIndex.array());
92: }
93: return false;
94: }
95:
96: /**
97: * 填充前置空白占位
98: *
99: * @param mappedFile MappedFile
100: * @param untilWhere consumeQueue存儲位置
101: */
102: private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
103: // 寫入前置空白占位到byteBuffer
104: ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
105: byteBuffer.putLong(0L);
106: byteBuffer.putInt(Integer.MAX_VALUE);
107: byteBuffer.putLong(0L);
108: // 循環填空
109: int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
110: for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
111: mappedFile.appendMessage(byteBuffer.array());
112: }
113: }
#putMessagePositionInfoWrapper(...)
說明 :添加位置信息到ConsumeQueue
的封裝,實際需要調用#putMessagePositionInfo(...)
方法。- 第 13 行 :判斷
ConsumeQueue
是否允許寫入。當發生Bug時,不允許寫入。 - 第 17 行 :調用
#putMessagePositionInfo(...)
方法,添加位置信息。 - 第 18 至 21 行 :添加成功,使用消息存儲時間 作為 存儲檢查點。
StoreCheckpoint
的詳細解析見:Store初始化與關閉。 - 第 22 至 32 行 :添加失敗,目前基本可以認為是BUG。
- 第 35 至 37 行 :寫入失敗時,標記
ConsumeQueue
寫入異常,不允許繼續寫入。
- 第 13 行 :判斷
#putMessagePositionInfo(...)
說明 :添加位置信息到ConsumeQueue
,並返回添加是否成功。- 第 51 至 54 行 :如果
offset
(存儲位置) 小於等於maxPhysicOffset
(CommitLog
消息重放到ConsumeQueue
最大的CommitLog
存儲位置),表示已經重放過,此時,不再重復寫入,直接返回寫入成功。 - 第 55 至 60 行 :寫 位置信息到byteBuffer。
- 第 62 至 63 行 :計算
ConsumeQueue
存儲位置,並獲得對應的MappedFile。 - 第 65 至 73 行 :當
MappedFile
是ConsumeQueue
當前第一個文件 &&MappedFile
未寫入內容 && 重放消息隊列位置大於0,則需要進行MappedFile
填充前置BLANK
。- 這塊比較有疑問,什么場景下會需要。猜測產生的原因:一個
Topic
長期無消息產生,突然N天后進行發送,Topic
對應的歷史消息以及和消費隊列數據已經被清理,新生成的MappedFile
需要前置占位。
- 這塊比較有疑問,什么場景下會需要。猜測產生的原因:一個
- 第 74 至 87 行 :校驗
ConsumeQueue
存儲位置是否合法,不合法則輸出日志。- 這塊比較有疑問,如果計算出來的存儲位置不合法,不返回添加失敗,繼續進行添加位置信息,會不會有問題???
- 第 89 行 :設置
CommitLog
重放消息到ConsumeQueue
的最大位置。 - 第 91 行 :插入消息位置到
MappedFile
。
- 第 51 至 54 行 :如果
FlushConsumeQueueService
1: class FlushConsumeQueueService extends ServiceThread {
2: private static final int RETRY_TIMES_OVER = 3;
3: /**
4: * 最后flush時間戳
5: */
6: private long lastFlushTimestamp = 0;
7:
8: private void doFlush(int retryTimes) {
9: int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
10:
11: // retryTimes == RETRY_TIMES_OVER時,進行強制flush。主要用於shutdown時。
12: if (retryTimes == RETRY_TIMES_OVER) {
13: flushConsumeQueueLeastPages = 0;
14: }
15: // 當時間滿足flushConsumeQueueThoroughInterval時,即使寫入的數量不足flushConsumeQueueLeastPages,也進行flush
16: long logicsMsgTimestamp = 0;
17: int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
18: long currentTimeMillis = System.currentTimeMillis();
19: if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
20: this.lastFlushTimestamp = currentTimeMillis;
21: flushConsumeQueueLeastPages = 0;
22: logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
23: }
24: // flush消費隊列
25: ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
26: for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
27: for (ConsumeQueue cq : maps.values()) {
28: boolean result = false;
29: for (int i = 0; i < retryTimes && !result; i++) {
30: result = cq.flush(flushConsumeQueueLeastPages);
31: }
32: }
33: }
34: // flush 存儲 check point
35: if (0 == flushConsumeQueueLeastPages) {
36: if (logicsMsgTimestamp > 0) {
37: DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
38: }
39: DefaultMessageStore.this.getStoreCheckpoint().flush();
40: }
41: }
42:
43: public void run() {
44: DefaultMessageStore.log.info(this.getServiceName() + " service started");
45:
46: while (!this.isStopped()) {
47: try {
48: int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
49: this.waitForRunning(interval);
50: this.doFlush(1);
51: } catch (Exception e) {
52: DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
53: }
54: }
55:
56: this.doFlush(RETRY_TIMES_OVER);
57:
58: DefaultMessageStore.log.info(this.getServiceName() + " service end");
59: }
60:
61: @Override
62: public String getServiceName() {
63: return FlushConsumeQueueService.class.getSimpleName();
64: }
65:
66: @Override
67: public long getJointime() {
68: return 1000 * 60;
69: }
70: }
- 說明 :flush
ConsumeQueue
(消費隊列) 線程服務。 - 第 11 至 14 行 :當
retryTimes == RETRY_TIMES_OVER
時,進行強制flush。用於shutdown
時。 - 第 15 至 23 行 :每 flushConsumeQueueThoroughInterval 周期,執行一次 flush 。因為不是每次循環到都能滿足 flushConsumeQueueLeastPages 大小,因此,需要一定周期進行一次強制 flush 。當然,不能每次循環都去執行強制 flush,這樣性能較差。
- 第 24 至 33 行 :flush
ConsumeQueue
(消費隊列)。- flush 邏輯:MappedFile#落盤。
- 第 34 至 40 行 :flush
StoreCheckpoint
。StoreCheckpoint
的詳細解析見:Store初始化與關閉。 - 第 43 至 59 行 :每 1000ms 執行一次
flush
。如果 wakeup() 時,則會立即進行一次flush
。目前,暫時不存在 wakeup() 的調用。
4、Broker 提供[拉取消息]接口
PullMessageRequestHeader
1: public class PullMessageRequestHeader implements CommandCustomHeader {
2: /**
3: * 消費者分組
4: */
5: @CFNotNull
6: private String consumerGroup;
7: /**
8: * Topic
9: */
10: @CFNotNull
11: private String topic;
12: /**
13: * 隊列編號
14: */
15: @CFNotNull
16: private Integer queueId;
17: /**
18: * 隊列開始位置
19: */
20: @CFNotNull
21: private Long queueOffset;
22: /**
23: * 消息數量
24: */
25: @CFNotNull
26: private Integer maxMsgNums;
27: /**
28: * 系統標識
29: */
30: @CFNotNull
31: private Integer sysFlag;
32: /**
33: * 提交消費進度位置
34: */
35: @CFNotNull
36: private Long commitOffset;
37: /**
38: * 掛起超時時間
39: */
40: @CFNotNull
41: private Long suspendTimeoutMillis;
42: /**
43: * 訂閱表達式
44: */
45: @CFNullable
46: private String subscription;
47: /**
48: * 訂閱版本號
49: */
50: @CFNotNull
51: private Long subVersion;
52: }
- 說明:拉取消息請求Header
- topic + queueId + queueOffset + maxMsgNums
- sysFlag :系統標識。
- 第 0 位
FLAG_COMMIT_OFFSET
:標記請求提交消費進度位置,和commitOffset
配合。 - 第 1 位
FLAG_SUSPEND
:標記請求是否掛起請求,和suspendTimeoutMillis
配合。當拉取不到消息時,Broker
會掛起請求,直到有消息。最大掛起時間:suspendTimeoutMillis
毫秒。 - 第 2 位
FLAG_SUBSCRIPTION
:是否過濾訂閱表達式,和subscription
配置。
- 第 0 位
- subVersion :訂閱版本號。請求時,如果版本號不對,則無法拉取到消息,需要重新獲取訂閱信息,使用最新的訂閱版本號。
PullMessageProcessor#processRequest(...)
1: private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
2: throws RemotingCommandException {
3: RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
4: final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
5: final PullMessageRequestHeader requestHeader =
6: (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
7:
8: response.setOpaque(request.getOpaque());
9:
10: if (LOG.isDebugEnabled()) {
11: LOG.debug("receive PullMessage request command, {}", request);
12: }
13:
14: // 校驗 broker 是否可讀
15: if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
16: response.setCode(ResponseCode.NO_PERMISSION);
17: response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
18: return response;
19: }
20:
21: // 校驗 consumer分組配置 是否存在
22: SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
23: if (null == subscriptionGroupConfig) {
24: response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
25: response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
26: return response;
27: }
28: // 校驗 consumer分組配置 是否可消費
29: if (!subscriptionGroupConfig.isConsumeEnable()) {
30: response.setCode(ResponseCode.NO_PERMISSION);
31: response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
32: return response;
33: }
34:
35: final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag()); // 是否掛起請求,當沒有消息時
36: final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag()); // 是否提交消費進度
37: final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag()); // 是否過濾訂閱表達式(subscription)
38: final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0; // 掛起請求超時時長
39:
40: // 校驗 topic配置 存在
41: TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
42: if (null == topicConfig) {
43: LOG.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
44: response.setCode(ResponseCode.TOPIC_NOT_EXIST);
45: response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
46: return response;
47: }
48: // 校驗 topic配置 權限可讀
49: if (!PermName.isReadable(topicConfig.getPerm())) {
50: response.setCode(ResponseCode.NO_PERMISSION);
51: response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
52: return response;
53: }
54: // 校驗 讀取隊列 在 topic配置 隊列范圍內
55: if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
56: String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
57: requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
58: LOG.warn(errorInfo);
59: response.setCode(ResponseCode.SYSTEM_ERROR);
60: response.setRemark(errorInfo);
61: return response;
62: }
63:
64: // 校驗 訂閱關系
65: SubscriptionData subscriptionData;
66: if (hasSubscriptionFlag) {
67: try {
68: subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
69: requestHeader.getSubscription());
70: } catch (Exception e) {
71: LOG.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
72: requestHeader.getConsumerGroup());
73: response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
74: response.setRemark("parse the consumer's subscription failed");
75: return response;
76: }
77: } else {
78: // 校驗 消費分組信息 是否存在
79: ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
80: if (null == consumerGroupInfo) {
81: LOG.warn("The consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
82: response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
83: response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
84: return response;
85: }
86: // 校驗 消費分組信息 消息模型是否匹配
87: if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //
88: && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
89: response.setCode(ResponseCode.NO_PERMISSION);
90: response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
91: return response;
92: }
93:
94: // 校驗 訂閱信息 是否存在
95: subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
96: if (null == subscriptionData) {
97: LOG.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
98: response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
99: response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
100: return response;
101: }
102: // 校驗 訂閱信息版本 是否合法
103: if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
104: LOG.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
105: subscriptionData.getSubString());
106: response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
107: response.setRemark("the consumer's subscription not latest");
108: return response;
109: }
110: }
111:
112: // 獲取消息
113: final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
114: requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);
115: if (getMessageResult != null) {
116: response.setRemark(getMessageResult.getStatus().name());
117: responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
118: responseHeader.setMinOffset(getMessageResult.getMinOffset());
119: responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
120:
121: // TODO 待讀
122: // 計算建議讀取brokerId
123: if (getMessageResult.isSuggestPullingFromSlave()) {
124: responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
125: } else {
126: responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
127: }
128:
129: switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
130: case ASYNC_MASTER:
131: case SYNC_MASTER:
132: break;
133: case SLAVE:
134: if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { // 從節點不允許讀取,告訴consumer讀取主節點。
135: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
136: responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
137: }
138: break;
139: }
140:
141: if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
142: // consume too slow ,redirect to another machine
143: if (getMessageResult.isSuggestPullingFromSlave()) {
144: responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
145: }
146: // consume ok
147: else {
148: responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
149: }
150: } else {
151: responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
152: }
153:
154: switch (getMessageResult.getStatus()) {
155: case FOUND:
156: response.setCode(ResponseCode.SUCCESS);
157: break;
158: case MESSAGE_WAS_REMOVING:
159: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
160: break;
161: case NO_MATCHED_LOGIC_QUEUE:
162: case NO_MESSAGE_IN_QUEUE:
163: if (0 != requestHeader.getQueueOffset()) {
164: response.setCode(ResponseCode.PULL_OFFSET_MOVED);
165:
166: // XXX: warn and notify me
167: LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
168: requestHeader.getQueueOffset(), //
169: getMessageResult.getNextBeginOffset(), //
170: requestHeader.getTopic(), //
171: requestHeader.getQueueId(), //
172: requestHeader.getConsumerGroup()//
173: );
174: } else {
175: response.setCode(ResponseCode.PULL_NOT_FOUND);
176: }
177: break;
178: case NO_MATCHED_MESSAGE:
179: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
180: break;
181: case OFFSET_FOUND_NULL:
182: response.setCode(ResponseCode.PULL_NOT_FOUND);
183: break;
184: case OFFSET_OVERFLOW_BADLY:
185: response.setCode(ResponseCode.PULL_OFFSET_MOVED);
186: // XXX: warn and notify me
187: LOG.info("The request offset:{} over flow badly, broker max offset:{} , consumer: {}", requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
188: break;
189: case OFFSET_OVERFLOW_ONE:
190: response.setCode(ResponseCode.PULL_NOT_FOUND);
191: break;
192: case OFFSET_TOO_SMALL:
193: response.setCode(ResponseCode.PULL_OFFSET_MOVED);
194: LOG.info("The request offset is too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
195: requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
196: getMessageResult.getMinOffset(), channel.remoteAddress());
197: break;
198: default:
199: assert false;
200: break;
201: }
202:
203: // hook:before
204: if (this.hasConsumeMessageHook()) {
205: ConsumeMessageContext context = new ConsumeMessageContext();
206: context.setConsumerGroup(requestHeader.getConsumerGroup());
207: context.setTopic(requestHeader.getTopic());
208: context.setQueueId(requestHeader.getQueueId());
209:
210: String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
211:
212: switch (response.getCode()) {
213: case ResponseCode.SUCCESS:
214: int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
215: int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
216:
217: context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
218: context.setCommercialRcvTimes(incValue);
219: context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
220: context.setCommercialOwner(owner);
221:
222: break;
223: case ResponseCode.PULL_NOT_FOUND:
224: if (!brokerAllowSuspend) {
225:
226: context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
227: context.setCommercialRcvTimes(1);
228: context.setCommercialOwner(owner);
229:
230: }
231: break;
232: case ResponseCode.PULL_RETRY_IMMEDIATELY:
233: case ResponseCode.PULL_OFFSET_MOVED:
234: context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
235: context.setCommercialRcvTimes(1);
236: context.setCommercialOwner(owner);
237: break;
238: default:
239: assert false;
240: break;
241: }
242:
243: this.executeConsumeMessageHookBefore(context);
244: }
245:
246: switch (response.getCode()) {
247: case ResponseCode.SUCCESS:
248:
249: this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
250: getMessageResult.getMessageCount());
251: this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
252: getMessageResult.getBufferTotalSize());
253: this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
254: // 讀取消息
255: if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { // 內存中
256: final long beginTimeMills = this.brokerController.getMessageStore().now();
257:
258: final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
259:
260: this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
261: requestHeader.getTopic(), requestHeader.getQueueId(),
262: (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
263: response.setBody(r);
264: } else { // zero-copy
265: try {
266: FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
267: channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
268: @Override
269: public void operationComplete(ChannelFuture future) throws Exception {
270: getMessageResult.release();
271: if (!future.isSuccess()) {
272: LOG.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause());
273: }
274: }
275: });
276: } catch (Throwable e) {
277: LOG.error("Error occurred when transferring messages from page cache", e);
278: getMessageResult.release();
279: }
280:
281: response = null;
282: }
283: break;
284: case ResponseCode.PULL_NOT_FOUND:
285: // 消息未查詢到 && broker允許掛起請求 && 請求允許掛起
286: if (brokerAllowSuspend && hasSuspendFlag) {
287: long pollingTimeMills = suspendTimeoutMillisLong;
288: if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
289: pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
290: }
291:
292: String topic = requestHeader.getTopic();
293: long offset = requestHeader.getQueueOffset();
294: int queueId = requestHeader.getQueueId();
295: PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
296: this.brokerController.getMessageStore().now(), offset, subscriptionData);
297: this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
298: response = null;
299: break;
300: }
301:
302: case ResponseCode.PULL_RETRY_IMMEDIATELY:
303: break;
304: case ResponseCode.PULL_OFFSET_MOVED:
305: if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
306: || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) { // TODO 待博客補充
307: MessageQueue mq = new MessageQueue();
308: mq.setTopic(requestHeader.getTopic());
309: mq.setQueueId(requestHeader.getQueueId());
310: mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
311:
312: OffsetMovedEvent event = new OffsetMovedEvent();
313: event.setConsumerGroup(requestHeader.getConsumerGroup());
314: event.setMessageQueue(mq);
315: event.setOffsetRequest(requestHeader.getQueueOffset());
316: event.setOffsetNew(getMessageResult.getNextBeginOffset());
317: this.generateOffsetMovedEvent(event);
318: LOG.warn(
319: "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
320: requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
321: responseHeader.getSuggestWhichBrokerId());
322: } else {
323: responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
324: response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
325: LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
326: requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
327: responseHeader.getSuggestWhichBrokerId());
328: }
329:
330: break;
331: default:
332: assert false;
333: }
334: } else {
335: response.setCode(ResponseCode.SYSTEM_ERROR);
336: response.setRemark("store getMessage return null");
337: }
338:
339: // 請求要求持久化進度 && broker非主,進行持久化進度。
340: boolean storeOffsetEnable = brokerAllowSuspend;
341: storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
342: storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
343: if (storeOffsetEnable) {
344: this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
345: requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
346: }
347: return response;
348: }
- 說明:處理拉取消息請求,返回響應。
- 第 14 至 19 行 :校驗
Broker
是否可讀。 - 第 21 至 33 行 :校驗
SubscriptionGroupConfig
(訂閱分組配置) 是否存在 && 可以消費。 - 第 35 至 38 行 :處理
PullMessageRequestHeader.sysFlag
對應的標志位。 - 第 40 至 62 行 :校驗
TopicConfig
(主題配置) 是否存在 && 可讀 && 隊列編號正確。 - 第 64 至 110 行 :校驗
SubscriptionData
(訂閱信息) 是否正確。 - 第 113 行 :調用
MessageStore#getMessage(...)
獲取GetMessageResult
(消息)。詳細解析見:MessageStore#getMessage(...)。 - 第 122 至 152 行 :計算建議拉取消息
brokerId
。 - 第 154 至 201 行 :
- 第 204 至 244 行 :
Hook
邏輯,#executeConsumeMessageHookBefore(...)
。 - 第 247 至 283 行 :拉取消息成功,即拉取到消息。
- 第 255 至 263 行 :方式一 :調用
readGetMessageResult(...)
獲取消息內容到堆內內存,設置到 響應body
。 - 第 265 至 281 行 :方式二 :基於
zero-copy
實現,直接響應,無需堆內內存,性能更優。TODO :此處等對zero-copy有研究,再補充一些。
- 第 255 至 263 行 :方式一 :調用
- 第 284 至 300 行 :拉取不到消息,當滿足條件 (
Broker
允許掛起 && 請求要求掛起),執行掛起請求。詳細解析見:PullRequestHoldService。 - 第 304 至 328 行 :TODO :此處等對
tools
模塊研究后再補充。 - 第 339 至 346 :持久化消費進度,當滿足 (
Broker
非主 && 請求要求持久化進度)。詳細解析見:更新消費進度。
MessageStore#getMessage(...)
1: /**
2: * 獲取消息結果
3: *
4: * @param group 消費分組
5: * @param topic 主題
6: * @param queueId 隊列編號
7: * @param offset 隊列位置
8: * @param maxMsgNums 消息數量
9: * @param subscriptionData 訂閱信息
10: * @return 消息結果
11: */
12: public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
13: final SubscriptionData subscriptionData) {
14: // 是否關閉
15: if (this.shutdown) {
16: log.warn("message store has shutdown, so getMessage is forbidden");
17: return null;
18: }
19: // 是否可讀
20: if (!this.runningFlags.isReadable()) {
21: log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
22: return null;
23: }
24:
25: long beginTime = this.getSystemClock().now();
26:
27: GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
28: long nextBeginOffset = offset;
29: long minOffset = 0;
30: long maxOffset = 0;
31:
32: GetMessageResult getResult = new GetMessageResult();
33:
34: final long maxOffsetPy = this.commitLog.getMaxOffset();
35:
36: // 獲取消費隊列
37: ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
38: if (consumeQueue != null) {
39: minOffset = consumeQueue.getMinOffsetInQueue(); // 消費隊列 最小隊列編號
40: maxOffset = consumeQueue.getMaxOffsetInQueue(); // 消費隊列 最大隊列編號
41:
42: // 判斷 隊列位置(offset)
43: if (maxOffset == 0) { // 消費隊列無消息
44: status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
45: nextBeginOffset = nextOffsetCorrection(offset, 0);
46: } else if (offset < minOffset) { // 查詢offset 太小
47: status = GetMessageStatus.OFFSET_TOO_SMALL;
48: nextBeginOffset = nextOffsetCorrection(offset, minOffset);
49: } else if (offset == maxOffset) { // 查詢offset 超過 消費隊列 一個位置
50: status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
51: nextBeginOffset = nextOffsetCorrection(offset, offset);
52: } else if (offset > maxOffset) { // 查詢offset 超過 消費隊列 太多(大於一個位置)
53: status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
54: if (0 == minOffset) { // TODO blog 這里是??為啥0 == minOffset做了特殊判斷
55: nextBeginOffset = nextOffsetCorrection(offset, minOffset);
56: } else {
57: nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
58: }
59: } else {
60: // 獲得 映射Buffer結果(MappedFile)
61: SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
62: if (bufferConsumeQueue != null) {
63: try {
64: status = GetMessageStatus.NO_MATCHED_MESSAGE;
65:
66: long nextPhyFileStartOffset = Long.MIN_VALUE; // commitLog下一個文件(MappedFile)對應的開始offset。
67: long maxPhyOffsetPulling = 0; // 消息物理位置拉取到的最大offset
68:
69: int i = 0;
70: final int maxFilterMessageCount = 16000;
71: final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
72: // 循環獲取 消息位置信息
73: for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
74: long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); // 消息物理位置offset
75: int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); // 消息長度
76: long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); // 消息tagsCode
77: // 設置消息物理位置拉取到的最大offset
78: maxPhyOffsetPulling = offsetPy;
79: // 當 offsetPy 小於 nextPhyFileStartOffset 時,意味着對應的 Message 已經移除,所以直接continue,直到可讀取的Message。
80: if (nextPhyFileStartOffset != Long.MIN_VALUE) {
81: if (offsetPy < nextPhyFileStartOffset)
82: continue;
83: }
84: // 校驗 commitLog 是否需要硬盤,無法全部放在內存
85: boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
86: // 是否已經獲得足夠消息
87: if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
88: isInDisk)) {
89: break;
90: }
91: // 判斷消息是否符合條件
92: if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
93: // 從commitLog獲取對應消息ByteBuffer
94: SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
95: if (selectResult != null) {
96: this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
97: getResult.addMessage(selectResult);
98: status = GetMessageStatus.FOUND;
99: nextPhyFileStartOffset = Long.MIN_VALUE;
100: } else {
101: // 從commitLog無法讀取到消息,說明該消息對應的文件(MappedFile)已經刪除,計算下一個MappedFile的起始位置
102: if (getResult.getBufferTotalSize() == 0) {
103: status = GetMessageStatus.MESSAGE_WAS_REMOVING;
104: }
105: nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
106: }
107: } else {
108: if (getResult.getBufferTotalSize() == 0) {
109: status = GetMessageStatus.NO_MATCHED_MESSAGE;
110: }
111:
112: if (log.isDebugEnabled()) {
113: log.debug("message type not matched, client: " + subscriptionData + " server: " + tagsCode);
114: }
115: }
116: }
117: // 統計剩余可拉取消息字節數
118: if (diskFallRecorded) {
119: long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
120: brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
121: }
122: // 計算下次拉取消息的消息隊列編號
123: nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
124: // 根據剩余可拉取消息字節數與內存判斷是否建議讀取從節點
125: long diff = maxOffsetPy - maxPhyOffsetPulling;
126: long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
127: * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
128: getResult.setSuggestPullingFromSlave(diff > memory);
129: } finally {
130: bufferConsumeQueue.release();
131: }
132: } else {
133: status = GetMessageStatus.OFFSET_FOUND_NULL;
134: nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
135: log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
136: + maxOffset + ", but access logic queue failed.");
137: }
138: }
139: } else {
140: status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
141: nextBeginOffset = nextOffsetCorrection(offset, 0);
142: }
143: // 統計
144: if (GetMessageStatus.FOUND == status) {
145: this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
146: } else {
147: this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
148: }
149: long eclipseTime = this.getSystemClock().now() - beginTime;
150: this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);
151: // 設置返回結果
152: getResult.setStatus(status);
153: getResult.setNextBeginOffset(nextBeginOffset);
154: getResult.setMaxOffset(maxOffset);
155: getResult.setMinOffset(minOffset);
156: return getResult;
157: }
158:
159: /**
160: * 根據 主題 + 隊列編號 獲取 消費隊列
161: *
162: * @param topic 主題
163: * @param queueId 隊列編號
164: * @return 消費隊列
165: */
166: public ConsumeQueue findConsumeQueue(String topic, int queueId) {
167: // 獲取 topic 對應的 所有消費隊列
168: ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
169: if (null == map) {
170: ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<>(128);
171: ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
172: if (oldMap != null) {
173: map = oldMap;
174: } else {
175: map = newMap;
176: }
177: }
178: // 獲取 queueId 對應的 消費隊列
179: ConsumeQueue logic = map.get(queueId);
180: if (null == logic) {
181: ConsumeQueue newLogic = new ConsumeQueue(//
182: topic, //
183: queueId, //
184: StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
185: this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
186: this);
187: ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
188: if (oldLogic != null) {
189: logic = oldLogic;
190: } else {
191: logic = newLogic;
192: }
193: }
194:
195: return logic;
196: }
197:
198: /**
199: * 下一個獲取隊列offset修正
200: * 修正條件:主節點 或者 從節點開啟校驗offset開關
201: *
202: * @param oldOffset 老隊列offset
203: * @param newOffset 新隊列offset
204: * @return 修正后的隊列offset
205: */
206: private long nextOffsetCorrection(long oldOffset, long newOffset) {
207: long nextOffset = oldOffset;
208: if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.getMessageStoreConfig().isOffsetCheckInSlave()) {
209: nextOffset = newOffset;
210: }
211: return nextOffset;
212: }
213:
214: /**
215: * 校驗 commitLog 是否需要硬盤,無法全部放在內存
216: *
217: * @param offsetPy commitLog 指定offset
218: * @param maxOffsetPy commitLog 最大offset
219: * @return 是否需要硬盤
220: */
221: private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) {
222: long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
223: return (maxOffsetPy - offsetPy) > memory;
224: }
225:
226: /**
227: * 判斷獲取消息是否已經滿
228: *
229: * @param sizePy 字節數
230: * @param maxMsgNums 最大消息數
231: * @param bufferTotal 目前已經計算字節數
232: * @param messageTotal 目前已經計算消息數
233: * @param isInDisk 是否在硬盤中
234: * @return 是否已滿
235: */
236: private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) {
237: if (0 == bufferTotal || 0 == messageTotal) {
238: return false;
239: }
240: // 消息數量已經滿足請求數量(maxMsgNums)
241: if ((messageTotal + 1) >= maxMsgNums) {
242: return true;
243: }
244: // 根據消息存儲配置的最大傳輸字節數、最大傳輸消息數是否已滿
245: if (isInDisk) {
246: if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
247: return true;
248: }
249:
250: if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk()) {
251: return true;
252: }
253: } else {
254: if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
255: return true;
256: }
257:
258: if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory()) {
259: return true;
260: }
261: }
262:
263: return false;
264: }
- 說明 :根據 消息分組(
group
) + 主題(Topic
) + 隊列編號(queueId
) + 隊列位置(offset
) + 訂閱信息(subscriptionData
) 獲取 指定條數(maxMsgNums
) 消息(Message
)。 - 第 14 至 18 行 :判斷
Store
是否處於關閉狀態,若關閉,則無法獲取消息。 - 第 19 至 23 行 :判斷當前運行狀態是否可讀,若不可讀,則無法獲取消息。
- 第 37 行 :根據 主題(
Topic
) + 隊列編號(queueId
) 獲取 消息隊列(ConsumeQueue
)。#findConsumeQueue(...)
:第 159 至 196 行。
- 第 43 至 58 行 :各種隊列位置(
offset
) 無法讀取消息,並針對對應的情況,計算下一次Client
隊列拉取位置。- 第 43 至 45 行 :消息隊列無消息。
- 第 46 至 48 行 :查詢的消息隊列位置(
offset
) 太小。 - 第 49 至 51 行 :查詢的消息隊列位置(
offset
) 恰好等於 消息隊列最大的隊列位置。該情況是正常現象,相當於查詢最新的消息。 - 第 52 至 58 行 :查詢的消息隊列位置(
offset
) 超過過多。 #nextOffsetCorrection(...)
:第 198 至 212 行。
- 第 61 行 :根據 消費隊列位置(
offset
) 獲取 對應的MappedFile
。 - 第 72 至 128 行 :循環獲取
消息位置信息
。- 第 74 至 76 行 :讀取每一個
消息位置信息
。 - 第 79 至 83 行 :當
offsetPy
小於nextPhyFileStartOffset
時,意味着對
應的Message
已經移除,所以直接continue,直到可讀取的Message
。 - 第 84 至 90 行 :判斷是否已經獲得足夠的消息。
#checkInDiskByCommitOffset(...)
:第 214 至 224 行。#isTheBatchFull(...)
:第 226 至 264 行。
- 第 74 至 76 行 :讀取每一個
- 第 92 行 :判斷消息是否符合條件。詳細解析見:DefaultMessageFilter#isMessageMatched(...)。
- 第 94 行 :從
CommitLog
獲取對應 消息的MappedByteBuffer
。 - 第 95 至 99 行 :獲取
消息MappedByteBuffer
成功。 - 第 100 至 106 行 :獲取
消息MappedByteBuffer
失敗。從CommitLog
無法讀取到消息,說明 該消息對應的文件(MappedFile
) 已經刪除,此時計算下一個MappedFile
的起始位置。該邏輯需要配合(第 79 至 83 行)一起理解。 - 第 117 至 120 行 :統計剩余可拉取消息字節數。
- 第 123 行 :計算下次拉取消息的消息隊列編號。
- 第 124 至 128 行 :根據剩余可拉取消息字節數與內存判斷是否建議讀取從節點。
- 第 130 行 :釋放
bufferConsumeQueue
對MappedFile
的指向。此處MappedFile
是ConsumeQueue
里的文件,不是CommitLog
下的文件。 - 第 133 至 136 行 :獲得消費隊列位置(
offset
) 獲取 對應的MappedFile
為空,計算ConsumeQueue
從offset
開始的下一個MappedFile
對應的位置。 - 第 143 至 150 行 :記錄統計信息:消耗時間、拉取到消息/未拉取到消息次數。
- 第 151 至 156 行 :設置返回結果並返回。
DefaultMessageFilter#isMessageMatched(...)
1: public class DefaultMessageFilter implements MessageFilter {
2:
3: @Override
4: public boolean isMessageMatched(SubscriptionData subscriptionData, Long tagsCode) {
5: // 消息tagsCode 空
6: if (tagsCode == null) {
7: return true;
8: }
9: // 訂閱數據 空
10: if (null == subscriptionData) {
11: return true;
12: }
13: // classFilter
14: if (subscriptionData.isClassFilterMode())
15: return true;
16: // 訂閱表達式 全匹配
17: if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
18: return true;
19: }
20: // 訂閱數據code數組 是否包含 消息tagsCode
21: return subscriptionData.getCodeSet().contains(tagsCode.intValue());
22: }
23:
24: }
- 說明 :消息過濾器默認實現。
PullRequestHoldService
1: public class PullRequestHoldService extends ServiceThread {
2:
3: private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
4:
5: private static final String TOPIC_QUEUEID_SEPARATOR = "@";
6:
7: private final BrokerController brokerController;
8:
9: private final SystemClock systemClock = new SystemClock();
10: /**
11: * 消息過濾器
12: */
13: private final MessageFilter messageFilter = new DefaultMessageFilter();
14: /**
15: * 拉取消息請求集合
16: */
17: private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
18: new ConcurrentHashMap<>(1024);
19:
20: public PullRequestHoldService(final BrokerController brokerController) {
21: this.brokerController = brokerController;
22: }
23:
24: /**
25: * 添加拉取消息掛起請求
26: *
27: * @param topic 主題
28: * @param queueId 隊列編號
29: * @param pullRequest 拉取消息請求
30: */
31: public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
32: String key = this.buildKey(topic, queueId);
33: ManyPullRequest mpr = this.pullRequestTable.get(key);
34: if (null == mpr) {
35: mpr = new ManyPullRequest();
36: ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
37: if (prev != null) {
38: mpr = prev;
39: }
40: }
41:
42: mpr.addPullRequest(pullRequest);
43: }
44:
45: /**
46: * 根據 主題 + 隊列編號 創建唯一標識
47: *
48: * @param topic 主題
49: * @param queueId 隊列編號
50: * @return key
51: */
52: private String buildKey(final String topic, final int queueId) {
53: StringBuilder sb = new StringBuilder();
54: sb.append(topic);
55: sb.append(TOPIC_QUEUEID_SEPARATOR);
56: sb.append(queueId);
57: return sb.toString();
58: }
59:
60: @Override
61: public void run() {
62: log.info("{} service started", this.getServiceName());
63: while (!this.isStopped()) {
64: try {
65: // 根據 長輪訓 還是 短輪訓 設置不同的等待時間
66: if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
67: this.waitForRunning(5 * 1000);
68: } else {
69: this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
70: }
71: // 檢查掛起請求是否有需要通知的
72: long beginLockTimestamp = this.systemClock.now();
73: this.checkHoldRequest();
74: long costTime = this.systemClock.now() - beginLockTimestamp;
75: if (costTime > 5 * 1000) {
76: log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
77: }
78: } catch (Throwable e) {
79: log.warn(this.getServiceName() + " service has exception. ", e);
80: }
81: }
82:
83: log.info("{} service end", this.getServiceName());
84: }
85:
86: @Override
87: public String getServiceName() {
88: return PullRequestHoldService.class.getSimpleName();
89: }
90:
91: /**
92: * 遍歷掛起請求,檢查是否有需要通知的請求。
93: */
94: private void checkHoldRequest() {
95: for (String key : this.pullRequestTable.keySet()) {
96: String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
97: if (2 == kArray.length) {
98: String topic = kArray[0];
99: int queueId = Integer.parseInt(kArray[1]);
100: final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
101: try {
102: this.notifyMessageArriving(topic, queueId, offset);
103: } catch (Throwable e) {
104: log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
105: }
106: }
107: }
108: }
109:
110: /**
111: * 檢查是否有需要通知的請求
112: *
113: * @param topic 主題
114: * @param queueId 隊列編號
115: * @param maxOffset 消費隊列最大offset
116: */
117: public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
118: notifyMessageArriving(topic, queueId, maxOffset, null);
119: }
120:
121: /**
122: * 檢查是否有需要通知的請求
123: *
124: * @param topic 主題
125: * @param queueId 隊列編號
126: * @param maxOffset 消費隊列最大offset
127: * @param tagsCode 過濾tagsCode
128: */
129: public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {
130: String key = this.buildKey(topic, queueId);
131: ManyPullRequest mpr = this.pullRequestTable.get(key);
132: if (mpr != null) {
133: //
134: List<PullRequest> requestList = mpr.cloneListAndClear();
135: if (requestList != null) {
136: List<PullRequest> replayList = new ArrayList<>(); // 不符合喚醒的請求數組
137:
138: for (PullRequest request : requestList) {
139: // 如果 maxOffset 過小,則重新讀取一次。
140: long newestOffset = maxOffset;
141: if (newestOffset <= request.getPullFromThisOffset()) {
142: newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
143: }
144: // 有新的匹配消息,喚醒請求,即再次拉取消息。
145: if (newestOffset > request.getPullFromThisOffset()) {
146: if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) {
147: try {
148: this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
149: request.getRequestCommand());
150: } catch (Throwable e) {
151: log.error("execute request when wakeup failed.", e);
152: }
153: continue;
154: }
155: }
156: // 超過掛起時間,喚醒請求,即再次拉取消息。
157: if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
158: try {
159: this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
160: request.getRequestCommand());
161: } catch (Throwable e) {
162: log.error("execute request when wakeup failed.", e);
163: }
164: continue;
165: }
166: // 不符合再次拉取的請求,再次添加回去
167: replayList.add(request);
168: }
169: // 添加回去
170: if (!replayList.isEmpty()) {
171: mpr.addPullRequest(replayList);
172: }
173: }
174: }
175: }
176: }
PullRequestHoldService
說明 :拉取消息請求掛起維護線程服務。- 當拉取消息請求獲得不了消息時,則會將請求進行掛起,添加到該服務。
- 當有符合條件信息時 或 掛起超時時,重新執行獲取消息邏輯。
#suspendPullRequest(...)
說明 :添加拉取消息掛起請求到集合(pullRequestTable
)。#run(...)
說明 :定時檢查掛起請求是否有需要通知重新拉取消息並進行通知。- 第 65 至 70 行 :根據
長輪訓
or短輪訓
設置不同的等待時間。 - 第 71 至 77 行 :檢查掛起請求是否有需要通知的。
- 第 65 至 70 行 :根據
#checkHoldRequest(...)
說明 :遍歷掛起請求,檢查是否有需要通知的。#notifyMessageArriving(...)
說明 :檢查指定隊列是否有需要通知的請求。- 第 139 至 143 行 :如果
maxOffset
過小,重新獲取一次最新的。 - 第 144 至 155 行 :有新的匹配消息,喚醒請求,即再次拉取消息。
- 第 156 至 165 行 :超過掛起時間,喚醒請求,即再次拉取消息。
- 第 148 || 159 行 :喚醒請求,再次拉取消息。原先擔心拉取消息時間過長,導致影響整個掛起請求的遍歷,后面查看
#executeRequestWhenWakeup(...)
,實際是丟到線程池進行一步的消息拉取,不會有性能上的問題。詳細解析見:PullMessageProcessor#executeRequestWhenWakeup(...)。 - 第 166 至 172 行 :不符合喚醒的請求重新添加到集合(
pullRequestTable
)。
- 第 139 至 143 行 :如果
PullMessageProcessor#executeRequestWhenWakeup(...)
1: public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException {
2: Runnable run = new Runnable() {
3: @Override
4: public void run() {
5: try {
6: // 調用拉取請求。本次調用,設置不掛起請求。
7: final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
8:
9: if (response != null) {
10: response.setOpaque(request.getOpaque());
11: response.markResponseType();
12: try {
13: channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
14: @Override
15: public void operationComplete(ChannelFuture future) throws Exception {
16: if (!future.isSuccess()) {
17: LOG.error("ProcessRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause());
18: LOG.error(request.toString());
19: LOG.error(response.toString());
20: }
21: }
22: });
23: } catch (Throwable e) {
24: LOG.error("ProcessRequestWrapper process request over, but response failed", e);
25: LOG.error(request.toString());
26: LOG.error(response.toString());
27: }
28: }
29: } catch (RemotingCommandException e1) {
30: LOG.error("ExecuteRequestWhenWakeup run", e1);
31: }
32: }
33: };
34: // 提交拉取請求到線程池
35: this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
36: }
- 說明 :執行請求喚醒,即再次拉取消息。該方法調用線程池,因此,不會阻塞。
- 第 7 行 :調用拉取消息請求。本次調用,設置即使請求不到消息,也不掛起請求。如果不設置,請求可能被無限掛起,被
Broker
無限循環。 - 第 35 行 :提交拉取消息請求到線程池。
5、Broker 提供[更新消費進度]接口
Yunai-MacdeMacBook-Pro-2:config yunai$ pwd
/Users/yunai/store/config
Yunai-MacdeMacBook-Pro-2:config yunai$ ls -ls
total 40
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 consumerOffset.json
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 consumerOffset.json.bak
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 delayOffset.json
8 -rw-r--r-- 1 yunai staff 21 4 28 16:58 delayOffset.json.bak
8 -rw-r--r-- 1 yunai staff 1401 4 27 21:51 topics.json
Yunai-MacdeMacBook-Pro-2:config yunai$ cat consumerOffset.json
{
"offsetTable":{
"%RETRY%please_rename_unique_group_name_4@please_rename_unique_group_name_4":{0:0
},
"TopicRead3@please_rename_unique_group_name_4":{1:5
}
}
}
consumerOffset.json
:消費進度存儲文件。consumerOffset.json.bak
:消費進度存儲文件備份。- 每次寫入
consumerOffset.json
,將原內容備份到consumerOffset.json.bak
。實現見:MixAll#string2File(...)。
BrokerController#initialize(...)
1:this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
2: @Override
3: public void run() {
4: try {
5: BrokerController.this.consumerOffsetManager.persist();
6: } catch (Throwable e) {
7: log.error("schedule persist consumerOffset error.", e);
8: }
9: }
10:}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
- 說明 :每 5s 執行一次持久化邏輯。
ConfigManager
1: public abstract class ConfigManager {
2: private static final Logger PLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
3:
4: /**
5: * 編碼內容
6: * @return 編碼后的內容
7: */
8: public abstract String encode();
9:
10: /**
11: * 加載文件
12: *
13: * @return 加載是否成功
14: */
15: public boolean load() {
16: String fileName = null;
17: try {
18: fileName = this.configFilePath();
19: String jsonString = MixAll.file2String(fileName);
20: // 如果內容不存在,則加載備份文件
21: if (null == jsonString || jsonString.length() == 0) {
22: return this.loadBak();
23: } else {
24: this.decode(jsonString);
25: PLOG.info("load {} OK", fileName);
26: return true;
27: }
28: } catch (Exception e) {
29: PLOG.error("load " + fileName + " Failed, and try to load backup file", e);
30: return this.loadBak();
31: }
32: }
33:
34: /**
35: * 配置文件地址
36: *
37: * @return 配置文件地址
38: */
39: public abstract String configFilePath();
40:
41: /**
42: * 加載備份文件
43: *
44: * @return 是否成功
45: */
46: private boolean loadBak() {
47: String fileName = null;
48: try {
49: fileName = this.configFilePath();
50: String jsonString = MixAll.file2String(fileName + ".bak");
51: if (jsonString != null && jsonString.length() > 0) {
52: this.decode(jsonString);
53: PLOG.info("load " + fileName + " OK");
54: return true;
55: }
56: } catch (Exception e) {
57: PLOG.error("load " + fileName + " Failed", e);
58: return false;
59: }
60:
61: return true;
62: }
63:
64: /**
65: * 解碼內容
66: *
67: * @param jsonString 內容
68: */
69: public abstract void decode(final String jsonString);
70:
71: /**
72: * 持久化
73: */
74: public synchronized void persist() {
75: String jsonString = this.encode(true);
76: if (jsonString != null) {
77: String fileName = this.configFilePath();
78: try {
79: MixAll.string2File(jsonString, fileName);
80: } catch (IOException e) {
81: PLOG.error("persist file Exception, " + fileName, e);
82: }
83: }
84: }
85:
86: /**
87: * 編碼存儲內容
88: *
89: * @param prettyFormat 是否格式化
90: * @return 內容
91: */
92: public abstract String encode(final boolean prettyFormat);
93: }
MixAll#string2File(...)
1: /**
2: * 將內容寫到文件
3: * 安全寫
4: * 1. 寫到.tmp文件
5: * 2. 備份准備寫入文件到.bak文件
6: * 3. 刪除原文件,將.tmp修改成文件
7: *
8: * @param str 內容
9: * @param fileName 文件名
10: * @throws IOException 當IO發生異常時
11: */
12: public static void string2File(final String str, final String fileName) throws IOException {
13: // 寫到 tmp文件
14: String tmpFile = fileName + ".tmp";
15: string2FileNotSafe(str, tmpFile);
16: //
17: String bakFile = fileName + ".bak";
18: String prevContent = file2String(fileName);
19: if (prevContent != null) {
20: string2FileNotSafe(prevContent, bakFile);
21: }
22:
23: File file = new File(fileName);
24: file.delete();
25:
26: file = new File(tmpFile);
27: file.renameTo(new File(fileName));
28: }
29:
30: /**
31: * 將內容寫到文件
32: * 非安全寫
33: *
34: * @param str 內容
35: * @param fileName 文件內容
36: * @throws IOException 當IO發生異常時
37: */
38: public static void string2FileNotSafe(final String str, final String fileName) throws IOException {
39: File file = new File(fileName);
40: // 創建上級目錄
41: File fileParent = file.getParentFile();
42: if (fileParent != null) {
43: fileParent.mkdirs();
44: }
45: // 寫內容
46: FileWriter fileWriter = null;
47: try {
48: fileWriter = new FileWriter(file);
49: fileWriter.write(str);
50: } catch (IOException e) {
51: throw e;
52: } finally {
53: if (fileWriter != null) {
54: fileWriter.close();
55: }
56: }
57: }
ConsumerOffsetManager
1: public class ConsumerOffsetManager extends ConfigManager {
2: private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
3: private static final String TOPIC_GROUP_SEPARATOR = "@";
4:
5: /**
6: * 消費進度集合
7: */
8: private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable = new ConcurrentHashMap<>(512);
9:
10: private transient BrokerController brokerController;
11:
12: public ConsumerOffsetManager() {
13: }
14:
15: public ConsumerOffsetManager(BrokerController brokerController) {
16: this.brokerController = brokerController;
17: }
18:
19: /**
20: * 提交消費進度
21: *
22: * @param clientHost 提交client地址
23: * @param group 消費分組
24: * @param topic 主題
25: * @param queueId 隊列編號
26: * @param offset 進度(隊列位置)
27: */
28: public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
29: // topic@group
30: String key = topic + TOPIC_GROUP_SEPARATOR + group;
31: this.commitOffset(clientHost, key, queueId, offset);
32: }
33:
34: /**
35: * 提交消費進度
36: *
37: * @param clientHost 提交client地址
38: * @param key 主題@消費分組
39: * @param queueId 隊列編號
40: * @param offset 進度(隊列位置)
41: */
42: private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
43: ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
44: if (null == map) {
45: map = new ConcurrentHashMap<>(32);
46: map.put(queueId, offset);
47: this.offsetTable.put(key, map);
48: } else {
49: Long storeOffset = map.put(queueId, offset);
50: if (storeOffset != null && offset < storeOffset) {
51: log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
52: }
53: }
54: }
55:
56: public String encode() {
57: return this.encode(false);
58: }
59:
60: @Override
61: public String configFilePath() {
62: return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
63: }
64:
65: /**
66: * 解碼內容
67: * 格式:JSON
68: *
69: * @param jsonString 內容
70: */
71: @Override
72: public void decode(String jsonString) {
73: if (jsonString != null) {
74: ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
75: if (obj != null) {
76: this.offsetTable = obj.offsetTable;
77: }
78: }
79: }
80:
81: /**
82: * 編碼內容
83: * 格式為JSON
84: *
85: * @param prettyFormat 是否格式化
86: * @return 編碼后的內容
87: */
88: public String encode(final boolean prettyFormat) {
89: return RemotingSerializable.toJson(this, prettyFormat);
90: }
91:
92: }
- 說明 :消費進度管理器。
6、Broker 提供[發回消息]接口
大部分邏輯和 Broker
提供[接收消息]接口 類似,可以先看下相關內容。
SendMessageProcessor#consumerSendMsgBack(...)
1: private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
2: throws RemotingCommandException {
3:
4: // 初始化響應
5: final RemotingCommand response = RemotingCommand.createResponseCommand(null);
6: final ConsumerSendMsgBackRequestHeader requestHeader =
7: (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
8:
9: // hook(獨有)
10: if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
11:
12: ConsumeMessageContext context = new ConsumeMessageContext();
13: context.setConsumerGroup(requestHeader.getGroup());
14: context.setTopic(requestHeader.getOriginTopic());
15: context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
16: context.setCommercialRcvTimes(1);
17: context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
18:
19: this.executeConsumeMessageHookAfter(context);
20: }
21:
22: // 判斷消費分組是否存在(獨有)
23: SubscriptionGroupConfig subscriptionGroupConfig =
24: this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
25: if (null == subscriptionGroupConfig) {
26: response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
27: response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
28: + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
29: return response;
30: }
31:
32: // 檢查 broker 是否有寫入權限
33: if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
34: response.setCode(ResponseCode.NO_PERMISSION);
35: response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
36: return response;
37: }
38:
39: // 檢查 重試隊列數 是否大於0(獨有)
40: if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
41: response.setCode(ResponseCode.SUCCESS);
42: response.setRemark(null);
43: return response;
44: }
45:
46: // 計算retry Topic
47: String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
48:
49: // 計算隊列編號(獨有)
50: int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
51:
52: // 計算sysFlag(獨有)
53: int topicSysFlag = 0;
54: if (requestHeader.isUnitMode()) {
55: topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
56: }
57:
58: // 獲取topicConfig。如果獲取不到,則進行創建
59: TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(//
60: newTopic, //
61: subscriptionGroupConfig.getRetryQueueNums(), //
62: PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
63: if (null == topicConfig) { // 沒有配置
64: response.setCode(ResponseCode.SYSTEM_ERROR);
65: response.setRemark("topic[" + newTopic + "] not exist");
66: return response;
67: }
68: if (!PermName.isWriteable(topicConfig.getPerm())) { // 不允許寫入
69: response.setCode(ResponseCode.NO_PERMISSION);
70: response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
71: return response;
72: }
73:
74: // 查詢消息。若不存在,返回異常錯誤。(獨有)
75: MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
76: if (null == msgExt) {
77: response.setCode(ResponseCode.SYSTEM_ERROR);
78: response.setRemark("look message by offset failed, " + requestHeader.getOffset());
79: return response;
80: }
81:
82: // 設置retryTopic到拓展屬性(獨有)
83: final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
84: if (null == retryTopic) {
85: MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
86: }
87:
88: // 設置消息不等待存儲完成(獨有) TODO 疑問:如果設置成不等待存儲,broker設置成同步落盤,豈不是不能批量提交了?
89: msgExt.setWaitStoreMsgOK(false);
90:
91: // 處理 delayLevel(獨有)。
92: int delayLevel = requestHeader.getDelayLevel();
93: int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
94: if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
95: maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
96: }
97: if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
98: || delayLevel < 0) { // 如果超過最大消費次數,則topic修改成"%DLQ%" + 分組名,即加入 死信隊列(Dead Letter Queue)
99: newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
100: queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
101:
102: topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
103: DLQ_NUMS_PER_GROUP, //
104: PermName.PERM_WRITE, 0
105: );
106: if (null == topicConfig) {
107: response.setCode(ResponseCode.SYSTEM_ERROR);
108: response.setRemark("topic[" + newTopic + "] not exist");
109: return response;
110: }
111: } else {
112: if (0 == delayLevel) {
113: delayLevel = 3 + msgExt.getReconsumeTimes();
114: }
115: msgExt.setDelayTimeLevel(delayLevel);
116: }
117:
118: // 創建MessageExtBrokerInner
119: MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
120: msgInner.setTopic(newTopic);
121: msgInner.setBody(msgExt.getBody());
122: msgInner.setFlag(msgExt.getFlag());
123: MessageAccessor.setProperties(msgInner, msgExt.getProperties());
124: msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
125: msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
126: msgInner.setQueueId(queueIdInt);
127: msgInner.setSysFlag(msgExt.getSysFlag());
128: msgInner.setBornTimestamp(msgExt.getBornTimestamp());
129: msgInner.setBornHost(msgExt.getBornHost());
130: msgInner.setStoreHost(this.getStoreHost());
131: msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
132:
133: // 設置原始消息編號到拓展字段(獨有)
134: String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
135: MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
136:
137: // 添加消息
138: PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
139: if (putMessageResult != null) {
140: switch (putMessageResult.getPutMessageStatus()) {
141: case PUT_OK:
142: String backTopic = msgExt.getTopic();
143: String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
144: if (correctTopic != null) {
145: backTopic = correctTopic;
146: }
147:
148: this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
149:
150: response.setCode(ResponseCode.SUCCESS);
151: response.setRemark(null);
152:
153: return response;
154: default:
155: break;
156: }
157:
158: response.setCode(ResponseCode.SYSTEM_ERROR);
159: response.setRemark(putMessageResult.getPutMessageStatus().name());
160: return response;
161: }
162:
163: response.setCode(ResponseCode.SYSTEM_ERROR);
164: response.setRemark("putMessageResult is null");
165: return response;
166: }
- 說明 :當
Consumer
消費某條消息失敗時,會調用該接口發回消息。Broker
會存儲發回的消息。這樣,下次Consumer
拉取該消息,能夠從CommitLog
和ConsumeQueue
順序讀取。 - [x] 因為大多數邏輯和
Broker
接收普通消息 很相似,時候TODO
標記成獨有的邏輯。 - 第 4 至 7 行 :初始化響應。
- [x] 第 9 至 20 行 :Hook邏輯。
- [x] 第22 至 30 行 :判斷消費分組是否存在。
- 第 32 至 37 行 :檢查
Broker
是否有寫入權限。 - [x] 第 39 至 44 行 :檢查重試隊列數是否大於0。
- 第 47 行 :計算 retry topic。
- [x] 第 50 行 :隨機分配隊列編號,依賴
retryQueueNums
。 - [x] 第 52 至 56 行 :計算
sysFlag
。 - 第 58 至 72 行 :獲取
TopicConfig
。如果不存在,則創建。 - [x] 第 74 至 80 行 :查詢消息。若不存在,返回異常錯誤。
- [x] 第 82 至 86 行 :設置
retryTopic
到消息拓展屬性。 - [x] 第 89 行 :設置消息不等待存儲完成。
- 當
Broker
刷盤方式為同步,會導致同步落盤不能批量提交,這樣會不會存在問題?有知道的同學麻煩告知下。😈。
- 當
- [x] 第 91 至 116 行 :處理
delayLevel
。 - 第 118 至 131 行 :創建
MessageExtBrokerInner
。 - [x] 第 133 至 135 行 :設置原始消息編號到拓展屬性。
- 第 137 至 161 行 :添加消息。
7、結尾
感謝同學們對本文的閱讀、收藏、點贊。
😈如果解析存在問題或者表達誤解的,表示抱歉。如果方便的話,可以加下 QQ:7685413。讓我們來一場 1 :1 交流(搞基)。
再次表示十分感謝。