java Netty tcp通訊


本篇簡單介紹java基於高性能網絡框架Nettytcp通訊。

Netty

Netty的強大之處在於,它的高度抽象和封裝。使用者無需關心內部實現。只需要修改相關handler類即可。

客戶端


package tcp;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;


public class TcpClient {

    /* Server Ip */
	public static String HOST = "127.0.0.1";
	/* Server Port */
	public static int PORT = 12340;

	public static Bootstrap bootstrap = getBootstrap();
	public static Channel channel = getChannel(HOST, PORT);

	// 初始化 `Bootstrap`
	public static final Bootstrap getBootstrap() {
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(group).channel(NioSocketChannel.class);
		b.handler(new ChannelInitializer<Channel>() {
			@Override
			protected void initChannel(Channel ch) throws Exception {
			    
				// 每個 `Channel` 都關聯一個 `ChannelPipeline`
				/* 發送和接收的 `object`通過`ObjectDecoder` `ObjectEncoder`進行加解密
				 * 注:對應`object`類,必須實現`Serializable`接口
				 *
				 * `netty`框架本身自帶了很多`Encode`和`DeCode`
				 *  例如:字符串的 `StringDecoder` `StringEncoder`
				 */
				 
				ChannelPipeline pipeline = ch.pipeline();
				pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
				pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
				pipeline.addLast(new ObjectEncoder());
				pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
				pipeline.addLast("handler", new TcpClientHandler());
			}
		});
		b.option(ChannelOption.SO_KEEPALIVE, true);
		return b;
	}

	// 建立連接
	public static final Channel getChannel(String host, int port) {
		Channel channel = null;
		try {
			channel = bootstrap.connect(host, port).sync().channel();
		} catch (Exception e) {
			System.out.println("連接Server(IP{},PORT{})失敗");
			return null;
		}
		return channel;
	}

	// 向服務器發送消息
	public static void sendMsg(Object msg) throws Exception {
		if (channel != null) {
			channel.writeAndFlush(msg).sync();
		} else {
			System.out.println("消息發送失敗,連接尚未建立!");
		}
	}

}


客戶端對應的handler


package tcp;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;


public class TcpClientHandler extends SimpleChannelInboundHandler<Object> {

    // 從服務器接收到的信息 `Object`
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
		
	}
}

服務端


package tcp;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;


public class TcpServer {
	private static final String IP = "192.168.1.154";
	private static final int PORT = 12340;
	/** 用於分配處理業務線程的線程組個數 */
	protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors() * 2; // 默認
	/** 業務出現線程大小 */
	protected static final int BIZTHREADSIZE = 4;
	/*
	 * NioEventLoopGroup實際上就是個線程池,
	 * NioEventLoopGroup在后台啟動了n個NioEventLoop來處理Channel事件,
	 * 每一個NioEventLoop負責處理m個Channel,
	 * NioEventLoopGroup從NioEventLoop數組里挨個取出NioEventLoop來處理Channel
	 */
	private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
	private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);

	public static void run() throws Exception {
		ServerBootstrap b = new ServerBootstrap();
		b.group(bossGroup, workerGroup);
		b.channel(NioServerSocketChannel.class);
		b.childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch) throws Exception {
				ChannelPipeline pipeline = ch.pipeline();
				pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
				pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
				pipeline.addLast(new ObjectEncoder());
				pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));

				pipeline.addLast(new TcpServerHandler());
			}
		});

		b.bind(IP, PORT).sync();

		System.out.println("TCP服務器已啟動");
	}

	protected static void shutdown() {
		workerGroup.shutdownGracefully();
		bossGroup.shutdownGracefully();
	}

	public static void main(String[] args) throws Exception {
		System.out.println("啟動TCP服務器...");
		TcpServer.run();
		// TcpServer.shutdown();
	}
}

服務器對應的handler


package tcp;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;


public class TcpServerHandler extends SimpleChannelInboundHandler<Object> {

    // 從客戶端接收到的消息
	/*  
	 *  服務器向指定客戶端發送消息,只需要通過`map`將客戶端的`id`和`channel`存起來
	 *  在需要的時候通過`writeAndFlush`方法發送即可
	 */
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
		
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}


}

SimpleChannelInboundHandler生命周期

如上開始所說我們只需要處理相應的handler即可ChannelHandler
我們只需要根據業務需要在相應的方法里面做業務處理即可。

ChannelHandler的子接口ChannelInboundHandler處理進站數據,ChannelOutboundHandler 處理出站數據,允許攔截各種操作。

ChannelInboundHandler

ChannelInboundHandler 生命周期對應的方法

  • channelRegistered channel被注冊到EventLoop並且可以處理io

  • channelUnregistered channel從EventLoop卸載,並且不能處理io

  • channelActive channel變為active模式,通道connected/boundb准備好了

  • channelInactive channel不活躍,不再連接遠程的

  • channelReadComplete channel上的讀操作完成了

  • channelRead 數據從Channel中讀出了

  • channelWritabilityChanged Channel的讀寫性改變時調用,

  • userEventTriggered(...) 用戶調用Channel.fireUserEventTriggered(...),從ChannelPipeline傳遞特定的消息

ChannelOutboundHandler

ChannelOutboundHandler供了出站的方法,這些方法會被Channel, ChannelPipeline, 和 ChannelHandlerContext調用

  • bind 請求綁定Channel到一個本地地址

  • connect 請求連接Channel到遠端

  • disconnect 請求從遠端斷開Channel

  • close 請求關閉Channel

  • deregister 請求Channel從它的EventLoop上解除注冊

  • read 請求從Channel中讀更多的數據

  • write 請求通過Channel刷隊列數據到遠端

  • flush 請求通過Channel寫數據到遠端


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM