一、前言
前面已經學習了Netty的EventLoop以及線程模型,接着學習Netty的Bootstrapping。
二、Bootstrapping
在學習了Netty中的很多組件后,如何將這些組件有效的組合至應用程序中,這需要使用應用引導程序,引導應用程序是將其配置為運行的過程,Netty以一種絕對應用程序的方式處理引導。
2.1 Bootstrap類
Bootstrap類的繼承結構圖如下圖所示。
一個服務器使用一個父通道來接受來自客戶端的連接並創建子通道來與它們進行通信,而一個客戶端很可能只需要一個非父通道來進行所有的網絡交互,而前面討論的組件會參與引導過程,有些在客戶端和服務端都會被使用。兩個應用程序類型共同的引導步驟由AbstractBootstrap處理,而特定於客戶端或服務器的引導步驟分別由Bootstrap或ServerBootstrap處理。
為何Bootstrap為Cloneable?因為有時需要創建具有類似或相同設置的多個通道,為了支持此模式,不需要為每個通道創建和配置新的引導實例,因此將AbstractBootstrap標記為Cloneable,在已配置的引導程序上調用clone()方法將返回可立即使用的另一個引導實例,這只會創建引導程序的EventLoopGroup的淺層副本,所以其被所有克隆的通道共享。
2.2 引導客戶端和無連接協議
Bootstrap在客戶端或無連接協議的應用程序中使用。
1. 引導客戶端
Bootstrap類負責為客戶端和使用無連接協議的應用程序創建通道,如下圖所示。
以下代碼引導使用NIO TCP傳輸的客戶端。
EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channeRead0( ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { System.out.println("Received data"); } } ); ChannelFuture future = bootstrap.connect( new InetSocketAddress("www.manning.com", 80)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("Connection established"); } else { System.err.println("Connection attempt failed"); channelFuture.cause().printStackTrace(); } } } );
2. Channel和EventLoopGroup兼容性
io.netty.channel包中的類結構如下。
可以看到對於NIO和OIO傳輸,都有相關的EventLoopGroup和Channel實現,不能混合具有不同前綴的組件,例如NioEventLoopGroup和OioSocketChannel。如下代碼展示不兼容的使用用法。
EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(OioSocketChannel.class) .handler(new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channelRead0( ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { System.out.println("Received data"); } } ); ChannelFuture future = bootstrap.connect( new InetSocketAddress("www.manning.com", 80)); future.syncUninterruptibly();
其中NIOEventLooop和OioSocketChannel不兼容,將會拋出IllegalStateException異常。
在調用bind或者connect方法之前,需要調用group、channel或channelFactory、handler方法,否則會拋出IllegalStateException異常。
2.3 引導服務器
ServerChannel的實現負責創建接受連接的子通道,ServerBootstrap通過bind()方法創建一個ServerChannel,ServerChannel可管理多個子通道,具體如下圖所示。
下面代碼展示了如何引導服務器。
NioEventLoopGroup group = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) .channel(NioServerSocketChannel.class) .childHandler(new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { System.out.println("Received data"); } } ); ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("Server bound"); } else { System.err.println("Bound attempt failed"); channelFuture.cause().printStackTrace(); } } } );
2.4 由通道引導客戶端
假設服務器正在處理客戶端請求,並要求服務器作為第三個系統的客戶端,如代理服務器。此時需要從ServerChannel引導客戶端通道。一個較好的方法是通過Bootstrap類的group方法傳遞Channel對應的EventLoop,因為所有分配給EventLoop的通道都使用相同的線程,這避免了額外的線程創建和相關的上下文切換。具體如下圖所示。
通過group方法共享EventLoop的代碼如下。
ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler( new SimpleChannelInboundHandler<ByteBuf>() { ChannelFuture connectFuture; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class).handler( new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channelRead0( ChannelHandlerContext ctx, ByteBuf in) throws Exception { System.out.println("Received data"); } } ); bootstrap.group(ctx.channel().eventLoop()); connectFuture = bootstrap.connect( new InetSocketAddress("www.manning.com", 80)); } @Override protected void channelRead0( ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { if (connectFuture.isDone()) { // do something with the data } } } ); ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("Server bound"); } else { System.err.println("Bind attempt failed"); channelFuture.cause().printStackTrace(); } } } );
2.5 在引導過程中添加多個ChannelHandler
在上面示例中,在引導過程中通過調用handler或者childHandler方法添加單個ChannelHandler,並且我們知道在ChannelPipeline中可以有多個ChannelHandler鏈,但是在引導過程中只添加了一個ChannelHandler。Netty提供了ChannelInboundHandlerAdapter,其提供了initChannel方法,該方法可以將多個ChannelHandler添加至ChannelPipeline中,你只需要將ChannelInitializer的實現提供給引導程序,而一旦Channel注冊了EventLoop,那么initChannel方法將被調用,當方法返回后,ChannelInitializer將自身從ChannelPipeline中移除,如下代碼展示了具體的操作。
ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializerImpl()); ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080)); future.sync();
final class ChannelInitializerImpl extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpClientCodec()); pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE)); } }
2.6 使用Netty的ChannelOptions和attributes
當創建通道時手動配置非常麻煩,可以使用option方法將ChannelOptions提供給引導程序,你提供的值將自動應用於在引導中創建的所有通道。ChannelOptions包括連接詳細信息,如保持活動、超時屬性和緩沖區設置等。Netty還可使用AttributeKey抽象類來配置屬性值,如下代碼展示了具體使用。
final AttributeKey<Integer> id = new AttributeKey<Integer>("ID"); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler( new SimpleChannelInboundHandler<ByteBuf>() { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { Integer idValue = ctx.channel().attr(id).get(); // do something with the idValue @Override protected void channelRead0( ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { System.out.println("Received data"); } } ); bootstrap.option(ChannelOption.SO_KEEPALIVE,true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); bootstrap.attr(id, 123456); ChannelFuture future = bootstrap.connect( new InetSocketAddress("www.manning.com", 80)); future.syncUninterruptibly();
2.7 引導DatagramChannels
前面的示例使用的SocketChannel,是基於TCP協議,但是Bootstrap也可以用於無連接的協議,如UDP協議,唯一的區別在於不調用connect方法,只使用bind方法,具體如下代碼所示。
Bootstrap bootstrap = new Bootstrap(); bootstrap.group(new OioEventLoopGroup()).channel( OioDatagramChannel.class).handler( new SimpleChannelInboundHandler<DatagramPacket>(){ @Override public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception { // Do something with the packet } } ); ChannelFuture future = bootstrap.bind(new InetSocketAddress(0)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("Channel bound"); } else { System.err.println("Bind attempt failed"); channelFuture.cause().printStackTrace(); } } })
2.8 關閉
引導使得應用運行,但是之后需要優雅的關閉引導。需要關閉EventLoopGroup,可調用EventLoopGroup.shutdownGracefully() 方法,其是異步的,會返回ChannelFuture,在觀泉關閉后會收到通知,下面是使用示例。
EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class); ... Future<?> future = group.shutdownGracefully(); // block until the group has shutdown future.syncUninterruptibly();
也可以在調用shutdownGracefully方法之前顯示調用close方法,要讓EventLoopGroup自己主動關閉。
三、總結
本篇博文講解了Bootstrap,包括客戶端和服務端的引導,以及如何啟動標准的客戶端和服務端程序。也謝謝各位園友的觀看~