在學習Netty前的必需知識:NIO。如果不了解可以看這篇:手動搭建I/O網絡通信框架3:NIO編程模型,升級改造聊天室。
對於BIO和AIO可以只看文字了解一下,但是NIO編程模型最好還是動手實踐一下,畢竟NIO目前是使用最廣的。還有一篇Netty實戰SpringBoot+Netty+WebSocket實現實時通信。是實現個人網站的聊天室的一篇博客,文字內容很少,主要是代碼,最好粗略看看代碼,因為下面有幾個地方會和這篇代碼做一些比較,下面統稱Netty實戰。如果現在看不懂,等認真看到這篇博客的pipeline那里,應該都會看懂。Netty實戰中的客戶端是Web瀏覽器,配合一些前端IM框架,客戶端實現起來非常簡單。但是聊天通訊的功能在APP中多一些,所以下面會說到Netty在客戶端中的使用。
一、Netty是什么?
官方定義:Netty 是一個異步事件驅動的網絡應用框架
,用於快速開發可維護的高性能服務器和客戶端。
簡單地說Netty封裝了JDK的NIO,不用再寫一大堆復雜的代碼。代替了原生的NIO,優點主要有如下幾點:
1.Netty底層IO模型可以隨意切換,比如可以從NIO切換到BIO。
2.Netty自帶拆包解包,從NIO各種繁復的細節中脫離出來,讓開發者重點關心業務邏輯。
3.Netty解決NIO中Selector空輪詢BUG,這個BUG應該很多人聽說過,雖然官方聲明jdk1.6的update18修復了該問題,只不過是降低了發生的概率。
4.對Selector做了很多細小的優化,reactor線程模型能做到高效的並發處理。
二、服務端啟動類詳解
精簡的服務端Demo
,與上面那篇Netty實戰中的代碼相比只有一個啟動類,少了業務代碼和初始化器。
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup) // 配置線程組
.channel(NioServerSocketChannel.class) // 配置IO模型
.childHandler(new ChannelInitializer<NioSocketChannel>() { // 匿名內部類,處理邏輯
protected void initChannel(NioSocketChannel ch) {
// 處理邏輯
}
});
serverBootstrap.bind(8000); // 綁定端口
}
}
NioEventLoopGroup
,可以看作是線程組。bossGroup
用來監聽客戶端請求。workerGroup
用來處理每個連接的數據讀寫。ServerBootstrap
是引導類,作用是引導服務器的啟動工作。.group()
配置兩個線程組的角色,就是配置誰去監聽請求,誰去處理連接讀寫。示例只創建了兩個線程組,並沒有實際使用。.channel()
配置服務端的IO模型,示例配置的是NIO模型。也可以配置為BIO,如OioServerSocketChannel.class
在Netty中,這個類已經過時了。.childHandler()
配置每條連接的數據讀寫和業務邏輯等。示例代碼用的是匿名內部類,沒有編寫邏輯。實際使用中的規范,一般會寫一個類繼承ChannelInitializer
也就是初始化器,重寫初始化方法。如同Netty實戰那篇中的代碼一樣。- 綁定監聽端口
引導類(ServerBootstrap)最小化的參數配置就是如上四個:配置線程組、IO模型、處理邏輯、綁定端口。
引導類ServerBootstrap的其它方法
1.handler()方法:上面的cildHandler是處理客戶端連接的讀寫邏輯,這個是用於指定服務端啟動中的邏輯。
serverBootstrap.handler(new ChannelInitializer<NioServerSocketChannel>() {
protected void initChannel(NioServerSocketChannel ch) {
System.out.println("服務端啟動中");
}
})
2.attr()方法:給服務端的channel指定一些自定義屬性。然后通過channel.attr()取出這個屬性,其實就是給channel維護一個map。
3.childAttr()方法:作用和上面一樣,這個是針對客戶端的channel。
4.option()方法:給服務端的channel設置屬性,如serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024)
設置系統用於臨時存放已完成三次握手的請求的隊列的最大長度,如果連接建立頻繁,服務器處理創建新連接較慢,可以適當調大這個參數
5.childOption()方法:大家肯定已經明白了Netty的命名規律,這個是給每條客戶端連接設置TCP相關的屬性,如:
serverBootstrap
//開啟TCP底層心跳機制
.childOption(ChannelOption.SO_KEEPALIVE, true)
//開啟Nagle算法,如果要求高實時性,有數據發送時就馬上發送,就關閉,如果需要減少發送次數減少網絡交互,就開啟。
.childOption(ChannelOption.TCP_NODELAY, true)
三、客戶端啟動類詳解
那篇Netty實戰,里面的客戶端是Web,所以用到了WebSocket。主要重點還是在服務端上,客戶端實現起來相對容易,只用處理發送消息和接收消息邏輯。下面是一個精簡客戶端Demo,可以根據自己的項目類型來選擇客戶端的實現。
public class NettyClient {
public static void main(String[] args) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup(); // 線程組
Bootstrap bootstrap = new Bootstrap(); // 啟動類
bootstrap
// 1.指定線程模型
.group(workerGroup)
// 2.指定 IO 類型為 NIO
.channel(NioSocketChannel.class)
// 3.IO 處理邏輯
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
// 處理邏輯
}
});
// 4.建立連接
bootstrap.connect("127.0.0.1", 8000).addListener(future -> {
if (future.isSuccess()) {
System.out.println("連接成功!");
} else {
System.err.println("連接失敗!");
//重新連接
}
});
}
}
客戶端重新連接
網絡環境差的情況下,客戶端第一次連接可能會失敗,所以需要設置嘗試重新連接策略。可以把連接connect上面的代碼封裝起來。一般情況下,連接失敗后不會馬上重連,而是會通過一個指數退避的方式,比如每隔1s、2s、4s、8s....重新連接。
int MAX_RETRY=5; // 最大次數
connect(bootstrap, "127.0.0.1", 8000, MAX_RETRY);
// 上方調用的connect連接方法的封裝細節
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
// 啟動器開始連接
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("連接成功!");
} else if (retry == 0) {
System.err.println("重試次數已用完,放棄連接!");
} else {
// 第幾次重連
int order = (MAX_RETRY - retry) + 1;
// 本次重連的間隔,1<<order相當於1乘以2的order次方
int delay = 1 << order;
System.err.println(new Date() + ": 連接失敗,第" + order + "次重連……");
bootstrap.config().group()
.schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
}
});
}
實例代碼中,定時任務是調用 bootstrap.config().group().schedule(), 其中 bootstrap.config() 這個方法返回的是 BootstrapConfig,他是對 Bootstrap 配置參數的抽象,然后 bootstrap.config().group() 返回的就是開始時配置的線程模型workerGroup,調 workerGroup 的 schedule 方法即可實現定時任務邏輯。
在 schedule 方法的任務方法參數中,前面任務方法的三個參數原封不動地傳遞,最后一個重試次數參數減掉一,就是下一次建立連接時候的上下文信息。可以自行修改代碼,更改到一個連接不上的服務端 Host 或者 Port,查看控制台日志就可以看到5次重連日志。
bootstrap的其他方法,attr()和option()。作用和服務端的方法一樣。attr設置自定義屬性,option設置TCP相關的屬性。
四、數據傳輸的載體:ByteBuf
NIO中經常使用的ByteBuffer有一些缺陷:
1.長度固定。2.讀寫狀態需要通過filp()和rewind()手動轉換。3.功能有限。
看完上面NIO那篇博客中聊天室Demo,就會發現這些問題。長度設置固定的1024個字節,讀寫也要用filp()轉換。
Netty為了解決ByteBuffer的這些缺陷,設計了ByteBuf。結構如下:
-
ByteBuf 字節容器,容器分為三個部分,第一個部分是主動丟棄的字節,該數據是無效的;第二部分是可讀字節,這部分數據是 ByteBuf 的主體數據, 從ByteBuf 里面讀取的數據都來自這一部分;最后一部分的數據是可寫字節,所有寫到 ByteBuf 的數據都會寫到這一段。最后一部分虛線表示的是該 ByteBuf 最多還能擴容多少容量。
-
以上三段內容是被兩個指針給划分出來的,從左到右,依次是讀指針(readerIndex)、寫指針(writerIndex),然后還有一個變量 capacity,表示 ByteBuf 底層內存的總容量。
-
從 ByteBuf 中每讀取一個字節,readerIndex 自增1,ByteBuf 里面總共有 writerIndex-readerIndex 個字節可讀, 由此可以推論出當 readerIndex - writerIndex = 0 時,ByteBuf 沒有可讀內容。
-
寫數據是從 writerIndex 指向的部分開始寫,每寫一個字節,writerIndex 自增1,直到增到 capacity,這個時候,表示 ByteBuf 已經不可寫了。
-
ByteBuf 里面其實還有一個參數 maxCapacity,當向 ByteBuf 寫數據的時候,如果容量不足,那么這個時候可以進行擴容,直到 capacity 擴容到 maxCapacity,超過 maxCapacity 就會報錯。
ByteBuf的API
capacity():表示ByteBuf底層占用了多少字節,包括丟棄字節、可讀字節、可寫字節。
maxCapacity():表示ByteBuf最大能占用多少字節,也就是包括后面的可擴容的內存。
readableBytes() 與 isReadable():前者表示當前可讀字節數,也就是寫指針位置減去讀指針位置的值。后者表示是否可讀。
writableBytes()、 isWritable() 與 maxWritableBytes():第一個表示可寫字節數。第二個表示是否可寫。第三個表示最大可寫字節數。
readerIndex() 與 readerIndex(int):前者返回當前的讀指針。后者可以設置讀指針。
writeIndex() 與 writeIndex(int):和上面一樣,只是讀指針變成了寫指針。
markReaderIndex() 與 resetReaderIndex():前者表示把當前讀指針保存起來。后者表示把當前的讀指針恢復到保存時的值,它們的功能其實readerIndex() 與 readerIndex(int)一樣可以實現,但一般會選擇下面兩句,因為不用定義一個變量。
int readerIndex = buffer.readerIndex(); // 將指針位置存儲到變量中
buffer.readerIndex(readerIndex); // 設置讀指針位置
//和上面兩句等價
buffer.markReaderIndex();
buffer.resetReaderIndex();
buffer.writeBytes(byte[] src)
與buffer.readBytes(byte[] dst)
前者表示把src寫到ByteBuf。后者表示把ByteBuf全部數據讀取到dst。
buffer.writeByte(int value)
與 buffer.readByte():前者表示把字節value寫到ByteBuf。后者表示從ByteBuf讀取一個字節。類似的 API 還有 writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble() 與 readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble() 此處就不一一說明了。
release() 與 retain():因Netty使用堆外內存,而堆外內存是不被jvm直接管理的,也就是說申請到的內存無法被JVM的垃圾回收器直接回收,所以需要手動回收。
Netty的ByteBuf是通過引用計數
算法來管理內存問題,如果一個 ByteBuf沒有在任何地方被引用到,那么久需要回收底層內存。默認情況下,當創建完一個 ByteBuf,它的引用為1,然后每次調用 retain() 方法, 它的引用就加一, release() 方法原理是將引用計數減一,減完之后如果發現引用計數為0,則直接回收 ByteBuf 底層的內存。
slice()、duplicate()、copy():這三個都會返回一個新的ByteBuf。第一個是截取讀指針到寫指針范圍內的一段內容。第二個是截取整個ByteBuf,包括數據和指針信息。第三個是拷貝所有信息,除了第二個API的內容還包括底層信息,因此拷貝后的新ByteBuf任何操作不會影響原始的ByteBuf。
五、實戰:服務端和客戶端全雙工通信
了解客戶端、服務端的啟動類和ByteBuf以后,可以進行一個簡單的實戰了。
首先看看前面的客戶端代碼,.handler
里重寫的initChannel
方法沒有實際邏輯。現在加上邏輯處理器,其實就是一個執行邏輯代碼的類,怎么叫無所謂,明白它的意思就行。
客戶端實現
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new FirstClientHandler());
}
});
ch.pipeline().addLast()就是添加一個邏輯處理器。在FirstClientHandler里添加對應的邏輯代碼就行。
public class FirstClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) { // 通信通道一旦激活,這個方法就會被調用
System.out.println("客戶端發送消息...");
// 1. 獲取數據
ByteBuf buffer = getByteBuf(ctx);
// 2. 往通道寫數據,寫到服務器端
ctx.channel().writeAndFlush(buffer);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 1. 獲取二進制抽象 ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
// 2. 准備數據,指定字符串的字符集為 utf-8
byte[] bytes = ("【客戶端】:這是客戶端發送的消息:"+new Date()).getBytes(Charset.forName("utf-8"));
// 3. 填充數據到 ByteBuf
buffer.writeBytes(bytes);
return buffer;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
//接收服務端的消息並打印
System.out.println(byteBuf.toString(Charset.forName("utf-8")));
}
}
- .channelActive()方法會在客戶端與服務器建立連接后調用。所以可以在這里面編寫邏輯代碼
- .alloc().buffer()的作用是把字符串的二進制數據填充到ByteBuf。
- .writeBytes()的作用是把數據寫到服務器。
- .channelRead()在收到服務端返回的消息后調用。
服務端實現
同樣需要在initChannel()里添加一個邏輯處理器。
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new FirstServerHandler());
}
});
邏輯處理器里的代碼
public class FirstServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(new Date() + ": 服務端讀到數據 -> " + byteBuf.toString(Charset.forName("utf-8")));
//接收到客戶端的消息后我們再回復客戶端
ByteBuf out = getByteBuf(ctx);
ctx.channel().writeAndFlush(out);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
byte[] bytes = "【服務器】:我是服務器,我收到你的消息了!".getBytes(Charset.forName("utf-8"));
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(bytes);
return buffer;
}
}
channelRead()方法在接收到客戶端發來的數據后調用。
最后我們啟動客戶端和服務端的啟動類,效果如下:
六、自定義客戶端與服務端的通信協議
什么是通信協議?
TCP通信的數據包格式均為二進制,協議指的就是客戶端與服務端制定一個規則,這個規則規定了每個二進制數據包中每一段的字節代表什么含義。客戶端與服務端通信的過程如下。
客戶端首先把一個Java對象轉換成二進制數據,然后通過網絡傳輸給服務端。這里的傳輸過程屬於TCP/IP協議負責,與我們應用層無關。
通信協議的設計
1.第一個是魔數,可以理解為識別這條二進制數據類型的字段。在《深入理解Java虛擬機》中這么解釋:使用魔數而不是擴展名來進行識別主要是基於安全方面的考慮,因為文件擴展名可以隨意地改動。文件格式的制定者可以自由地選擇魔數值,只要這個魔數值還沒有被廣泛采用過,並且不會引起混淆即可。
2.第二個是版本號,就像IPV4和IPV6一樣。能夠支持協議的升級。
3.第三個表示如何把Java對象轉換成二進制數據和把二進制數據轉回Java對象。
4.第四個用於區分這條數據是干嘛的或者說叫數據類型,如:這是發送的消息,還是登錄的請求等。服務端就可以根據這個指令執行不同的邏輯代碼。
5.第五個代表后面的數據長度。
6.第六個代表發送的數據,如果指令表明這是個登錄數據,里面存儲的就是賬號密碼。
通信協議的實現
示例
以實現登錄為例,下面接口和類有點多,建議先把代碼拷貝到IDE里,分好包寫好注釋,助於理解它們的關系。
1.首先創建一個Java對象,這里以登錄時的請求響應為例
@Data
public abstract class Packet {
//協議版本
private Byte version = 1;
//獲取數據類型
public abstract Byte getCommand();
}
@Date注解由lombok提供,不了解的可以看看這個https://www.cnblogs.com/lbhym/p/12551021.html
public interface Command {
//定義登錄請求指令和響應指令為1和2,其他的指令同理如MESSAGE_REQUEST等
Byte LOGIN_REQUEST = 1;
Byte LOGIN_RESPONSE = 2;
}
// 這個是登錄請求數據包的Java對象,所以調用的是上面接口的登錄請求指令,其他類型的數據包同理
@Data
public class LoginRequestPacket extends Packet {
//定義用戶信息
private Integer userId;
private String username;
private String password;
@Override
public Byte getCommand() {
return LOGIN_REQUEST;
}
}
@Data
public class LoginResponsePacket extends Packet {
//是否登錄成功
private boolean success;
//如果失敗,返回的信息
private String reason;
@Override
public Byte getCommand() {
return LOGIN_RESPONSE;
}
}
2.Java對象創建完了,再定義Java對象轉換的規則
//序列化接口
public interface Serializer {
Serializer DEFAULT = new JSONSerializer();
//序列化算法
byte getSerializerAlogrithm();
//java 對象轉換成二進制
byte[] serialize(Object object);
//二進制轉換成 java 對象
<T> T deserialize(Class<T> clazz, byte[] bytes);
}
接口定義完后開始實現接口。這里的序列化算法使用的是fastjson里面的。需要在pom.xml里導入。
public interface SerializerAlgorithm {
//json序列化標識,如果你有其他的序列化方式可以在這注明標識,類似上面的登錄指令
byte JSON = 1;
}
//實現上面定義的序列化接口
public class JSONSerializer implements Serializer {
@Override
public byte getSerializerAlgorithm() {
//獲取上面的序列化標識
return SerializerAlgorithm.JSON;
}
@Override
public byte[] serialize(Object object) {
return JSON.toJSONBytes(object);
}
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
return JSON.parseObject(bytes, clazz);
}
}
3.創建一個類PacketCodeC,里面寫上編解碼的方法。這里再說一點,因為使用了@Data注解,所以有的get方法在語法檢測階段會報錯,可以在IDEA里面下載Lombok插件。
public class PacketCodeC {
//自定義一個魔數
private static final int MAGIC_NUMBER = 0x12345678;
//創建一個靜態實例供外部調用
public static final PacketCodeC INSTANCE=new PacketCodeC();
//創建兩個map,一個存儲數據類型,如:是登錄數據還是普通消息等。第二個是存儲序列化類型。
//這樣在解碼時就可以把數據轉換為對應的類型。如:這個byte數組是LOGIN_REQUEST類型,就把它轉換成LoginRequestPacket類型的Java對象
private final Map<Byte, Class<? extends Packet>> packetTypeMap;
private final Map<Byte, Serializer> serializerMap;
private PacketCodeC() {
//初始化map並添加數據類型和序列化類型,如果有其他數據類型,記得在這里添加
packetTypeMap = new HashMap<>();
packetTypeMap.put(LOGIN_REQUEST, LoginRequestPacket.class);
packetTypeMap.put(LOGIN_RESPONSE, LoginResponsePacket.class);
serializerMap = new HashMap<>();
Serializer serializer = new JSONSerializer();
serializerMap.put(serializer.getSerializerAlogrithm(), serializer);
}
//編碼
public ByteBuf encode(ByteBufAllocator bufAllocator,Packet packet) {
// 1. 創建 ByteBuf 對象
ByteBuf byteBuf = bufAllocator.ioBuffer();
// 2. 序列化 Java 對象
byte[] bytes = Serializer.DEFAULT.serialize(packet);
// 3. 實際編碼過程,把通信協議幾個部分,一一編碼
byteBuf.writeInt(MAGIC_NUMBER);
byteBuf.writeByte(packet.getVersion());
byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlogrithm());
byteBuf.writeByte(packet.getCommand());
byteBuf.writeInt(bytes.length);
byteBuf.writeBytes(bytes);
return byteBuf;
}
//解碼
public Packet decode(ByteBuf byteBuf) {
// 跳過魔數
byteBuf.skipBytes(4);
// 跳過版本號
byteBuf.skipBytes(1);
// 序列化算法標識
byte serializeAlgorithm = byteBuf.readByte();
// 指令
byte command = byteBuf.readByte();
// 數據包長度
int length = byteBuf.readInt();
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes);
Class<? extends Packet> requestType = getRequestType(command);
Serializer serializer = getSerializer(serializeAlgorithm);
if (requestType != null && serializer != null) {
return serializer.deserialize(requestType, bytes);
}
return null;
}
//獲取序列化類型
private Serializer getSerializer(byte serializeAlgorithm) {
return serializerMap.get(serializeAlgorithm);
}
//獲取數據類型
private Class<? extends Packet> getRequestType(byte command) {
return packetTypeMap.get(command);
}
}
使用自定義通信協議
最后通過一個登錄示例,來使用一下上面自定義的通信協議。
基於上面的代碼,首先更換一下客戶端和服務端的邏輯處理器。
客戶端
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ClientHandler());
}
});
客戶端在連接上服務端后立即登錄,下面為客戶端登錄代碼
public class ClientHandler extends ChannelInboundHandlerAdapter{
// 使用自定義協議發送登錄數據
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println(new Date() + ": 客戶端開始登錄");
// 創建登錄對象
LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
loginRequestPacket.setUserId(new Random().nextInt(10000));
loginRequestPacket.setUsername("username");
loginRequestPacket.setPassword("pwd");
// 編碼
ByteBuf buffer = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginRequestPacket);
// 寫數據
ctx.channel().writeAndFlush(buffer);
}
// 接收服務端信息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
Packet packet = PacketCodeC.INSTANCE.decode(byteBuf);
//如果數據類型是登錄,就進行登錄判斷
//同理可以判斷數據是否是普通消息,還是其他類型的數據
if (packet instanceof LoginResponsePacket) {
LoginResponsePacket loginResponsePacket = (LoginResponsePacket) packet;
if (loginResponsePacket.isSuccess()) {
System.out.println(new Date() + ": 客戶端登錄成功");
} else {
System.out.println(new Date() + ": 客戶端登錄失敗,原因:" + loginResponsePacket.getReason());
}
}
}
}
服務端
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ServerHandler());
}
}
下面是服務端代碼
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf requestByteBuf = (ByteBuf) msg;
// 解碼
Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);
// 判斷是否是登錄請求數據包
if (packet instanceof LoginRequestPacket) {
LoginRequestPacket loginRequestPacket = (LoginRequestPacket) packet;
LoginResponsePacket loginResponsePacket=new LoginResponsePacket();
loginResponsePacket.setVersion(packet.getVersion());
// 登錄校驗
if (valid(loginRequestPacket)) {
// 校驗成功
loginResponsePacket.setSuccess(true);
System.out.println("客戶端登錄成功!");
} else {
// 校驗失敗
loginResponsePacket.setReason("賬號或密碼錯誤");
loginResponsePacket.setSuccess(false);
System.out.println("客戶端登錄失敗!");
}
// 編碼,結果發送給客戶端
ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginResponsePacket);
ctx.channel().writeAndFlush(responseByteBuf);
}
}
private boolean valid(LoginRequestPacket loginRequestPacket) {
//這里可以查詢數據庫,驗證用戶的賬號密碼是否正確
return true;
}
}
最后演示效果如圖所示:
實現收發消息
按照上面的登錄功能,可以嘗試實現收發消息的功能。首先還是要定義一個收發消息的Java對象。還需要在Command里面加上收發消息的指令,在編解碼類里面的map添加Java對象。收發消息前,肯定需要登錄。那么怎么判斷一個客戶端是否登錄呢?可以通過ctx獲取channel,然后通過channel的attr方法設置屬性。如果登錄成功這個屬性就設為true。然后在客戶端的啟動類里連接成功后,新建一個線程專門監聽用戶的輸入,新建輸入線程前判斷一下登錄屬性是否為true就行了。其他的地方跟登錄就沒有太大差別了。
七、Pipeline與ChannelHandler
通過上面的一些實戰,可以發現所有的邏輯代碼都寫在了一個Handler類里面,幸好現在需要處理的業務不是很多。如果以后功能拓展,這個類會變得非常臃腫。Netty中的pipeline和channelHandler就是解決這個問題的,它們通過責任鏈設計模式來組織代碼邏輯,並且能夠支持邏輯的添加和刪除,能夠支持各類協議拓展,如HTTP、Websocket等。可以看看Netty實戰博客中的初始化器類,里面就是通過pipeline添加了各類協議和一些邏輯代碼。
pipeline與channelHandler的構成
一條連接就對應一個channel,這個channel的所有處理邏輯在一個ChannelPipeline對象里,就是上圖中的pipeline,這是它的對象名。然后這個對象里面是一個雙向鏈表結構,每個節點是一個ChannelHandlerContext對象。這個對象能拿到與channel相關的所有上下文信息,這個對象還包含一個重要的對象:ChannelHandler,它的分類如下。
簡單地說,它包含兩個接口和這兩個接口的實現類,圖中左邊的實現類是不是很熟悉,就是我們自己寫的邏輯處理器里的繼承的類。從名字就可以看出,它們的作用分別是讀數據和寫數據,或理解為入棧和出棧。最重要的兩個方法分別為channelRead():消息入站棧和write():消息出棧。
構建客戶端與服務端的pipeline
下面的代碼基於上面的登錄示例改造。
先了解一下ByteToMessageDecoder這個類。不論客戶端還是服務端收到數據后,都會先進行解碼,這個類就是Netty提供的專門做這個事情的。這個類有一個好處就是,ByteBuf默認使用的是堆外內存,而它會自動釋放內存,無需我們關心。使用如下:
public class PacketDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {
out.add(PacketCodeC.INSTANCE.decode(in));
}
}
對應的Netty也准備了一個類來專門編碼:MessageToByteEncoder.
public class PacketEncoder extends MessageToByteEncoder<Packet> {
@Override
protected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf out) {
PacketCodeC.INSTANCE.encode(out, packet);
}
}
注意encode傳入的參數,最后一個參數變成了ByteBuf的類型,所以需要把PacketCodeC里的encode方法的參數改過來,也不需要第一行創建一個ByteBuf對象了。
如果不明白為什么要用到這兩個類的話,先展示一段Netty實戰博客里面的代碼:
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline= socketChannel.pipeline();
//http解碼器
pipeline.addLast(new HttpServerCodec());
//....
//websocket支持,設置路由
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//添加自定義的助手類(邏輯處理器)
pipeline.addLast(new NettyHandler());
}
}
在這篇實戰中沒有實現自己的編解碼器,而是直接使用了http的解碼器。類似的可以把自定義的編解碼器也通過pepeline添加到邏輯鏈中。就像前面說的,可以添加刪除邏輯代碼,每個功能各司其職,而不是一股腦的全在一個地方。用這兩個類還有一個好處就是Netty會自動識別這兩個類,從而自動編解碼而不需要我們自己去調用。
編解碼的問題解決了,再看看邏輯處理器類。看看登錄的代碼,如果我們不止實現登錄功能,還有收發等其他功能,是不是要用大量的if else把各個消息類型分開,然后執行不同的邏輯。不同的邏輯都擠在一個方法中,顯然也太擁擠了。因此Netty基於這種考慮,抽象出了SimpleChannelInboundHandler。下面看看它是如何解決這個問題的:
public class ClientLoginHandler extends SimpleChannelInboundHandler<LoginResponsePacket>{
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("客戶端開始登錄....");
// 創建登錄對象
LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
loginRequestPacket.setUserId(new Random().nextInt(10000));
loginRequestPacket.setUsername("username");
loginRequestPacket.setPassword("pwd");
// 寫數據
ctx.channel().writeAndFlush(loginRequestPacket);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) {
if (loginResponsePacket.isSuccess()) {
System.out.println(new Date() + ": 客戶端登錄成功");
} else {
System.out.println(new Date() + ": 客戶端登錄失敗,原因:" + loginResponsePacket.getReason());
}
}
}
public class ServerLoginHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {
//服務端登錄邏輯
ctx.channel().writeAndFlush(login(loginRequestPacket));
}
private LoginResponsePacket login(LoginRequestPacket loginRequestPacket) {
LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
loginResponsePacket.setVersion(loginRequestPacket.getVersion());
// 登錄校驗(自行判斷用戶信息是否正確)
if (true) {
// 校驗成功
loginResponsePacket.setSuccess(true);
System.out.println("客戶端登錄成功!");
return loginResponsePacket;
} else {
// 校驗失敗
loginResponsePacket.setReason("賬號或密碼錯誤");
loginResponsePacket.setSuccess(false);
System.out.println("客戶端登錄失敗!");
return loginResponsePacket;
}
}
}
類似的,收發消息也可以這么做。Netty會自動根據抽象類后面的泛型來區分它要調用哪個類。比如我們發送的是一個SendMessage類型的Java對象,它就會在繼承了SimpleChannelInboundHandler的類中找到泛型為SendMessage的類去執行。
最后我們要把這些邏輯代碼根據服務端和客戶端不同的需求添加到它們的pipeline中。
- 客戶端
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
//解碼一定要放在第一個,在這里pipeline按順序執行,不然接收消息無法正常使用
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginResponseHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});
- 服務端
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});
最后的運行結果和登錄示例一樣。看到這里,Netty實戰中的代碼,應該可以全部看懂了。
八、ChannelHandler的生命周期
在重寫ChannelInboundHandlerAdapter或者SimpleChannelInboundHandler里的方法的時候,只用到了讀和Active,其他一大堆沒用上的方法是干嘛的?現在就一一說明一下,這些方法運作的整個過程,可以理解為這個channelHandler的生命周期。以ChannelInboundHandlerAdapter為例,SimpleChannelInboundHandler繼承於ChannelInboundHandlerAdapter,所以也差不多,個別方法名不一樣而已。下面的API,從上到下,就是觸發的順序。
handlerAdded():邏輯處理器被添加后觸發。
channelRegistered():channel綁定到線程組后觸發。
channelActive():channel准備就緒,或者說連接完成。
channelRead():channel有數據可讀。
channelReadComplete():channel某次數據讀完了。
channelInactive():channel被關閉
channelUnregistered():channel取消線程的綁定
handlerRemoved():邏輯處理器被移除。
九、拆包和粘包
以上面客戶端和服務端雙向通信的代碼為例。簡單修改一下,在建立連接后,客戶端用一個循環向服務器發送消息。然后服務端打印這些消息。等次數多了以后,服務端打印時會發現一些問題,比如發送的字符串為“123456789”,大部分打印的是123456789;有一部分變成了123456789123,這就是粘包;有一部分變為了1234,這就是拆包。
為什么會有這種現象?
雖然在代碼層面傳輸的數據單位是ByteBuf。但是到了更底層,用到了TCP協議,依然會按照字節流發送數據。而底層並不知道應用層數據的具體含義,它會根據TCP緩沖區的實際情況進行數據包的划分。所以最終到達服務端的數據產生上面的現象。
如何解決?
Netty提供了4種解決方法:
1.FixedLengthFrameDecoder:固定長度拆包器,每個數據包長度都是固定的。
2.LineBasedFrameDecoder:行拆包器,每個數據包之間以換行符作為分隔。
3.DelimiterBasedFrameDecoder:類似行拆包器,不過我們可以自定義分隔符。
4.LengthFieldBasedFrameDecoder:基於長度域拆包器,最常用的,只要你的自定義協議中包含數據長度這個部分,就可以使用。它需要三個參數,第一個是數據包最大長度、第二個是參數長度域偏移量、第三個是長度域長度。
看看通信協議的圖,所謂長度域就是數據長度占用的字節,這里是4。長度域偏移量就是數據長度這個部分在通信協議組成部分中的位置,前面幾個部分加起來是7,所以它的偏移量就是7。
使用LengthFieldBasedFrameDecoder
添加到客戶端和服務端pipeline中就行了,注意要放在第一個。以服務端為例。
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4));
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new PacketEncoder());
如果客戶端用到的是WebSocket,沒有使用自定義的通信協議,就不用添加拆包器,Netty已經幫我們做好了。
十、心跳與空閑檢測
網絡應用程序普遍會遇到的一個問題就是連接假死。簡單地說就是因為網絡或其他問題導致某一端實際上(TCP)已經斷開連接,但是應用程序沒有檢測到,以為還連接着。對服務端來說,這會浪費系統資源,導致性能下降。對於客戶端來說,假死會造成數據發送超時,影響體驗。
如何解決?
只需要客戶端每隔一段時間打個招呼,表示它還活着就行了,就是所謂的心跳。Netty自帶的IdleStateHandler 就可以實現這個功能。下面就來實現它。
服務端
//心跳檢測類
public class IMIdleStateHandler extends IdleStateHandler {
//讀空閑時間,也就是多久沒讀到數據了
private static final int READER_IDLE_TIME = 15;
public IMIdleStateHandler() {
//調用父類構造函數,四個參數分別為:
//讀空閑時間、寫空閑時間、讀寫空閑時間、時間單位
super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
System.out.println(READER_IDLE_TIME + "秒內未讀到數據,關閉連接");
ctx.channel().close();
}
}
//回復客戶端發送的心跳數據包
public class HeartBeatRequestHandler extends SimpleChannelInboundHandler<HeartBeatRequestPacket> {
public static final HeartBeatRequestHandler INSTANCE = new HeartBeatRequestHandler();
private HeartBeatRequestHandler() {
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HeartBeatRequestPacket requestPacket) {
ctx.writeAndFlush(new HeartBeatResponsePacket());
}
}
客戶端
服務端實現了檢測讀空閑,客戶端肯定就需要發送一個數據。
public class HeartBeatTimerHandler extends ChannelInboundHandlerAdapter {
//心跳數據包發送時間間隔,這里設為5秒,實際使用時建議服務端和客戶端都設成分鍾級別
private static final int HEARTBEAT_INTERVAL = 5;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
scheduleSendHeartBeat(ctx);
super.channelActive(ctx);
}
private void scheduleSendHeartBeat(ChannelHandlerContext ctx) {
//schedule類似延時任務,每隔5秒發送心跳數據包
ctx.executor().schedule(() -> {
if (ctx.channel().isActive()) {
ctx.writeAndFlush(new HeartBeatRequestPacket());
scheduleSendHeartBeat(ctx);
}
}, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
}
}
客戶端發送心跳數據包后,也需要檢測服務端是否回復了自己,所以也需要個檢測類,與服務端的代碼一樣,就不寫了。也需要和服務端一樣,pipeline添加到相同的位置。
最后:如果服務端在讀到數據后,不要再read方法里面直接訪問數據庫或者其他比較復雜的操作,可以把這些耗時的操作放進我們的業務線程池中去執行。如:
ThreadPool threadPool = xxx;
protected void channelRead0(ChannelHandlerContext ctx, T packet) {
threadPool.submit(new Runnable() {
// 1. balabala 一些邏輯
// 2. 數據庫或者網絡等一些耗時的操作
// 3. writeAndFlush()
})
}
如果想統計某個操作的響應時間,直接用System.currentTimeMillis()其實是不准確的,因為有些操作是異步的,它馬上就返回了,所以我們要判斷異步結果是否完成再計算結束時間。
protected void channelRead0(ChannelHandlerContext ctx, T packet) {
threadPool.submit(new Runnable() {
long begin = System.currentTimeMillis();
// 1. balabala 一些邏輯
// 2. 數據庫或者網絡等一些耗時的操作
// 3. writeAndFlush
xxx.writeAndFlush().addListener(future -> {
if (future.isDone()) {
long time = System.currentTimeMillis() - begin;
}
});
})
}