【源碼】RocketMQ如何實現獲取指定消息


概要

消息查詢是什么?

消息查詢就是根據用戶提供的msgId從MQ中取出該消息

RocketMQ如果有多個節點如何查詢?

問題:RocketMQ分布式結構中,數據分散在各個節點,即便是同一Topic的數據,也未必都在一個broker上。客戶端怎么知道數據該去哪個節點上查?

猜想1:逐個訪問broker節點查詢數據

猜想2:有某種數據中心存在,該中心知道所有消息存儲的位置,只要向該中心查詢即可得到消息具體位置,進而取得消息內容

實際:

1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及該消息在CommitLog中的偏移量。

2.客戶端實現會從msgId字符串中解析出broker地址,向指定broker節查詢消息。

問題:CommitLog文件有多個,只有偏移量估計不能確定在哪個文件吧?

實際:單個Broker節點內offset是全局唯一的,不是每個CommitLog文件的偏移量都是從0開始的。單個節點內所有CommitLog文件共用一套偏移量,每個文件的文件名為其第一個消息的偏移量。所以可以根據偏移量和文件名確定CommitLog文件。

源碼閱讀

0.使用方式

MessageExt  msg = consumer.viewMessage(msgId);

1.消息ID解析

這個了解下就可以了

public class MessageId {
    private SocketAddress address;
    private long offset;

    public MessageId(SocketAddress address, long offset) {
        this.address = address;
        this.offset = offset;
    }

    //get-set
}

//from MQAdminImpl.java
public MessageExt viewMessage(
    String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

    MessageId messageId = null;
    try {
        //從msgId字符串中解析出address和offset
        //address = ip:port
        //offset為消息在CommitLog文件中的偏移量
        messageId = MessageDecoder.decodeMessageId(msgId);
    } catch (Exception e) {
        throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
    }
    return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
        messageId.getOffset(), timeoutMillis);
}

//from MessageDecoder.java
public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
    SocketAddress address;
    long offset;
    //ipv4和ipv6的區別
    //如果msgId總長度超過32字符,則為ipv6
    int ipLength = msgId.length() == 32 ? 4 * 2 : 16 * 2;

    byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
    byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
    ByteBuffer bb = ByteBuffer.wrap(port);
    int portInt = bb.getInt(0);
    address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);

    // offset
    byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8 + 16));
    bb = ByteBuffer.wrap(data);
    offset = bb.getLong(0);

    return new MessageId(address, offset);
}

2.長連接客戶端RPC實現

要發請求首先得先建立連接,這里方法可以看到創建連接相關的操作。值得注意的是,第一次訪問的時候可能連接還沒建立,建立連接需要消耗一段時間。代碼中對這個時間也做了判斷,如果連接建立完成后,發現已經超時,則不再發出請求。目的應該是盡可能減少請求線程的阻塞時間。

