socket服務端和客戶端互發和心跳檢測實例


基礎版

網上百度了一個簡單的socket服務端和客戶端監聽代碼 並且已經試驗完成。直接上代碼

服務端:

package com.whalecloud.uip.server.socket;

import java.io.DataOutputStream;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 16:07
 */
public class SocketServer {

    private static final int PORT = 5209;


    public static void test() {
        ServerSocket server = null;
        Socket socket = null;
        DataOutputStream out = null;
        try {
            server = new ServerSocket(PORT);
            socket = server.accept();
            out = new DataOutputStream(socket.getOutputStream());
            while (true) {
                Thread.sleep(1000);
                out.writeUTF(getRandomStr());
                out.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    private static String getRandomStr() {
        String str = "";
        int ID = (int) (Math.random() * 30);
        int x = (int) (Math.random() * 200);
        int y = (int) (Math.random() * 300);
        int z = (int) (Math.random() * 10);
        str = "ID:" + ID + "/x:" + x + "/y:" + y + "/z:" + z;
        return str;
    }


    public static void main(String[] args) {
        test();
    }
}

客戶端:

package com.whalecloud.uip.server.socket;

import java.io.DataInputStream;
import java.io.InputStream;
import java.net.Socket;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 16:10
 */
public class SocketClient {
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 5209;

    private static void test() {
        Socket socket = null;
        DataInputStream dis = null;
        InputStream is = null;

        try {
            socket = new Socket(HOST, PORT);
            is = socket.getInputStream();
            dis = new DataInputStream(is);
            while (true) {
                System.out.println("receive_msg:" + dis.readUTF());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        test();
    }
}

 

啟動兩個項目就可以在控制台看到接收到的信息了

 

 

 

 

完整進階版

客戶端:

ClientMain---啟動類
package com.whalecloud.uip.client.socket;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/11 16:57
 */
public class ClientMain {

    public static void main(String[] args) {
        SocketClientResponseInterface socketClientResponseInterface = new SocketClientResponseInterface() {
            @Override
            public void onSocketConnect() {
                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String strDate = df.format(new Date());
                System.out.println(strDate + "連接成功");
            }

            @Override
            public void onSocketReceive(Object socketResult, int code) {
                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String strDate = df.format(new Date());
                System.out.println(strDate+"拿到消息:"+socketResult);
            }

            @Override
            public void onSocketDisable(String msg, int code) {
                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String strDate = df.format(new Date());
                System.out.println(strDate+"斷開連接");
            }
        };
        try {
            SocketClient socketClient = new SocketClient(socketClientResponseInterface);
            while (true) {
                socketClient.sendData("客戶端11發送測試數據");
                Thread.sleep(20000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
View Code

SocketClientResponseInterface---返回信息接口類
package com.whalecloud.uip.client.socket;


/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 16:42
 */
public interface SocketClientResponseInterface<T> {
    /**
     * 客戶端連接回調
     */
    void onSocketConnect();

    /**
     * 客戶端收到服務端消息回調
     *
     * @param socketResult
     * @param code
     */
    void onSocketReceive(T socketResult, int code);

    /**
     * 客戶端關閉回調
     *
     * @param msg
     * @param code
     */
    void onSocketDisable(String msg, int code);
}
View Code

SocketCloseInterface---連接關閉信息接口類
package com.whalecloud.uip.client.socket;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 17:24
 */
public interface SocketCloseInterface {

    /**
     * 客戶端收到服務端消息回調
     */
    void onSocketShutdownInput();

    /**
     * 客戶端關閉回調
     */
    void onSocketDisconnection();
}
View Code

SocketClient---客戶端調用類
package com.whalecloud.uip.client.socket;

import org.apache.http.util.TextUtils;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 17:05
 */
public class SocketClient {

    private static final String TAG = SocketClient.class.getSimpleName();

    private SocketClientThread socketClientThread;

    public SocketClient(SocketClientResponseInterface socketClientResponseInterface) {
        socketClientThread = new SocketClientThread("socketClientThread", socketClientResponseInterface);
        socketClientThread.start();
        //ThreadPoolUtil.getInstance().addExecuteTask(socketClientThread);
    }

    public <T> void sendData(T data) {
        //convert to string or serialize object
        String s = (String) data;
        if (TextUtils.isEmpty(s)) {
            System.out.print(TAG+"sendData: 消息不能為空");
            return;
        }
        if (socketClientThread != null) {
            socketClientThread.sendMsg(s);
        }
    }

    public void stopSocket() {
        //一定要在子線程內執行關閉socket等IO操作
        new Thread(() -> {
            socketClientThread.setReConnect(false);
            socketClientThread.stopThread();
        }).start();
    }
}
View Code

 

SocketClientThread---客戶端線程類
package com.whalecloud.uip.client.socket;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;

import javax.net.SocketFactory;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 16:10
 * 寫數據采用死循環,沒有數據時wait,有新消息時notify
 * 連接線程
 */
public class SocketClientThread extends Thread implements SocketCloseInterface{

    private static final String TAG = SocketClientThread.class.getSimpleName();

    private volatile String name;

    private boolean isLongConnection = true;
    private boolean isReConnect = true;
    private SocketSendThread mSocketSendThread;
    private SocketReceiveThread mSocketReceiveThread;
    private SocketHeartBeatThread mSocketHeartBeatThread;
    private Socket mSocket;

    private boolean isSocketAvailable;

    private SocketClientResponseInterface socketClientResponseInterface;

    public SocketClientThread(String name, SocketClientResponseInterface socketClientResponseInterface) {
        this.name = name;
        this.socketClientResponseInterface = socketClientResponseInterface;
    }

    @Override
    public void run() {
        final Thread currentThread = Thread.currentThread();
        final String oldName = currentThread.getName();
        currentThread.setName("Processing-" + name);
        try {
            initSocket();
            System.out.print(TAG + "run: SocketClientThread end");
        } finally {
            currentThread.setName(oldName);
        }
    }

    /**
     * 初始化socket客戶端
     */
    private void initSocket() {
        try {
            mSocket = SocketFactory.getDefault().createSocket();
            SocketAddress socketAddress = new InetSocketAddress(SocketUtil.ADDRESS, SocketUtil.PORT);
            mSocket.connect(socketAddress, 10000);

            isSocketAvailable = true;

            //開啟接收線程
            mSocketReceiveThread = new SocketReceiveThread("SocketReceiveThread",
                new BufferedReader(new InputStreamReader(mSocket.getInputStream(), "UTF-8")),
                socketClientResponseInterface, this);
            mSocketReceiveThread.start();

            //開啟發送線程
            PrintWriter printWriter = new PrintWriter(mSocket.getOutputStream(), true);
            System.out.print(TAG+"initSocket: " + printWriter);
            mSocketSendThread = new SocketSendThread("SocketSendThread", printWriter);
            mSocketSendThread.setCloseSendTask(false);
            mSocketSendThread.start();

            //開啟心跳線程
            if (isLongConnection) {
                mSocketHeartBeatThread = new SocketHeartBeatThread("SocketHeartBeatThread",
                    printWriter, mSocket, this);
                mSocketHeartBeatThread.start();
            }

            if (socketClientResponseInterface != null) {
                socketClientResponseInterface.onSocketConnect();
            }
        } catch (ConnectException e) {
            failedMessage("服務器連接異常,請檢查網絡", SocketUtil.FAILED);
            e.printStackTrace();
            stopThread();
        } catch (IOException e) {
            failedMessage("網絡發生異常,請稍后重試", SocketUtil.FAILED);
            e.printStackTrace();
            stopThread();
        }
    }

    /**
     * 發送消息
     */
    public void sendMsg(String data) {
        if (mSocketSendThread != null) {
            mSocketSendThread.sendMsg(data);
        }
    }

    /**
     * 關閉socket客戶端
     */
    public synchronized void stopThread() {
        //關閉接收線程
        closeReceiveTask();
        //喚醒發送線程並關閉
        wakeSendTask();
        //關閉心跳線程
        closeHeartBeatTask();
        //關閉socket
        closeSocket();
        //清除數據
        clearData();
        failedMessage("斷開連接", SocketUtil.FAILED);
        if (isReConnect) {
            SocketUtil.toWait(this, 15000);
            initSocket();
            System.out.print(TAG + "stopThread: " + Thread.currentThread().getName());
        }
    }

    /**
     * 喚醒后關閉發送線程
     */
    private void wakeSendTask() {
        if (mSocketSendThread != null) {
            mSocketSendThread.wakeSendTask();
        }
    }

    /**
     * 關閉接收線程
     */
    private void closeReceiveTask() {
        if (mSocketReceiveThread != null) {
            mSocketReceiveThread.close();
            mSocketReceiveThread = null;
        }
    }

    /**
     * 關閉心跳線程
     */
    private void closeHeartBeatTask() {
        if (mSocketHeartBeatThread != null) {
            mSocketHeartBeatThread.close();
        }
    }

    /**
     * 關閉socket
     */
    private void closeSocket() {
        if (mSocket != null) {
            if (!mSocket.isClosed() && mSocket.isConnected()) {
                try {
                    mSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            isSocketAvailable = false;
            mSocket = null;
        }
    }

    /**
     * 清除數據
     */
    private void clearData() {
        if (mSocketSendThread != null) {
            mSocketSendThread.clearData();
        }
    }

    /**
     * 連接失敗回調
     */
    private void failedMessage(String msg, int code) {
        if (socketClientResponseInterface != null) {
            socketClientResponseInterface.onSocketDisable(msg, code);
        }
    }

    @Override
    public void onSocketShutdownInput() {
        if (isSocketAvailable) {
            SocketUtil.inputStreamShutdown(mSocket);
        }
    }

    @Override
    public void onSocketDisconnection() {
        isSocketAvailable = false;
        stopThread();
    }

    /**
     * 設置是否斷線重連
     */
    public void setReConnect(boolean reConnect) {
        isReConnect = reConnect;
    }

}
View Code

 

SocketHeartBeatThread---心跳監聽類
package com.whalecloud.uip.client.socket;

import java.io.PrintWriter;
import java.net.Socket;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 17:01
 * 心跳實現,頻率5秒
 */
public class SocketHeartBeatThread extends Thread {

    private static final String TAG = SocketHeartBeatThread.class.getSimpleName();

    private volatile String name;

    private static final int REPEAT_TIME = 120000;
    private boolean isCancel = false;
    private final PrintWriter printWriter;
    private Socket mSocket;

    private SocketCloseInterface socketCloseInterface;

    public SocketHeartBeatThread(String name, PrintWriter printWriter,
                                 Socket mSocket, SocketCloseInterface socketCloseInterface) {
        this.name = name;
        this.printWriter = printWriter;
        this.mSocket = mSocket;
        this.socketCloseInterface = socketCloseInterface;
    }

    @Override
    public void run() {
        final Thread currentThread = Thread.currentThread();
        final String oldName = currentThread.getName();
        currentThread.setName("Processing-" + name);
        try {
            while (!isCancel) {
                if (!isConnected()) {
                    break;
                }

                if (printWriter != null) {
                    synchronized (printWriter) {
                        SocketUtil.write2Stream("ping", printWriter);
                    }
                }
                try {
                    Thread.sleep(REPEAT_TIME);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            //循環結束則退出輸入流
            if (printWriter != null) {
                synchronized (printWriter) {
                    SocketUtil.closePrintWriter(printWriter);
                }
            }
            currentThread.setName(oldName);
            System.out.print(TAG+"SocketHeartBeatThread finish");
        }
    }

    /**
     * 判斷本地socket連接狀態
     */
    private boolean isConnected() {
        if (mSocket.isClosed() || !mSocket.isConnected() ||
            mSocket.isInputShutdown() || mSocket.isOutputShutdown()) {
            if (socketCloseInterface != null) {
                socketCloseInterface.onSocketDisconnection();
            }
            return false;
        }
        return true;
    }

    public void close() {
        isCancel = true;
        if (printWriter != null) {
            synchronized (printWriter) {
                SocketUtil.closePrintWriter(printWriter);
            }
        }
    }

}
View Code

 

SocketReceiveThread---消息接收類
package com.whalecloud.uip.client.socket;

import java.io.BufferedReader;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 16:48
 * 數據接收線程
 */
public class SocketReceiveThread extends Thread {
    private static final String TAG = SocketReceiveThread.class.getSimpleName();

    private volatile String name;

    private volatile boolean isCancel = false;

    private BufferedReader bufferedReader;

    private SocketCloseInterface socketCloseInterface;

    private SocketClientResponseInterface socketClientResponseInterface;

    public SocketReceiveThread(String name, BufferedReader bufferedReader,
                               SocketClientResponseInterface socketClientResponseInterface,
                               SocketCloseInterface socketCloseInterface) {
        this.name = name;
        this.bufferedReader = bufferedReader;
        this.socketClientResponseInterface = socketClientResponseInterface;
        this.socketCloseInterface = socketCloseInterface;
    }

    @Override
    public void run() {
        final Thread currentThread = Thread.currentThread();
        final String oldName = currentThread.getName();
        currentThread.setName("Processing-" + name);
        try {
            while (!isCancel) {

                if (bufferedReader != null) {
                    String receiverData = SocketUtil.readFromStream(bufferedReader);
                    if (receiverData != null) {
                        successMessage(receiverData);
                    } else {
                        System.out.print(TAG + "run: receiverData==null");
                        break;
                    }
                }
            }
        } finally {
            //循環結束則退出輸入流
            SocketUtil.closeBufferedReader(bufferedReader);
            currentThread.setName(oldName);
            System.out.print(TAG + "SocketReceiveThread finish");
        }
    }

    /**
     * 接收消息回調
     */
    private void successMessage(String data) {
        if (socketClientResponseInterface != null) {
            socketClientResponseInterface.onSocketReceive(data, SocketUtil.SUCCESS);
        }
    }

    public void close() {
        isCancel = true;
        this.interrupt();
        if (bufferedReader != null) {
            if (socketCloseInterface != null) {
                socketCloseInterface.onSocketShutdownInput();
            }
            SocketUtil.closeBufferedReader(bufferedReader);
            bufferedReader = null;
        }
    }
}
View Code

 

SocketSendThread---發送線程類
package com.whalecloud.uip.client.socket;

import java.io.PrintWriter;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 16:59
 * 數據發送線程,當沒有發送數據時讓線程等待
 */
public class SocketSendThread extends Thread {
    private static final String TAG = SocketSendThread.class.getSimpleName();

    private volatile String name;

    private volatile boolean isCancel = false;
    private boolean closeSendTask;
    private final PrintWriter printWriter;

    protected volatile ConcurrentLinkedQueue<String> dataQueue = new ConcurrentLinkedQueue<>();

    public SocketSendThread(String name, PrintWriter printWriter) {
        this.name = name;
        this.printWriter = printWriter;
    }

    @Override
    public void run() {
        final Thread currentThread = Thread.currentThread();
        final String oldName = currentThread.getName();
        currentThread.setName("Processing-" + name);
        try {
            while (!isCancel) {

                String dataContent = dataQueue.poll();
                if (dataContent == null) {
                    //沒有發送數據則等待
                    SocketUtil.toWait(dataQueue, 0);
                    if (closeSendTask) {
                        //notify()調用后,並不是馬上就釋放對象鎖的,所以在此處中斷發送線程
                        close();
                    }
                } else if (printWriter != null) {
                    synchronized (printWriter) {
                        SocketUtil.write2Stream(dataContent, printWriter);
                    }
                }
            }
        } finally {
            //循環結束則退出輸出流
            if (printWriter != null) {
                synchronized (printWriter) {
                    SocketUtil.closePrintWriter(printWriter);
                }
            }
            currentThread.setName(oldName);
            System.out.print(TAG+"SocketSendThread finish");
        }
    }

    /**
     * 發送消息
     */
    public void sendMsg(String data) {
        dataQueue.add(data);
        //有新增待發送數據,則喚醒發送線程
        SocketUtil.toNotifyAll(dataQueue);
    }

    /**
     * 清除數據
     */
    public void clearData() {
        dataQueue.clear();
    }

    public void close() {
        isCancel = true;
        this.interrupt();
        if (printWriter != null) {
            //防止寫數據時停止,寫完再停
            synchronized (printWriter) {
                SocketUtil.closePrintWriter(printWriter);
            }
        }
    }

    public void wakeSendTask() {
        closeSendTask = true;
        SocketUtil.toNotifyAll(dataQueue);
    }

    public void setCloseSendTask(boolean closeSendTask) {
        this.closeSendTask = closeSendTask;
    }
}
View Code

 

SocketUtil---客戶端socket工具類
package com.whalecloud.uip.client.socket;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.Socket;
import java.net.SocketException;
import java.util.Enumeration;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 16:39
 */
public class SocketUtil {
    private static final String TAG = SocketUtil.class.getSimpleName();
    public static String ADDRESS = "127.0.0.1";
    public static int PORT = 10086;

    public static final int SUCCESS = 100;
    public static final int FAILED = -1;

    /**
     * 讀數據
     *
     * @param bufferedReader
     */
    public static String readFromStream(BufferedReader bufferedReader) {
        try {
            String s;
            if ((s = bufferedReader.readLine()) != null) {
                return s;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 寫數據
     *
     * @param data
     * @param printWriter
     */
    public static void write2Stream(String data, PrintWriter printWriter) {
        if (data == null) {
            return;
        }
        if (printWriter != null) {
            printWriter.println(data);
        }
    }


    /**
     * 關閉輸入流
     *
     * @param socket
     */
    public static void inputStreamShutdown(Socket socket) {
        try {
            if (!socket.isClosed() && !socket.isInputShutdown()) {
                socket.shutdownInput();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 關閉BufferedReader
     *
     * @param br
     */
    public static void closeBufferedReader(BufferedReader br) {
        try {
            if (br != null) {
                br.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 關閉PrintWriter
     *
     * @param pw
     */
    public static void closePrintWriter(PrintWriter pw) {
        if (pw != null) {
            pw.close();
        }
    }

    /**
     * 阻塞線程,millis為0則永久阻塞,知道調用notify()
     */
    public static void toWait(Object o, long millis) {
        synchronized (o) {
            try {
                o.wait(millis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * notify()調用后,並不是馬上就釋放對象鎖的,而是在相應的synchronized(){}語句塊執行結束,自動釋放鎖后
     *
     * @param o
     */
    public static void toNotifyAll(Object o) {
        synchronized (o) {
            o.notifyAll();
        }
    }

}
View Code

 

 

服務端:

ServerMain--服務端測試類
package com.whalecloud.uip.server.socket;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 20:05
 */
public class ServerMain {

    private static boolean isStart = true;
    private static ServerReceiveThread serverReceiveThread;

    public static void main(String[] args) {
        ServerSocket serverSocket = null;
        ExecutorService executorService = Executors.newCachedThreadPool();
        System.out.println("服務端 " + SocketUtil.getIP() + " 運行中...\n");
        try {
            serverSocket = new ServerSocket(SocketUtil.PORT);
            while (isStart) {
                Socket socket = serverSocket.accept();

                //設定輸入流讀取阻塞超時時間(180秒收不到客戶端消息判定斷線)
                socket.setSoTimeout(180000);
                serverReceiveThread = new ServerReceiveThread(socket,
                    new SocketServerResponseInterface() {
                        @Override
                        public void clientOffline() {// 對方不在線
                            System.out.println("offline");
                        }

                        @Override
                        public void clientOnline(String clientIp) {
                            System.out.println(clientIp + " is online");
                            System.out.println("-----------------------------------------");
                        }
                    });

                if (socket.isConnected()) {
                    executorService.execute(serverReceiveThread);
                }
            }

            serverSocket.close();

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (serverSocket != null) {
                try {
                    isStart = false;
                    serverSocket.close();
                    serverReceiveThread.stop();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
View Code
 
        
SocketServerResponseInterface--回調接口類
package com.whalecloud.uip.server.socket;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 19:34
 */
public interface SocketServerResponseInterface {
    /**
     * 客戶端斷線回調
     */
    void clientOffline();

    /**
     * 客戶端上線回調
     *
     * @param clientIp
     */
    void clientOnline(String clientIp);
}
View Code


ServerReceiveThread--消息接送類
package com.whalecloud.uip.server.socket;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/10 19:46
 */
public class ServerReceiveThread implements Runnable {
    private ReceiveThread receiveThread;
    private static ServerSendThread serverSendThread;
    private Socket socket;
    private SocketServerResponseInterface socketServerResponseInterface;

    private volatile ConcurrentLinkedQueue<String> dataQueue = new ConcurrentLinkedQueue<>();
    private static ConcurrentHashMap<String, Socket> onLineClient = new ConcurrentHashMap<>();

    private long lastReceiveTime = System.currentTimeMillis();

    private String userIP;

    public String getUserIP() {
        return userIP;
    }

    public ServerReceiveThread(Socket socket, SocketServerResponseInterface socketServerResponseInterface) {
        this.socket = socket;
        this.socketServerResponseInterface = socketServerResponseInterface;
        this.userIP = socket.getInetAddress().getHostAddress();
        onLineClient.put(userIP, socket);
        System.out.println("用戶:" + userIP
            + " 加入了聊天室,當前在線人數:" + onLineClient.size());
    }

    @Override
    public void run() {
        try {
            //開啟接收線程
            receiveThread = new ReceiveThread();
            receiveThread.bufferedReader = new BufferedReader(
                new InputStreamReader(socket.getInputStream(), "UTF-8"));
            receiveThread.start();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 斷開socket連接
     */
    public void stop() {
        try {
            System.out.println("stop");
            if (receiveThread != null) {
                receiveThread.isCancel = true;
                receiveThread.interrupt();
                if (receiveThread.bufferedReader != null) {
                    SocketUtil.inputStreamShutdown(socket);
                    System.out.println("before closeBufferedReader");
                    SocketUtil.closeBufferedReader(receiveThread.bufferedReader);
                    System.out.println("after closeBufferedReader");
                    receiveThread.bufferedReader = null;
                }
                receiveThread = null;
                System.out.println("stop receiveThread");
                //停止消息發送線程
                serverSendThread.stop();
            }
            onLineClient.remove(userIP);
            System.out.println("用戶:" + userIP
                + " 退出,當前在線人數:" + onLineClient.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 獲取已接連的客戶端
     */
    public Socket getConnectdClient(String clientID) {
        return onLineClient.get(clientID);
    }

    /**
     * 打印已經連接的客戶端
     */
    public static void printAllClient() {
        if (onLineClient == null) {
            return;
        }
        Iterator<String> inter = onLineClient.keySet().iterator();
        while (inter.hasNext()) {
            System.out.println("client:" + inter.next());
        }
    }

    /**
     * 阻塞線程,millis為0則永久阻塞,知道調用notify()
     */
    public void toWaitAll(Object o) {
        synchronized (o) {
            try {
                o.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * notify()調用后,並不是馬上就釋放對象鎖的,而是在相應的synchronized(){}語句塊執行結束,自動釋放鎖后
     */
    public void toNotifyAll(Object obj) {
        synchronized (obj) {
            obj.notifyAll();
        }
    }

    /**
     * 判斷本地socket連接狀態
     */
    private boolean isConnected() {
        if (socket.isClosed() || !socket.isConnected()) {
            onLineClient.remove(userIP);
            ServerReceiveThread.this.stop();
            System.out.println("socket closed...");
            return false;
        }
        return true;
    }

    /**
     * 數據接收線程
     */
    public class ReceiveThread extends Thread {

        private BufferedReader bufferedReader;
        private boolean isCancel;

        @Override
        public void run() {
            try {
                while (!isCancel) {
                    if (!isConnected()) {
                        isCancel = true;
                        break;
                    }

                    String msg = SocketUtil.readFromStream(bufferedReader);
                    if (msg != null) {
                        if ("ping".equals(msg)) {
                            System.out.println("收到心跳包");
                            lastReceiveTime = System.currentTimeMillis();
                            socketServerResponseInterface.clientOnline(userIP);
                        } else {
                            msg = "用戶" + userIP + " : " + msg;
                            System.out.println("我是服務端,我收到數據:"+msg);
                            //接收到消息 啟動發消息的線程
                            this.sendMsg(msg);
                            socketServerResponseInterface.clientOnline(userIP);
                        }
                    } else {
                        System.out.println("client is offline...");
                        ServerReceiveThread.this.stop();
                        socketServerResponseInterface.clientOffline();
                        break;
                    }
                    System.out.println("ReceiveThread");
                }

                SocketUtil.inputStreamShutdown(socket);
                SocketUtil.closeBufferedReader(bufferedReader);
                System.out.println("ReceiveThread is finish");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void sendMsg(String msg) {
            //開啟發送線程  這里最好優化成用線程池
            ExecutorService executorService = Executors.newCachedThreadPool();
            serverSendThread = new ServerSendThread(socket);
            serverSendThread.addMessage("服務端發送測試數據");

            if (socket.isConnected()) {
                executorService.execute(serverSendThread);
            }
        }
    }



}
View Code

 

ServerSendThread--消息發送類
package com.whalecloud.uip.server.socket;

import java.io.PrintWriter;
import java.net.Socket;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * @author lin.hongwen2@iwhalecloud.com
 * @date 2020/3/12 15:12
 */
public class ServerSendThread implements Runnable  {
    private SendThread sendThread;
    private Socket socket;

    private volatile ConcurrentLinkedQueue<String> dataQueue = new ConcurrentLinkedQueue<>();
    private static ConcurrentHashMap<String, Socket> onLineClient = new ConcurrentHashMap<>();

    private long lastReceiveTime = System.currentTimeMillis();

    private String userIP;

    public String getUserIP() {
        return userIP;
    }

    public ServerSendThread(Socket socket) {
        this.socket = socket;
        this.userIP = socket.getInetAddress().getHostAddress();
    }

    @Override
    public void run() {
        try {
            //開啟發送線程
            sendThread = new SendThread();
            sendThread.printWriter = new PrintWriter(socket.getOutputStream(), true);
            sendThread.start();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 斷開socket連接
     */
    public void stop() {
        try {
            System.out.println("stop");
            if (sendThread != null) {
                sendThread.isCancel = true;
                toNotifyAll(sendThread);
                sendThread.interrupt();
                if (sendThread.printWriter != null) {
                    //防止寫數據時停止,寫完再停
                    synchronized (sendThread.printWriter) {
                        SocketUtil.closePrintWriter(sendThread.printWriter);
                        sendThread.printWriter = null;
                    }
                }
                sendThread = null;
                System.out.println("stop sendThread");
            }
            onLineClient.remove(userIP);
            System.out.println("用戶:" + userIP
                + " 退出,當前在線人數:" + onLineClient.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 發送消息
     */
    public void addMessage(String data) {
        if (!isConnected()) {
            return;
        }

        dataQueue.offer(data);
        //有新增待發送數據,則喚醒發送線程
        toNotifyAll(dataQueue);
    }

    /**
     * 獲取已接連的客戶端
     */
    public Socket getConnectdClient(String clientID) {
        return onLineClient.get(clientID);
    }

    /**
     * 打印已經連接的客戶端
     */
    public static void printAllClient() {
        if (onLineClient == null) {
            return;
        }
        Iterator<String> inter = onLineClient.keySet().iterator();
        while (inter.hasNext()) {
            System.out.println("client:" + inter.next());
        }
    }

    /**
     * 阻塞線程,millis為0則永久阻塞,知道調用notify()
     */
    public void toWaitAll(Object o) {
        synchronized (o) {
            try {
                o.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * notify()調用后,並不是馬上就釋放對象鎖的,而是在相應的synchronized(){}語句塊執行結束,自動釋放鎖后
     */
    public void toNotifyAll(Object obj) {
        synchronized (obj) {
            obj.notifyAll();
        }
    }

    /**
     * 判斷本地socket連接狀態
     */
    private boolean isConnected() {
        if (socket.isClosed() || !socket.isConnected()) {
            onLineClient.remove(userIP);
            ServerSendThread.this.stop();
            System.out.println("socket closed...");
            return false;
        }
        return true;
    }

    /**
     * 數據發送線程,當沒有發送數據時讓線程等待
     */
    public class SendThread extends Thread {

        private PrintWriter printWriter;
        private boolean isCancel;

        @Override
        public void run() {
            try {
                while (!isCancel) {
                    if (!isConnected()) {
                        isCancel = true;
                        break;
                    }

                    String msg = dataQueue.poll();
                    if (msg == null) {
                        toWaitAll(dataQueue);
                    } else if (printWriter != null) {
                        synchronized (printWriter) {
                            SocketUtil.write2Stream(msg, printWriter);
                        }
                    }
                    System.out.println("SendThread");
                }

                SocketUtil.outputStreamShutdown(socket);
                SocketUtil.closePrintWriter(printWriter);
                System.out.println("SendThread is finish");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
View Code

 

 

 
        

測試后發現服務端可以收到心跳+客戶端發送過來的數據

 

客戶端發送的信息:

 

 我這里嘗試了啟動兩個client。兩個client發不同的信息。然后server端根據接收到的信息發送不同的值。

測試后發現發送的數據不會socket互串。只會對應的客戶端收到信息

 

gitLab項目地址:  https://github.com/linHongWenGithub/socketLearn


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM