java-socket-demo的實現


前言

最近公司在預研設備app端與服務端的交互方案,主要方案有

  • 服務端和app端通過阿里iot套件實現消息的收發;
  • 服務端通過極光推送主動給app端推消息,app通過rest接口與服務端進行交互;
  • 服務端與app通過mqtt消息隊列來實現彼此的消息交互;
  • 服務端與app通過原生socket長連接交互。

雖然上面的一些成熟方案肯定更利於上生產環境,但它們通訊基礎也都是socket長連接,所以本人主要是預研了一下socket長連接的交互,寫了個簡單demo,采用了BIO的多線程方案,實現了自定義簡單協議,心跳機制,socket客戶端身份強制驗證,socket客戶端斷線獲知等功能,並暴露了一些接口,可通過接口簡單實現客戶端與服務端的socket交互。

Github 地址點此

IO通訊模型

IO通訊模型簡介

IO通訊模型主要包括阻塞式同步IO(BIO),非阻塞式同步IO,多路復用IO以及異步IO。大神博客請點此

1. 阻塞式同步IO

BIO就是:blocking IO。最容易理解、最容易實現的IO工作方式,應用程序向操作系統請求網絡IO操作,這時應用程序會一直等待;另一方面,操作系統收到請求后,也會等待,直到網絡上有數據傳到監聽端口;操作系統在收集數據后,會把數據發送給應用程序;最后應用程序受到數據,並解除等待狀態。

BIO示意圖

2. 非阻塞式同步IO

這種模式下,應用程序的線程不再一直等待操作系統的IO狀態,而是在等待一段時間后,就解除阻塞。如果沒有得到想要的結果,則再次進行相同的操作。這樣的工作方式,暴增了應用程序的線程可以不會一直阻塞,而是可以進行一些其他工作。

非阻塞式同步IO示意圖

3. 多路復用IO(阻塞+非阻塞)

多路復用io示意圖

目前流程的多路復用IO實現主要包括四種:select、poll、epoll、kqueue。下表是他們的一些重要特性的比較:

多路復用io模式比較

4. 異步IO

異步IO則是采用“訂閱-通知”模式:即應用程序向操作系統注冊IO監聽,然后繼續做自己的事情。當操作系統發生IO事件,並且准備好數據后,在主動通知應用程序,觸發相應的函數。

異步IO示意圖

  • 和同步IO一樣,異步IO也是由操作系統進行支持的。微軟的windows系統提供了一種異步IO技術:IOCP(I/O Completion Port,I/O完成端口);
  • Linux下由於沒有這種異步IO技術,所以使用的是epoll(上文介紹過的一種多路復用IO技術的實現)對異步IO進行模擬。

Java對IO模型的支持

  • Java對阻塞式同步IO的支持主要是java.net包中的Socket套接字實現;
  • Java中非阻塞同步IO模式通過設置serverSocket.setSoTimeout(100);即可實現;
  • Java 1.4中引入了NIO框架(java.nio包)可以構建多路復用、同步非阻塞IO程序;
  • Java 7中對NIO進行了進一步改進,即NIO2,引入了異步非阻塞IO方式。

由於是要實現socket長連接的demo,主要關注其一些實現注意點及方案,所以本demo采用了BIO的多線程方案,該方案代碼比較簡單、直觀,引入了多線程技術后,IO的處理吞吐量也大大提高了。下面是BIO多線程方案server端的簡單實現:

 public static void main(String[] args) throws Exception{
        ServerSocket serverSocket = new ServerSocket(83);
        try {
            while(true) {
                Socket socket = null;
                socket = serverSocket.accept();
                //這邊獲得socket連接后開啟一個線程監聽處理數據
                SocketServerThread socketServerThread = new SocketServerThread(socket);
                new Thread(socketServerThread).start();
            }
        } catch(Exception e) {
            log.error("Socket accept failed. Exception:{}", e.getMessage());
        } finally {
            if(serverSocket != null) {
                serverSocket.close();
            }
        }
    }
}
@slf4j
class SocketServerThread implements Runnable {

    private Socket socket;

    public SocketServerThread (Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        InputStream in = null;
        OutputStream out = null;
        try {
            in = socket.getInputStream();
            out = socket.getOutputStream();
            Integer sourcePort = socket.getPort();
            int maxLen = 2048;
            byte[] contextBytes = new byte[maxLen];
            int realLen;
            StringBuffer message = new StringBuffer();
            BIORead:while(true) {
                try {
                    while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {
                        message.append(new String(contextBytes , 0 , realLen));
                        /*
                         * 我們假設讀取到“over”關鍵字,
                         * 表示客戶端的所有信息在經過若干次傳送后,完成
                         * */
                        if(message.indexOf("over") != -1) {
                            break BIORead;
                        }
                    }
            }
            //下面打印信息
           log.info("服務器(收到來自於端口:" + sourcePort + "的信息:" + message);
            //下面開始發送信息
            out.write("回發響應信息!".getBytes());
            //關閉
            out.close();
            in.close();
            this.socket.close();
        } catch(Exception e) {
           log.error("Socket read failed. Exception:{}", e.getMessage());
        }
    }
}

注意點及實現方案

TCP粘包/拆包

1. 問題說明

假設客戶端分別發送了兩個數據包D1和D2給服務端,由於服務端一次讀取到的字節數是不確定的,故可能存在以下4種情況。

  1. 服務端分兩次讀取到了兩個獨立的數據包,分別是D1和D2,沒有粘包和拆包;
  2. 服務端一次接收到了兩個數據包,D1和D2粘合在一起,被稱為TCP粘包;
  3. 服務端分兩次讀取到了兩個數據包,第一次讀取到了完整的D1包和D2包的部分內容,第二次讀取到了D2包的剩余內容,這被稱為TCP拆包;
  4. 服務端分兩次讀取到了兩個數據包,第一次讀取到了D1包的部分內容D1_1,第二次讀取到了D1包的剩余內容D1_2和D2包的整包。如果此時服務端TCP接收滑窗非常小,而數據包D1和D2比較大,很有可能會發生第五種可能,即服務端分多次才能將D1和D2包接收完全,期間發生多次拆包。

2. 解決思路

由於底層的TCP無法理解上層的業務數據,所以在底層是無法保證數據包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,可以歸納如下:

  1. 消息定長,例如每個報文的大小為固定長度200字節,如果不夠,空位補空格;
  2. 在包尾增加回車換行符進行分割,例如FTP協議;
  3. 將消息分為消息頭和消息體,消息頭中包含表示消息總長度(或者消息體長度)的字段,通常設計思路為消息頭的第一個字段使用int32來表示消息的總長度;
  4. 更復雜的應用層協議。

3. demo方案

作為socket長連接的demo,使用了上述的解決思路2,即在包尾增加回車換行符進行數據的分割,同時整體數據使用約定的Json體進行作為消息的傳輸格式。

使用換行符進行數據分割,可如下進行數據的單行讀取:

BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String message;
while ((message = reader.readLine()) != null) {
//....
}

可如下進行數據的單行寫入:

PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);
writer.println(message);

Json消息格式如下:

  1. 服務端接收消息實體類
@Data
public class ServerReceiveDto implements Serializable {

	private static final long serialVersionUID = 6600253865619639317L;

	/**
	 * 功能碼 0 心跳 1 登陸 2 登出 3 發送消息
	 */
	private Integer functionCode;

	/**
	 * 用戶id
	 */
	private String userId;

	/**
	 * 這邊假設是string的消息體
	 */
	private String message;

}
  1. 服務端發送消息實體類
@Data
public class ServerSendDto implements Serializable {

	private static final long serialVersionUID = -7453297551797390215L;

	/**
	 * 狀態碼 20000 成功,否則有errorMessage
	 */
	private Integer statusCode;

	private String message;

	/**
	 * 功能碼
	 */
	private Integer functionCode;

	/**
	 * 錯誤消息
	 */
	private String errorMessage;
}
  1. 客戶端發送消息實體類
@Data
public class ClientSendDto implements Serializable {

	private static final long serialVersionUID = 97085384412852967L;

	/**
	 * 功能碼 0 心跳 1 登陸 2 登出 3 發送消息
	 */
	private Integer functionCode;

	/**
	 * 用戶id
	 */
	private String userId;

	/**
	 * 這邊假設是string的消息體
	 */
	private String message;

}

客戶端或服務端掉線檢測功能

1. 實現思路

通過自定義心跳包來實現掉線檢測功能,具體思路如下:

客戶端連接上服務端后,在服務端會維護一個在線客戶端列表。客戶端每隔一段時間,向服務端發送一個心跳包,服務端受收到包以后,會更新客戶端最近一次在線時間。一旦服務端超過規定時間沒有接收到客戶端發來的包,則視為掉線。

2. 代碼實現

維護一個客戶端map,其中key代表用戶的唯一id(用戶唯一id的身份驗證下面會說明),value代表用戶對應的一個實體

/**
 * 存儲當前由用戶信息活躍的的socket線程
 */
private ConcurrentMap<String, Connection> existSocketMap = new ConcurrentHashMap<>();

其中Connection對象包含的信息如下:

@Slf4j
@Data
public class Connection {

	/**
	 * 當前的socket連接實例
	 */
	private Socket socket;

	/**
	 * 當前連接線程
	 */
	private ConnectionThread connectionThread;

	/**
	 * 當前連接是否登陸
	 */
	private boolean isLogin;

	/**
	 * 存儲當前的user信息
	 */
	private String userId;

	/**
	 * 創建時間
	 */
	private Date createTime;

	/**
	 * 最后一次更新時間,用於判斷心跳
	 */
	private Date lastOnTime;
}

主要關注其中的lastOnTime字段,每次服務端接收到標識是心跳數據,會更新當前的lastOnTime字段,代碼如下:

if (functionCode.equals(FunctionCodeEnum.HEART.getValue())) {
	//心跳類型
	connection.setLastOnTime(new Date());
	//發送同樣的心跳數據給客戶端
	ServerSendDto dto = new ServerSendDto();
	dto.setFunctionCode(FunctionCodeEnum.HEART.getValue());
	connection.println(JSONObject.toJSONString(dto));
}

額外會有一個監測進程,以一定頻率來監測上述維護的map中的每一個Connection對象,如果當前時間與lastOnTime的時間間隔超過自定義的長度,則自動將其對應的socket連接關閉,代碼如下:

Date now = new Date();
Date lastOnTime = connectionThread.getConnection().getLastOnTime();
long heartDuration = now.getTime() - lastOnTime.getTime();
if (heartDuration > SocketConstant.HEART_RATE) {
	//心跳超時,關閉當前線程
	log.error("心跳超時");
	connectionThread.stopRunning();
}

在上面代碼中,服務端收到標識是心跳數據的時候,除了更新該socket對應的lastOnTime,還會同樣同樣心跳類型的數據給客戶端,客戶端收到標識是心跳數據的時候也會更新自己的lastOnTime字段,同時也有一個心跳監測線程在監測當前的socket連接心跳是否超時

客戶端身份獲知、強制身份驗證

1. 實現思路

通過代碼socket = serverSocket.accept()獲得的一個socket連接我們僅僅只能知道其客戶端的ip以及端口號,並不能獲知這個socket連接對應的到底是哪一個客戶端,因此必須得先獲得客戶端的身份並且驗證通過其身份才能讓其正常連接。

具體的實現思路是:

自定義一個登陸處理接口,當server端受到標識是用戶登陸的時候(此時會攜帶用戶信息或者token,此處簡化為用戶id),調用用戶的登陸驗證,驗證通過的話則將該socket連接與用戶信息綁定,設置其為已登錄,並且封裝對應的對象放入前面提的客戶端map中,由此可獲得具體用戶對應的哪一個socket連接。

為了實現socket連接的強制驗證,在監測線程中,也會判斷當前用戶多長時間內沒有實現登錄態,若超時則認為該socket連接為非法連接,主動關閉該socket連接。

2. 代碼實現

自定義登陸處理接口,這邊簡單以userId來判斷是否允許登陸:

public interface LoginHandler {

	/**
	 * client登陸的處理函數
	 *
	 * @param userId 用戶id
	 *
	 * @return 是否驗證通過
	 */
	boolean canLogin(String userId);
}

收到客戶端發來的數據時候的處理:

if (functionCode.equals(FunctionCodeEnum.LOGIN.getValue())) {
    //登陸,身份驗證
    String userId = receiveDto.getUserId();
    if (socketServer.getLoginHandler().canLogin(userId)) {
        //設置用戶對象已登錄狀態
    	connection.setLogin(true);
    	connection.setUserId(userId);
    	if (socketServer.getExistSocketMap().containsKey(userId)) {
    		//存在已登錄的用戶,發送登出指令並主動關閉該socket
    		Connection existConnection = socketServer.getExistSocketMap().get(userId);
    		ServerSendDto dto = new ServerSendDto();
    		dto.setStatusCode(999);
    		dto.setFunctionCode(FunctionCodeEnum.MESSAGE.getValue());
    		dto.setErrorMessage("force logout");
    		existConnection.println(JSONObject.toJSONString(dto));
    		existConnection.getConnectionThread().stopRunning();
    		log.error("用戶被客戶端重入踢出,userId:{}", userId);
    	}
    	//添加到已登錄map中
    	socketServer.getExistSocketMap().put(userId, connection);
} 

監測線程判斷用戶是否完成身份驗證:

if (!connectionThread.getConnection().isLogin()) {
	//還沒有用戶登陸成功
	Date createTime = connectionThread.getConnection().getCreateTime();
	long loginDuration = now.getTime() - createTime.getTime();
	if (loginDuration > SocketConstant.LOGIN_DELAY) {
		//身份驗證超時
		log.error("身份驗證超時");
		connectionThread.stopRunning();
	}
}

socket異常處理與垃圾線程回收

1. 實現思路

socket在讀取數據或者發送數據的時候會出現各種異常,比如客戶端的socket已斷開連接(正常斷開或物理連接斷開等),但是服務端還在發送數據或者還在接受數據的過程中,此時socket會拋出相關異常,對於該異常的處理需要將自身的socket連接關閉,避免資源的浪費,同時由於是多線程方案,還需將該socket對應的線程正常清理。

2. 代碼實現

下面以server端發送數據為例,改代碼中加入了重試機制:

public void println(String message) {
	int count = 0;
	PrintWriter writer;
	do {
		try {
			writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);
			writer.println(message);
			break;
		} catch (IOException e) {
			count++;
			if (count >= RETRY_COUNT) {
				//重試多次失敗,說明client端socket異常
				this.connectionThread.stopRunning();
			}
		}
		try {
			Thread.sleep(2 * 1000);
		} catch (InterruptedException e1) {
			log.error("Connection.println.IOException interrupt,userId:{}", userId);
		}
	} while (count < 3);
}

上述調用的this.connectionThread.stopRunning();代碼如下:

public void stopRunning() {
    //設置線程對象狀態,便於線程清理
	isRunning = false;
	try {
	    //異常情況需要將該socket資源釋放
		socket.close();
	} catch (IOException e) {
		log.error("ConnectionThread.stopRunning failed.exception:{}", e);
	}
}

上述代碼中設置了線程對象的狀態,下述代碼在監測線程中執行,將沒有運行的線程給清理掉

/**
 * 存儲只要有socket處理的線程
 */
private List<ConnectionThread> existConnectionThreadList = Collections.synchronizedList(new ArrayList<>());

/**
 * 中間list,用於遍歷的時候刪除
 */
private List<ConnectionThread> noConnectionThreadList = Collections.synchronizedList(new ArrayList<>());

//...

//刪除list中沒有用的thread引用
existConnectionThreadList.forEach(connectionThread -> {
	if (!connectionThread.isRunning()) {
		noConnectionThreadList.add(connectionThread);
	}
});
noConnectionThreadList.forEach(connectionThread -> {
	existConnectionThreadList.remove(connectionThread);
	if (connectionThread.getConnection().isLogin()) {
		//說明用戶已經身份驗證成功了,需要刪除map
		this.existSocketMap.remove(connectionThread.getConnection().getUserId());
	}
});
noConnectionThreadList.clear();

項目結構

由於使用了springboot框架來實現該demo,所以項目結構如下:

整體目錄

socket工具包目錄如下:

socket工具包目錄

pom文件主要添加了springboot的相關依賴,以及json工具和lombok工具等,依賴如下:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.3.RELEASE</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.36</version>
    </dependency>
</dependencies>

自己寫的socket工具包的使用方式如下:

@Configuration
@Slf4j
public class SocketServerConfig {

@Bean
public SocketServer socketServer() {
	SocketServer socketServer = new SocketServer(60000);
	socketServer.setLoginHandler(userId -> {
		log.info("處理socket用戶身份驗證,userId:{}", userId);
		//用戶名中包含了dingxu則允許登陸
		return userId.contains("dingxu");

	});
	socketServer.setMessageHandler((connection, receiveDto) -> log
			.info("處理socket消息,userId:{},receiveDto:{}", connection.getUserId(),
					JSONObject.toJSONString(receiveDto)));
	socketServer.start();
	return socketServer;
}
}

該demo中主要提供了以下幾個接口進行測試:

  • 服務端:獲得當前用戶列表,發送一個消息
  • 客戶端:開始一個socket客戶端,發送一個消息,關閉一個socket客戶端,查看已開啟的客戶端

具體的postman文件也放已在項目中,具體可點此鏈接獲得

demo中還提供了一個簡單壓測函數,如下:

@Slf4j
public class SocketClientTest {

	public static void main(String[] args) {
		ExecutorService clientService = Executors.newCachedThreadPool();
		String userId = "dingxu";
		for (int i = 0; i < 1000; i++) {
			int index = i;
			clientService.execute(() -> {
				try {
					SocketClient client;
					client = new SocketClient(InetAddress.getByName("127.0.0.1"), 60000);
					//登陸
					ClientSendDto dto = new ClientSendDto();
					dto.setFunctionCode(FunctionCodeEnum.LOGIN.getValue());
					dto.setUserId(userId + index);
					client.println(JSONObject.toJSONString(dto));
					ScheduledExecutorService clientHeartExecutor = Executors.newSingleThreadScheduledExecutor(
							r -> new Thread(r, "socket_client+heart_" + r.hashCode()));
					clientHeartExecutor.scheduleWithFixedDelay(() -> {
						try {
							ClientSendDto heartDto = new ClientSendDto();
							heartDto.setFunctionCode(FunctionCodeEnum.HEART.getValue());
							client.println(JSONObject.toJSONString(heartDto));
						} catch (Exception e) {
							log.error("客戶端異常,userId:{},exception:{}", userId, e.getMessage());
							client.close();
						}
					}, 0, 5, TimeUnit.SECONDS);
					while (true){

					}
				} catch (Exception e) {
					log.error(e.getMessage());
				}

			});
		}
	}

}

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM