通過前面的幾篇文章,對整個netty部分的架構已經運行原理都有了一定的了解,那么這篇文章來分析一個經常用到的類:ServerBootstrap,一般對於服務器端的編程它用到的都還算是比較的多。。看一看它的初始化,以及它的運行原理。。。
首先我們還是引入一段代碼,通過分析這段代碼來分析ServerBootstrap的運行。。。
- EventLoopGroup bossGroup = new NioEventLoopGroup(); //這個是用於serversocketchannel的eventloop
- EventLoopGroup workerGroup = new NioEventLoopGroup(); //這個是用於處理accept到的channel
- try {
- ServerBootstrap b = new ServerBootstrap(); //構建serverbootstrap對象
- b.group(bossGroup, workerGroup); //設置時間循環對象,前者用來處理accept事件,后者用於處理已經建立的連接的io
- b.channel(NioServerSocketChannel.class); //用它來建立新accept的連接,用於構造serversocketchannel的工廠類
- b.childHandler(new ChannelInitializer<SocketChannel>(){ //為accept channel的pipeline預添加的inboundhandler
- @Override //當新連接accept的時候,這個方法會調用
- protected void initChannel(SocketChannel ch) throws Exception {
- // TODO Auto-generated method stub
- ch.pipeline().addLast(new MyChannelHandler()); //為當前的channel的pipeline添加自定義的處理函數
- }
- });
- //bind方法會創建一個serverchannel,並且會將當前的channel注冊到eventloop上面,
- //會為其綁定本地端口,並對其進行初始化,為其的pipeline加一些默認的handler
- ChannelFuture f = b.bind(80).sync();
- f.channel().closeFuture().sync(); //相當於在這里阻塞,直到serverchannel關閉
- } finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
這段代碼在前面的文章也有用到,基本上其意思也都在上面的注釋中說的比較清楚了,那么我們接下來具體的分析其中的方法調用,首先是ServerBootstrap的group方法:
- //這里parent用於執行server的accept時間事件,child才是用於執行獲取的channel連接的事件
- public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
- super.group(parentGroup);
- if (childGroup == null) {
- throw new NullPointerException("childGroup");
- }
- if (this.childGroup != null) {
- throw new IllegalStateException("childGroup set already");
- }
- this.childGroup = childGroup;
- return this;
- }
這個方法是用來設置eventloopgroup,首先調用了父類的group方法(abstractbootstrap),就不將父類的方法列出來了,其實意思都差不多,eventloopgroup屬性的值。。。
好了,接下來我們再來看一下channel方法:
- //構造serversocketchannel factory
- public B channel(Class<? extends C> channelClass) {
- if (channelClass == null) {
- throw new NullPointerException("channelClass");
- }
- return channelFactory(new BootstrapChannelFactory<C>(channelClass)); //構造工廠類
- }
- /**
- * {@link ChannelFactory} which is used to create {@link Channel} instances from
- * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
- * is not working for you because of some more complex needs. If your {@link Channel} implementation
- * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
- * simplify your code.
- */
- @SuppressWarnings("unchecked")
- public B channelFactory(ChannelFactory<? extends C> channelFactory) {
- if (channelFactory == null) {
- throw new NullPointerException("channelFactory");
- }
- if (this.channelFactory != null) {
- throw new IllegalStateException("channelFactory set already");
- }
- this.channelFactory = channelFactory; //設置
- return (B) this;
- }
該方法主要是用於構造用於產生channel的工廠類,在我們這段代碼說白了就是用於實例化serversocketchannel的工廠類。。。
接下來我們再來看一下childHandler方法:
- //設置childHandler,這個是當有channel accept之后為其添加的handler
- public ServerBootstrap childHandler(ChannelHandler childHandler) {
- if (childHandler == null) {
- throw new NullPointerException("childHandler");
- }
- this.childHandler = childHandler;
- return this;
- }
這個很簡單吧,就是一個賦值,具體說他有什么用,前面的注釋有說明,不過以后的分析會說明它有什么用的。。。
接下來我們來看一下bind方法,這個比較重要吧:
- //最終將會創建serverchannel,然后會將其綁定到這個地址,然后對其進行初始化
- public ChannelFuture bind(int inetPort) {
- return bind(new InetSocketAddress(inetPort));
- }
好吧,接下來再來看bind方法:
- public ChannelFuture bind(SocketAddress localAddress) {
- validate();
- if (localAddress == null) {
- throw new NullPointerException("localAddress");
- }
- return doBind(localAddress);
- }
好吧,再來看看doBind方法:
- private ChannelFuture doBind(final SocketAddress localAddress) {
- final ChannelFuture regPromise = initAndRegister(); //在這里創建serverchanel,並對其進行初始化,並將其注冊到eventloop當中去
- final Channel channel = regPromise.channel();
- final ChannelPromise promise = channel.newPromise();
- if (regPromise.isDone()) {
- doBind0(regPromise, channel, localAddress, promise); //將當前的serverchannel綁定地址
- } else {
- regPromise.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- doBind0(future, channel, localAddress, promise);
- }
- });
- }
- return promise;
- }
這里調用了一個比較重要的方法:initAndRegister,我們來看看它的定義:
- //創建初始化以及注冊serverchanel
- final ChannelFuture initAndRegister() {
- //利用工廠類創建channel
- final Channel channel = channelFactory().newChannel();
- try {
- init(channel); //init函數留給了后面來實現,用於初始化channel,例如為其的pipeline加上handler
- } catch (Throwable t) {
- channel.unsafe().closeForcibly();
- return channel.newFailedFuture(t);
- }
- ChannelPromise regPromise = channel.newPromise();
- group().register(channel, regPromise); //將當前創建的serverchannel注冊到eventloop上面去
- if (regPromise.cause() != null) {
- if (channel.isRegistered()) {
- channel.close();
- } else {
- channel.unsafe().closeForcibly();
- }
- }
- // If we are here and the promise is not failed, it's one of the following cases:
- // 1) If we attempted registration from the event loop, the registration has been completed at this point.
- // i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
- // 2) If we attempted registration from the other thread, the registration request has been successfully
- // added to the event loop's task queue for later execution.
- // i.e. It's safe to attempt bind() or connect() now:
- // because bind() or connect() will be executed *after* the scheduled registration task is executed
- // because register(), bind(), and connect() are all bound to the same thread.
- return regPromise;
- }
代碼還是很簡單,而且也相對比較好理解,無非就是利用前面說到過的channel工廠類來創建一個serversocketchannel,然后調用init方法對這個剛剛生成的channel進行一些初始化的操作,然后在調用eventloopgroup的register方法,將當前這個channel的注冊到group上,那么以后這個channel的事件都在這個group上面執行,說白了也就是一些accept。、。。
好,我們先來看看這個init方法吧:
- @Override
- //初始化chanel,當用channel factory構造channel以后,會調用這個函數來初始化,說白了就是為當前的channel的pipeline加入一些handler
- void init(Channel channel) throws Exception {
- //先初始化一些配置
- final Map<ChannelOption<?>, Object> options = options();
- synchronized (options) {
- channel.config().setOptions(options);
- }
- //初始化一些屬性
- final Map<AttributeKey<?>, Object> attrs = attrs();
- synchronized (attrs) {
- for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
- @SuppressWarnings("unchecked")
- AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
- channel.attr(key).set(e.getValue());
- }
- }
- //獲取當前channel的pipeline
- ChannelPipeline p = channel.pipeline();
- if (handler() != null) {
- p.addLast(handler());
- }
- final EventLoopGroup currentChildGroup = childGroup;
- final ChannelHandler currentChildHandler = childHandler;
- final Entry<ChannelOption<?>, Object>[] currentChildOptions;
- final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
- synchronized (childOptions) {
- currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
- }
- synchronized (childAttrs) {
- currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
- }
- p.addLast(new ChannelInitializer<Channel>() {
- @Override
- public void initChannel(Channel ch) throws Exception {
- //這是一個inboundher,將其加入到serverchannel的pipeline上面去
- ch.pipeline().addLast(new ServerBootstrapAcceptor(
- currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
- }
- });
- }
代碼還是相對很簡單,首先初始化一些配置參數,然后初始化屬性,最后還要為當前的channel的pipeline添加一個handler,這個handler用來當channel注冊到eventloop上面之后對其進行一些初始化,我們還是來看看channelInitalizer的定義吧:
- public abstract class ChannelInitializer<C extends Channel> extends ChannelStateHandlerAdapter {
- private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
- /**
- * This method will be called once the {@link Channel} was registered. After the method returns this instance
- * will be removed from the {@link ChannelPipeline} of the {@link Channel}.
- *
- * @param ch the {@link Channel} which was registered.
- * @throws Exception is thrown if an error occours. In that case the {@link Channel} will be closed.
- */
- protected abstract void initChannel(C ch) throws Exception;
- @SuppressWarnings("unchecked")
- @Override
- public final void channelRegistered(ChannelHandlerContext ctx)
- throws Exception {
- boolean removed = false;
- boolean success = false;
- try {
- //調用用戶定義的init函數對當前的channel進行初始化
- initChannel((C) ctx.channel());
- ctx.pipeline().remove(this);
- removed = true;
- ctx.fireChannelRegistered();
- success = true;
- } catch (Throwable t) {
- logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
- } finally {
- if (!removed) {
- ctx.pipeline().remove(this);
- }
- if (!success) {
- ctx.close();
- }
- }
- }
- @Override
- public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
- ctx.fireInboundBufferUpdated();
- }
- }
它有一個channelRegistered方法,這個方法是在當前pipeline所屬的channel注冊到eventloop上面之后會激活的方法,它則是調用了用戶自定義的函數來初始化channel,然后在將當前handler移除。。。也就是執行
- ch.pipeline().addLast(new ServerBootstrapAcceptor(
- currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
這里又為當前的serversocketchannel添加了另外一個handler,來看看該類型的定義吧:
- private static class ServerBootstrapAcceptor
- extends ChannelStateHandlerAdapter implements ChannelInboundMessageHandler<Channel> {
- private final EventLoopGroup childGroup;
- private final ChannelHandler childHandler;
- private final Entry<ChannelOption<?>, Object>[] childOptions;
- private final Entry<AttributeKey<?>, Object>[] childAttrs;
- @SuppressWarnings("unchecked")
- ServerBootstrapAcceptor(
- EventLoopGroup childGroup, ChannelHandler childHandler,
- Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
- this.childGroup = childGroup; //這個是用於管理accept的channel的eventloop
- this.childHandler = childHandler;
- this.childOptions = childOptions;
- this.childAttrs = childAttrs;
- }
- @Override
- public MessageBuf<Channel> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
- return Unpooled.messageBuffer();
- }
- @Override
- @SuppressWarnings("unchecked")
- //當有數據進來的時候,會調用這個方法來處理數據,這里進來的數據就是accept的channel
- public void inboundBufferUpdated(ChannelHandlerContext ctx) {
- MessageBuf<Channel> in = ctx.inboundMessageBuffer(); //獲取buf
- for (;;) {
- Channel child = in.poll();
- if (child == null) {
- break;
- }
- child.pipeline().addLast(childHandler); //為accept的channel的pipeline加入用戶定義的初始化handler
- for (Entry<ChannelOption<?>, Object> e: childOptions) {
- try {
- if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
- logger.warn("Unknown channel option: " + e);
- }
- } catch (Throwable t) {
- logger.warn("Failed to set a channel option: " + child, t);
- }
- }
- for (Entry<AttributeKey<?>, Object> e: childAttrs) {
- child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
- }
- try {
- childGroup.register(child); //將當前accept的channel注冊到eventloop
- } catch (Throwable t) {
- child.unsafe().closeForcibly();
- logger.warn("Failed to register an accepted channel: " + child, t);
- }
- }
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- final ChannelConfig config = ctx.channel().config();
- if (config.isAutoRead()) {
- // stop accept new connections for 1 second to allow the channel to recover
- // See https://github.com/netty/netty/issues/1328
- config.setAutoRead(false);
- ctx.channel().eventLoop().schedule(new Runnable() {
- @Override
- public void run() {
- config.setAutoRead(true);
- }
- }, 1, TimeUnit.SECONDS);
- }
- // still let the exceptionCaught event flow through the pipeline to give the user
- // a chance to do something with it
- ctx.fireExceptionCaught(cause);
- }
- }
主要是有一個比較重要的方法,inboundBufferUpdated,這個方法是在有數據進來的時候會調用的,用於處理進來的數據,也就是accept到的channel,這里就知道我們定義的chidHandler的用處了吧,netty會將這個handler直接加入到剛剛accept到的channel的pipeline上面去。。。最后還要講當前accept到的channel注冊到child eventloop上面去,這里也就完完全全的明白了最開始定義的兩個eventloopgroup的作用了。。。
好了,serversocketchannel的init以及register差不多了,然后會調用doBind0方法,將當前的serversocketchannel綁定到一個本地端口,
- //將chanel綁定到一個本地地址
- private static void doBind0(
- final ChannelFuture regFuture, final Channel channel,
- final SocketAddress localAddress, final ChannelPromise promise) {
- // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
- // the pipeline in its channelRegistered() implementation.
- channel.eventLoop().execute(new Runnable() {
- @Override
- //匿名內部類想要訪問外面的參數,那么外面的參數必須是要final的才行
- public void run() {
- if (regFuture.isSuccess()) {
- //調用channel的bind方法,將當前的channl綁定到一個本地地址,其實是調用的是pipeline的bind方法,但是最終又是調用的當前
- //channel的unsafe對象的bind方法
- channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- } else {
- promise.setFailure(regFuture.cause());
- }
- }
- });
- }
其實這里調用bind方法最終還是調用serversocketchannel的unsafe對象的bind方法。。。。
到這里,整個serverbootstrap 就算初始化完成了,而且也可以開始運行了。。。
- b.childHandler(new ChannelInitializer<SocketChannel>(){ //為accept channel的pipeline預添加的inboundhandler
- @Override //當新連接accept的時候,這個方法會調用
- protected void initChannel(SocketChannel ch) throws Exception {
- // TODO Auto-generated method stub
- ch.pipeline().addLast(new MyChannelHandler()); //為當前的channel的pipeline添加自定義的處理函數
- }
- });
這段代碼的意思是對於剛剛accept到的channel,將會在它的pipeline上面添加handler,這個handler的用處主要是就是用戶自定義的initChannel方法,就是初始化這個channel,說白了就是為它的pipeline上面添加自己定義的handler。。。
這樣整個serverbootstrap是怎么運行的也就差不多了。。。
剛開始接觸到netty的時候覺得這里一頭霧水,通過這段時間對其代碼的閱讀,總算搞懂了其整個運行的原理,而且覺得其設計還是很漂亮的,雖然有的時候會覺得有那么一點點的繁瑣。。。。
整個運行過程總結為一下幾個步驟:
(1)創建用於兩個eventloopgroup對象,一個用於管理serversocketchannel,一個用於管理accept到的channel
(2)創建serverbootstrap對象,
(3)設置eventloopgroup
(4)創建用於構建用到的channel的工廠類
(5)設置childhandler,它的主要功能主要是用戶定義代碼來初始化accept到的channel
(6)創建serversocketchannel,並對它進行初始化,綁定端口,以及register,並為serversocketchannel的pipeline設置默認的handler
通過這幾個步驟,整個serverbootstrap也就算是運行起來了。。。