RocketMQ 源碼分析(二) —— Message 存儲


CommitLog 結構

CommitLog、MappedFileQueue、MappedFile 的關系如下:

CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N。

反應到系統文件如下:
···
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd
/Users/yunai/store/commitlog
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -l
total 10485760
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:27 00000000000000000000
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:29 00000000001073741824
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000002147483648
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:33 00000000003221225472
-rw-r--r-- 1 yunai staff 1073741824 4 21 16:32 00000000004294967296
···

CommitLog、MappedFileQueue、MappedFile 的定義如下:

MappedFile :00000000000000000000、00000000001073741824、00000000002147483648等文件。
MappedFileQueue :MappedFile 所在的文件夾,對 MappedFile 進行封裝成文件隊列,對上層提供可無限使用的文件容量。
每個 MappedFile 統一文件大小。
文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 CommitLog 里默認為 1GB。
CommitLog :針對 MappedFileQueue 的封裝使用。
CommitLog 目前存儲在 MappedFile 有兩種內容類型:

MESSAGE :消息。
BLANK :文件不足以存儲消息時的空白占位。
CommitLog 存儲在 MappedFile的結構:

MESSAGE[1] MESSAGE[2] … MESSAGE[n - 1] MESSAGE[n] BLANK
MESSAGE 在 CommitLog 存儲結構:

第幾位 字段 說明 數據類型 字節數
1 MsgLen 消息總長度 Int 4
2 MagicCode MESSAGE_MAGIC_CODE Int 4
3 BodyCRC 消息內容CRC Int 4
4 QueueId 消息隊列編號 Int 4
5 Flag flag Int 4
6 QueueOffset 消息隊列位置 Long 8
7 PhysicalOffset 物理位置。在 CommitLog 的順序存儲位置。 Long 8
8 SysFlag MessageSysFlag Int 4
9 BornTimestamp 生成消息時間戳 Long 8
10 BornHost 生效消息的地址+端口 Long 8
11 StoreTimestamp 存儲消息時間戳 Long 8
12 StoreHost 存儲消息的地址+端口 Long 8
13 ReconsumeTimes 重新消費消息次數 Int 4
14 PreparedTransationOffset Long 8
15 BodyLength + Body 內容長度 + 內容 Int + Bytes 4 + bodyLength
16 TopicLength + Topic Topic長度 + Topic Byte + Bytes 1 + topicLength
17 PropertiesLength + Properties 拓展字段長度 + 拓展字段 Short + Bytes 2 + PropertiesLength
BLANK 在 CommitLog 存儲結構:

第幾位 字段 說明 數據類型 字節數
1 maxBlank 空白長度 Int 4
2 MagicCode BLANK_MAGIC_CODE Int 4

CommitLog 存儲消息

  1: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
  2:     // Set the storage time
  3:     msg.setStoreTimestamp(System.currentTimeMillis());
  4:     // Set the message body BODY CRC (consider the most appropriate setting
  5:     // on the client)
  6:     msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
  7:     // Back to Results
  8:     AppendMessageResult result = null;
  9: 
 10:     StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
 11: 
 12:     String topic = msg.getTopic();
 13:     int queueId = msg.getQueueId();
 14: 
 15:     // 事務相關 TODO 待讀:事務相關
 16:     final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
 17:     if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
 18:         || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
 19:         // Delay Delivery
 20:         if (msg.getDelayTimeLevel() > 0) {
 21:             if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
 22:                 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
 23:             }
 24: 
 25:             topic = ScheduleMessageService.SCHEDULE_TOPIC;
 26:             queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
 27: 
 28:             // Backup real topic, queueId
 29:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
 30:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
 31:             msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
 32: 
 33:             msg.setTopic(topic);
 34:             msg.setQueueId(queueId);
 35:         }
 36:     }
 37: 
 38:     long eclipseTimeInLock = 0;
 39: 
 40:     // 獲取寫入映射文件
 41:     MappedFile unlockMappedFile = null;
 42:     MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
 43: 
 44:     // 獲取寫入鎖
 45:     lockForPutMessage(); //spin...
 46:     try {
 47:         long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
 48:         this.beginTimeInLock = beginLockTimestamp;
 49: 
 50:         // Here settings are stored timestamp, in order to ensure an orderly
 51:         // global
 52:         msg.setStoreTimestamp(beginLockTimestamp);
 53: 
 54:         // 當不存在映射文件時,進行創建
 55:         if (null == mappedFile || mappedFile.isFull()) {
 56:             mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
 57:         }
 58:         if (null == mappedFile) {
 59:             log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
 60:             beginTimeInLock = 0;
 61:             return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
 62:         }
 63: 
 64:         // 存儲消息
 65:         result = mappedFile.appendMessage(msg, this.appendMessageCallback);
 66:         switch (result.getStatus()) {
 67:             case PUT_OK:
 68:                 break;
 69:             case END_OF_FILE: // 當文件尾時,獲取新的映射文件,並進行插入
 70:                 unlockMappedFile = mappedFile;
 71:                 // Create a new file, re-write the message
 72:                 mappedFile = this.mappedFileQueue.getLastMappedFile(0);
 73:                 if (null == mappedFile) {
 74:                     // XXX: warn and notify me
 75:                     log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
 76:                     beginTimeInLock = 0;
 77:                     return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
 78:                 }
 79:                 result = mappedFile.appendMessage(msg, this.appendMessageCallback);
 80:                 break;
 81:             case MESSAGE_SIZE_EXCEEDED:
 82:             case PROPERTIES_SIZE_EXCEEDED:
 83:                 beginTimeInLock = 0;
 84:                 return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
 85:             case UNKNOWN_ERROR:
 86:                 beginTimeInLock = 0;
 87:                 return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
 88:             default:
 89:                 beginTimeInLock = 0;
 90:                 return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
 91:         }
 92: 
 93:         eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
 94:         beginTimeInLock = 0;
 95:     } finally {
 96:         // 釋放寫入鎖
 97:         releasePutMessageLock();
 98:     }
 99: 
100:     if (eclipseTimeInLock > 500) {
101:         log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
102:     }
103: 
104:     // 
105:     if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
106:         this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
107:     }
108: 
109:     PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
110: 
111:     // Statistics
112:     storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
113:     storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
114: 
115:     // 進行同步||異步 flush||commit
116:     GroupCommitRequest request = null;
117:     // Synchronization flush
118:     if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
119:         final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
120:         if (msg.isWaitStoreMsgOK()) {
121:             request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
122:             service.putRequest(request);
123:             boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
124:             if (!flushOK) {
125:                 log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
126:                     + " client address: " + msg.getBornHostString());
127:                 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
128:             }
129:         } else {
130:             service.wakeup();
131:         }
132:     }
133:     // Asynchronous flush
134:     else {
135:         if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
136:             flushCommitLogService.wakeup(); // important:喚醒commitLog線程,進行flush
137:         } else {
138:             commitLogService.wakeup();
139:         }
140:     }
141: 
142:     // Synchronous write double 如果是同步Master,同步到從節點 // TODO 待讀:數據同步
143:     if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
144:         HAService service = this.defaultMessageStore.getHaService();
145:         if (msg.isWaitStoreMsgOK()) {
146:             // Determine whether to wait
147:             if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
148:                 if (null == request) {
149:                     request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
150:                 }
151:                 service.putRequest(request);
152: 
153:                 service.getWaitNotifyObject().wakeupAll();
154: 
155:                 boolean flushOK =
156:                     // TODO
157:                     request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
158:                 if (!flushOK) {
159:                     log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
160:                         + msg.getTags() + " client address: " + msg.getBornHostString());
161:                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
162:                 }
163:             }
164:             // Slave problem
165:             else {
166:                 // Tell the producer, slave not available
167:                 putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
168:             }
169:         }
170:     }
171: 
172:     return putMessageResult;
173: }

說明 :存儲消息,並返回存儲結果。
第 2 行 :設置存儲時間等。
第 16 至 36 行 :事務消息相關,暫未了解。
第 45 & 97 行 :獲取鎖與釋放鎖。
第 52 行 :再次設置存儲時間。目前會有多處地方設置存儲時間。
第 55 至 62 行 :獲取 MappedFile,若不存在或已滿,則進行創建。詳細解析見:MappedFileQueue#getLastMappedFile(…)。
第 65 行 :插入消息到 MappedFile,解析解析見:MappedFile#appendMessage(…)。
第 69 至 80 行 :MappedFile 已滿,創建新的,再次插入消息。
第 116 至 140 行 :消息刷盤,即持久化到文件。上面插入消息實際未存儲到硬盤。此處,根據不同的刷盤策略,執行會有不同。詳細解析見:FlushCommitLogService。
第 143 至 173 行 :Broker 主從同步。后面的文章會詳細解析😈。

MappedFileQueue#getLastMappedFile(…)

1: public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
 2:     long createOffset = -1; // 創建文件開始offset。-1時,不創建
 3:     MappedFile mappedFileLast = getLastMappedFile();
 4: 
 5:     if (mappedFileLast == null) { // 一個映射文件都不存在
 6:         createOffset = startOffset - (startOffset % this.mappedFileSize);
 7:     }
 8: 
 9:     if (mappedFileLast != null && mappedFileLast.isFull()) { // 最后一個文件已滿
10:         createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
11:     }
12: 
13:     if (createOffset != -1 && needCreate) { // 創建文件
14:         String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
15:         String nextNextFilePath = this.storePath + File.separator
16:             + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
17:         MappedFile mappedFile = null;
18: 
19:         if (this.allocateMappedFileService != null) {
20:             mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
21:                 nextNextFilePath, this.mappedFileSize);
22:         } else {
23:             try {
24:                 mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
25:             } catch (IOException e) {
26:                 log.error("create mappedFile exception", e);
27:             }
28:         }
29: 
30:         if (mappedFile != null) {
31:             if (this.mappedFiles.isEmpty()) {
32:                 mappedFile.setFirstCreateInQueue(true);
33:             }
34:             this.mappedFiles.add(mappedFile);
35:         }
36: 
37:         return mappedFile;
38:     }
39: 
40:     return mappedFileLast;
41: }

說明 :獲取最后一個 MappedFile,若不存在或文件已滿,則進行創建。
第 5 至 11 行 :計算當文件不存在或已滿時,新創建文件的 createOffset。
第 14 行 :計算文件名。從此處我們可
以得知,MappedFile的文件命名規則:

fileName[n] = fileName[n - 1] + n * mappedFileSize
fileName[0] = startOffset - (startOffset % this.mappedFileSize)

目前 CommitLog 的 startOffset 為 0。
此處有個疑問,為什么需要 (startOffset % this.mappedFileSize)。例如:

| startOffset | mappedFileSize | createOffset |
| — | :– | :– |
| 5 | 1 | 5 |
| 5 | 2 | 4 |
| 5 | 3 | 3 |
| 5 | 4 | 4 |
| 5 | > 5 | 0 |

如果有知道的同學,麻煩提示下。😈
解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 計算出來的是,以 this.mappedFileSize 為每個文件大小時,startOffset 所在文件的開始offset

第 30 至 35 行 :設置 MappedFile是否是第一個創建的文件。該標識用於 ConsumeQueue 對應的 MappedFile ,詳見 ConsumeQueue#fillPreBlank。

MappedFile#appendMessage(…)

 1: public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
 2:     assert msg != null;
 3:     assert cb != null;
 4: 
 5:     int currentPos = this.wrotePosition.get();
 6: 
 7:     if (currentPos < this.fileSize) {
 8:         ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
 9:         byteBuffer.position(currentPos);
10:         AppendMessageResult result =
11:             cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
12:         this.wrotePosition.addAndGet(result.getWroteBytes());
13:         this.storeTimestamp = result.getStoreTimestamp();
14:         return result;
15:     }
16: 
17:     log.error("MappedFile.appendMessage return null, wrotePosition: " + currentPos + " fileSize: "
18:         + this.fileSize);
19:     return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
20: }

說明 :插入消息到 MappedFile,並返回插入結果。
第 8 行 :獲取需要寫入的字節緩沖區。為什么會有 writeBuffer != null 的判斷后,使用不同的字節緩沖區,見:FlushCommitLogService。
第 9 至 11 行 :設置寫入 position,執行寫入,更新 wrotePosition(當前寫入位置,下次開始寫入開始位置)。

DefaultAppendMessageCallback#doAppend(…)

class DefaultAppendMessageCallback implements AppendMessageCallback {
  2:     // File at the end of the minimum fixed length empty
  3:     private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
  4:     
  7:     private final ByteBuffer msgIdMemory;
  8:     
 13:     private final ByteBuffer msgStoreItemMemory;
 14:     
 18:     private final int maxMessageSize;
 19:     
 24:     private final StringBuilder keyBuilder = new StringBuilder();
 25:     
 29:     private final ByteBuffer hostHolder = ByteBuffer.allocate(8);
 30: 
 31:     DefaultAppendMessageCallback(final int size) {
 32:         this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
 33:         this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH);
 34:         this.maxMessageSize = size;
 35:     }
 36: 
 37:     public ByteBuffer getMsgStoreItemMemory() {
 38:         return msgStoreItemMemory;
 39:     }
 40: 
 41:     public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) {
 42:         // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
 43: 
 44:         // PHY OFFSET
 45:         long wroteOffset = fileFromOffset + byteBuffer.position();
 46: 
 47:         // 計算commitLog里的msgId
 48:         this.resetByteBuffer(hostHolder, 8);
 49:         String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
 50: 
 51:         // Record ConsumeQueue information 獲取隊列offset
 52:         keyBuilder.setLength(0);
 53:         keyBuilder.append(msgInner.getTopic());
 54:         keyBuilder.append('-');
 55:         keyBuilder.append(msgInner.getQueueId());
 56:         String key = keyBuilder.toString();
 57:         Long queueOffset = CommitLog.this.topicQueueTable.get(key);
 58:         if (null == queueOffset) {
 59:             queueOffset = 0L;
 60:             CommitLog.this.topicQueueTable.put(key, queueOffset);
 61:         }
 62: 
 63:         // Transaction messages that require special handling // TODO 疑問:用途
 64:         final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
 65:         switch (tranType) {
 66:             // Prepared and Rollback message is not consumed, will not enter the
 67:             // consumer queue
 68:             case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
 69:             case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
 70:                 queueOffset = 0L;
 71:                 break;
 72:             case MessageSysFlag.TRANSACTION_NOT_TYPE:
 73:             case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
 74:             default:
 75:                 break;
 76:         }
 77: 
 78:         // 計算消息長度
 79:         final byte[] propertiesData =
 80:             msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
 81:         final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
 82:         if (propertiesLength > Short.MAX_VALUE) {
 83:             log.warn("putMessage message properties length too long. length={}", propertiesData.length);
 84:             return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
 85:         }
 86:         final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
 87:         final int topicLength = topicData.length;
 88:         final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
 89:         final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
 90:         // Exceeds the maximum message
 91:         if (msgLen > this.maxMessageSize) {
 92:             CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
 93:                 + ", maxMessageSize: " + this.maxMessageSize);
 94:             return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
 95:         }
 96: 
 97:         // Determines whether there is sufficient(足夠) free space
 98:         if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
 99:             this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
100:             // 1 TOTAL_SIZE
101:             this.msgStoreItemMemory.putInt(maxBlank);
102:             // 2 MAGIC_CODE
103:             this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
104:             // 3 The remaining space may be any value
105:             //
106: 
107:             // Here the length of the specially set maxBlank
108:             final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
109:             byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
110:             return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
111:                 queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
112:         }
113: 
114:         // Initialization of storage space
115:         this.resetByteBuffer(msgStoreItemMemory, msgLen);
116:         // 1 TOTAL_SIZE
117:         this.msgStoreItemMemory.putInt(msgLen);
118:         // 2 MAGIC_CODE
119:         this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
120:         // 3 BODY_CRC
121:         this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
122:         // 4 QUEUE_ID
123:         this.msgStoreItemMemory.putInt(msgInner.getQueueId());
124:         // 5 FLAG
125:         this.msgStoreItemMemory.putInt(msgInner.getFlag());
126:         // 6 QUEUE_OFFSET
127:         this.msgStoreItemMemory.putLong(queueOffset);
128:         // 7 PHYSICAL_OFFSET
129:         this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
130:         // 8 SYS_FLAG
131:         this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
132:         // 9 BORN_TIMESTAMP
133:         this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
134:         // 10 BORN_HOST
135:         this.resetByteBuffer(hostHolder, 8);
136:         this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
137:         // 11 STORE_TIMESTAMP
138:         this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
139:         // 12 STORE_HOST_ADDRESS
140:         this.resetByteBuffer(hostHolder, 8);
141:         this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
142:         //this.msgStoreItemMemory.put(msgInner.getStoreHostBytes());
143:         // 13 RECONSUME_TIMES
144:         this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
145:         // 14 Prepared Transaction Offset
146:         this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
147:         // 15 BODY
148:         this.msgStoreItemMemory.putInt(bodyLength);
149:         if (bodyLength > 0)
150:             this.msgStoreItemMemory.put(msgInner.getBody());
151:         // 16 TOPIC
152:         this.msgStoreItemMemory.put((byte) topicLength);
153:         this.msgStoreItemMemory.put(topicData);
154:         // 17 PROPERTIES
155:         this.msgStoreItemMemory.putShort((short) propertiesLength);
156:         if (propertiesLength > 0)
157:             this.msgStoreItemMemory.put(propertiesData);
158: 
159:         final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
160:         // Write messages to the queue buffer
161:         byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
162: 
163:         AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
164:             msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
165: 
166:         switch (tranType) {
167:             case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
168:             case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
169:                 break;
170:             case MessageSysFlag.TRANSACTION_NOT_TYPE:
171:             case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
172:                 // The next update ConsumeQueue information 更新隊列的offset
173:                 CommitLog.this.topicQueueTable.put(key, ++queueOffset);
174:                 break;
175:             default:
176:                 break;
177:         }
178:         return result;
179:     }
180: 
181:     
187:     private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
188:         byteBuffer.flip();
189:         byteBuffer.limit(limit);
190:     }
191: }

說明 :插入消息到字節緩沖區。
第 45 行 :計算物理位置。在 CommitLog 的順序存儲位置。
第 47 至 49 行 :計算 CommitLog 里的 offsetMsgId。這里一定要和 msgId 區分開。
計算方式 長度
offsetMsgId Broker存儲時生成 Hex(storeHostBytes, wroteOffset) 32
msgId Client發送消息時生成 Hex(進程編號, IP, ClassLoader, startTime, currentTime, 自增序列) 32 《RocketMQ 源碼分析 —— Message 基礎》
第 51 至 61 行 :獲取隊列位置(offset)。
第 78 至 95 行 :計算消息總長度。
第 98 至 112 行 :當文件剩余空間不足時,寫入 BLANK 占位,返回結果。
第 114 至 161 行 :寫入 MESSAGE 。
第 173 行 :更新隊列位置(offset)。

FlushCommitLogService

線程服務 場景 插入消息性能
CommitRealTimeService 異步刷盤 && 開啟內存字節緩沖區 第一
FlushRealTimeService 異步刷盤 && 關閉內存字節緩沖區 第二
GroupCommitService 同步刷盤 第三

MappedFile#落盤
方式
方式一 寫入內存字節緩沖區(writeBuffer) 從內存字節緩沖區(write buffer)提交(commit)到文件通道(fileChannel) 文件通道(fileChannel)flush
方式二 寫入映射文件字節緩沖區(mappedByteBuffer) 映射文件字節緩沖區(mappedByteBuffer)flush

