Rocketmq源碼解讀之消息拉取


最近閱讀了Rocketmq關於pullmessage的實現方式,分享出來

 

眾所周知,Rocketmq在consumer端是拉取消息的方式,它會在客戶端維護一個PullRequestQueue,這個是一個阻塞隊列(LinkedBlockingQueue),內部的節點是PullRequest,每一個PullRequest代表了一個消費的分組單元

 

PullRequest會記錄一個topic對應的consumerGroup的拉取進度,包括MessageQueue和PorcessQueue,還有拉取的offset

(代碼片段一)
public
class PullRequest { private String consumerGroup; private MessageQueue messageQueue; private ProcessQueue processQueue; private long nextOffset; private boolean lockedFirst = false; }

其中MessageQueue記錄元信息:

(代碼片段二)
public
class MessageQueue implements Comparable<MessageQueue>, Serializable { private String topic; private String brokerName; private int queueId; }

PorcessQueue記錄一次拉取之后實際消息體和拉取相關操作記錄的快照

(代碼片段三)
public
class ProcessQueue { private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); private final AtomicLong msgCount = new AtomicLong(); private final AtomicLong msgSize = new AtomicLong(); private final Lock lockConsume = new ReentrantLock(); private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>(); private final AtomicLong tryUnlockTimes = new AtomicLong(0); private volatile long queueOffsetMax = 0L; private volatile boolean dropped = false; private volatile long lastPullTimestamp = System.currentTimeMillis(); private volatile long lastConsumeTimestamp = System.currentTimeMillis(); private volatile boolean locked = false; private volatile long lastLockTimestamp = System.currentTimeMillis(); private volatile boolean consuming = false; private volatile long msgAccCnt = 0; }

 

 

PullMessageService負責輪詢PullRequestQueue,並進行消息元的拉取

(代碼片段四)
public
class PullMessageService extends ServiceThread { private final InternalLogger log = ClientLogger.getLog(); private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>(); private final MQClientInstance mQClientFactory; @Override public void run() { while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } } }

 

在發送的時候會維護一個PullCallback,這是拉取收到消息后的回調處理

(代碼片段五)
public
interface PullCallback { void onSuccess(final PullResult pullResult); void onException(final Throwable e); }

這里的實現邏輯就不貼了,本質上就是把消息丟給消費線程池來處理

 

pullMessage分為同步拉取和異步拉取兩種模式,先解讀異步拉取,然后再解讀同步拉取,再說明兩者的區別

其實從這里已經大概可以看出來,異步的方式,這個方法返回值是null,同步的方式必須要返回PullResult,后續說明區別

(代碼片段六)
public
PullResult pullMessage( final String addr, final PullMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); switch (communicationMode) { case ONEWAY: assert false; return null; case ASYNC: this.pullMessageAsync(addr, request, timeoutMillis, pullCallback); return null; case SYNC: return this.pullMessageSync(addr, request, timeoutMillis); default: assert false; break; } return null; }

 

先介紹異步拉取

可以看到,把PullCallback傳進去,並封裝了InvokeCallback,

(代碼片段七)
private
void pullMessageAsync( final String addr, final RemotingCommand request, final long timeoutMillis, final PullCallback pullCallback ) throws RemotingException, InterruptedException { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (response != null) { try { PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response); assert pullResult != null; pullCallback.onSuccess(pullResult); } catch (Exception e) { pullCallback.onException(e); } } else { if (!responseFuture.isSendRequestOK()) { pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause())); } else if (responseFuture.isTimeout()) { pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, responseFuture.getCause())); } else { pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause())); } } } }); }

 

接下來進入NettyRemotingAbstract這個類中,使用netty的Chanle發送

(代碼片段八)
public
void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final int opaque = request.getOpaque(); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseFuture.putResponse(null); responseTable.remove(opaque); try { executeInvokeCallback(responseFuture); } catch (Throwable e) { log.warn("excute callback in writeAndFlush addListener, and callback throw", e); } finally { responseFuture.release(); } log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { responseFuture.release(); log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }

 

這里需要詳細解讀一下:

RemotingCommand是request和response的載體,先從request中取出opaque,這是一個自增的操作id,然后將傳進來的opaque和invokeCallback封裝成一個ResponseFuture,再put到一個叫

responseTable的map中,這個map是一個核心的map,維護着opaque和對應的ResponseFuture

    /**
     * This map caches all on-going requests.
     */
    protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
        new ConcurrentHashMap<Integer, ResponseFuture>(256);

從注釋中可以看出,它緩存着正在執行的request

 

再回到剛剛的(代碼片段八)中,channel.writeAndFlush(request).addListener(new ChannelFutureListener(){...}),netty在writeAndFlush發送完之后會回調我們ChannelFutureListener的operationComplete方法:如果發送成功則responseFuture.setSendRequestOK(true); 並且就return了;如果發送失敗,則從responseTable中移除,並且起一個異步線程執行responseFuture中的InvokeCallback,在(代碼片段七)中,可以看到當responseFuture.isSendRequestOK()是false的時候,執行了onException,這里就不多介紹了。

 

那么此時發送的邏輯就全部結束了,整個過程沒有任何的阻塞,當Broker收到拉取請求后,會按照queueOffset等信息封裝好返回consumer端,

會經過NettyRemotingServer上注冊的NettyServerHandler

(代碼片段九)
class
NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { processTunnelId(ctx, msg); processMessageReceived(ctx, msg); } public void processTunnelId(ChannelHandlerContext ctx, RemotingCommand msg) { if (nettyServerConfig.isValidateTunnelIdFromVtoaEnable()) { if (null != msg && msg.getType() == RemotingCommandType.REQUEST_COMMAND) { Vtoa vtoa = tunnelTable.get(ctx.channel()); if (null == vtoa) { vtoa = VpcTunnelUtils.getInstance().getTunnelID(ctx); tunnelTable.put(ctx.channel(), vtoa); } msg.addExtField(VpcTunnelUtils.PROPERTY_VTOA_TUNNEL_ID, String.valueOf(vtoa.getVid())); } } } }

 

最終會調用到NettyRemotingAbstract的processResponseCommand,RemotingCommand中根據opaque從responseTable中獲取ResponseFuture,然后同樣也是執行callback,這樣,就實現了整個pullmessage的異步模式

(代碼片段十)
public
void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { responseFuture.setResponseCommand(cmd); responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { //異步 executeInvokeCallback(responseFuture); //執行回調 } else { //同步 responseFuture.putResponse(cmd); //為了解除阻塞 responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }

 

我們再看下同步的方式是如何實現的

 

回顧下代碼片段六,同步的方式是需要返回PullResult的,換句話說,這種方式是需要在發送的線程中來處理返回結果的

我們從代碼片段六跟下去,跟到NettyRemotingAbstract的invokeSyncImpl

(代碼片段十一)
public
RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }

 

和異步發送的代碼片段八對比一下,可以看到,同步方式也要放到responseTable中,這里就有個疑惑了,既然都同步了,還要放到responseTable中干什么呢,繼續往下看,

ChannelFutureListener都是一樣的,如果發送成功就返回了,然后到了最關鍵的一行:

RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

這一行顧名思義就是阻塞了,但是也不能一直阻塞住(因為PullMessageService是單線程的,如果因為一個異常就阻塞那就跪了),所以是一個設置了超時時間的阻塞,看下是如何阻塞的

 

ResponseFuture中有這兩個方法,當putResponse的時候,把RemotingCommand賦值,並且countDownLatch.countDown,而在waitResponse的時候countDownLatch.await

(代碼片段十二)
public
RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException { this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); return this.responseCommand; } public void putResponse(final RemotingCommand responseCommand) { this.responseCommand = responseCommand; this.countDownLatch.countDown(); }

 

這樣就清晰多了,剩下的疑問就是,在什么時候putResponse的,有兩個地方:

第一個地方,當拉取消息回來的時候,回顧下代碼片段十,有一句是(responseFuture.getInvokeCallback() != null),通過剛剛的流程已經知道,只有異步的時候invokeCallback才不為null,因此走到else,看到在這個時候responseFuture.putResponse(cmd)和responseFuture.release(),也就是說同步方式也是通過responseTable存儲的方式,來獲取結果,並且通過CountDownLatch來阻塞發送的線程,當收到消息之后再countDown,發送端最終返回PullResult來處理消息

第二個地方,回顧下代碼片段十一,在ChannelFutureListener中當發送失敗了以后,也會put一個null值:responseFuture.putResponse(null),這里只是為了將阻塞放開

 

至此,Rockmq關於pullmessage的同步和異步方式就已經說明白了,總結一下,同步和異步本質上都是“異步”的,因為netty就是一個異步的框架,Rockmq只是利用了CountDownLatch來阻塞住發送端線程來實現了“同步”的效果,

通過一個responseTable來緩存住發送出去的請求,等收到的時候從這個緩存里按對應關系取出來,再去做對應的consumer線程的消息處理

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM