本篇簡單介紹java
基於高性能網絡框架Netty
的tcp
通訊。
Netty
Netty
的強大之處在於,它的高度抽象和封裝。使用者無需關心內部實現。只需要修改相關handler
類即可。
客戶端
package tcp;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class TcpClient {
/* Server Ip */
public static String HOST = "127.0.0.1";
/* Server Port */
public static int PORT = 12340;
public static Bootstrap bootstrap = getBootstrap();
public static Channel channel = getChannel(HOST, PORT);
// 初始化 `Bootstrap`
public static final Bootstrap getBootstrap() {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class);
b.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
// 每個 `Channel` 都關聯一個 `ChannelPipeline`
/* 發送和接收的 `object`通過`ObjectDecoder` `ObjectEncoder`進行加解密
* 注:對應`object`類,必須實現`Serializable`接口
*
* `netty`框架本身自帶了很多`Encode`和`DeCode`
* 例如:字符串的 `StringDecoder` `StringEncoder`
*/
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast("handler", new TcpClientHandler());
}
});
b.option(ChannelOption.SO_KEEPALIVE, true);
return b;
}
// 建立連接
public static final Channel getChannel(String host, int port) {
Channel channel = null;
try {
channel = bootstrap.connect(host, port).sync().channel();
} catch (Exception e) {
System.out.println("連接Server(IP{},PORT{})失敗");
return null;
}
return channel;
}
// 向服務器發送消息
public static void sendMsg(Object msg) throws Exception {
if (channel != null) {
channel.writeAndFlush(msg).sync();
} else {
System.out.println("消息發送失敗,連接尚未建立!");
}
}
}
客戶端對應的handler
。
package tcp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class TcpClientHandler extends SimpleChannelInboundHandler<Object> {
// 從服務器接收到的信息 `Object`
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
}
}
服務端
package tcp;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class TcpServer {
private static final String IP = "192.168.1.154";
private static final int PORT = 12340;
/** 用於分配處理業務線程的線程組個數 */
protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors() * 2; // 默認
/** 業務出現線程大小 */
protected static final int BIZTHREADSIZE = 4;
/*
* NioEventLoopGroup實際上就是個線程池,
* NioEventLoopGroup在后台啟動了n個NioEventLoop來處理Channel事件,
* 每一個NioEventLoop負責處理m個Channel,
* NioEventLoopGroup從NioEventLoop數組里挨個取出NioEventLoop來處理Channel
*/
private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);
private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);
public static void run() throws Exception {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new TcpServerHandler());
}
});
b.bind(IP, PORT).sync();
System.out.println("TCP服務器已啟動");
}
protected static void shutdown() {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
System.out.println("啟動TCP服務器...");
TcpServer.run();
// TcpServer.shutdown();
}
}
服務器對應的handler
。
package tcp;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
public class TcpServerHandler extends SimpleChannelInboundHandler<Object> {
// 從客戶端接收到的消息
/*
* 服務器向指定客戶端發送消息,只需要通過`map`將客戶端的`id`和`channel`存起來
* 在需要的時候通過`writeAndFlush`方法發送即可
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
SimpleChannelInboundHandler
生命周期
如上開始所說我們只需要處理相應的handler
即可ChannelHandler
。
我們只需要根據業務需要在相應的方法里面做業務處理即可。
ChannelHandler
的子接口ChannelInboundHandler
處理進站數據,ChannelOutboundHandler
處理出站數據,允許攔截各種操作。
ChannelInboundHandler
ChannelInboundHandler
生命周期對應的方法
-
channelRegistered channel被注冊到EventLoop並且可以處理io
-
channelUnregistered channel從EventLoop卸載,並且不能處理io
-
channelActive channel變為active模式,通道connected/boundb准備好了
-
channelInactive channel不活躍,不再連接遠程的
-
channelReadComplete channel上的讀操作完成了
-
channelRead 數據從Channel中讀出了
-
channelWritabilityChanged Channel的讀寫性改變時調用,
-
userEventTriggered(...) 用戶調用Channel.fireUserEventTriggered(...),從ChannelPipeline傳遞特定的消息
ChannelOutboundHandler
ChannelOutboundHandler
供了出站的方法,這些方法會被Channel
, ChannelPipeline
, 和 ChannelHandlerContext
調用
-
bind 請求綁定Channel到一個本地地址
-
connect 請求連接Channel到遠端
-
disconnect 請求從遠端斷開Channel
-
close 請求關閉Channel
-
deregister 請求Channel從它的EventLoop上解除注冊
-
read 請求從Channel中讀更多的數據
-
write 請求通過Channel刷隊列數據到遠端
-
flush 請求通過Channel寫數據到遠端