RocketMQ之連接以及連接緩存


 發現rabbitmq有一個ConnectionFactory。發現rocketmq好像沒這個東西。按道理來說如果每次發送消息都新建一條連接肯定是不可能的。

ps:其實之所以是有上面的疑問是因為數據庫連接池那個地方來的,因為數據庫連接connection並沒有說是線程安全的,所以為了線程安全會為每個事物單獨分配一個連接。但是rocketmq用的是netty的長連接channel,Java 上關於SocketChannel 的注釋 說明是安全的。

Netty : writeAndFlush的線程安全及並發問題

可見,writeAndFlush如果在Netty線程池內執行,則是直接write;否則,將作為一個task插入到Netty線程池執行。

這個問題歸根到底是netty的問題。既然是線程安全的,那么整個系統就可以只用一個連接了。

 

所以我們從源碼角度來分析一下rocketmq是如何做得。

下面我們追蹤一條消息發送的過程,下面是發送消息時涉及io的模塊。

rocketmq就是在channelTables里面維護的連接。

 

    private Channel getAndCreateNameserverChannel() throws InterruptedException {
        String addr = this.namesrvAddrChoosed.get();
        if (addr != null) {
            ChannelWrapper cw = this.channelTables.get(addr);
            if (cw != null && cw.isOK()) {
                return cw.getChannel();
            }
        }

        final List<String> addrList = this.namesrvAddrList.get();
        if (this.lockNamesrvChannel.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
            try {
                addr = this.namesrvAddrChoosed.get();
                if (addr != null) {
                    ChannelWrapper cw = this.channelTables.get(addr);
                    if (cw != null && cw.isOK()) {
                        return cw.getChannel();
                    }
                }

                if (addrList != null && !addrList.isEmpty()) {
                    for (int i = 0; i < addrList.size(); i++) {
                        int index = this.namesrvIndex.incrementAndGet();
                        index = Math.abs(index);
                        index = index % addrList.size();
                        String newAddr = addrList.get(index);

                        this.namesrvAddrChoosed.set(newAddr);
                        Channel channelNew = this.createChannel(newAddr);
                        if (channelNew != null)
                            return channelNew;
                    }
                }
            } catch (Exception e) {
                log.error("getAndCreateNameserverChannel: create name server channel exception", e);
            } finally {
                this.lockNamesrvChannel.unlock();
            }
        } else {
            log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LockTimeoutMillis);
        }

        return null;
    }

 

    private Channel createChannel(final String addr) throws InterruptedException {
        ChannelWrapper cw = this.channelTables.get(addr);
        if (cw != null && cw.isOK()) {
            return cw.getChannel();
        }

        //lockChannelTables是ReentrantLock鎖。tryLock()  嘗試獲取鎖,不管成功失敗,都立即返回true、false
        if (this.lockChannelTables.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
            try {
                boolean createNewConnection = false;
                cw = this.channelTables.get(addr);
                if (cw != null) {

                    if (cw.isOK()) {
                        return cw.getChannel();
                    }

                    else if (!cw.getChannelFuture().isDone()) {
                        createNewConnection = false;
                    }

                    else {
                        this.channelTables.remove(addr);
                        createNewConnection = true;
                    }
                }

                else {
                    createNewConnection = true;
                }

                if (createNewConnection) {
                    ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); log.info("createChannel: begin to connect remote host[{}] asynchronously", addr); cw = new ChannelWrapper(channelFuture);
            //連接被保存到channelTables中。 this
.channelTables.put(addr, cw); } } catch (Exception e) { log.error("createChannel: create channel exception", e); } finally { this.lockChannelTables.unlock(); } } else { log.warn("createChannel: try to lock channel table, but timeout, {}ms", LockTimeoutMillis); } if (cw != null) { ChannelFuture channelFuture = cw.getChannelFuture(); if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { if (cw.isOK()) { log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); return cw.getChannel(); } else { log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause()); } } else { log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), channelFuture.toString()); } } return null; }

 

    /**
     * @see {@link #connect()}
     */
    private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        final ChannelPromise promise = channel.newPromise();
        if (regFuture.isDone()) {
            doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
        } else {
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
                }
            });
        }

        return promise;
    }
    private static void doConnect0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new OneTimeTask() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    if (localAddress == null) {
              //io.netty.channel.connect(remoteAddress,promise); channel.connect(remoteAddress, promise); }
else { channel.connect(remoteAddress, localAddress, promise); } promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }

連接這塊就點到netty為止,進一步了解可以參考netty這方面的資料。

下面分析rocketmq對channelTables的維護。

 1,關閉系統時清空:

2.關閉channel時

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM