基礎版
網上百度了一個簡單的socket服務端和客戶端監聽代碼 並且已經試驗完成。直接上代碼
服務端:
package com.whalecloud.uip.server.socket; import java.io.DataOutputStream; import java.net.ServerSocket; import java.net.Socket; /** * @author lin.hongwen2@iwhalecloud.com * @date 2020/3/10 16:07 */ public class SocketServer { private static final int PORT = 5209; public static void test() { ServerSocket server = null; Socket socket = null; DataOutputStream out = null; try { server = new ServerSocket(PORT); socket = server.accept(); out = new DataOutputStream(socket.getOutputStream()); while (true) { Thread.sleep(1000); out.writeUTF(getRandomStr()); out.flush(); } } catch (Exception e) { e.printStackTrace(); } } private static String getRandomStr() { String str = ""; int ID = (int) (Math.random() * 30); int x = (int) (Math.random() * 200); int y = (int) (Math.random() * 300); int z = (int) (Math.random() * 10); str = "ID:" + ID + "/x:" + x + "/y:" + y + "/z:" + z; return str; } public static void main(String[] args) { test(); } }
客戶端:
package com.whalecloud.uip.server.socket; import java.io.DataInputStream; import java.io.InputStream; import java.net.Socket; /** * @author lin.hongwen2@iwhalecloud.com * @date 2020/3/10 16:10 */ public class SocketClient { private static final String HOST = "127.0.0.1"; private static final int PORT = 5209; private static void test() { Socket socket = null; DataInputStream dis = null; InputStream is = null; try { socket = new Socket(HOST, PORT); is = socket.getInputStream(); dis = new DataInputStream(is); while (true) { System.out.println("receive_msg:" + dis.readUTF()); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { test(); } }
啟動兩個項目就可以在控制台看到接收到的信息了

完整進階版
客戶端:
ClientMain---啟動類
package com.whalecloud.uip.client.socket; import java.text.SimpleDateFormat; import java.util.Date; /** * @author lin.hongwen2@iwhalecloud.com * @date 2020/3/11 16:57 */ public class ClientMain { public static void main(String[] args) { SocketClientResponseInterface socketClientResponseInterface = new SocketClientResponseInterface() { @Override public void onSocketConnect() { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String strDate = df.format(new Date()); System.out.println(strDate + "連接成功"); } @Override public void onSocketReceive(Object socketResult, int code) { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String strDate = df.format(new Date()); System.out.println(strDate+"拿到消息:"+socketResult); } @Override public void onSocketDisable(String msg, int code) { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String strDate = df.format(new Date()); System.out.println(strDate+"斷開連接"); } }; try { SocketClient socketClient = new SocketClient(socketClientResponseInterface); while (true) { socketClient.sendData("客戶端11發送測試數據"); Thread.sleep(20000); } } catch (Exception e) { e.printStackTrace(); } } }
SocketClientResponseInterface---返回信息接口類
package com.whalecloud.uip.client.socket; /** * @author lin.hongwen2@iwhalecloud.com * @date 2020/3/10 16:42 */ public interface SocketClientResponseInterface<T> { /** * 客戶端連接回調 */ void onSocketConnect(); /** * 客戶端收到服務端消息回調 * * @param socketResult * @param code */ void onSocketReceive(T socketResult, int code); /** * 客戶端關閉回調 * * @param msg * @param code */ void onSocketDisable(String msg, int code); }
SocketCloseInterface---連接關閉信息接口類
package com.whalecloud.uip.client.socket; /** * @author lin.hongwen2@iwhalecloud.com * @date 2020/3/10 17:24 */ public interface SocketCloseInterface { /** * 客戶端收到服務端消息回調 */ void onSocketShutdownInput(); /** * 客戶端關閉回調 */ void onSocketDisconnection(); }
SocketClient---客戶端調用類
package com.whalecloud.uip.client.socket; import org.apache.http.util.TextUtils; /** * @author lin.hongwen2@iwhalecloud.com * @date 2020/3/10 17:05 */ public class SocketClient { private static final String TAG = SocketClient.class.getSimpleName(); private SocketClientThread socketClientThread; public SocketClient(SocketClientResponseInterface socketClientResponseInterface) { socketClientThread = new SocketClientThread("socketClientThread", socketClientResponseInterface); socketClientThread.start(); //ThreadPoolUtil.getInstance().addExecuteTask(socketClientThread); } public <T> void sendData(T data) { //convert to string or serialize object String s = (String) data; if (TextUtils.isEmpty(s)) { System.out.print(TAG+"sendData: 消息不能為空"); return; } if (socketClientThread != null) { socketClientThread.sendMsg(s); } } public void stopSocket() { //一定要在子線程內執行關閉socket等IO操作 new Thread(() -> { socketClientThread.setReConnect(false); socketClientThread.stopThread(); }).start(); } }
SocketClientThread---客戶端線程類
package com.whalecloud.uip.client.socket; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import javax.net.SocketFactory; /** * @author lin.hongwen2@iwhalecloud.com * @date 2020/3/10 16:10 * 寫數據采用死循環,沒有數據時wait,有新消息時notify * 連接線程 */ public class SocketClientThread extends Thread implements SocketCloseInterface{ private static final String TAG = SocketClientThread.class.getSimpleName(); private volatile String name; private boolean isLongConnection = true; private boolean isReConnect = true; private SocketSendThread mSocketSendThread; private SocketReceiveThread mSocketReceiveThread; private SocketHeartBeatThread mSocketHeartBeatThread; private Socket mSocket; private boolean isSocketAvailable; private SocketClientResponseInterface socketClientResponseInterface; public SocketClientThread(String name, SocketClientResponseInterface socketClientResponseInterface) { this.name = name; this.socketClientResponseInterface = socketClientResponseInterface; } @Override public void run() { final Thread currentThread = Thread.currentThread(); final String oldName = currentThread.getName(); currentThread.setName("Processing-" + name); try { initSocket(); System.out.print(TAG + "run: SocketClientThread end"); } finally { currentThread.setName(oldName); } } /** * 初始化socket客戶端 */ private void initSocket() { try { mSocket = SocketFactory.getDefault().createSocket(); SocketAddress socketAddress = new InetSocketAddress(SocketUtil.ADDRESS, SocketUtil.PORT); mSocket.connect(socketAddress, 10000); isSocketAvailable = true; //開啟接收線程 mSocketReceiveThread = new SocketReceiveThread("SocketReceiveThread", new BufferedReader(new InputStreamReader(mSocket.getInputStream(), "UTF-8")), socketClientResponseInterface, this); mSocketReceiveThread.start(); //開啟發送線程 PrintWriter printWriter = new PrintWriter(mSocket.getOutputStream(), true); System.out.print(TAG+"initSocket: " + printWriter); mSocketSendThread = new SocketSendThread("SocketSendThread", printWriter); mSocketSendThread.setCloseSendTask(false); mSocketSendThread.start(); //開啟心跳線程 if (isLongConnection) { mSocketHeartBeatThread = new SocketHeartBeatThread("SocketHeartBeatThread", printWriter, mSocket, this); mSocketHeartBeatThread.start(); } if (socketClientResponseInterface != null) { socketClientResponseInterface.onSocketConnect(); } } catch (ConnectException e) { failedMessage("服務器連接異常,請檢查網絡", SocketUtil.FAILED); e.printStackTrace(); stopThread(); } catch (IOException e) { failedMessage("網絡發生異常,請稍后重試", SocketUtil.FAILED); e.printStackTrace(); stopThread(); } } /** * 發送消息 */ public void sendMsg(String data) { if (mSocketSendThread != null) { mSocketSendThread.sendMsg(data); } } /** * 關閉socket客戶端 */ public synchronized void stopThread() { //關閉接收線程 closeReceiveTask(); //喚醒發送線程並關閉 wakeSendTask(); //關閉心跳線程 closeHeartBeatTask(); //關閉socket closeSocket(); //清除數據 clearData(); failedMessage("斷開連接", SocketUtil.FAILED); if (isReConnect) { SocketUtil.toWait(this, 15000); initSocket(); System.out.print(TAG + "stopThread: " + Thread.currentThread().getName()); } } /** * 喚醒后關閉發送線程 */ private void wakeSendTask() { if (mSocketSendThread != null) { mSocketSendThread.wakeSendTask(); } } /** * 關閉接收線程 */ private void closeReceiveTask() { if (mSocketReceiveThread != null) { mSocketReceiveThread.close(); mSocketReceiveThread = null; } } /** * 關閉心跳線程 */ private void closeHeartBeatTask() { if (mSocketHeartBeatThread != null) { mSocketHeartBeatThread.close(); } } /** * 關閉socket */ private void closeSocket() { if (mSocket != null) { if (!mSocket.isClosed() && mSocket.isConnected()) { try { mSocket.close(); } catch (IOException e) { e.printStackTrace(); } } isSocketAvailable = false; mSocket = null; } } /** * 清除數據 */ private void clearData() { if (mSocketSendThread != null) { mSocketSendThread.clearData(); } } /** * 連接失敗回調 */ private void failedMessage(String msg, int code) { if (socketClientResponseInterface != null) { socketClientResponseInterface.onSocketDisable(msg, code); } } @Override public void onSocketShutdownInput() { if (isSocketAvailable) { SocketUtil.inputStreamShutdown(mSocket); } } @Override public void onSocketDisconnection() { isSocketAvailable = false; stopThread(); } /** * 設置是否斷線重連 */ public void setReConnect(boolean reConnect) { isReConnect = reConnect; } }
SocketHeartBeatThread---心跳監聽類
package com.whalecloud.uip.client.socket; import java.io.PrintWriter; import java.net.Socket; /** * @author lin.hongwen2@iwhalecloud.com * @date 2020/3/10 17:01 * 心跳實現,頻率5秒 */ public class SocketHeartBeatThread extends Thread { private static final String TAG = SocketHeartBeatThread.class.getSimpleName(); private volatile String name; private static final int REPEAT_TIME = 120000; private boolean isCancel = false; private final PrintWriter printWriter; private Socket mSocket; private SocketCloseInterface socketCloseInterface; public SocketHeartBeatThread(String name, PrintWriter printWriter, Socket mSocket, SocketCloseInterface socketCloseInterface) { this.name = name; this.printWriter = printWriter; this.mSocket = mSocket; this.socketCloseInterface = socketCloseInterface; } @Override public void run() { final Thread currentThread = Thread.currentThread(); final String oldName = currentThread.getName(); currentThread.setName("Processing-" + name); try { while (!isCancel) { if (!isConnected()) { break; } if (printWriter != null) { synchronized (printWriter) { SocketUtil.write2Stream("ping", printWriter); } } try { Thread.sleep(REPEAT_TIME); } catch (InterruptedException e) { e.printStackTrace(); } } } finally { //循環結束則退出輸入流 if (printWriter != null) { synchronized (printWriter) { SocketUtil.closePrintWriter(printWriter); } } currentThread.setName(oldName); System.out.print(TAG+"SocketHeartBeatThread finish"); } } /** * 判斷本地socket連接狀態 */ private boolean isConnected() { if (mSocket.isClosed() || !mSocket.isConnected() || mSocket.isInputShutdown() || mSocket.isOutputShutdown()) { if (socketCloseInterface != null) { socketCloseInterface.onSocketDisconnection(); } return false; } return true; } public void close() { isCancel = true; if (printWriter != null) { synchronized (printWriter) { SocketUtil.closePrintWriter(printWriter); } } } }
SocketReceiveThread---消息接收類
package com.whalecloud.uip.client.socket; import java.io.BufferedReader; /** * @author lin.hongwen2@iwhalecloud.com * @date 2020/3/10 16:48 * 數據接收線程 */ public class SocketReceiveThread extends Thread { private static final String TAG = SocketReceiveThread.class.getSimpleName(); private volatile String name; private volatile boolean isCancel = false; private BufferedReader bufferedReader; private SocketCloseInterface socketCloseInterface; private SocketClientResponseInterface socketClientResponseInterface; public SocketReceiveThread(String name, BufferedReader bufferedReader, SocketClientResponseInterface socketClientResponseInterface, SocketCloseInterface socketCloseInterface) { this.name = name; this.bufferedReader = bufferedReader; this.socketClientResponseInterface = socketClientResponseInterface; this.socketCloseInterface = socketCloseInterface; } @Override public void run() { final Thread currentThread = Thread.currentThread(); final String oldName = currentThread.getName(); currentThread.setName("Processing-" + name); try { while (!isCancel) { if (bufferedReader != null) { String receiverData = SocketUtil.readFromStream(bufferedReader); if (receiverData != null) { successMessage(receiverData); } else { System.out.print(TAG + "run: receiverData==null"); break; } } } } finally { //循環結束則退出輸入流 SocketUtil.closeBufferedReader(bufferedReader); currentThread.setName(oldName); System.out.print(TAG + "SocketReceiveThread finish"); } } /** * 接收消息回調 */ private void successMessage(String data) { if (socketClientResponseInterface != null) { socketClientResponseInterface.onSocketReceive(data, SocketUtil.SUCCESS); } } public void close() { isCancel = true; this.interrupt(); if (bufferedReader != null) { if (socketCloseInterface != null) { socketCloseInterface.onSocketShutdownInput(); } SocketUtil.closeBufferedReader(bufferedReader); bufferedReader = null; } } }
SocketSendThread---發送線程類
package com.whalecloud.uip.client.socket; import java.io.PrintWriter; import java.util.concurrent.ConcurrentLinkedQueue; /** * @author lin.hongwen2@iwhalecloud.com * @date 2020/3/10 16:59 * 數據發送線程,當沒有發送數據時讓線程等待 */ public class SocketSendThread extends Thread { private static final String TAG = SocketSendThread.class.getSimpleName(); private volatile String name; private volatile boolean isCancel = false; private boolean closeSendTask; private final PrintWriter printWriter; protected volatile ConcurrentLinkedQueue<String> dataQueue = new ConcurrentLinkedQueue<>(); public SocketSendThread(String name, PrintWriter printWriter) { this.name = name; this.printWriter = printWriter; } @Override public void run() { final Thread currentThread = Thread.currentThread(); final String oldName = currentThread.getName(); currentThread.setName("Processing-" + name); try { while (!isCancel) { String dataContent = dataQueue.poll(); if (dataContent == null) { //沒有發送數據則等待 SocketUtil.toWait(dataQueue, 0); if (closeSendTask) { //notify()調用后,並不是馬上就釋放對象鎖的,所以在此處中斷發送線程 close(); } } else if (printWriter != null) { synchronized (printWriter) { SocketUtil.write2Stream(dataContent, printWriter); } } } } finally { //循環結束則退出輸出流 if (printWriter != null) { synchronized (printWriter) { SocketUtil.closePrintWriter(printWriter); } } currentThread.setName(oldName); System.out.print(TAG+"SocketSendThread finish"); } } /** * 發送消息 */ public void sendMsg(String data) { dataQueue.add(data); //有新增待發送數據,則喚醒發送線程 SocketUtil.toNotifyAll(dataQueue); } /** * 清除數據 */ public void clearData() { dataQueue.clear(); } public void close() { isCancel = true; this.interrupt(); if (printWriter != null) { //防止寫數據時停止,寫完再停 synchronized (printWriter) { SocketUtil.closePrintWriter(printWriter); } } } public void wakeSendTask() { closeSendTask = true; SocketUtil.toNotifyAll(dataQueue); } public void setCloseSendTask(boolean closeSendTask) { this.closeSendTask = closeSendTask; } }
SocketUtil---客戶端socket工具類
package com.whalecloud.uip.client.socket; import java.io.BufferedReader; import java.io.IOException; import java.io.PrintWriter; import java.net.Inet6Address; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.Socket; import java.net.SocketException; import java.util.Enumeration; /** * @author lin.hongwen2@iwhalecloud.com * @date 2020/3/10 16:39 */ public class SocketUtil { private static final String TAG = SocketUtil.class.getSimpleName(); public static String ADDRESS = "127.0.0.1"; public static int PORT = 10086; public static final int SUCCESS = 100; public static final int FAILED = -1; /** * 讀數據 * * @param bufferedReader */ public static String readFromStream(BufferedReader bufferedReader) { try { String s; if ((s = bufferedReader.readLine()) != null) { return s; } } catch (IOException e) { e.printStackTrace(); } return null; } /** * 寫數據 * * @param data * @param printWriter */ public static void write2Stream(String data, PrintWriter printWriter) { if (data == null) { return; } if (printWriter != null) { printWriter.println(data); } } /** * 關閉輸入流 * * @param socket */ public static void inputStreamShutdown(Socket socket) { try { if (!socket.isClosed() && !socket.isInputShutdown()) { socket.shutdownInput(); } } catch (IOException e) { e.printStackTrace(); } } /** * 關閉BufferedReader * * @param br */ public static void closeBufferedReader(BufferedReader br) { try { if (br != null) { br.close(); } } catch (IOException e) { e.printStackTrace(); } } /** * 關閉PrintWriter * * @param pw */ public static void closePrintWriter(PrintWriter pw) { if (pw != null) { pw.close(); } } /** * 阻塞線程,millis為0則永久阻塞,知道調用notify() */ public static void toWait(Object o, long millis) { synchronized (o) { try { o.wait(millis); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * notify()調用后,並不是馬上就釋放對象鎖的,而是在相應的synchronized(){}語句塊執行結束,自動釋放鎖后 * * @param o */ public static void toNotifyAll(Object o) { synchronized (o) { o.notifyAll(); } } }
服務端:
ServerMain--服務端測試類
package com.whalecloud.uip.server.socket; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author lin.hongwen2@iwhalecloud.com * @date 2020/3/10 20:05 */ public class ServerMain { private static boolean isStart = true; private static ServerReceiveThread serverReceiveThread; public static void main(String[] args) { ServerSocket serverSocket = null; ExecutorService executorService = Executors.newCachedThreadPool(); System.out.println("服務端 " + SocketUtil.getIP() + " 運行中...\n"); try { serverSocket = new ServerSocket(SocketUtil.PORT); while (isStart) { Socket socket = serverSocket.accept(); //設定輸入流讀取阻塞超時時間(180秒收不到客戶端消息判定斷線) socket.setSoTimeout(180000); serverReceiveThread = new ServerReceiveThread(socket, new SocketServerResponseInterface() { @Override public void clientOffline() {// 對方不在線 System.out.println("offline"); } @Override public void clientOnline(String clientIp) { System.out.println(clientIp + " is online"); System.out.println("-----------------------------------------"); } }); if (socket.isConnected()) { executorService.execute(serverReceiveThread); } } serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } finally { if (serverSocket != null) { try { isStart = false; serverSocket.close(); serverReceiveThread.stop(); } catch (IOException e) { e.printStackTrace(); } } } } }
SocketServerResponseInterface--回調接口類
package com.whalecloud.uip.server.socket; /** * @author lin.hongwen2@iwhalecloud.com * @date 2020/3/10 19:34 */ public interface SocketServerResponseInterface { /** * 客戶端斷線回調 */ void clientOffline(); /** * 客戶端上線回調 * * @param clientIp */ void clientOnline(String clientIp); }
ServerReceiveThread--消息接送類
package com.whalecloud.uip.server.socket; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.Socket; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author lin.hongwen2@iwhalecloud.com * @date 2020/3/10 19:46 */ public class ServerReceiveThread implements Runnable { private ReceiveThread receiveThread; private static ServerSendThread serverSendThread; private Socket socket; private SocketServerResponseInterface socketServerResponseInterface; private volatile ConcurrentLinkedQueue<String> dataQueue = new ConcurrentLinkedQueue<>(); private static ConcurrentHashMap<String, Socket> onLineClient = new ConcurrentHashMap<>(); private long lastReceiveTime = System.currentTimeMillis(); private String userIP; public String getUserIP() { return userIP; } public ServerReceiveThread(Socket socket, SocketServerResponseInterface socketServerResponseInterface) { this.socket = socket; this.socketServerResponseInterface = socketServerResponseInterface; this.userIP = socket.getInetAddress().getHostAddress(); onLineClient.put(userIP, socket); System.out.println("用戶:" + userIP + " 加入了聊天室,當前在線人數:" + onLineClient.size()); } @Override public void run() { try { //開啟接收線程 receiveThread = new ReceiveThread(); receiveThread.bufferedReader = new BufferedReader( new InputStreamReader(socket.getInputStream(), "UTF-8")); receiveThread.start(); } catch (Exception e) { e.printStackTrace(); } } /** * 斷開socket連接 */ public void stop() { try { System.out.println("stop"); if (receiveThread != null) { receiveThread.isCancel = true; receiveThread.interrupt(); if (receiveThread.bufferedReader != null) { SocketUtil.inputStreamShutdown(socket); System.out.println("before closeBufferedReader"); SocketUtil.closeBufferedReader(receiveThread.bufferedReader); System.out.println("after closeBufferedReader"); receiveThread.bufferedReader = null; } receiveThread = null; System.out.println("stop receiveThread"); //停止消息發送線程 serverSendThread.stop(); } onLineClient.remove(userIP); System.out.println("用戶:" + userIP + " 退出,當前在線人數:" + onLineClient.size()); } catch (Exception e) { e.printStackTrace(); } } /** * 獲取已接連的客戶端 */ public Socket getConnectdClient(String clientID) { return onLineClient.get(clientID); } /** * 打印已經連接的客戶端 */ public static void printAllClient() { if (onLineClient == null) { return; } Iterator<String> inter = onLineClient.keySet().iterator(); while (inter.hasNext()) { System.out.println("client:" + inter.next()); } } /** * 阻塞線程,millis為0則永久阻塞,知道調用notify() */ public void toWaitAll(Object o) { synchronized (o) { try { o.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * notify()調用后,並不是馬上就釋放對象鎖的,而是在相應的synchronized(){}語句塊執行結束,自動釋放鎖后 */ public void toNotifyAll(Object obj) { synchronized (obj) { obj.notifyAll(); } } /** * 判斷本地socket連接狀態 */ private boolean isConnected() { if (socket.isClosed() || !socket.isConnected()) { onLineClient.remove(userIP); ServerReceiveThread.this.stop(); System.out.println("socket closed..."); return false; } return true; } /** * 數據接收線程 */ public class ReceiveThread extends Thread { private BufferedReader bufferedReader; private boolean isCancel; @Override public void run() { try { while (!isCancel) { if (!isConnected()) { isCancel = true; break; } String msg = SocketUtil.readFromStream(bufferedReader); if (msg != null) { if ("ping".equals(msg)) { System.out.println("收到心跳包"); lastReceiveTime = System.currentTimeMillis(); socketServerResponseInterface.clientOnline(userIP); } else { msg = "用戶" + userIP + " : " + msg; System.out.println("我是服務端,我收到數據:"+msg); //接收到消息 啟動發消息的線程 this.sendMsg(msg); socketServerResponseInterface.clientOnline(userIP); } } else { System.out.println("client is offline..."); ServerReceiveThread.this.stop(); socketServerResponseInterface.clientOffline(); break; } System.out.println("ReceiveThread"); } SocketUtil.inputStreamShutdown(socket); SocketUtil.closeBufferedReader(bufferedReader); System.out.println("ReceiveThread is finish"); } catch (Exception e) { e.printStackTrace(); } } public void sendMsg(String msg) { //開啟發送線程 這里最好優化成用線程池 ExecutorService executorService = Executors.newCachedThreadPool(); serverSendThread = new ServerSendThread(socket); serverSendThread.addMessage("服務端發送測試數據"); if (socket.isConnected()) { executorService.execute(serverSendThread); } } } }
ServerSendThread--消息發送類
package com.whalecloud.uip.server.socket; import java.io.PrintWriter; import java.net.Socket; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** * @author lin.hongwen2@iwhalecloud.com * @date 2020/3/12 15:12 */ public class ServerSendThread implements Runnable { private SendThread sendThread; private Socket socket; private volatile ConcurrentLinkedQueue<String> dataQueue = new ConcurrentLinkedQueue<>(); private static ConcurrentHashMap<String, Socket> onLineClient = new ConcurrentHashMap<>(); private long lastReceiveTime = System.currentTimeMillis(); private String userIP; public String getUserIP() { return userIP; } public ServerSendThread(Socket socket) { this.socket = socket; this.userIP = socket.getInetAddress().getHostAddress(); } @Override public void run() { try { //開啟發送線程 sendThread = new SendThread(); sendThread.printWriter = new PrintWriter(socket.getOutputStream(), true); sendThread.start(); } catch (Exception e) { e.printStackTrace(); } } /** * 斷開socket連接 */ public void stop() { try { System.out.println("stop"); if (sendThread != null) { sendThread.isCancel = true; toNotifyAll(sendThread); sendThread.interrupt(); if (sendThread.printWriter != null) { //防止寫數據時停止,寫完再停 synchronized (sendThread.printWriter) { SocketUtil.closePrintWriter(sendThread.printWriter); sendThread.printWriter = null; } } sendThread = null; System.out.println("stop sendThread"); } onLineClient.remove(userIP); System.out.println("用戶:" + userIP + " 退出,當前在線人數:" + onLineClient.size()); } catch (Exception e) { e.printStackTrace(); } } /** * 發送消息 */ public void addMessage(String data) { if (!isConnected()) { return; } dataQueue.offer(data); //有新增待發送數據,則喚醒發送線程 toNotifyAll(dataQueue); } /** * 獲取已接連的客戶端 */ public Socket getConnectdClient(String clientID) { return onLineClient.get(clientID); } /** * 打印已經連接的客戶端 */ public static void printAllClient() { if (onLineClient == null) { return; } Iterator<String> inter = onLineClient.keySet().iterator(); while (inter.hasNext()) { System.out.println("client:" + inter.next()); } } /** * 阻塞線程,millis為0則永久阻塞,知道調用notify() */ public void toWaitAll(Object o) { synchronized (o) { try { o.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * notify()調用后,並不是馬上就釋放對象鎖的,而是在相應的synchronized(){}語句塊執行結束,自動釋放鎖后 */ public void toNotifyAll(Object obj) { synchronized (obj) { obj.notifyAll(); } } /** * 判斷本地socket連接狀態 */ private boolean isConnected() { if (socket.isClosed() || !socket.isConnected()) { onLineClient.remove(userIP); ServerSendThread.this.stop(); System.out.println("socket closed..."); return false; } return true; } /** * 數據發送線程,當沒有發送數據時讓線程等待 */ public class SendThread extends Thread { private PrintWriter printWriter; private boolean isCancel; @Override public void run() { try { while (!isCancel) { if (!isConnected()) { isCancel = true; break; } String msg = dataQueue.poll(); if (msg == null) { toWaitAll(dataQueue); } else if (printWriter != null) { synchronized (printWriter) { SocketUtil.write2Stream(msg, printWriter); } } System.out.println("SendThread"); } SocketUtil.outputStreamShutdown(socket); SocketUtil.closePrintWriter(printWriter); System.out.println("SendThread is finish"); } catch (Exception e) { e.printStackTrace(); } } } }
測試后發現服務端可以收到心跳+客戶端發送過來的數據

客戶端發送的信息:

我這里嘗試了啟動兩個client。兩個client發不同的信息。然后server端根據接收到的信息發送不同的值。
測試后發現發送的數據不會socket互串。只會對應的客戶端收到信息


gitLab項目地址: https://github.com/linHongWenGithub/socketLearn
