這個異步調用方法中傳入一個final 回調對象。

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { System.out.println("NettyRemotingAbstract.invokeAsyncImpl()****************"); final int opaque = request.getOpaque(); //超時信號量鎖 boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); //這個ResonseFuture的封裝的思路。 final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { //----》執行io請求,添加channelfuture監聽器 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { //這里只是發送成功!!!!!!!!!!!!!!!!!!!!!!!!! if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } //這里只是發送成功,返回設為nul。putResponse -> NULL responseFuture.putResponse(null); responseTable.remove(opaque); try { //發送成功后還沒有返回消息時的,調用回調方法。參見:MQClientAPIImpl.sendMessageAsync(..) System.out.println("**************NettyRemotingAbstract.invokeAsyncImpl().callback()"); responseFuture.executeInvokeCallback(); } catch (Throwable e) { plog.warn("excute callback in writeAndFlush addListener, and callback throw", e); } finally { responseFuture.release(); } plog.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { responseFuture.release(); plog.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", // timeoutMillis, // this.semaphoreAsync.getQueueLength(), // this.semaphoreAsync.availablePermits()// ); plog.warn(info); throw new RemotingTooMuchRequestException(info); } }
我們往上面看看這個回調對象的回調方法:


