netty4.1.32學習(持續更新)


netty4.1.32學習

官方api:https://netty.io/4.1/api/index.html

Netty 實戰(精髓):https://waylau.gitbooks.io/essential-netty-in-action/

 

一、簡單介紹

本文是根據李林峰書籍《Netty權威指南》(第2版)總結而來。

二、Netty簡介

Netty是一個高性能、異步事件驅動的NIO框架,提供了對TCP、UDP和文件傳輸的支持,作為一個異步NIO框架,Netty的所有IO操作都是異步非阻塞的,通過Future-Listener機制,用戶可以方便的主動獲取或者通過通知機制獲得IO操作結果。
作為當前最流行的NIO框架,Netty在互聯網領域、大數據分布式計算領域、游戲行業、通信行業等獲得了廣泛的應用,一些業界著名的開源組件也基於Netty構建,比如RPC框架、zookeeper等。

三、Netty學習路線(與書本順序基本一致)

1、NIO入門

2、Netty入門應用介紹(helloworld:時間服務器)

3、TCP粘包/拆包問題解決方法:

  (1)LineBasedFromeDecoder+StringDecoder:按行切換的文本解碼器
  (2)DelimiterBasedFrameDecoder+StringDecoder:自動完成以分隔符做結束標志的消息的解碼器
  (3)FixedLengthFrameDecoder+StringDecoder:自動完成對定長消息的解碼器

4、Netty序列化問題解決方法:

  (1)介紹Java序列化的缺點

  (2)MessagePack

  (3)Google的ProtoBuf

    ①ProtoBuf入門與使用

    ②ProtoBuf在netty中使用

  (4)Facebook的Thrift

  (5)JBoss Marshalling

5、Netty多協議開發和應用

  (1)HTTP協議介紹

  (2)Netty HTTP+XML協議棧開發

四、Netty實例(源碼)

1、解決粘包/拆包問題(LineBasedFromeDecoder實現)

 1 package com.rs.test.timeserver;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.ChannelFuture;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelOption;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 import io.netty.handler.codec.LineBasedFrameDecoder;
12 import io.netty.handler.codec.string.StringDecoder;
13 
14 public class TimeServer {
15 
16     public void bind(int port) throws Exception {
17         // 配置服務端的NIO線程組
18         EventLoopGroup bossGroup = new NioEventLoopGroup();
19         EventLoopGroup workerGroup = new NioEventLoopGroup();
20 
21         try {
22             ServerBootstrap b = new ServerBootstrap();
23             b.group(bossGroup, workerGroup)
24                 .channel(NioServerSocketChannel.class)
25                 .option(ChannelOption.SO_BACKLOG, 1024)
26                 .childHandler(new ChildChannelHandler());
27 
28             // 綁定端口,同步等待成功
29             ChannelFuture f = b.bind(port).sync();
30 
31             // 等待服務端監聽端口關閉
32             f.channel().closeFuture().sync();
33         } finally {
34             // 優雅退出,釋放線程池資源
35             bossGroup.shutdownGracefully();
36             workerGroup.shutdownGracefully();
37         }
38     }
39 
40     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
41 
42         @Override
43         protected void initChannel(SocketChannel ch) throws Exception {
44             // 核心在下面兩行,加入了LineBasedFrameDecoder和StringDecoder兩個解碼器
45             // 所以當消息到達我們的業務處理handler即TimerServerHandler,所看到的消息
46             // 都是前面兩個解碼器經過處理之后的結果
47             ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
48             ch.pipeline().addLast(new StringDecoder());
49             ch.pipeline().addLast(new TimeServerHandler());
50         }
51 
52     }
53 
54     public static void main(String[] args) throws Exception {
55         int port = 8080;
56         if(args != null && args.length > 0) {
57             try {
58                 port = Integer.valueOf(port);
59             } catch (NumberFormatException e) {
60                 // TODO: handle exception
61             }
62         }
63         new TimeServer().bind(port);
64     }
65 
66 }
TimeServer
 1 package com.rs.test.timeserver;
 2 
 3 import java.sql.Date;
 4 
 5 import io.netty.buffer.ByteBuf;
 6 import io.netty.buffer.Unpooled;
 7 import io.netty.channel.ChannelHandlerAdapter;
 8 import io.netty.channel.ChannelHandlerContext;
 9 import io.netty.channel.ChannelInboundHandlerAdapter;
10 
11 public class TimeServerHandler extends ChannelInboundHandlerAdapter {
12 
13     private int counter = 0;
14 
15     @Override
16     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
17         String body = (String) msg;
18         // counter的作用是標記這是第幾次收到客戶端的請求
19         System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter);
20         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? 
21                 new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
22         currentTime = currentTime + System.getProperty("line.separator");
23         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
24         ctx.write(resp);
25     }
26 
27     @Override
28     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
29         ctx.flush();
30     }
31 
32     @Override
33     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
34         ctx.close();
35     }
36 }
TimeServerHandler
 1 package com.rs.test.timeserver;
 2 import io.netty.bootstrap.Bootstrap;
 3 import io.netty.channel.ChannelFuture;
 4 import io.netty.channel.ChannelInitializer;
 5 import io.netty.channel.ChannelOption;
 6 import io.netty.channel.EventLoopGroup;
 7 import io.netty.channel.nio.NioEventLoopGroup;
 8 import io.netty.channel.socket.SocketChannel;
 9 import io.netty.channel.socket.nio.NioSocketChannel;
10 import io.netty.handler.codec.LineBasedFrameDecoder;
11 import io.netty.handler.codec.string.StringDecoder;
12 
13 public class TimeClient {
14 
15     public void connect(int port, String host) throws Exception {
16         // 配置客戶端NIO線程組
17         EventLoopGroup group = new NioEventLoopGroup();
18         try {
19             Bootstrap b = new Bootstrap();
20             b.group(group).channel(NioSocketChannel.class)
21                 .option(ChannelOption.TCP_NODELAY, true)
22                 .handler(new ChannelInitializer<SocketChannel>() {
23 
24                     @Override
25                     protected void initChannel(SocketChannel ch) throws Exception {
26                         // 核心在下面兩行,加入了LineBasedFrameDecoder和StringDecoder兩個解碼器
27                         // 所以當消息到達我們的業務處理handler即TimerServerHandler,所看到的消息
28                         // 都是前面兩個解碼器經過處理之后的結果
29                         ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
30                         ch.pipeline().addLast(new StringDecoder());
31                         ch.pipeline().addLast(new TimeClientHandler());
32                     }
33                 });
34             // 發起異步連接操作
35             ChannelFuture f = b.connect(host, port).sync();
36 
37             // 等待客戶端鏈路關閉
38             f.channel().closeFuture().sync();
39         } finally {
40             // 優雅退出,釋放NIO線程組
41             group.shutdownGracefully();
42         }
43     }
44 
45     public static void main(String[] args) throws Exception {
46         int port = 8080;
47         if(args != null && args.length > 0) {
48             try {
49                 port = Integer.valueOf(port);
50             } catch (NumberFormatException e) {
51                 // 采用默認值
52             }
53         }
54         new TimeClient().connect(port, "localhost");
55     }
56 }
TimeClient
 1 package com.rs.test.timeserver;
 2 import java.util.logging.Logger;
 3 
 4 import io.netty.buffer.ByteBuf;
 5 import io.netty.buffer.Unpooled;
 6 import io.netty.channel.ChannelHandlerAdapter;
 7 import io.netty.channel.ChannelHandlerContext;
 8 import io.netty.channel.ChannelInboundHandlerAdapter;
 9 
10 public class TimeClientHandler extends ChannelInboundHandlerAdapter {
11 
12     private static final Logger logger = Logger.getLogger(TimeServerHandler.class.getName());
13 
14     private int counter;
15 
16     private byte[] req;
17 
18     public TimeClientHandler() {
19         req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
20     }
21 
22     @Override
23     public void channelActive(ChannelHandlerContext ctx) {
24         ByteBuf message = null;
25         for(int i = 0; i < 100; i++) {
26             message = Unpooled.buffer(req.length);
27             message.writeBytes(req);
28             ctx.writeAndFlush(message);
29         }
30     }
31 
32     @Override
33     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
34         String body = (String) msg;
35         // counter的作用是標記這是第幾次收到客戶端的請求
36         System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
37     }
38 
39     @Override
40     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
41         logger.warning("Unexpected exception from downstream : ");
42         ctx.close();
43     }
44 
45 }
TimeClientHandler

2.使用protobuf作為編解碼技術(包括半包處理及序列化),心跳機制動態將客戶端關閉

https://github.com/carsonWuu/Netty/tree/master/src/com/rs/test/protobuf

五、相關技術學習

1、Netty斷線重連解決方案(client端):①使用心跳機制去檢測②啟動時連接重試③運行中連接斷開時重試

https://www.jianshu.com/p/c78b37a2ca47


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM