Netty 拆包粘包和服務啟動流程分析


Netty 拆包粘包和服務啟動流程分析

通過本章學習,筆者希望你能掌握EventLoopGroup的工作流程,ServerBootstrap的啟動流程,ChannelPipeline是如何操作管理Channel。只有清楚這些,才能更好的了解和使用Netty。還在等什么,快來學習吧!

知識結構圖:

Netty

技術:Netty,拆包粘包,服務啟動流程
說明:若你對NIO有一定的了解,對於本章知識來說有很大的幫助!NIO教程
源碼:https://github.com/ITDragonBlog/daydayup/tree/master/Netty/netty-stu

Netty 重要組件

這里讓你清楚了解 ChannelPipeline,ChannelHandlerContext,ChannelHandler,Channel 四者之間的關系。
這里讓你清楚了解 NioEventLoopGroup,NioEventLoop,Channel 三者之間的關系。
這里讓你清楚了解 ServerBootstrap,Channel 兩者之間的關系。
看懂了這塊的理論知識,后面Netty拆包粘包的代碼就非常的簡單。

Channel

Channel : Netty最核心的接口。NIO通訊模式中通過Channel進行Socket套接字的讀,寫和同時讀寫操作。
ChannelHandler : 因為直接使用Channel會比較麻煩,所以在Netty編程中通過ChannelHandler間接操作Channel,從而簡化開發。
ChannelPipeline : 可以理解為一個管理ChandlerHandler的鏈表。對Channel進行操作時,Pipeline負責從尾部依次調用每一個Handler進行處理。每個Channel都有一個屬於自己的ChannelPipeline。
ChannelHandlerContext : ChannelPipeline通過ChannelHandlerContext間接管理每個ChannelHandler。

如下圖所示,結合代碼,在服務器初始化和客戶端創建連接的過程中加了四個Handler,分別是日志事務,字符串分割解碼器,接受參數轉字符串解碼器,處理任務的Handler。

NioEventLoopGroup

EventLoopGroup : 本質是個線程池,繼承了ScheduledExecutorService 定時任務線程池。
NioEventLoopGroup : 是用來處理NIO通信模式的線程池。每個線程池有N個NioEventLoop來處理Channel事件,每一個NioEventLoop負責處理N個Channel。
NioEventLoop : 負責不停地輪詢IO事件,處理IO事件和執行任務,類比多路復用器,細化分三件事。
1 輪詢注冊到Selector上所有的Channel的IO事件
2 處理產生網絡IO事件的Channel
3 處理隊列中的任務

ServerBootstrap

本章重點,Netty是如何通過NIO輔助啟動類來初始化Channel的?先看下面的源碼。

@Override
void init(Channel channel) throws Exception {
	final Map<ChannelOption<?>, Object> options = options0();
	synchronized (options) {
		setChannelOptions(channel, options, logger);
	}
	final Map<AttributeKey<?>, Object> attrs = attrs0();
	synchronized (attrs) {
		for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
			@SuppressWarnings("unchecked")
			AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
			channel.attr(key).set(e.getValue());
		}
	}
	ChannelPipeline p = channel.pipeline();
	final EventLoopGroup currentChildGroup = childGroup;
	final ChannelHandler currentChildHandler = childHandler;
	final Entry<ChannelOption<?>, Object>[] currentChildOptions;
	final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
	synchronized (childOptions) {
		currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
	}
	synchronized (childAttrs) {
		currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
	}
	p.addLast(new ChannelInitializer<Channel>() {
		@Override
		public void initChannel(final Channel ch) throws Exception {
			final ChannelPipeline pipeline = ch.pipeline();
			ChannelHandler handler = config.handler();
			if (handler != null) {
				pipeline.addLast(handler);
			}
			ch.eventLoop().execute(new Runnable() {
				@Override
				public void run() {
					pipeline.addLast(new ServerBootstrapAcceptor(
							ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
				}
			});
		}
	});
}

服務器啟動和連接過程:
第一步:是給Channel設置options和attrs,
第二步:復制childGroup,childHandler,childOptions和childAttrs等待服務器和客戶端連接,
第三步:實例化一個ChannelInitializer,添加到Pipeline的末尾。
第四步:當Channel注冊到NioEventLoop時,ChannelInitializer觸發initChannel方法,pipeline裝入自定義的Handler,給Channel設置一下child配置。

小結:
1 group,options,attrs,handler,是在服務器端初始化時配置,是AbstractBootstrap的方法。
2 childGroup,childOption,childAttr,childHandler,是在服務器與客戶端建立Channel后配置,是ServerBootstrap的方法。
3 Bootstrap 和 ServerBootstrap 都繼承了AbstractBootstrap類。
4 若不設置childGroup,則默認取group值。
5 Bootstrap 和 ServerBootstrap 啟動服務時,都會執行驗證方法,判斷必填參數是否都有配置。

Netty 拆包粘包

這里通過介紹Netty拆包粘包問題來對Netty進行入門學習。
在基於流的傳輸中,即便客戶端發送獨立的數據包,操作系統也會將其轉換成一串字節隊列,而服務端一次讀取到的字節數又不確定。再加上網絡傳輸的快慢。服務端很難完整的接收到數據。
常見的拆包粘包方法有三種
1 服務端設置一次接收字節的長度。若服務端接收的字節長度不滿足要求則一直處於等待。客戶端為滿足傳輸的字節長度合格,可以考慮使用空格填充。
2 服務端設置特殊分隔符。客戶端通過特殊分隔符粘包,服務端通過特殊分隔符拆包。
3 自定義協議。數據傳輸一般分消息頭和消息體,消息頭中包含了數據的長度。服務端先接收到消息頭,得知需要接收N個數據,然后服務端接收直到數據為N個為止。
本章采用第二種,用特殊分隔符的方式。

創建服務端代碼流程

第一步:准備兩個線程池。一個用於接收事件的boss線程池,另一個用於處理事件的worker線程池。
第二步:服務端實例化ServerBootstrap NIO服務輔助啟動類。用於簡化提高開發效率。
第三步:配置服務器啟動參數。比如channel的類型,接收channel的EventLoop,初始化的日志打印事件,建立連接后的事件(拆包,對象轉字符串,自定義事件),初始化的配置和建立連接后的配置。
第四步:綁定端口,啟動服務。Netty會根據第三步配置的參數啟動服務。
第五步:關閉資源。

package com.itdragon.delimiter;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;  
public class ITDragonServer {  
      
    private static final Integer PORT = 8888; 							// 被監聽端口號
    private static final String DELIMITER = "_$"; 						// 拆包分隔符  
      
    public static void main(String[] args) {  
        EventLoopGroup bossGroup = new NioEventLoopGroup(); 			// 用於接收進來的連接
        EventLoopGroup workerGroup = new NioEventLoopGroup(); 			// 用於處理進來的連接
        try {  
            ServerBootstrap serverbootstrap = new ServerBootstrap(); 	// 啟動NIO服務的輔助啟動類
            serverbootstrap.group(bossGroup, workerGroup) 				// 分別設置bossGroup, workerGroup 順序不能反
            .channel(NioServerSocketChannel.class) 						// Channel的創建工廠,啟動服務時會通過反射的方式來創建一個NioServerSocketChannel對象
            .handler(new LoggingHandler(LogLevel.INFO))					// handler在初始化時就會執行,可以設置打印日志級別
            .childHandler(new ChannelInitializer<SocketChannel>() {  	// childHandler會在客戶端成功connect后才執行,這里實例化ChannelInitializer
                @Override  
                protected void initChannel(SocketChannel socketChannel) throws Exception { 	// initChannel方法執行后刪除實例ChannelInitializer,添加以下內容
                    ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes());  		// 獲取特殊分隔符的ByteBuffer
                    socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(128, delimiter)); // 設置特殊分隔符用於拆包 
//                    socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(8));  設置指定長度分割
                    socketChannel.pipeline().addLast(new StringDecoder());  				// 設置字符串形式的解碼  
                    socketChannel.pipeline().addLast(new ITDragonServerHandler());			// 自定義的服務器處理類,負責處理事件
                }  
            })  
            .option(ChannelOption.SO_BACKLOG, 128) 						// option在初始化時就會執行,設置tcp緩沖區  
            .childOption(ChannelOption.SO_KEEPALIVE, true); 			// childOption會在客戶端成功connect后才執行,設置保持連接  
            ChannelFuture future = serverbootstrap.bind(PORT).sync(); 	// 綁定端口, 阻塞等待服務器啟動完成,調用sync()方法會一直阻塞等待channel的停止
            future.channel().closeFuture().sync(); 						// 等待關閉 ,等待服務器套接字關閉
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            workerGroup.shutdownGracefully(); 							// 關閉線程組,先打開的后關閉  
            bossGroup.shutdownGracefully();  
        }  
    }  
}  

核心參數說明

NioEventLoopGroup : 是用來處理I/O操作的多線程事件循環器。 Netty提供了許多不同的EventLoopGroup的實現用來處理不同傳輸協議。
ServerBootstrap : 啟動NIO服務的輔助啟動類。先配置Netty服務端啟動參數,執行bind(PORT)方法才算真正啟動服務。
group : 注冊EventLoopGroup
channel : channelFactory,用於配置通道的類型。
handler : 服務器始化時就會執行的事件。
childHandler : 服務器在和客戶端成功連接后會執行的事件。
initChannel : channelRegistered事件觸發后執行,刪除ChannelInitializer實例,添加該方法體中的handler。
option : 服務器始化的配置。
childOption : 服務器在和客戶端成功連接后的配置。
SocketChannel : 繼承了Channel,通過Channel可以對Socket進行各種操作。
ChannelHandler : 通過ChannelHandler來間接操縱Channel,簡化了開發。
ChannelPipeline : 可以看成是一個ChandlerHandler的鏈表。
ChannelHandlerContext : ChannelPipeline通過ChannelHandlerContext來間接管理ChannelHandler。

自定義服務器處理類

第一步:繼承 ChannelInboundHandlerAdapter,其父類已經實現了ChannelHandler接口,簡化了開發。
第二步:覆蓋 chanelRead()事件處理方法 ,每當服務器從客戶端收到新的數據時,該方法會在收到消息時被調用。
第三步:釋放 ByteBuffer,ByteBuf是一個引用計數對象,這個對象必須顯示地調用release()方法來釋放。
第四步:異常處理,即當Netty由於IO錯誤或者處理器在處理事件時拋出的異常時觸發。在大部分情況下,捕獲的異常應該被記錄下來並且把關聯的channel給關閉掉。

package com.itdragon.delimiter;
import com.itdragon.utils.ITDragonUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;  
  
public class ITDragonServerHandler extends ChannelInboundHandlerAdapter{  
	private static final String DELIMITER = "_$"; // 拆包分隔符  
    @Override  
    public void channelRead(ChannelHandlerContext chc, Object msg) {  
        try {  
        	// 普通讀寫數據
        	/* 設置字符串形式的解碼 new StringDecoder() 后可以直接使用
            ByteBuf buf = (ByteBuf) msg;  
            byte[] req = new byte[buf.readableBytes()];  
            buf.readBytes(req);  
            String body = new String(req, "utf-8");  
            */
        	System.out.println("Netty Server : " + msg.toString());
            // 分隔符拆包  
            String response = ITDragonUtil.cal(msg.toString())+ DELIMITER;  
            chc.channel().writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            ReferenceCountUtil.release(msg); // 寫入方法writeAndFlush ,Netty已經釋放了
        }  
    }  
    // 當出現Throwable對象才會被調用
    @Override  
    public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) {  
        // 這個方法的處理方式會在遇到不同異常的情況下有不同的實現,比如你可能想在關閉連接之前發送一個錯誤碼的響應消息。  
        cause.printStackTrace();  
        chc.close();  
    }  
}  

客戶端啟動流程

第一步:創建一個用於發送請求的線程池。
第二步:客戶端實例化Bootstrap NIO服務啟動輔助類,簡化開發。
第三步:配置參數,粘包,發送請求。
第四步:關閉資源。
值得注意的是,和ServerBootstrap不同,它並沒有childHandler和childOption方法。

package com.itdragon.delimiter;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;  
import io.netty.channel.ChannelFuture;  
import io.netty.channel.ChannelInitializer;  
import io.netty.channel.ChannelOption;  
import io.netty.channel.nio.NioEventLoopGroup;  
import io.netty.channel.socket.SocketChannel;  
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;  
  
public class ITDragonClient {  
      
    private static final Integer PORT = 8888;  
    private static final String HOST = "127.0.0.1";  
    private static final String DELIMITER = "_$"; // 拆包分隔符  
      
    public static void main(String[] args) {  
        NioEventLoopGroup group = new NioEventLoopGroup();  
        try {  
            Bootstrap bootstrap = new Bootstrap();  
            bootstrap.group(group)  
            .channel(NioSocketChannel.class)  
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override  
                protected void initChannel(SocketChannel socketChannel) throws Exception { 
                	ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER.getBytes());  
                    // 設置特殊分隔符
                    socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(128, delimiter));  
                    // 設置指定長度分割  不推薦,兩者選其一
//                    socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(8));  
                    socketChannel.pipeline().addLast(new StringDecoder()); 
                    socketChannel.pipeline().addLast(new ITDragonClientHandler());  
                }  
            })  
            .option(ChannelOption.SO_KEEPALIVE, true);  
              
            ChannelFuture future = bootstrap.connect(HOST, PORT).sync(); // 建立連接  
            future.channel().writeAndFlush(Unpooled.copiedBuffer(("1+1"+DELIMITER).getBytes()));  
            future.channel().writeAndFlush(Unpooled.copiedBuffer(("6+1"+DELIMITER).getBytes()));  
            future.channel().closeFuture().sync();  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            group.shutdownGracefully();  
        }  
    }  
}  

客戶端請求接收類

和服務器處理類一樣,這里只負責打印數據。

package com.itdragon.delimiter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;  
public class ITDragonClientHandler extends ChannelInboundHandlerAdapter{  
          
	@Override 
    public void channelRead(ChannelHandlerContext chc, Object msg) {  
        try {  
        	/* 設置字符串形式的解碼 new StringDecoder() 后可以直接使用
            ByteBuf buf = (ByteBuf) msg;  
            byte[] req = new byte[buf.readableBytes()];  
            buf.readBytes(req);  
            String body = new String(req, "utf-8");  
            */
            System.out.println("Netty Client :" + msg);  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            ReferenceCountUtil.release(msg);
        }  
    }  
    public void exceptionCaught(ChannelHandlerContext chc, Throwable cause) {  
        cause.printStackTrace();  
        chc.close();  
    }  
}  

打印結果

一月 29, 2018 11:31:10 上午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xcf3a3ac1] REGISTERED
一月 29, 2018 11:31:11 上午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xcf3a3ac1] BIND: 0.0.0.0/0.0.0.0:8888
一月 29, 2018 11:31:11 上午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xcf3a3ac1, L:/0:0:0:0:0:0:0:0:8888] ACTIVE
一月 29, 2018 11:31:18 上午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xcf3a3ac1, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0xf1b8096b, L:/127.0.0.1:8888 - R:/127.0.0.1:4777]
一月 29, 2018 11:31:18 上午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xcf3a3ac1, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
Netty Server : 1+1
Netty Server : 6+1

Netty Client :2
Netty Client :7

從日志中可以看出Channel的狀態從REGISTERED ---> ACTIVE ---> READ ---> READ COMPLETE。服務端也是按照特殊分割符拆包。

總結

看完本章,你必須要掌握的三個知識點:NioEventLoopGroup,ServerBootstrap,ChannelHandlerAdapter
1 NioEventLoopGroup 本質就是一個線程池,管理多個NioEventLoop,一個NioEventLoop管理多個Channel。
2 NioEventLoop 負責不停地輪詢IO事件,處理IO事件和執行任務。
3 ServerBootstrap 是NIO服務的輔助啟動類,先配置服務參數,后執行bind方法啟動服務。
4 Bootstrap 是NIO客戶端的輔助啟動類,用法和ServerBootstrap類似。
5 Netty 使用FixedLengthFrameDecoder 固定長度拆包,DelimiterBasedFrameDecoder 分隔符拆包。

到這里,Netty的拆包粘包,以及Netty的重要組件,服務器啟動流程到這里就結束了,如果覺得不錯可以點一個** "推薦" ** ,也可以** "關注" **我哦。

優質文章

http://blog.csdn.net/spiderdog/article/category/1800249
https://www.jianshu.com/p/c5068caab217


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM