如下所示,我們寫一個簡單的Netty Demo,實現客戶端與服務端進行通訊。
1、Netty 服務端啟動類
/** * (1)、 初始化用於Acceptor的主"線程池"以及用於I/O工作的從"線程池"; * (2)、 初始化ServerBootstrap實例, 此實例是netty服務端應用開發的入口; * (3)、 通過ServerBootstrap的group方法,設置(1)中初始化的主從"線程池"; * (4)、 指定通道channel的類型,由於是服務端,故而是NioServerSocketChannel; * (5)、 設置ServerSocketChannel的處理器 * (6)、 設置子通道也就是SocketChannel的處理器, 其內部是實際業務開發的"主戰場" * (8)、 配置子通道也就是SocketChannel的選項 * (9)、 綁定並偵聽某個端口 */ public class SimpleNettyServer { public void bind(int port) throws Exception { // 服務器端應用程序使用兩個NioEventLoopGroup創建兩個EventLoop的組,EventLoop這個相當於一個處理線程,是Netty接收請求和處理IO請求的線程。 // 主線程組, 用於接受客戶端的連接,但是不做任何處理,跟老板一樣,不做事 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 從線程組, 當boss接受連接並注冊被接受的連接到worker時,處理被接受連接的流量。 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // netty服務器啟動類的創建, 輔助工具類,用於服務器通道的一系列配置 ServerBootstrap serverBootstrap = new ServerBootstrap(); /** * 使用了多少線程以及如何將它們映射到創建的通道取決於EventLoopGroup實現,甚至可以通過構造函數進行配置。 * 設置循環線程組,前者用於處理客戶端連接事件,后者用於處理網絡IO(server使用兩個參數這個) * public ServerBootstrap group(EventLoopGroup group) * public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) */ serverBootstrap.group(bossGroup, workerGroup) //綁定兩個線程組 // 用於構造socketchannel工廠 .channel(NioServerSocketChannel.class) //指定NIO的模式 /** * @Description: 初始化器,channel注冊后,會執行里面的相應的初始化方法,傳入自定義客戶端Handle(服務端在這里操作) * @Override protected void initChannel(SocketChannel channel) throws Exception { // 通過SocketChannel去獲得對應的管道 ChannelPipeline pipeline = channel.pipeline(); // 通過管道,添加handler pipeline.addLast("nettyServerOutBoundHandler", new NettyServerOutBoundHandler()); pipeline.addLast("nettyServerHandler", new NettyServerHandler()); } * 子處理器也可以通過下面的內部方法來實現。 */ .childHandler(new ChannelInitializer<SocketChannel>() { // 子處理器,用於處理workerGroup protected void initChannel(SocketChannel socketChannel) throws Exception { // socketChannel.pipeline().addLast(new NettyServerOutBoundHandler()); socketChannel.pipeline().addLast(new SimpleNettyServerHandler()); } }); // 啟動server,綁定端口,開始接收進來的連接,設置8088為啟動的端口號,同時啟動方式為同步 ChannelFuture channelFuture = serverBootstrap.bind(8088).sync(); System.out.println("server start"); // 監聽關閉的channel,等待服務器 socket 關閉 。設置位同步方式 channelFuture.channel().closeFuture().sync(); } finally { //退出線程組 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; new netty.server.SimpleNettyServer().bind(port); } }
2、Netty 服務端處理類Handler
public class SimpleNettyServerHandler extends ChannelInboundHandlerAdapter { /** * 本方法用於讀取客戶端發送的信息 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("SimpleNettyServerHandler.channelRead"); ByteBuf result = (ByteBuf) msg; byte[] bytesMsg = new byte[result.readableBytes()]; // msg中存儲的是ByteBuf類型的數據,把數據讀取到byte[]中 result.readBytes(bytesMsg); String resultStr = new String(bytesMsg); // 接收並打印客戶端的信息 System.out.println("Client said:" + resultStr); // 釋放資源,這行很關鍵 result.release(); // 向客戶端發送消息 String response = "hello client!"; // 在當前場景下,發送的數據必須轉換成ByteBuf數組 ByteBuf encoded = ctx.alloc().buffer(4 * response.length()); encoded.writeBytes(response.getBytes()); ctx.write(encoded); ctx.flush(); } /** * 本方法用作處理異常 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 當出現異常就關閉連接 cause.printStackTrace(); ctx.close(); } /** * 信息獲取完畢后操作 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
3、Netty 客戶端啟動類
public class SimpleNettyClient { public void connect(String host, int port) throws Exception { EventLoopGroup worker = new NioEventLoopGroup(); try { // 客戶端啟動類程序 Bootstrap bootstrap = new Bootstrap(); /** *EventLoop的組 */ bootstrap.group(worker); /** * 用於構造socketchannel工廠 */ bootstrap.channel(NioSocketChannel.class); /**設置選項 * 參數:Socket的標准參數(key,value),可自行百度 保持呼吸,不要斷氣! * */ bootstrap.option(ChannelOption.SO_KEEPALIVE, true); /** * 自定義客戶端Handle(客戶端在這里搞事情) */ bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new SimpleNettyClientHandler()); } }); /** 開啟客戶端監聽,連接到遠程節點,阻塞等待直到連接完成*/ ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); /**阻塞等待數據,直到channel關閉(客戶端關閉)*/ channelFuture.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); } } public static void main(String[] args) throws Exception { SimpleNettyClient client = new SimpleNettyClient(); client.connect("127.0.0.1", 8088); } }
4、客戶端處理類Handler
public class SimpleNettyClientHandler extends ChannelInboundHandlerAdapter { /** * 本方法用於接收服務端發送過來的消息 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("SimpleClientHandler.channelRead"); ByteBuf result = (ByteBuf) msg; byte[] result1 = new byte[result.readableBytes()]; result.readBytes(result1); System.out.println("Server said:" + new String(result1)); result.release(); } /** * 本方法用於處理異常 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 當出現異常就關閉連接 cause.printStackTrace(); ctx.close(); } /** * 本方法用於向服務端發送信息 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String msg = "hello Server!"; ByteBuf encoded = ctx.alloc().buffer(4 * msg.length()); encoded.writeBytes(msg.getBytes()); ctx.write(encoded); ctx.flush(); } }
先啟動服務端,然后啟動客戶端,可分別在Console得到以下輸出:
服務端:
server start
SimpleNettyServerHandler.channelRead
Client said:hello Server!
客戶端:
SimpleClientHandler.channelRead
Server said:hello client!
由此,一個Netty的簡單Demo即搭建完成。