很多小伙伴搞不清楚為啥要學習 Netty ,今天這篇文章開始之前,簡單說一下自己的看法:
@
覺得不錯的話,歡迎 star!ღ( ´・ᴗ・` )比心
- Netty 從入門到實戰系列文章地址:https://github.com/Snailclimb/netty-practical-tutorial 。
- RPC 框架源碼地址:https://github.com/Snailclimb/guide-rpc-framework
下面,我會帶着大家搭建自己的第一個 Netty 版的 Hello World 小程序。
首先,讓我們來創建服務端。
服務端
我們可以通過 ServerBootstrap
來引導我們啟動一個簡單的 Netty 服務端,為此,你必須要為其指定下面三類屬性:
- 線程組(一般需要兩個線程組,一個負責接處理客戶端的連接,一個負責具體的 IO 處理)
- IO 模型(BIO/NIO)
- 自定義
ChannelHandler
(處理客戶端發過來的數據並返回數據給客戶端)
創建服務端
/**
* @author shuang.kou
* @createTime 2020年05月14日 20:28:00
*/
public final class HelloServer {
private final int port;
public HelloServer(int port) {
this.port = port;
}
private void start() throws InterruptedException {
// 1.bossGroup 用於接收連接,workerGroup 用於具體的處理
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//2.創建服務端啟動引導/輔助類:ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
//3.給引導類配置兩大線程組,確定了線程模型
b.group(bossGroup, workerGroup)
// (非必備)打印日志
.handler(new LoggingHandler(LogLevel.INFO))
// 4.指定 IO 模型
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
//5.可以自定義客戶端消息的業務處理邏輯
p.addLast(new HelloServerHandler());
}
});
// 6.綁定端口,調用 sync 方法阻塞知道綁定完成
ChannelFuture f = b.bind(port).sync();
// 7.阻塞等待直到服務器Channel關閉(closeFuture()方法獲取Channel 的CloseFuture對象,然后調用sync()方法)
f.channel().closeFuture().sync();
} finally {
//8.優雅關閉相關線程組資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new HelloServer(8080).start();
}
}
簡單解析一下服務端的創建過程具體是怎樣的:
1.創建了兩個 NioEventLoopGroup
對象實例:bossGroup
和 workerGroup
。
bossGroup
: 用於處理客戶端的 TCP 連接請求。workerGroup
: 負責每一條連接的具體讀寫數據的處理邏輯,真正負責 I/O 讀寫操作,交由對應的 Handler 處理。
舉個例子:我們把公司的老板當做 bossGroup,員工當做 workerGroup,bossGroup 在外面接完活之后,扔給 workerGroup 去處理。一般情況下我們會指定 bossGroup 的 線程數為 1(並發連接量不大的時候) ,workGroup 的線程數量為 CPU 核心數 *2 。另外,根據源碼來看,使用 NioEventLoopGroup
類的無參構造函數設置線程數量的默認值就是 CPU 核心數 *2 。
2.創建一個服務端啟動引導/輔助類: ServerBootstrap
,這個類將引導我們進行服務端的啟動工作。
3.通過 .group()
方法給引導類 ServerBootstrap
配置兩大線程組,確定了線程模型。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
4.通過channel()
方法給引導類 ServerBootstrap
指定了 IO 模型為NIO
NioServerSocketChannel
:指定服務端的 IO 模型為 NIO,與 BIO 編程模型中的ServerSocket
對應NioSocketChannel
: 指定客戶端的 IO 模型為 NIO, 與 BIO 編程模型中的Socket
對應
5.通過 .childHandler()
給引導類創建一個ChannelInitializer
,然后指定了服務端消息的業務處理邏輯也就是自定義的ChannelHandler
對象
6.調用 ServerBootstrap
類的 bind()
方法綁定端口 。
//bind()是異步的,但是,你可以通過 `sync()`方法將其變為同步。
ChannelFuture f = b.bind(port).sync();
自定義服務端 ChannelHandler 處理消息
HelloServerHandler.java
/**
* @author shuang.kou
* @createTime 2020年05月14日 20:39:00
*/
@Sharable
public class HelloServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
ByteBuf in = (ByteBuf) msg;
System.out.println("message from client:" + in.toString(CharsetUtil.UTF_8));
// 發送消息給客戶端
ctx.writeAndFlush(Unpooled.copiedBuffer("你也好!", CharsetUtil.UTF_8));
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
這個邏輯處理器繼承自ChannelInboundHandlerAdapter
並重寫了下面 2 個方法:
channelRead()
:服務端接收客戶端發送數據調用的方法exceptionCaught()
:處理客戶端消息發生異常的時候被調用
客戶端
創建客戶端
public final class HelloClient {
private final String host;
private final int port;
private final String message;
public HelloClient(String host, int port, String message) {
this.host = host;
this.port = port;
this.message = message;
}
private void start() throws InterruptedException {
//1.創建一個 NioEventLoopGroup 對象實例
EventLoopGroup group = new NioEventLoopGroup();
try {
//2.創建客戶端啟動引導/輔助類:Bootstrap
Bootstrap b = new Bootstrap();
//3.指定線程組
b.group(group)
//4.指定 IO 模型
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// 5.這里可以自定義消息的業務處理邏輯
p.addLast(new HelloClientHandler(message));
}
});
// 6.嘗試建立連接
ChannelFuture f = b.connect(host, port).sync();
// 7.等待連接關閉(阻塞,直到Channel關閉)
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new HelloClient("127.0.0.1",8080, "你好,你真帥啊!哥哥!").start();
}
}
繼續分析一下客戶端的創建流程:
1.創建一個 NioEventLoopGroup
對象實例 (服務端創建了兩個 NioEventLoopGroup
對象)
2.創建客戶端啟動的引導類是 Bootstrap
3.通過 .group()
方法給引導類 Bootstrap
配置一個線程組
4.通過channel()
方法給引導類 Bootstrap
指定了 IO 模型為NIO
5.通過 .childHandler()
給引導類創建一個ChannelInitializer
,然后指定了客戶端消息的業務處理邏輯也就是自定義的ChannelHandler
對象
6.調用 Bootstrap
類的 connect()
方法連接服務端,這個方法需要指定兩個參數:
inetHost
: ip 地址inetPort
: 端口號
public ChannelFuture connect(String inetHost, int inetPort) {
return this.connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}
public ChannelFuture connect(SocketAddress remoteAddress) {
ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
this.validate();
return this.doResolveAndConnect(remoteAddress, this.config.localAddress());
}
connect
方法返回的是一個 Future
類型的對象
public interface ChannelFuture extends Future<Void> {
......
}
也就是說這個方是異步的,我們通過 addListener
方法可以監聽到連接是否成功,進而打印出連接信息。具體做法很簡單,只需要對代碼進行以下改動:
ChannelFuture f = b.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("連接成功!");
} else {
System.err.println("連接失敗!");
}
}).sync();
自定義客戶端 ChannelHandler 處理消息
HelloClientHandler.java
/**
* @author shuang.kou
* @createTime 2020年05月14日 20:46:00
*/
@Sharable
public class HelloClientHandler extends ChannelInboundHandlerAdapter {
private final String message;
public HelloClientHandler(String message) {
this.message = message;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("client sen msg to server " + message);
ctx.writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
System.out.println("client receive msg from server: " + in.toString(CharsetUtil.UTF_8));
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
這個邏輯處理器繼承自 ChannelInboundHandlerAdapter
,並且覆蓋了下面三個方法:
channelActive()
:客戶端和服務端的連接建立之后就會被調用channelRead
:客戶端接收服務端發送數據調用的方法exceptionCaught
:處理消息發生異常的時候被調用
運行程序
首先運行服務端 ,然后再運行客戶端。
如果你看到,服務端控制台打印出:
message from client:你好,你真帥啊!哥哥!
客戶端控制台打印出:
client sen msg to server 你好,你真帥啊!哥哥!
client receive msg from server: 你也好!
說明你的 Netty 版的 Hello World 已經完成了!
總結
這篇文章我們自己實現了一個 Netty 版的 Hello World,並且詳細介紹了服務端和客戶端的創建流程。客戶端和服務端這塊的創建流程,套路基本都差不多,差別可能就在相關配置方面。
文中涉及的代碼,你可以在這里找到:https://github.com/Snailclimb/guide-rpc-framework-learning/tree/master/src/main/java/github/javaguide/netty/echo 。