上一篇說明了RocketMQ怎么支持broker集群的,這里接着說RocketMQ實現高可用的手段之一——冗余。
RocketMQ部署的時候一個broker set會有一個mater和一個或者多個slave,salve起到的作用就是同步master存儲的的消息,並且會接收部分consumer讀取消息的請求,下面圍繞兩個問題來闡明怎么做的冗余:
- 怎么實現冗余
- 冗余之后的消息讀取
怎么實現冗余?
RocketMQ通過主從結構來實現消息冗余,master接收來自producer發送來的消息,然后同步消息到slave,根據master的role不同,同步的時機可分為兩種不同的情況:
- SYNC_MASTER:如果master是這種角色,每次master在將producer發送來的消息寫入內存(磁盤)的時候會同步等待master將消息傳輸到slave
- ASYNC_MASTER:這種角色下消息會異步復制到slave
這里注意的是master傳輸到slave只有CommitLog的物理文件。
master和slave之間傳輸CommitLog的主要流程如下:
這里主要涉及到兩個class(包括其內部類):HAService、HAConnection(這兩個類的源碼中文注釋可以在這里找到)。
broker在啟動的時候會調用DefaultMessageStore.start方法,這里面會調用HAService.start來啟動相關的服務:
- AcceptSocketService:啟動serverSocket並監聽來自HAClient的連接
- GroupTransferService:broker寫消息的時候如果需要同步等待消息同步到slave,會用到這個服務
- HAClient:如果是slave,才會啟動haClient。
master和slave之間的數據通信過程是:
- master啟動之后會監聽來自slave的連接,slave啟動之后會主動連接到master。
- 在連接建立之后,slave會向master上報自己的本地的CommitLog的offset
- master根據slave的offset來決定從那里開始向slave發送數據
slave發送給master的數據格式:
offset(8字節)
offset:slave本地CommitLog的maxOffset
master發送給slave的數據格式:
header(offset(8字節) + bodySize(4字節)) + body
offset:由於master發送給slave的CommitLog的單位是MappedFile的個數,這個offset是MappedFile的起始位置
bodySize:MappedFile的大小
body:MappedFile的內容
前面說過SYNC_MASTER和ASYNC_MASTER傳輸數據給slave的過程稍有不同,下面先看看ASYNC_MASTER怎么傳輸數據到slave的。
ASYNC_MASTER同步數據到slave
- salve連接到master,向master上報slave當前的offset
- master收到后確認給slave發送數據的開始位置
- master查詢開始位置對應的MappedFIle
- master將查找到的數據發送給slave
- slave收到數據后保存到自己的CommitLog
// org.apache.rocketmq.store.ha.HAService.HAClient#run
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 只有配置了HAMaster地址的broker才會連接到master
if (this.connectMaster()) {
if (this.isTimeToReportOffset()) {
// 向master上報slave本地最大的CommitLog的offset
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
// 處理socket上的read事件,也就是處理master發來的數據
boolean ok = this.processReadEvent();
// 省略中間代碼...
}
// master收到slave上報的offset后用下面的方法處理
// org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#processReadEvent
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
this.processPostion = 0;
}
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPostion = pos;
// slave上報過來的offset說明offset之前的數據slave都已經收到
HAConnection.this.slaveAckOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {
// 如果是剛剛和slave建立連接,需要知道slave需要從哪里開始接收commitLog
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}
// 如果收到來自slave的確認之后,喚醒等待同步到slave的線程(如果是SYNC_MASTER)
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
// 省略中間代碼...
}
通過上面slave和master的通信,master已經知道第一次從哪里(slaveRequestOffset)開始給slave傳輸數據
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 說明還沒收到來自slave的offset,等10ms重試
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
// 如果是第一次發送數據需要計算出從哪里開始給slave發送數據
if (-1 == this.nextTransferFromWhere) {
if (0 == HAConnection.this.slaveRequestOffset) {
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMapedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
this.nextTransferFromWhere = masterOffset;
} else {
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
// 如果上一次transfer完成了才進行下一次transfer
if (this.lastWriteOver) {
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
// 為什么要發送一個body的size是0的數據呢?怕時間太久連接斷開?
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
} else {
// 說明上一次的數據還沒有傳輸完成,這里繼續上一次的傳輸
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
// 發送找到的MappedFile數據
int size = selectResult.getSize();
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
// 計算下次需要給slave發送數據的起始位置
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
// 向slave發送數據
this.lastWriteOver = this.transferData();
} else {
// 如果沒有需要給slave發送的數據,傳輸數據的線程等到100ms
// 或者等待broker接收到新發送來的消息的時候喚醒這個線程
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
// 省略中間代碼
}
// 發送數據給slave,格式:header(offset(8字節) + body的size(4字節)) + body
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// Write Header
// 前面已將需要發送的header數據放入byteBufferHeader
while (this.byteBufferHeader.hasRemaining()) {
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write header error < 0");
}
}
if (null == this.selectMappedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// Write Body
if (!this.byteBufferHeader.hasRemaining()) {
// selectMappedBufferResult里存放的是需要發送的MappedFile數據
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write body error < 0");
}
}
}
boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
// 每次發送完成數據后清空selectMappedBufferResult,保證下一次發送前selectMappedBufferResult=null
this.selectMappedBufferResult.release();
this.selectMappedBufferResult = null;
}
return result;
}
上面master已經通過網絡將MappedFile數據發送給slave,接下來就是slave收到master的數據,然后保存到自己的CommitLog。HAClient啟動的時候向和master連接的socket上注冊了read事件的selector,收到read事件之后,一次執行以下方法:
org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent
org.apache.rocketmq.store.ha.HAService.HAClient#dispatchReadRequest
slave接收數據主要的邏輯在dispatchReadRequest
// 將從master讀取到的數據寫到CommitLog
// 消息的格式:header(offset(8字節) + bodySize(4字節)) + body
private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();
while (true) {
// 當前從master讀取到的數據的總大小 - 上一次處理(寫入CommitLog)到的位置
int diff = this.byteBufferRead.position() - this.dispatchPostion;
// 確定收到的數據是完整的
if (diff >= msgHeaderSize) {
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
// 保證本次消息同步slave的commitLog起始位置和master這個mappedFile的起始位置相同
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
this.byteBufferRead.get(bodyData);
// 將收到的MappedFile寫入slave的commitLog
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
this.byteBufferRead.position(readSocketPos);
// 記錄本(上次)次讀取消息的位置
this.dispatchPostion += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
// 到這里說明當前收到的信息不完整,需要使用byteBufferRead繼續接收,所以要保證byteBufferRead的空間是足夠接收的
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
slave的處理邏輯主要是:
- slave將接收到的數據都存在byteBufferRead
- 判斷收到的數據是否完整,如果byteBufferRead待處理的數據大於headerSize則認為可以開始處理
- 判斷收到的數據的offset是否和slave當前offset是否一致(也就是判斷是否是slave需要的下一個MappedFile),如果不一致說明系統錯誤
- 按照數據協議從byteBufferRead依次讀出offset、size、body
- 將body(MappedFile)寫入slave的CommitLog
- 更新byteBufferRead里面的處理進度(當前已處理的字節數)
- 如果上面判斷出收到的數據尚不足以處理,需要繼續接收數據之前先對byteBufferRead進行擴容
這里對byteBufferRead“擴容”的說法並不准確,因為並沒有擴大byteBufferRead的大小,具體的算法如下
// readBuffer不夠的時候"重新"申請buffer
// "重新":其實並沒有重新申請,只是將尚未讀取的部分放在准備好(backup)的buffer中,然后將backup賦值給readBuffer
// org.apache.rocketmq.store.ha.HAService.HAClient#reallocateByteBuffer
private void reallocateByteBuffer() {
// 計算byteBufferRead尚未處理部分的size
int remain = READ_MAX_BUFFER_SIZE - this.dispatchPostion;
// 如果byteBufferRead中還有未處理的字節的時候
if (remain > 0) {
this.byteBufferRead.position(this.dispatchPostion);
this.byteBufferBackup.position(0);
this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
this.byteBufferBackup.put(this.byteBufferRead);
}
this.swapByteBuffer();
// 設置下次寫入的位置為之前byteBufferRead尚未處理部分的后面
this.byteBufferRead.position(remain);
this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
// 從0開始處理byteBufferRead中的數據
this.dispatchPostion = 0;
}
SYNC_MASTER同步數據到slave
SYNC_MASTER和ASYNC_MASTER傳輸數據到salve的過程是一致的,只是時機上不一樣。SYNC_MASTER接收到producer發送來的消息時候,會同步等待消息也傳輸到salve。
- master將需要傳輸到slave的數據構造為GroupCommitRequest交給GroupTransferService
- 喚醒傳輸數據的線程(如果沒有更多數據需要傳輸的的時候HAClient.run會等待新的消息)
- 等待當前的傳輸請求完成
// org.apache.rocketmq.store.CommitLog#handleHA
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// 如果是SYNC_MASTER才會同步等待
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
// Determine whether to wait
// 存在slave並且slave落后master不能太多
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
// 喚醒可能等待新的消息數據的傳輸數據線程
service.getWaitNotifyObject().wakeupAll();
// 等待當前的消息被傳輸到slave,等待slave收到該消息的確認之后則flushOK=true
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
}
在HAService啟動的時候會啟動GroupTransferService線程,GroupTransferService並沒有真正的傳輸數據,傳數據還是上面一節說的傳輸方式,GroupTransferService只是將push2SlaveMaxOffset和需要傳輸到slave的消息的offset比較,如果
push2SlaveMaxOffset > req.getNextOffset()
則說明slave已經收到該消息,這個時候就會通知request,消息已經傳輸完成。
push2SlaveMaxOffset這個字段表示當前slave收到消息的最大的offset,每次master收到slave的ACK之后會更新這個值。
冗余之后消息怎么讀取
broker實現冗余之后,就有多個消息副本了,那么consumer怎么知道究竟是從master讀取消息還是從slave讀取消息呢?
從前面我們知道consumer通過負載均衡算法計算出本次消息從哪一個MessageQueue消費,但是MessageQueue只決定了從哪一個broker set下的哪一個queue消費消息,並不能確定具體的broker,但是在發送pull請求的時候會確定具體的broker
// org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl
public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final String expressionType,
final long subVersion,
final long offset,
final int maxNums,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
}
// 省略部分代碼...
}
上面調用recalculatePullFromWhichNode來計算從哪一個broker來消費消息
// org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#recalculatePullFromWhichNode
public long recalculatePullFromWhichNode(final MessageQueue mq) {
if (this.isConnectBrokerByUser()) {
// org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#connectBrokerByUser配置為true的時候會從msater讀取
return this.defaultBrokerId;
}
// pullFromWhichNodeTable這個數據結構保存的是broker返回的建議從哪一個broker讀取消息的信息
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (suggest != null) {
return suggest.get();
}
// 如果沒有建議的broker,則默認從master消費
return MixAll.MASTER_ID;
}
上面可以看出如果沒有建議的broker則從master消費,那么就要看看哪些情況下會有建議的broker?會有哪些建議的broker?
要解決這兩個問題需要回到broker處理producer發來的消息的時候:
// org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
// 省略部分代碼...
if (getMessageResult != null) {
// 省略部分代碼...
// 如果slave配置了可讀
if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
// consume too slow ,redirect to another machine
// 如果讀取消息的時候發現consumer消費的太慢就會建議從slave讀取消息
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
}
// consume ok
else {
// 如果沒有消費過慢的問題則依然建議從本broker消費
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
}
} else {
// 如果slave不允許讀取消息則從master讀取
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
// 省略部分代碼...
case ResponseCode.PULL_OFFSET_MOVED:
if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
|| this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
// 省略部分代碼...
} else {
// 如果當前broker是slave並且消費消息的offset在broker中沒有找到的時候
// 建議consumer再次向這個broker發送請求重試,
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
// 省略部分代碼...
}
org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader#suggestWhichBrokerId字段就是最后返回給consumer的建議的brokerId,最后會被存入字段:org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullFromWhichNodeTable
結合上面的代碼broker會返回給consumer建議的brokerId的情況有以下幾種:
- 如果slave可讀並且當前broker消費過慢的時候:如果沒有配置org.apache.rocketmq.common.subscription.SubscriptionGroupConfig#whichBrokerWhenConsumeSlowly的時候,默認是返回brokerId時1 的broker
- 如果slave可讀並且當前broker消費正常的時候:返回當前的broker
- 如果slave不可讀的時候:返回master
- 如果當前broker是slave並且需要消費消息的offset不在合理范圍(broker沒有消息的時候或offset > maxOffset或offset < minOffset)的時候:返回當前slave
所以consumer一開始是從master消費消息的,如果出現消費消費過慢的情況,consumer就會從slave(如果slave配置為可讀)消費。
關於消息消費過慢的說明:
RocketMQ是根據以下兩個值進行判斷的:
a = 當前CommitLog的maxOffset - 需要消費消息的offset的結果
b = RocketMQ可用內存的大小
如果a > b則判斷為消息消費過慢,a > b表示需要消費的消息一定不在內存中了,還需要讀取文件,這樣會給還需要寫消息的broker帶來一定的性能壓力,所以這個時候master建議從slave讀取消息。