發現rabbitmq有一個ConnectionFactory
。發現rocketmq好像沒這個東西。按道理來說如果每次發送消息都新建一條連接肯定是不可能的。
ps:其實之所以是有上面的疑問是因為數據庫連接池那個地方來的,因為數據庫連接connection並沒有說是線程安全的,所以為了線程安全會為每個事物單獨分配一個連接。但是rocketmq用的是netty的長連接channel,Java 上關於SocketChannel 的注釋 說明是安全的。
《Netty : writeAndFlush的線程安全及並發問題》
可見,writeAndFlush如果在Netty線程池內執行,則是直接write;否則,將作為一個task插入到Netty線程池執行。
這個問題歸根到底是netty的問題。既然是線程安全的,那么整個系統就可以只用一個連接了。
所以我們從源碼角度來分析一下rocketmq是如何做得。
下面我們追蹤一條消息發送的過程,下面是發送消息時涉及io的模塊。
rocketmq就是在channelTables里面維護的連接。
private Channel getAndCreateNameserverChannel() throws InterruptedException { String addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } } final List<String> addrList = this.namesrvAddrList.get(); if (this.lockNamesrvChannel.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) { try { addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } } if (addrList != null && !addrList.isEmpty()) { for (int i = 0; i < addrList.size(); i++) { int index = this.namesrvIndex.incrementAndGet(); index = Math.abs(index); index = index % addrList.size(); String newAddr = addrList.get(index); this.namesrvAddrChoosed.set(newAddr); Channel channelNew = this.createChannel(newAddr); if (channelNew != null) return channelNew; } } } catch (Exception e) { log.error("getAndCreateNameserverChannel: create name server channel exception", e); } finally { this.lockNamesrvChannel.unlock(); } } else { log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LockTimeoutMillis); } return null; }
private Channel createChannel(final String addr) throws InterruptedException { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } //lockChannelTables是ReentrantLock鎖。tryLock() 嘗試獲取鎖,不管成功失敗,都立即返回true、false if (this.lockChannelTables.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) { try { boolean createNewConnection = false; cw = this.channelTables.get(addr); if (cw != null) { if (cw.isOK()) { return cw.getChannel(); } else if (!cw.getChannelFuture().isDone()) { createNewConnection = false; } else { this.channelTables.remove(addr); createNewConnection = true; } } else { createNewConnection = true; } if (createNewConnection) { ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); log.info("createChannel: begin to connect remote host[{}] asynchronously", addr); cw = new ChannelWrapper(channelFuture);
//連接被保存到channelTables中。 this.channelTables.put(addr, cw); } } catch (Exception e) { log.error("createChannel: create channel exception", e); } finally { this.lockChannelTables.unlock(); } } else { log.warn("createChannel: try to lock channel table, but timeout, {}ms", LockTimeoutMillis); } if (cw != null) { ChannelFuture channelFuture = cw.getChannelFuture(); if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { if (cw.isOK()) { log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); return cw.getChannel(); } else { log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause()); } } else { log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), channelFuture.toString()); } } return null; }
/** * @see {@link #connect()} */ private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise = channel.newPromise(); if (regFuture.isDone()) { doConnect0(regFuture, channel, remoteAddress, localAddress, promise); } else { regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doConnect0(regFuture, channel, remoteAddress, localAddress, promise); } }); } return promise; }
private static void doConnect0( final ChannelFuture regFuture, final Channel channel, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new OneTimeTask() { @Override public void run() { if (regFuture.isSuccess()) { if (localAddress == null) {
//io.netty.channel.connect(remoteAddress,promise); channel.connect(remoteAddress, promise); } else { channel.connect(remoteAddress, localAddress, promise); } promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
連接這塊就點到netty為止,進一步了解可以參考netty這方面的資料。
下面分析rocketmq對channelTables的維護。
1,關閉系統時清空:
2.關閉channel時