不管是DefaultMQProducer還是DefaultMQPushConsumer,本質都是封裝類,發起請求的實際上是RemotingClient,它的start方法調用之后,啟動了一個netty的客戶端bootstrap,每次需要與nameService或者broker進行連接的時候,調用getAndCreateChannel方法,從一個map中創建或者獲取channel(創建的時候nameService和broker兩者的區別在於addr參數是不是為null),連接建立之后,發起請求調用的是invokeSync和invokeAsync,點進去看:同步invokeSync的實現是新建一個responseFuture,放到responseTable中(key是自增的requestId),然后調用channel.writeAndFlush(request),發起請求,最后調用responseFuture.waitResponse,等待響應。讓線程等待用的是countDownLatch,那么latch之后怎樣放行呢?數據的發出是writeAndFlush,進來就應該是在channel的read方法中,去查看bootstrap的構造過程,發現添加的handler中有NettyClientHandler,點進去一看重寫了channelRead0方法(在其父類的channelRead方法被調用),里面有processMessageReceived,點進去發現根據收到的RemotingCommand的type,是對方的主動請求還是對自己之前請求的應答,現在討論的情況下是響應,所以進入processResponseCommand,一看,果然是從上面說的responseTable中再根據id把responseFuture取出來,如果有回調方法,會執行回調方法,因為現在討論的是同步請求的情況,是不會傳入回調方法的(異步請求是有的),那么進入responseFuture.putResponse(cmd),點進去看,看見了countDownLatch.countDown(),至此同步方法邏輯通了。
異步方法的邏輯和上面一樣,區別就在於剛才的判斷,如果是異步請求,取出回調方法並執行。
像發送消息就是同步請求,而拉去消息就是異步。