什么是Netty
- Netty是一個基於Java NIO的編寫客服端服務器的框架,是一個異步事件框架。
- 官網https://netty.io/
為什么選擇Netty
由於JAVA NIO編寫服務器的過程過於復雜且不易掌控,所以我們選擇Netty框架進行開發。
- 具有很高的的性能。
- 且比NIO更容易編碼和維護。
- 實踐者眾多,Elastic Search,dubbo,Akka,grpc等等
- 社區很成熟。
Netty的常用組件
- EventLoopGroup 相當於Reactor線程組,常用NioEventLoopGroup
- Channel 連接到網絡套接字或能夠進行I/O操作(如讀、寫、連接和綁定)的組件的連接。常用NioServerSocketChannel,NioSocketChannel,SocketChannel
- Bootstrap/ServerBootstrap 輔助類
- ChannelPipeline 處理或截取通道的入站事件和出站操作的通道處理程序列表
- ChannelHandler 處理I/O事件或攔截I/O操作,並將其轉發到其ChannelPipeline中的下一個處理程序。常用ChannelInboundHandlerAdapter和SimpleChannelInboundHandler,編碼器,解碼器。
- ChannelFuture 異步操作使用。
Netty實現一個服務器
服務器代碼:
/**
* @author monkjavaer
* @date 2019/7/18 14:56
*/
public class NettyServer {
private static Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
public static int PORT = 8080;
public static void connect(){
//配置兩個服務端的NIO線程組,一個用於接收客服端的鏈接,另一個用於進行SocketChannel的網絡讀寫。
//NioEventLoopGroup是一個處理I/O操作的多線程事件循環
//"boss":接收一個傳入連接
EventLoopGroup boss = new NioEventLoopGroup();
//"worker" : 當boss接收連接並把接收的連接注冊給worker,work就開始處理
EventLoopGroup worker = new NioEventLoopGroup();
try {
//ServerBootstrap是一個幫助類,可以設置服務器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,worker)
//NioServerSocketChannel用於實例化新通道來接收傳入的連接
.channel(NioServerSocketChannel.class)
//配置日志
.handler(new LoggingHandler(LogLevel.INFO))
//ChannelInitializer用於配置新通道
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//通過ChannelPipeline添加處理類ChannelHandler
//通常有很多處理類,可以將這個內部類new ChannelInitializer提為一個獨立的類
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyServerHandler());
}
})
//ChannelOption和ChannelConfig可以設置各種參數
.option(ChannelOption.SO_BACKLOG, 128)
//option()用於接受傳入連接的NioServerSocketChannel,childOption()用於父ServerChannel接受的通道
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
//異步地綁定服務器;調用 sync()方法阻塞等待直到綁定完成
ChannelFuture f = bootstrap.bind(PORT).sync();
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
NettyServer.connect();
}
}
- 首先創建兩個NioEventLoopGroup線程組,一個用於接收客服端的鏈接,另一個用於進行SocketChannel的網絡讀寫。
- 創建一個服務器幫助類ServerBootstrap,將boss,worker兩個線程組加入
- 通過ServerBootstrap流式設置服務器
- 設置通道為NioServerSocketChannel
- 通過ChannelInitializer設置自定義處理器NettyServerHandler,並將他加入ChannelPipeline,這里用內部類簡易實現,真實線上環境我們應該提取為相應的類。
- 通過option和childOption設置TCP相關參數。
- 異步地綁定服務器;調用 sync()方法阻塞等待直到綁定完成
- 最后關閉相關資源
服務器處理類:
客戶端的處理類和服務器類似。
/**
* ChannelHandler.Sharable 標注一個channel handler可以被多個channel安全地共享
* ChannelInboundHandlerAdapter實現了ChannelInboundHandler
* 回調事件處理類
*
* @author monkjavaer
* @date 2019/7/18 15:36
*/
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static Logger LOGGER = LoggerFactory.getLogger(NettyServerHandler.class);
/**
* 新的連接被建立時調用
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("client {} connected.", ctx.channel().remoteAddress());
ctx.writeAndFlush(Unpooled.copiedBuffer("hello client!", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
//獲取緩沖區可讀字節數
int readableBytes = byteBuf.readableBytes();
byte[] bytes = new byte[readableBytes];
byteBuf.readBytes(bytes);
LOGGER.info("readableBytes is{},server received message:{}", readableBytes, new String(bytes, StandardCharsets.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
// ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
// .addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error("server exceptionCaught,{}",cause.getMessage());
ctx.close();
}
}
- 自定義處理類繼承自ChannelInboundHandlerAdapter
- 重寫我們需要的方法,channelActive(新的連接被建立時調用),channelRead(讀取數據),channelReadComplete(讀取最后的一條信息),exceptionCaught(發生異常時調用)
Netty實現一個客戶端
客戶端
/**
* @author monkjavaer
* @date 2019/7/18 17:17
*/
public class NettyClient {
private static Logger LOGGER = LoggerFactory.getLogger(NettyClient.class);
public static String IP = "127.0.0.1";
public static int PORT = 8080;
public static void main(String[] args) {
EventLoopGroup client = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(client)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new NettyClientHandler());
}
});
ChannelFuture f = bootstrap.connect(IP,PORT).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
client.shutdownGracefully();
}
}
}
- 客戶端這里只創建一個線程組即可
- 幫助類這里使用Bootstrap
- 設置通道為NioSocketChannel
- 其他類容和服務器雷士
- 和服務器建立連接
客戶端處理類
/**
* @author monkjavaer
* @date 2019/7/18 17:26
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private static Logger LOGGER = LoggerFactory.getLogger(NettyServerHandler.class);
/**
* 新的連接被建立時調用
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("server {} connected.", ctx.channel().remoteAddress());
ctx.writeAndFlush(Unpooled.copiedBuffer("hello server!", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
//獲取緩沖區可讀字節數
int readableBytes = byteBuf.readableBytes();
byte[] bytes = new byte[readableBytes];
byteBuf.readBytes(bytes);
LOGGER.info("readableBytes is{},client received message:{}", readableBytes, new String(bytes, StandardCharsets.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error("server exceptionCaught,{}",cause.getMessage());
ctx.close();
}
}
