問題背景
NIO是面向緩沖區進行通信的,不是面向流的。我們都知道,既然是緩沖區,那它一定存在一個固定大小。這樣一來通常會遇到兩個問題:
- 消息粘包:當緩沖區足夠大,由於網絡不穩定種種原因,可能會有多條消息從通道讀入緩沖區,此時如果無法分清數據包之間的界限,就會導致粘包問題;
- 消息不完整:若消息沒有接收完,緩沖區就被填滿了,會導致從緩沖區取出的消息不完整,即半包的現象。
介紹這個問題之前,務必要提一下我代碼整體架構。
代碼參見GitHub倉庫
https://github.com/CuriousLei/smyl-im
在這個項目中,我的NIO核心庫設計思路流程圖如下所示
介紹:
- 服務端為每一個連接上的客戶端建立一個Connector對象,為其提供IO服務;
- ioArgs對象內部實例域引用了緩沖區buffer,作為直接與channel進行數據交互的緩沖區;
- 兩個線程池,分別操控ioArgs進行讀和寫操作;
- connector與ioArgs關系:(1)輸入,線程池處理讀事件,數據寫入ioArgs,並回調給connector;(2)輸出,connector將數據寫入ioArgs,將ioArgs傳入Runnable對象,供線程池處理;
- 兩個selector線程,分別監聽channel的讀和寫事件。事件就緒,則觸發線程池工作。
思路
光這樣實現,必然會有粘包、半包問題。要重現這兩個問題也很簡單。
- ioArgs中把緩沖區設置小一點,發送一條大於該長度的數據,服務端會當成兩條消息讀取,即消息不完整;
- 在線程代碼中,加一個Thread.sleep()延時等待,客戶端連續發幾條消息(總長度小於緩沖區大小),也可以重現粘包現象。
這個問題實質上是消息體與緩沖區數據不一一對應導致的。那么,如何解決呢?
固定頭部方案
可以采用固定頭部方案來解決,頭部設置四個字節,存儲一個int值,記錄后面數據的長度。以此來標記一個消息體。
- 讀取數據時,根據頭部的長度信息,按序讀取ioArgs緩沖區中的數據,若沒有達到長度要求,繼續讀下一個ioArgs。這樣自然不會出現粘包、半包問題。
- 輸出數據時,也采用同樣的機制封裝數據,首部四個字節記錄長度。
我的工程項目中,客戶端和服務端共用一個nio核心包,即niohdl,可保證收發數據格式一致。
設計方案
要實現以上設想,必須在connector和ioArgs之間加一層Dispatcher類,用於處理消息體與緩沖區之間的轉化關系(消息體取個名字:Packet)。根據輸入和輸出的不同,分別叫ReceiveDispatcher和SendDispatcher。即通過它們來操作Packet與ioArgs之間的轉化。
Packet
定義這個消息體,繼承關系如下圖所示:
Packet是基類,代碼如下:
package cn.buptleida.niohdl.core;
import java.io.Closeable;
import java.io.IOException;
/**
* 公共的數據封裝
* 提供了類型以及基本的長度的定義
*/
public class Packet implements Closeable {
protected byte type;
protected int length;
public byte type(){
return type;
}
public int length(){
return length;
}
@Override
public void close() throws IOException {
}
}
SendPacket和ReceivePacket分別代表發送消息體和接收消息體。StringReceivePacket和StringSendPacket代表字符串類的消息,因為本次實踐只限於字符串消息的收發,今后可能有文件之類的,有待擴展。
代碼中必然會涉及到字節數組的操作,所以,以StringSendPacket為例,需要提供將String轉化為byte[]的方法。代碼如下所示:
package cn.buptleida.niohdl.box;
import cn.buptleida.niohdl.core.SendPacket;
public class StringSendPacket extends SendPacket {
private final byte[] bytes;
public StringSendPacket(String msg) {
this.bytes = msg.getBytes();
this.length = bytes.length;//父類中的實例域
}
@Override
public byte[] bytes() {
return bytes;
}
}
SendDispatcher
在connector對象的實例域中會引用一個SendDispatcher對象。發送數據時,會通過SendDispatcher中的方法對數據進行封裝和處理。其大致的關系圖如下所示:
SendDispatcher中設置任務隊列Queue
這個過程的程序框圖如下所示:
在代碼中,SendDispatcher實際上是一個接口,我用AsyncSendDispatcher實現此接口,代碼如下:
package cn.buptleida.niohdl.impl.async;
import cn.buptleida.niohdl.core.*;
import cn.buptleida.utils.CloseUtil;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
public class AsyncSendDispatcher implements SendDispatcher {
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private Sender sender;
private Queue<SendPacket> queue = new ConcurrentLinkedDeque<>();
private AtomicBoolean isSending = new AtomicBoolean();
private ioArgs ioArgs = new ioArgs();
private SendPacket packetTemp;
//當前發送的packet大小以及進度
private int total;
private int position;
public AsyncSendDispatcher(Sender sender) {
this.sender = sender;
}
/**
* connector將數據封裝進packet后,調用這個方法
* @param packet
*/
@Override
public void send(SendPacket packet) {
queue.offer(packet);//將數據放進隊列中
if (isSending.compareAndSet(false, true)) {
sendNextPacket();
}
}
@Override
public void cancel(SendPacket packet) {
}
/**
* 從隊列中取數據
* @return
*/
private SendPacket takePacket() {
SendPacket packet = queue.poll();
if (packet != null && packet.isCanceled()) {
//已經取消不用發送
return takePacket();
}
return packet;
}
private void sendNextPacket() {
SendPacket temp = packetTemp;
if (temp != null) {
CloseUtil.close(temp);
}
SendPacket packet = packetTemp = takePacket();
if (packet == null) {
//隊列為空,取消發送狀態
isSending.set(false);
return;
}
total = packet.length();
position = 0;
sendCurrentPacket();
}
private void sendCurrentPacket() {
ioArgs args = ioArgs;
args.startWriting();//將ioArgs緩沖區中的指針設置好
if (position >= total) {
sendNextPacket();
return;
} else if (position == 0) {
//首包,需要攜帶長度信息
args.writeLength(total);
}
byte[] bytes = packetTemp.bytes();
//把bytes的數據寫入到IoArgs中
int count = args.readFrom(bytes, position);
position += count;
//完成封裝
args.finishWriting();//flip()操作
//向通道注冊OP_write,將Args附加到runnable中;selector線程監聽到就緒即可觸發線程池進行消息發送
try {
sender.sendAsync(args, ioArgsEventListener);
} catch (IOException e) {
closeAndNotify();
}
}
private void closeAndNotify() {
CloseUtil.close(this);
}
@Override
public void close(){
if (isClosed.compareAndSet(false, true)) {
isSending.set(false);
SendPacket packet = packetTemp;
if (packet != null) {
packetTemp = null;
CloseUtil.close(packet);
}
}
}
/**
* 接收回調,來自writeHandler輸出線程
*/
private ioArgs.IoArgsEventListener ioArgsEventListener = new ioArgs.IoArgsEventListener() {
@Override
public void onStarted(ioArgs args) {
}
@Override
public void onCompleted(ioArgs args) {
//繼續發送當前包packetTemp,因為可能一個包沒發完
sendCurrentPacket();
}
};
}
ReceiveDispatcher
同樣,ReceiveDispatcher也是一個接口,代碼中用AsyncReceiveDispatcher實現。在connector對象的實例域中會引用一個AsyncReceiveDispatcher對象。接收數據時,會通過ReceiveDispatcher中的方法對接收到的數據進行拆包處理。其大致的關系圖如下所示:
每一個消息體的首部會有一個四字節的int字段,代表消息的長度值,按照這個長度來進行讀取。如若一個ioArgs未滿足這個長度,就讀取下一個ioArgs,保證數據包的完整性。這個流程就不畫程序框圖了,偷個懶hhhh。其實看下面代碼注釋已經很清晰了,容易理解。
AsyncReceiveDispatcher的代碼如下所示:
package cn.buptleida.niohdl.impl.async;
import cn.buptleida.niohdl.box.StringReceivePacket;
import cn.buptleida.niohdl.core.ReceiveDispatcher;
import cn.buptleida.niohdl.core.ReceivePacket;
import cn.buptleida.niohdl.core.Receiver;
import cn.buptleida.niohdl.core.ioArgs;
import cn.buptleida.utils.CloseUtil;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
public class AsyncReceiveDispatcher implements ReceiveDispatcher {
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final Receiver receiver;
private final ReceivePacketCallback callback;
private ioArgs args = new ioArgs();
private ReceivePacket packetTemp;
private byte[] buffer;
private int total;
private int position;
public AsyncReceiveDispatcher(Receiver receiver, ReceivePacketCallback callback) {
this.receiver = receiver;
this.receiver.setReceiveListener(ioArgsEventListener);
this.callback = callback;
}
/**
* connector中調用該方法進行
*/
@Override
public void start() {
registerReceive();
}
private void registerReceive() {
try {
receiver.receiveAsync(args);
} catch (IOException e) {
closeAndNotify();
}
}
private void closeAndNotify() {
CloseUtil.close(this);
}
@Override
public void stop() {
}
@Override
public void close() throws IOException {
if(isClosed.compareAndSet(false,true)){
ReceivePacket packet = packetTemp;
if(packet!=null){
packetTemp = null;
CloseUtil.close(packet);
}
}
}
/**
* 回調方法,從readHandler輸入線程中回調
*/
private ioArgs.IoArgsEventListener ioArgsEventListener = new ioArgs.IoArgsEventListener() {
@Override
public void onStarted(ioArgs args) {
int receiveSize;
if (packetTemp == null) {
receiveSize = 4;
} else {
receiveSize = Math.min(total - position, args.capacity());
}
//設置接受數據大小
args.setLimit(receiveSize);
}
@Override
public void onCompleted(ioArgs args) {
assemblePacket(args);
//繼續接受下一條數據,因為可能同一個消息可能分隔在兩份IoArgs中
registerReceive();
}
};
/**
* 解析數據到packet
* @param args
*/
private void assemblePacket(ioArgs args) {
if (packetTemp == null) {
int length = args.readLength();
packetTemp = new StringReceivePacket(length);
buffer = new byte[length];
total = length;
position = 0;
}
//將args中的數據寫進外面buffer中
int count = args.writeTo(buffer,0);
if(count>0){
//將數據存進StringReceivePacket的buffer當中
packetTemp.save(buffer,count);
position+=count;
if(position == total){
completePacket();
packetTemp = null;
}
}
}
private void completePacket() {
ReceivePacket packet = this.packetTemp;
CloseUtil.close(packet);
callback.onReceivePacketCompleted(packet);
}
}
總結
其實粘包、半包的解決方案並沒有什么奧秘,單純地復雜而已。方法核心就是自定義一個消息體Packet,完成Packet中的byte數組與緩沖區數組之間的復制轉化即可。當然,position、limit等等指針的輔助很重要。
總結這個博客,也是將目前為止的工作進行梳理和記錄。我將通過smyl-im這個項目來持續學習+實踐。因為之前學習過程中有很多零碎的知識點,都躺在我的有道雲筆記里,感覺沒必要總結成博客。本次博客講的內容剛好是一個成體系的東西,正好可以將這個項目背景帶出來,后續的博客就可以在這基礎上衍生拓展了。