//from NettyRemotingClient.java
@Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    long beginStartTime = System.currentTimeMillis();
    //這里會先檢查有無該地址的通道,有則返回,無則創建
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            //前置鈎子
            doBeforeRpcHooks(addr, request); 
            //判斷通道建立完成時是否已到達超時時間,如果超時直接拋出異常。不發請求
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            //同步調用
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            //后置鈎子
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); //后置鈎子
            return response;
        } catch (RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        } catch (RemotingTimeoutException e) {
            if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

下一步看看它的同步調用做了什么處理。注意到它會構建一個Future對象加入待響應池,發出請求報文后就掛起線程,然后等待喚醒(waitResponse內部使用CountDownLatch等待)。

//from NettyRemotingAbstract.java
public
RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { //請求id final int opaque = request.getOpaque(); try { //請求存根 final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); //加入待響應的請求池 this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); //將請求發出,成功發出時更新狀態 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { //若成功發出,更新請求狀態為“已發出” responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } //若發出失敗,則從池中移除(沒用了,釋放資源) responseTable.remove(opaque); responseFuture.setCause(f.cause()); //putResponse的時候會喚醒等待的線程 responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); //只等待一段時間,不會一直等下去 //若正常響應,則收到響應后,此線程會被喚醒,繼續執行下去 //若超時,則到達該時間后線程蘇醒,繼續執行 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { //正常響應完成時,將future釋放(正常邏輯) //超時時,將future釋放。這個請求已經作廢了,后面如果再收到響應,就可以直接丟棄了(由於找不到相關的響應鈎子,就不處理了) this.responseTable.remove(opaque); } }

好,我們再來看看收到報文的時候是怎么處理的。我們都了解JDK中的Future的原理,大概就是將這個任務提交給其他線程處理,該線程處理完畢后會將結果寫入到Future對象中,寫入時如果有線程在等待該結果,則喚醒這些線程。這里也差不多,只不過執行線程在服務端,服務執行完畢后會將結果通過長連接發送給客戶端,客戶端收到后根據報文中的ID信息從待響應池中找到Future對象,然后就是類似的處理了。

class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {

    //底層解碼完畢得到RemotingCommand的報文
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        processMessageReceived(ctx, msg);
    }
}

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    if (cmd != null) {
        //判斷類型
        switch (cmd.getType()) {
            case REQUEST_COMMAND:
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
    //取得消息id
    final int opaque = cmd.getOpaque();
    //從待響應池中取得對應請求
    final ResponseFuture responseFuture = responseTable.get(opaque);
    if (responseFuture != null) {
        //將響應值注入到ResponseFuture對象中,等待線程可從這個對象獲取結果
        responseFuture.setResponseCommand(cmd);
        //請求已處理完畢,釋放該請求
        responseTable.remove(opaque);

        //如果有回調函數的話則回調(由當前線程處理)
        if (responseFuture.getInvokeCallback() != null) {
            executeInvokeCallback(responseFuture);
        } else {
            //沒有的話,則喚醒等待線程(由等待線程做處理)
            responseFuture.putResponse(cmd);
            responseFuture.release();
        }
    } else {
        log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        log.warn(cmd.toString());
    }
}

總結一下,客戶端的處理時序大概是這樣的:

結構大概是這樣的:

3.服務端的處理

第一步先從收到報文開始,經過底層解碼后,進站的最后一個Handler-NettyServerHandler類將收到解碼后的RemotingCommand報文。

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        processMessageReceived(ctx, msg);
    }
}

處理類同樣是NettyRemotingAbscract.java。不過這里報文的類型是請求,故將調用processRequestCommand方法進行處理。

//from NettyRemotingAbscract.java
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    if (cmd != null) {
        switch (cmd.getType()) {
            case REQUEST_COMMAND: //服務端走這里
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}

服務端收到請求后,先判斷有無該請求類型關聯的處理類,如果沒有則告知客戶端請求類型不支持。有的話則構建任務,提交給處理器關聯的線程池處理(服務端NIO線程不處理業務)。提交的時候帶上Channel信息,這樣得到結果后,處理線程就可以直接通過channel將響應結果寫回了。

//from NettyRemotingAbscract.java
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    //查看有無該請求code相關的處理器
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    //如果沒有,則使用默認處理器(可能沒有默認處理器)
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

    if (pair != null) {
        //構建任務
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                    final RemotingResponseCallback callback = new RemotingResponseCallback() {
                        @Override
                        public void callback(RemotingCommand response) {
                            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                            if (!cmd.isOnewayRPC()) {
                                if (response != null) { //不為null,則由本類將響應值寫會給請求方
                                    response.setOpaque(opaque);
                                    response.markResponseType();
                                    try {
                                        ctx.writeAndFlush(response);
                                    } catch (Throwable e) {
                                        log.error("process request over, but response failed", e);
                                        log.error(cmd.toString());
                                        log.error(response.toString());
                                    }
                                } else { //為null,意味着processor內部已經將響應處理了,這里無需再處理。
                                }
                            }
                        }
                    };
                    if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                        AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
                        processor.asyncProcessRequest(ctx, cmd, callback);
                    } else { 
                        NettyRequestProcessor processor = pair.getObject1();
                        RemotingCommand response = processor.processRequest(ctx, cmd);
                        doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                        callback.callback(response);
                    }
                } catch (Throwable e) {
                    log.error("process request exception", e);
                    log.error(cmd.toString());

                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                            RemotingHelper.exceptionSimpleDesc(e));
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            }
        };

        if (pair.getObject1().rejectRequest()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                "[REJECTREQUEST]system busy, start flow control for a while");
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            return;
        }

        try {
            //將任務提交給處理器的線程池處理(NIO線程只提交任務,不處理業務)
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
            if ((System.currentTimeMillis() % 10000) == 0) {
                log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                    + ", too many requests and system thread pool busy, RejectedExecutionException "
                    + pair.getObject2().toString()
                    + " request code: " + cmd.getCode());
            }

            if (!cmd.isOnewayRPC()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[OVERLOAD]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
            }
        }
    } else {
        String error = " request type " + cmd.getCode() + " not supported";
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
        log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
    }
}

我們再來看看查詢消息的處理器是如何實現的。這個類是QueryMessageProcessor,它支持兩種查詢方式,我們這里使用的是根據msgId直接查詢,故調用viewMessageById進行處理。

//from QueryMessageProcesor.java
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
    throws RemotingCommandException {
    switch (request.getCode()) {
        case RequestCode.QUERY_MESSAGE:
            return this.queryMessage(ctx, request);
        case RequestCode.VIEW_MESSAGE_BY_ID: //通過msgId查詢消息
            return this.viewMessageById(ctx, request);
        default:
            break;
    }

    return null;
}

viewMessageById內部,則是根據客戶端提供的偏移量讀取對應的消息。這里讀取到消息內容后將構造一個RemotingCommand報文回送給客戶端。

//from QueryMessageProcesor.java
public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
    throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final ViewMessageRequestHeader requestHeader =
        (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);

    response.setOpaque(request.getOpaque());

    //getMessagetStore得到當前映射到內存中的CommitLog文件,然后根據偏移量取得數據
    final SelectMappedBufferResult selectMappedBufferResult =
        this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
    if (selectMappedBufferResult != null) {
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);

        //將響應通過socket寫回給客戶端
        try {
            //response對象的數據作為header
            //消息內容作為body
            FileRegion fileRegion =
                new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
                    selectMappedBufferResult);
            ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    selectMappedBufferResult.release();
                    if (!future.isSuccess()) {
                        log.error("Transfer one message from page cache failed, ", future.cause());
                    }
                }
            });
        } catch (Throwable e) {
            log.error("", e);
            selectMappedBufferResult.release();
        }

        return null; //如果有值,則直接寫回給請求方。這里返回null是不需要由外層處理響應。
    } else {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("can not find message by the offset, " + requestHeader.getOffset());
    }

    return response;
}

接下來再來看看消息是如何從CommitLog文件中讀取出來的。我們知道RocketMQ的CommitLog文件會通過內存映射的方式載入內存,故可以在內存中直接訪問。看看代碼是如何實現的。其中消息的存儲條目中,前4個字節用來表示消息存儲長度。故先讀取一次得到長度信息,再完整取出。

//DefaultMessageStore.java
@Override
public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
    //只要讀前4個字節的信息,就可得到長度信息
    SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
    if (null != sbr) {
        try {
            // 1 TOTALSIZE
            int size = sbr.getByteBuffer().getInt();
            //得到長度信息后,在讀取完整信息
            return this.commitLog.getMessage(commitLogOffset, size);
        } finally {
            sbr.release();
        }
    }

    return null;
}

在getMessage實現中可以通過"消息偏移量"和"單個CommitLog文件的固定長度"確定兩個信息。一個是消息所在的CommitLog文件,二是消息在該CommitLog文件中的相對偏移量。

//CommitLog.java
public SelectMappedBufferResult getMessage(final long offset, final int size) {
    //獲取commitLog文件的大小,默認是1G
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
    //MappedFileQueue中存儲映射的文件列表,這里可以通過消息的偏移量和CommitLog文件的大小,確定文件
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
    if (mappedFile != null) {
        //得到相對偏移量(消息在該文件內部的偏移量)
        int pos = (int) (offset % mappedFileSize);
        return mappedFile.selectMappedBuffer(pos, size);
    }
    return null;
}

確定完文件和相對偏移量之后,就可以直接讀取數據了。這里值得注意的是,MappedByteBuffer的position始終為0。寫出的索引信息單獨存儲在wrotePosition字段中,該字段的值會在重啟的時候重新載入。(寫入時也是基於切片處理的,不會影響position的值)

//from MappedFile
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
    //文件最大可讀位置
    int readPosition = getReadPosition();
    if ((pos + size) <= readPosition) {
        if (this.hold()) {//切片(數據范圍:0~limit)
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
            //由於是切片(position,limit,capacity獨立)故修改position,不會影響原position
            byteBuffer.position(pos);
            //再次切片(數據范圍:position~length)
            ByteBuffer byteBufferNew = byteBuffer.slice();
            //設定limit(數據范圍:position~position+size)
            byteBufferNew.limit(size);
            return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
        } else {
            log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
                + this.fileFromOffset);
        }
    } else {
        log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
            + ", fileFromOffset: " + this.fileFromOffset);
    }

    return null;
}

//todo 補充Rocket下一層的封包處理

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM