public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
this.groupTransferService = new GroupTransferService();
this.haClient = new HAClient();
}
一個haservice下面有accpet和haclient分別對應客戶端和服務端,grouptranserservie用來控制消息是否獲取到,下面具體講。
拿haclient舉例子,在主線程做的事情
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
if (this.connectMaster()) {
if (this.isTimeToReportOffset()) {
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
boolean ok = this.processReadEvent();
} else {
this.waitForRunning(1000 * 5);
}
也就是在rocketmq里面,一個具體的任務就是單獨分配一個線程,從而發揮多線程優勢,在主線程上面休眠等待喚醒或者超時喚醒然后執行io動作。
一個典型的基於bytebuffer的寫操作,通過positon、limit來判斷是否數據寫完:
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8);
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
this.reportOffset.limit(8);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
return !this.reportOffset.hasRemaining();
}
haservice里面所有的io沒有走netty,全部使用原始select做異步io,然后直接使用nio的bytebuff做read和write操作
另外rocketmq里面的每個線程實現都有一個特別的標志位:
public abstract class ServiceThread implements Runnable {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final long JOIN_TIME = 90 * 1000;
protected final Thread thread;
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
protected volatile boolean stopped = false;
這個hasnotified和countdownlatch是配合一起使用的。如果一個線程被countdown過、喚醒過,那么hasnotified就通過cas被設置成true,下一個循環進入wait的時候,不用等待超時也不用等待下一次喚醒,直接通過hasnotified這個標志位可以直接喚醒,相當於第一次喚醒我的時候 我當時沒有在阻塞,那么第一次喚醒我的時候 先設置一個標示hasnotified,下次進入阻塞的時候可以直接走喚醒流程,不用等待。
下面具體講下每個模塊
HAclient:
干了兩個事情:
1 備broker去nameserv注冊的時候,可以從nameserv拿到master-broker的ha-address,拿到這個地址以后,通過haclient去連接master-broker。定期給主機broker上報自己的currentReportedOffset,也就是備機broker自己當前的commit-log在什么地方了
2 在channel上面嘗試讀取數據,這個就是主機broker發過來的具體數據提交到自己的commit-log里面。
也就是對於一個備機broker而言,發布自己的ack-offset和接收主機broker的實際數據都在ha-client一個線程完成的:
3 ha-client用到了雙緩沖reallocateByteBuffer,因為主機broker發過來的數據有可能備機broker的bytebuffer已經存不下了,只能存一半,這時候需要把已經落盤的數據從bytebuffer清理掉,然后寫了一半的bytebuffer從后半部分移動到前半部分,那么需要有一個第三者tmp做swap,bytebufferbackup就是這個tmp,大小跟bytebufferread一樣,防止極端情況:
private void reallocateByteBuffer() {
int remain = READ_MAX_BUFFER_SIZE - this.dispatchPostion;
if (remain > 0) {
this.byteBufferRead.position(this.dispatchPostion);
this.byteBufferBackup.position(0);
this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
this.byteBufferBackup.put(this.byteBufferRead);
}
this.swapByteBuffer();
this.byteBufferRead.position(remain);
this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
this.dispatchPostion = 0;
}
private void swapByteBuffer() {
ByteBuffer tmp = this.byteBufferRead;
this.byteBufferRead = this.byteBufferBackup;
this.byteBufferBackup = tmp;
}
AcceptSocketService:
干一件事:綁定端口以后,作為accpet,在主循環的select里面,監聽accpet事件,如果有客戶端連接進來,那么生成一個haconnection。
所以看得出來,只有主機broker才有這個accpetsocketservice和haconnection。下面具體說下HaConnection
public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
this.haService = haService;
this.socketChannel = socketChannel;
this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
this.socketChannel.configureBlocking(false);
this.socketChannel.socket().setSoLinger(false, -1);
this.socketChannel.socket().setTcpNoDelay(true);
this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
this.socketChannel.socket().setSendBufferSize(1024 * 64);
this.writeSocketService = new WriteSocketService(this.socketChannel);
this.readSocketService = new ReadSocketService(this.socketChannel);
this.haService.getConnectionCount().incrementAndGet();
}
socketchannel是服務端accept后拿到的、跟客戶端通信的channel。
SO_LINGER選項,使用默認的
最主要的是有兩個線程,一個是writesocketService一個是readSocketService
writesocketservice:當主機寫入commit-log以后offset肯定會長,但是備機傳過來的ack-offset沒有增長。通過這種方式主機知道此時需要把什么數據傳給備機。
這個線程沒有雙緩沖,也沒有swap-bytebuffer,全部數據通過網絡io寫出去即可,不涉及磁盤io。這個線程平時不需要工作,只有在有新數據的時候才需要工作,啥時候被喚醒的呢?
是在service.getWaitNotifyObject().wakeupAll()業務線程進行喚醒的
readSocketService: 專門接收備機發過來的ack-offset的,收到新的ack以后,通過HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset)喚醒GroupTransferService,后者專門處理消息是否真的已經被接收。
GroupTransferService:
雙緩沖邏輯,在主循環的waitforEnding結束后的onWaitEnd中,執行swapRequests,把requestsWrite和requestsRead互換,因為這個線程在處理的時候需要用synchronnized鎖整個requestsRead,別人無法put了,所以弄一個requestsWrite出來,其他線程可以在這個里面put,跟自己的線程鎖住的requestsRead不沖突。
這個模塊本質上就是一個thread,干兩個事情:
1 在waitfoRunning中等待
2 等待超時或者被喚醒的話,那么針對requestRead里面所有request,push2SlaveMaxOffset(這個就是備機的ack-offset)大於request的offset的話,那么說明備機當前已經有這個數據了,那么wakeupCustomer把在request上的CountdownLatch去掉,並且把GroupCommitRequest的flushOK=ture。
如果備機的ack-offset比GroupCommitRequest小的話,那么循環5次等待,阻塞在notifyTransferObject,嘗試等待5次看看備機的ack-offset 也就是push2SlaveMaxOffset能不能追上來,從而也讓這個GroupCommitRequest的flushOK=true。
private void doWaitTransfer() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
for (int i = 0; !transferOK && i < 5; i++) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
req.wakeupCustomer(transferOK);
}
this.requestsRead.clear();
}
}
}
那么誰在request上的countdownlatch等待呢?flushok啥含義
如果是同步master的話,兩個地方:handleHA和handleDiskFlush
以前者舉例子,handleHA里面:
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
// Determine whether to wait
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
主機在sendmessage以后,執行service.putRequest
1 會在GroupTransferService的requestWrite里面放入新的request,讓GroupTransferService去檢查這個request是否已經被備機同步了
2 對GroupTransferService做waitpoint.countDown,讓GroupTransferService干活必須先要喚醒他。
public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
GroupTransferService每次被喚醒的時候,首先把requestWrite放入到requestRead里面,然后檢查request的offset和備機ackoffset是否ok。
3 service.getWaitNotifyObject().wakeupAll(); 這個是喚醒writeSockeService,新的數據來了,那么寫線程需要工作了。
4 在每個request上面做waitforFlush,也就是在request上面countdown等待,然后檢查flushok。
所以主機上面新數據來了以后,業務線程喚醒writeSocketService去發數據給備機broker(writeSocketService通過檢查commit-log的offset感知主機數據offset增長了),然后喚醒grouptransferservice去檢查每個request是否已經ok。
grouptransferservice的工作沒有放在writeSocketService,而是單獨一個線程來做,還是利用多核並發處理。
grouptransferservice即使被業務線程在putRequest中waitPoint.countDown();被喚醒還會被this.notifyTransferObject.waitForRunning(1000)阻塞,因為被業務線程喚醒也不能表示立馬可以更新request和ack-offset的關系,比較備機新的ack-offset還沒來,所以還需要readSocketService在拿到新的ackoffset以后,通過this.groupTransferService.notifyTransferSome();進一步喚醒groupTransferService,此時才能真正更新request是狀態flushok狀態
private void doWaitTransfer() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (CommitLog.GroupCommitRequest req : this.requestsRead) { boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); for (int i = 0; !transferOK && i < 5; i++) { this.notifyTransferObject.waitForRunning(1000); transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); } if (!transferOK) { log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); } req.wakeupCustomer(transferOK); } this.requestsRead.clear(); } } }