flush相關代碼
考慮到寫入性能,滿足 flushLeastPages * OS_PAGE_SIZE 才進行 flush。

 1: 
 7: public int flush(final int flushLeastPages) {
 8:     if (this.isAbleToFlush(flushLeastPages)) {
 9:         if (this.hold()) {
10:             int value = getReadPosition();
11: 
12:             try {
13:                 //We only append data to fileChannel or mappedByteBuffer, never both.
14:                 if (writeBuffer != null || this.fileChannel.position() != 0) {
15:                     this.fileChannel.force(false);
16:                 } else {
17:                     this.mappedByteBuffer.force();
18:                 }
19:             } catch (Throwable e) {
20:                 log.error("Error occurred when force data to disk.", e);
21:             }
22: 
23:             this.flushedPosition.set(value);
24:             this.release();
25:         } else {
26:             log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
27:             this.flushedPosition.set(getReadPosition());
28:         }
29:     }
30:     return this.getFlushedPosition();
31: }
32: 
33: 
42: private boolean isAbleToFlush(final int flushLeastPages) {
43:     int flush = this.flushedPosition.get();
44:     int write = getReadPosition();
45: 
46:     if (this.isFull()) {
47:         return true;
48:     }
49: 
50:     if (flushLeastPages > 0) {
51:         return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
52:     }
53: 
54:     return write > flush;
55: }

FlushRealTimeService

 class FlushRealTimeService extends FlushCommitLogService {
 2:     
 5:     private long lastFlushTimestamp = 0;
 6:     
10:     private long printTimes = 0;
11: 
12:     public void run() {
13:         CommitLog.log.info(this.getServiceName() + " service started");
14: 
15:         while (!this.isStopped()) {
16:             boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
17:             int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
18:             int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
19:             int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
20: 
21:             // Print flush progress
22:             // 當時間滿足flushPhysicQueueThoroughInterval時,即使寫入的數量不足flushPhysicQueueLeastPages,也進行flush
23:             boolean printFlushProgress = false;
24:             long currentTimeMillis = System.currentTimeMillis();
25:             if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
26:                 this.lastFlushTimestamp = currentTimeMillis;
27:                 flushPhysicQueueLeastPages = 0;
28:                 printFlushProgress = (printTimes++ % 10) == 0;
29:             }
30: 
31:             try {
32:                 // 等待執行
33:                 if (flushCommitLogTimed) {
34:                     Thread.sleep(interval);
35:                 } else {
36:                     this.waitForRunning(interval);
37:                 }
38: 
39:                 if (printFlushProgress) {
40:                     this.printFlushProgress();
41:                 }
42: 
43:                 // flush commitLog
44:                 long begin = System.currentTimeMillis();
45:                 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
46:                 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
47:                 if (storeTimestamp > 0) {
48:                     CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
49:                 }
50:                 long past = System.currentTimeMillis() - begin;
51:                 if (past > 500) {
52:                     log.info("Flush data to disk costs {} ms", past);
53:                 }
54:             } catch (Throwable e) {
55:                 CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
56:                 this.printFlushProgress();
57:             }
58:         }
59: 
60:         // Normal shutdown, to ensure that all the flush before exit
61:         boolean result = false;
62:         for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
63:             result = CommitLog.this.mappedFileQueue.flush(0);
64:             CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
65:         }
66: 
67:         this.printFlushProgress();
68: 
69:         CommitLog.log.info(this.getServiceName() + " service end");
70:     }
71: 
72:     @Override
73:     public String getServiceName() {
74:         return FlushRealTimeService.class.getSimpleName();
75:     }
76: 
77:     private void printFlushProgress() {
78:         // CommitLog.log.info("how much disk fall behind memory, "
79:         // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
80:     }
81: 
82:     @Override
83:     @SuppressWarnings("SpellCheckingInspection")
84:     public long getJointime() {
85:         return 1000 * 60 * 5;
86:     }
87: }

說明:實時 flush線程服務,調用 MappedFile#flush 相關邏輯。
第 23 至 29 行 :每 flushPhysicQueueThoroughInterval 周期,執行一次 flush 。因為不是每次循環到都能滿足 flushCommitLogLeastPages 大小,因此,需要一定周期進行一次強制 flush 。當然,不能每次循環都去執行強制 flush,這樣性能較差。
第 33 行 至 37 行 :根據 flushCommitLogTimed 參數,可以選擇每次循環是固定周期還是等待喚醒。默認配置是后者,所以,每次插入消息完成,會去調用 commitLogService.wakeup() 。
第 45 行 :調用 MappedFile 進行 flush。
第 61 至 65 行 :Broker 關閉時,強制 flush,避免有未刷盤的數據。

轉載:http://www.iocoder.cn/RocketMQ/message-store/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM