基本介紹與思路
收發並行
前一篇博客中,完成了客戶端與服務端的簡單TCP交互,但這種交互是觸發式的:客戶端發送一條消息,服務端收到后再回送一條。沒有做到收發並行。收發並行的字面意思很容易理解,即數據的發送與接收互相不干擾,相互獨立。當然,要保證服務端和客戶端都能做到收發並行。
業務邏輯
脫離業務邏輯的實踐是毫無意義的,先描述一下本實踐中的業務邏輯:一個服務端接受多個客戶端的連接,連接后,向各個客戶端定時發送時間戳數據,同時在並行條件下,接受各個客戶端發送來的數據並顯示;客戶端鍵盤輸入字符串,發送給服務端,同時在並行條件下,接收服務器發來的時間戳數據並顯示。
實現思路
實現發送與接收並行,思路其實非常直觀,即建立兩個線程,分別用來實現輸入流和輸出流。我的代碼的設計方案如下圖所示:
- 服務端:創建一個監聽客戶端連接的線程,線程中一旦接收到請求,創建一個對應該客戶端收發處理的對象,對象中創建輸入流線程,並使用單例線程池創建輸出流線程。主線程使用鍵盤輸入流System.in來進行阻塞。同時主線程中創建Timer定時器,定時向輸出流發送數據。
- 客戶端:主線程發送連接請求,與服務器建立連接。使用鍵盤輸入流System.in來阻塞主線程,同時作為輸出流使用;創建一個輸入流線程,異步運行,接收服務器數據。
代碼分析
源代碼文件結構如下圖所示
服務端
服務器端分為三個部分,分別是Server.java,TCPServer.java和ClientHandler.java
Server.java
package Server;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.TimerTask;
import java.util.Timer;
import java.util.Date;
public class Server {
private static SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd_HH:mm:ss");
public static void main(String[] args){
try {
TCPServer.accept();
new Timer("Timer").schedule(new TimerTask() {
@Override
public void run() {
TCPServer.broadcast(df.format(new Date()));
}
}, 1000,5000);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
String str;
//因為ClientListen是異步線程,使用鍵盤輸入流將主線程阻塞住,保證跟ClientListen線程同步,同時可控制ClientListen服務的退出
do{
str = bufferedReader.readLine();
}while (str.equalsIgnoreCase("serverExit"));
}catch (Exception e){
System.out.println("監聽請求過程中異常退出");
}
try {
TCPServer.stop();
} catch (IOException e) {
System.out.println("關閉套接字過程中出現異常");
} finally {
System.out.println("服務器端套接字已關閉!");
}
}
}
TCPServer.java
package Server;
import java.io.IOException;
import java.net.*;
import java.util.ArrayList;
import java.util.UUID;
class TCPServer {
private static int LOCAL_PORT = 3001;
private static ClientListenHandle clientListenHandle;
private static ArrayList<ClientHandler> clientHandlerList = new ArrayList<ClientHandler>();
static void accept() throws IOException {
//創建服務器端套接字
ServerSocket serverSocket = createSocket();
InitSocket(serverSocket);
System.out.println("服務器准備就緒 addr: " + Inet4Address.getLocalHost() + " /port: " + LOCAL_PORT);
System.out.println("開始監聽客戶端連接...");
//創建線程監聽客戶端請求
clientListenHandle = new ClientListenHandle(serverSocket);
clientListenHandle.start();
}
static void stop() throws IOException {
for (ClientHandler clientHandler : clientHandlerList) {
clientHandler.socketClose();
}
clientHandlerList.clear();
clientListenHandle.exit();
}
private static ServerSocket createSocket() throws IOException {
ServerSocket socket = new ServerSocket(LOCAL_PORT, 50);
return socket;
}
private static void InitSocket(ServerSocket socket) throws SocketException {
// 是否復用未完全關閉的地址端口
socket.setReuseAddress(true);
// 等效Socket#setReceiveBufferSize
socket.setReceiveBufferSize(64 * 1024 * 1024);
// 設置serverSocket#accept超時時間,不設置即永久等待
// serverSocket.setSoTimeout(2000);
// 設置性能參數:短鏈接,延遲,帶寬的相對重要性
socket.setPerformancePreferences(1, 1, 1);
}
static void broadcast(String msg) {
for (ClientHandler clientHandler : clientHandlerList) {
clientHandler.write(msg);
}
}
/**
* 監聽客戶端請求的線程
*/
static class ClientListenHandle extends Thread {
private final ServerSocket serverSocket;
private Boolean done = false;
ClientListenHandle(ServerSocket serverSocket) {
this.serverSocket = serverSocket;
}
@Override
public void run() {
super.run();
try {
do {
Socket client;
try {
client = serverSocket.accept();
} catch (Exception e) {
continue;//某一個客戶端連接失敗,要保證其它客戶端能正常連接
}
String uuid = UUID.randomUUID().toString();//為客戶端生成唯一標識
System.out.println("已接受連接client:"+uuid+" /Addr:"+client.getInetAddress()+" /Port:"+client.getPort());
//為該客戶端實例化一個ClientHandler對象,注入對象刪除操作的lambda表達式
ClientHandler clientHandle = new ClientHandler(client, handler -> clientHandlerList.remove(handler), uuid);
clientHandle.read();
clientHandlerList.add(clientHandle);
} while (!done);
} catch (Exception e) {
if (!done) {
System.out.println("異常退出!");
}
}
}
void exit() throws IOException {
done = true;
serverSocket.close();
}
}
}
ClientHandler.java
package Server;
import java.io.*;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ClientHandler {
private final Socket client;
private final ReadHandler readHandler;
private final WriteHandle writeHandler;
private final Removable removable;
private final String uid;
ClientHandler(Socket socket, Removable removable, String uid) throws IOException {
this.client = socket;
this.readHandler = new ReadHandler(socket.getInputStream());
this.writeHandler = new WriteHandle(socket.getOutputStream());
this.removable = removable;
this.uid = uid;
}
void read() {
readHandler.start();
}
void write(String msg) {
System.out.println("Server -->> " + uid + " : " + msg);
writeHandler.write(msg);
}
/**
* 把輸入輸出流和套接字都關閉
*/
void socketClose(){
try {
readHandler.exit();
writeHandler.exit();
client.close();
} catch (IOException e) {
e.printStackTrace();
}finally {
System.out.println("客戶端:"+uid+" 套接字連接已關閉");
}
}
/**
* 把自身從對象列表中清除掉,具體方法是使用lambda表達式來注入的
*/
void removeClientHandler() {
removable.removeClientHandle(this);
}
/**
* 定義一個接口,接收lambda表達式
*/
interface Removable {
void removeClientHandle(ClientHandler clientHandler);
}
/**
* 輸入流操作線程
*/
class ReadHandler extends Thread {
private final InputStream inputStream;
private Boolean flag = true;
ReadHandler(InputStream inputStream) {
this.inputStream = inputStream;
}
@Override
public void run() {
super.run();
BufferedReader socketInput = null;
try {
socketInput = new BufferedReader(new InputStreamReader(inputStream));
do {
String str = socketInput.readLine();
//不知道為什么,客戶端關閉時,這里直接報異常,獲取不到null
if (str.equalsIgnoreCase("exit")) {
System.out.println("已無法讀取客戶端數據!");
throw new Exception();
}
System.out.println(uid + " -->> server : " + str);
} while (flag);
} catch (Exception e) {
if (flag) {
System.out.println("讀取客戶端過程中異常退出");
ClientHandler.this.removeClientHandler();
ClientHandler.this.socketClose();
}
}
}
void exit() throws IOException {
flag = false;
inputStream.close();
}
}
/**
* 輸出流操作線程,使用單例線程池,可以自動等待任務並處理,無需人工添加阻塞操作
*/
class WriteHandle {
private final OutputStream outputStream;
private final ExecutorService executorService;
WriteHandle(OutputStream outputStream) {
this.outputStream = outputStream;
this.executorService = Executors.newSingleThreadExecutor();
}
private void write(String msg){
executorService.execute(new WriteRunnable(msg,outputStream));
}
void exit() throws IOException{
outputStream.close();
executorService.shutdown();
}
class WriteRunnable implements Runnable{
private final String msg;
private final PrintStream printStream;
WriteRunnable(String msg, OutputStream outputStream) {
this.msg = msg;
this.printStream = new PrintStream(outputStream);
}
@Override
public void run() {
try {
printStream.println(msg);
} catch (Exception e) {
System.out.println("打印輸出異常!");
}
}
}
}
}
客戶端
Client.java
package Client;
import java.io.*;
import java.util.UUID;
import Client.bean.ServerInfo;
public class Client {
public static void main(String[] args)throws IOException {
ServerInfo serverInfo = new ServerInfo(UUID.randomUUID().toString(),"127.0.2.16",3001);
System.out.println("准備發起服務器連接...");
System.out.println("服務器信息:Addr:"+serverInfo.getAddress()+" /Port:"+serverInfo.getPort());
try {
TCPClient.connect(serverInfo);
}catch (Exception e){
System.out.println("連接失敗,退出");
}
}
}
TCPClient.java
package Client;
import Client.bean.ServerInfo;
import java.io.*;
import java.net.*;
class TCPClient {
static void connect(ServerInfo serverInfo) throws IOException {
Socket clientSocket = createSocket();//建立套接字
InitSocket(clientSocket);//初始化套接字
//連接遠程服務器
clientSocket.connect(new InetSocketAddress(serverInfo.getAddress(), serverInfo.getPort()), 3000);
System.out.println("已連接server");
try {
//輸入流線程
ReadHandle readHandle = new ReadHandle(clientSocket.getInputStream());
readHandle.start();
//輸出流
write(clientSocket);
//當輸出流結束時,關閉輸入流
readHandle.exit();
} catch (Exception e) {
System.out.println("出現異常!");
} finally {
clientSocket.close();
System.out.println("客戶端結束");
}
}
private static Socket createSocket() throws IOException {
Socket socket = new Socket();
return socket;
}
private static void InitSocket(Socket socket) throws SocketException {
// 設置讀取超時時間為2秒,超過2秒未獲得數據時readline報超時異常;不設置即進行永久等待
//socket.setSoTimeout(2000);
// 是否復用未完全關閉的Socket地址,對於指定bind操作后的套接字有效
socket.setReuseAddress(true);
// 是否開啟Nagle算法
socket.setTcpNoDelay(true);
// 是否需要在長時無數據響應時發送確認數據(類似心跳包),時間大約為2小時
socket.setKeepAlive(true);
// 對於close關閉操作行為進行怎樣的處理;默認為false,0
// false、0:默認情況,關閉時立即返回,底層系統接管輸出流,將緩沖區內的數據發送完成
// true、0:關閉時立即返回,緩沖區數據拋棄,直接發送RST結束命令到對方,並無需經過2MSL等待
// true、200:關閉時最長阻塞200毫秒,隨后按第二情況處理
socket.setSoLinger(true, 20);
// 是否讓緊急數據內斂,默認false;緊急數據通過 socket.sendUrgentData(1);發送
socket.setOOBInline(true);
// 設置接收發送緩沖器大小
socket.setReceiveBufferSize(64 * 1024 * 1024);
socket.setSendBufferSize(64 * 1024 * 1024);
// 設置性能參數:短鏈接,延遲,帶寬的相對重要性
socket.setPerformancePreferences(1, 1, 1);
}
/**
* 輸出流方法
*/
private static void write(Socket socket) throws IOException {
//構建鍵盤輸入流
InputStream in = System.in;
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));
//得到socket輸出流並轉化為打印流
OutputStream outputStream = socket.getOutputStream();
PrintStream printStream = new PrintStream(outputStream);
for(;;){
String str = bufferedReader.readLine();//從鍵盤輸入獲取內容
printStream.println(str);//通過打印流輸出
if(str.equalsIgnoreCase("exit")){
break;
}
}
printStream.close();
System.out.println("輸出流關閉");
}
/**
* 輸入流線程
*/
static class ReadHandle extends Thread {
private final InputStream inputStream;
private Boolean done = false;
ReadHandle(InputStream inputStream) {
this.inputStream = inputStream;
}
@Override
public void run() {
super.run();
try {
//獲取輸入流
BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
do {
String str;
str = socketInput.readLine();
if (str==null) {
break;
}
System.out.println("From server: "+ str);
} while (!done);
} catch (Exception e) {
if (!done) {
System.out.println("異常斷開,或者輸入異常");
}
}
}
void exit() {
done = true;
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}finally {
System.out.println("輸入流關閉");
}
}
}
}
關於代碼的具體分析,由於代碼已有很多注釋,博文中便不再贅述。
運行結果
運行結果如下所示
-
服務端
連接成功后,服務端每隔5秒向各個客戶端發送時間戳信息,同時接收兩個客戶端發來的信息 -
客戶端1
輸入“I am client1”並向服務端發送,同時接收服務端發來的時間戳信息 -
客戶端2
輸入“I am client2”並向服務端發送,同時接收服務端發來的時間戳信息
本篇博客記錄一次實踐學習,使用多線程+socket編程,實現了單服務器與多客戶端之間的數據收發並行,除此之外,通過思維流程圖,整理了代碼的設計思路並展示出來。