使用UDP實現TCP協議 代碼示例


前幾天在群里看到這樣一個圖片,引起了我的興趣:如果要用UDP實現類似TCP的可靠傳輸,一般需要手工實現的機制有那些?接下來我就以我的理解來討論一下這個問題。

那么先說結論吧:

  • 1、添加seq/ack機制,確保數據發送到對端
  • 2、添加發送和接收緩沖區,主要是用戶超時重傳。
  • 3、添加超時重傳機制。

TCP詳解

我們都知道TCP是面向連接的,進行可靠性傳輸的協議。接下來我們來看看TCP使用哪些技術來實現可靠的傳輸協議呢?

三次握手四次揮手

所謂三次握手,就是在建立TCP連接時,需要客戶端和服務端總共發送3個包確認連接的建立。在socket編程中,這一過程有客戶端執行connect觸發。整個流程如下:

簡單來說,就是:

  1. 建立連接時,客戶端發送SYN包(SYN=i)到服務器,並進入到SYS_SENT狀態,等待服務器確認
  2. 服務器收到SYN包,必須確認客戶的SYN(ack=i+1),同時自己也發送一個SYN包(SYN=K),即SYN+ACK包,此時服務器進入SYN_RECV狀態
  3. 客戶端收到服務器的SYN+ACK包,想服務器發送確認報ACK(ack=K+1),此時發送完畢,客戶端和服務端進入ESTABLISHED狀態,完成三次握手,客戶端與服務端開始傳送數據。

通俗的說,加入你老大要跟你說你的程序有bug,給你打電話的過程如下:

老大:你聽的到嗎?

你:我聽的到,你聽得到嗎?

老大:嗯,我也聽得到。

確認雙方都能聽得到對方的講話,開始講正事。。。。

所謂四次揮手,即斷開TCP連接時,需要客戶端和服務端總共發送4個包以確認連接的斷開,在socket編程中,這一過程由客戶端或者服務端任一方執行close來觸發。由於TCP是雙全工的,因此每個方向都必須要單獨關閉,這一原則是當一方完成數據發送后,發送一個FIN來終止這一方的連接,收到一個FIN只是意味着這一方向上沒有數據流動了,即不會再發送數據了,但是在這個TCP連接上仍然能夠發送數據,直到靈一方向也發送了FIN。首先進行關閉的一方將執行主動關閉,而另一方則執行被動關閉。整個流程如下:

簡單來說,就是:

  1. Client發送一個FIN,用來關閉Client到Server的數據傳送,Client進入FIN_WAIT_1狀態。
  2. Server收到FIN后,發送一個ACK給Client,確認序號為收到序號+1(與SYN相同,一個FIN占用一個序號),Server進入CLOSE_WAIT狀態。
  3. Server發送一個FIN,用來關閉Server到Client的數據傳送,Server進入LAST_ACK狀態。
  4. Client收到FIN后,Client進入TIME_WAIT狀態,接着發送一個ACK給Server,確認序號為收到序號+1,Server進入CLOSED狀態,完成四次揮手。

TCP可靠性傳輸的幾種實現機制

1.確認應答(ACK)機制( 即上面講到的三次握手,四次揮手)

TCP將每個字節的數據都進行了編號,即為序列號(seq),確認序號=序號+1,每個ACK都有對應的確認序列號,意思是告訴發送者已經收到了數據,下一個數據應該從哪里發送

2.超時重傳機制(兩種情況)

  1. 如果主機A發送給主機B的報文,主機B在規定的時間內沒有及時收到主機A發送的報文,我們可以認為是ACK丟了,這時就需要觸發超時重傳機制。
  2. 如果主機A未收到B發來的確認應答,也可能是因為ACK丟了。因此主機B會收到很多重復的數據,那么,TCP協議需要能夠識別出那些包是重復的包,並且把重復的包丟棄,這時候我們可以用前面提到的序列號,很容易做到去重的效果

3.滑動窗口實現流量控制

所謂流量控制,就是讓發送方的發送速率不要太快,要讓對方來的及接受。

當發送一次數據,等到確認應答時才可以發送下一個數據段,這樣的效率會很低,我們利用滑動窗口,無需等待確認應答而可以繼續發送數據的最大值;收到第一個ACK后,滑動窗口向后移;操作系統為了維護這個滑動窗口,需要開辟發送緩沖區來記錄當前還有那些數據沒有應答,只有確認應答過的數據,才能從緩沖區刪除掉;窗口越大,網絡的吞吐率越高TCP為每一個連接設有一個持續計時器,只要TCP連接的一方收到對方的零窗口通知,就啟動持續計時器,若持續計時器設置的時間到期,就發送一個零窗口探測報文段(僅攜帶一個字節的數據),對方就在確認這個探測報文段時給出了現在的窗口 值,如果仍然是0,那么收到這個報文段的一方就重新設置持續計時器;如果窗口不是0,那么死鎖的僵局就可以打破

在這傳輸過程中,出現了丟包的情況,這里就不做解說了。

4.擁塞控制

雖然有了滑動窗口機制,如果一開始就發送大量數據,很有可能引發很多問題。所以TCP加入慢啟動機制,先發少量的數據探探路,看看當前網絡的擁塞狀態,再決定按照多大的速率進行傳送,剛開始時,定義擁塞窗口的大小為1,每次接收到一個ACK應答,擁塞窗口值加1,每次發送數據包的時候,將擁塞窗口和接收端主機反饋的窗口大小做比較,取較小的值作為實際發送的窗口。這樣 的擁塞窗口增長的速度是指數級別的,慢啟動只是指初始時慢,但是增長速度很快,不久就可以造成網 絡擁塞。為了不讓窗口一直加倍增長,我們引入一個慢啟動的閾值,當擁塞窗口超過這個閾值的時候,不在按指數方式增長,而是按照線性方式增長。

擁塞控制,歸根結底是TCP協議想盡可能快的把數據傳輸給對方,但又要避免給網絡造成最大壓力的最好方案

UDP詳解

UDP是User Datagram Protocol,一種無連接的傳輸層協議,提供面向事務的簡單不可靠信息傳送服務。可靠性由上層應用實現,所以要實現udp可靠性傳輸,必須通過應用層來實現和控制。像實時視頻,直播等要求以穩定的速度發送,能夠容忍一些數據的丟失,但是不允許又較大的時延,就會采用UDP協議。

UDP提供的是不可靠傳輸服務,具有TCP沒有的優勢:

  • UDP是無連接的,在時間上不存在建立連接需要的延時,在空間上,TCP需要在系統中維護連接狀態,需要一定的開銷。UDP不需要維護連接狀態,也不跟蹤這些參數,開銷小,時間和空間上都具有優勢。
  • 分組首部開銷小,TCP首部20字節,UDP首部8字節。
  • UDP沒有擁塞控制,應用層能夠更好的控制要發送的蘇劇和發送時間,網絡中的擁塞控制也不會影響主機的發送速率。
  • UDP提供盡最大的努力交付,不保證可靠交付。所以維護傳輸可靠性的工作需要用戶在應用層來完成,沒有TCP的確認機制,重傳機制。
  • UDP是面向報文的,對應用層傳下來的報文,添加首部信息后直接向下交付給IP層。既不合並,也不拆分(TCP有粘包拆包問題)。對於IP層交上來的UDP用戶數據報,在去除首部后就原封不動的交付給上層應用進程,報文不可分割,是UDP數據報的最小單位。
  • UDP常用一次性傳輸比較少量數據的網絡應用,如DNS,SNMP等,因為對於這些應用,若是采用TCP,為連接的創建,維護和拆除帶來不小的開銷。UDP也常用於多媒體應用(如IP電話,實時視 頻會議,流媒體等)數據的可靠傳輸對他們而言並不重要,TCP的擁塞控制會使他們有較大的延遲,也是不可容忍的。

代碼

package com.example.charon.entity;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Arrays;

/**
 * @className: MessageEntity
 * @description: 請求發送消息實體的結構定義
 * @author: charon
 * @create: 2020-09-18 08:20
 */
public class RequestMessage {

    /**定義數據的長度*/
    private int totalLen;

    /**生成唯一的id*/
    private int id;

    /**數據*/
    private byte[] data;

    /**發送次數*/
    private int sendCount = 0;

    /**最后一次發送時間*/
    private Long lastSendTime = System.currentTimeMillis();

    /**發送者接受應答的地址*/
    private SocketAddress recvRespAddr;

    /**接收者的地址*/
    private SocketAddress remoteAddr;

    public RequestMessage(int id, byte[] data) {
        this.id = id;
        this.data = data;
        // 4+4是因為每個int類型占4個字節
        this.totalLen = 4 + 4 + data.length;
    }

    /**
     * 構造器將收到的udp數據解析為tcp的requestMessage對象
     * @param udpData udp數據
     */
    public RequestMessage(byte[] udpData){
        try {
            ByteArrayInputStream bais = new ByteArrayInputStream(udpData);
            DataInputStream dis = new DataInputStream(bais);
            this.totalLen = dis.readInt();
            this.id = dis.readInt();
            this.data = new byte[totalLen - 4 - 4];
            dis.readFully(data);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public byte[] toByte(){
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputStream dos = new DataOutputStream(baos);
            // 寫和讀需要一一對應
            dos.writeInt(totalLen);
            dos.writeInt(id);
            dos.write(data);
            dos.flush();
            return baos.toByteArray();
        }catch (IOException e){
            e.printStackTrace();
        }
        return null;
    }

    public int getTotalLen() {
        return totalLen;
    }

    public void setTotalLen(int totalLen) {
        this.totalLen = totalLen;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public byte[] getData() {
        return data;
    }

    public void setData(byte[] data) {
        this.data = data;
    }

    public int getSendCount() {
        return sendCount;
    }

    public void setSendCount(int sendCount) {
        this.sendCount = sendCount;
    }

    public Long getLastSendTime() {
        return lastSendTime;
    }

    public void setLastSendTime(Long lastSendTime) {
        this.lastSendTime = lastSendTime;
    }

    /**
     * Gets the value of recvRespAddr
     *
     * @return the value of recvRespAddr
     */
    public SocketAddress getRecvRespAddr() {
        return recvRespAddr;
    }

    /**
     * Sets the recvRespAddr
     *
     * @param recvRespAddr recvRespAddr
     */
    public void setRecvRespAddr(SocketAddress recvRespAddr) {
        this.recvRespAddr = recvRespAddr;
    }

    /**
     * Gets the value of remoteAddr
     *
     * @return the value of remoteAddr
     */
    public SocketAddress getRemoteAddr() {
        return remoteAddr;
    }

    /**
     * Sets the remoteAddr
     *
     * @param remoteAddr remoteAddr
     */
    public void setRemoteAddr(SocketAddress remoteAddr) {
        this.remoteAddr = remoteAddr;
    }

    @Override
    public String toString() {
        return "RequestMessage{" +
                "totalLen=" + totalLen +
                ", id=" + id +
                ", data=" + Arrays.toString(data) +
                ", sendCount=" + sendCount +
                ", lastSendTime=" + lastSendTime +
                '}';
    }
}

package com.example.charon.entity;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Arrays;

/**
 * @className: ResponseMessage
 * @description: 響應信息實體的結構定義
 * @author: charon
 * @create: 2020-09-18 09:02
 */
public class ResponseMessage {

    /**總長度*/
    private int totalLen;

    /**對應接收到消息的id*/
    private int repId;

    /**響應的數據*/
    private byte[] data;

    /**接收狀態 0:正確接收 其他:錯誤 */
    private int state = 0;

    /**應答方的發送時間*/
    private Long resTime;

    /**發送次數*/
    private int sendCount;

    /**最后一次發送時間*/
    private Long lastSendTime = System.currentTimeMillis();

    /**接收者的地址*/
    private SocketAddress remoteAddr;

    public ResponseMessage(int repId, int state, byte[] data) {
        this.repId = repId;
        this.state = state;
        this.data = data;
        // 4+4+4是因為每個int類型占4個字節
        this.totalLen = 4 + 4 + 4 + data.length;
    }

    public ResponseMessage(byte[] udpData){
        try {
            ByteArrayInputStream bais = new ByteArrayInputStream(udpData);
            DataInputStream dis = new DataInputStream(bais);
            this.totalLen = dis.readInt();
            this.repId = dis.readInt();
            this.state = dis.readInt();
            this.data = new byte[totalLen - 4 - 4 -4 ];
            dis.readFully(data);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public byte[] toByte(){
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputStream dos = new DataOutputStream(baos);
            // 寫和讀需要一一對應
            dos.writeInt(totalLen);
            dos.writeInt(repId);
            dos.writeInt(state);
            dos.write(data);
            dos.flush();
            return baos.toByteArray();
        }catch (IOException e){
            e.printStackTrace();
        }
        return null;
    }



    public int getTotalLen() {
        return totalLen;
    }

    public void setTotalLen(int totalLen) {
        this.totalLen = totalLen;
    }

    public int getRepId() {
        return repId;
    }

    public void setRepId(int repId) {
        this.repId = repId;
    }

    public int getState() {
        return state;
    }

    public void setState(int state) {
        this.state = state;
    }

    public Long getResTime() {
        return resTime;
    }

    public void setResTime(Long resTime) {
        this.resTime = resTime;
    }

    /**
     * Gets the value of sendCount
     *
     * @return the value of sendCount
     */
    public int getSendCount() {
        return sendCount;
    }

    /**
     * Sets the sendCount
     *
     * @param sendCount sendCount
     */
    public void setSendCount(int sendCount) {
        this.sendCount = sendCount;
    }

    /**
     * Gets the value of lastSendTime
     *
     * @return the value of lastSendTime
     */
    public Long getLastSendTime() {
        return lastSendTime;
    }

    /**
     * Sets the lastSendTime
     *
     * @param lastSendTime lastSendTime
     */
    public void setLastSendTime(Long lastSendTime) {
        this.lastSendTime = lastSendTime;
    }

    /**
     * Gets the value of remoteAddr
     *
     * @return the value of remoteAddr
     */
    public SocketAddress getRemoteAddr() {
        return remoteAddr;
    }

    /**
     * Sets the remoteAddr
     *
     * @param remoteAddr remoteAddr
     */
    public void setRemoteAddr(SocketAddress remoteAddr) {
        this.remoteAddr = remoteAddr;
    }

    /**
     * Gets the value of data
     *
     * @return the value of data
     */
    public byte[] getData() {
        return data;
    }

    /**
     * Sets the data
     *
     * @param data data
     */
    public void setData(byte[] data) {
        this.data = data;
    }

    @Override
    public String toString() {
        return "ResponseMessage{" +
                "totalLen=" + totalLen +
                ", repId=" + repId +
                ", data=" + Arrays.toString(data) +
                ", state=" + state +
                ", resTime=" + resTime +
                ", sendCount=" + sendCount +
                ", lastSendTime=" + lastSendTime +
                '}';
    }
}

package com.example.charon;

import com.example.charon.entity.RequestMessage;
import com.example.charon.entity.ResponseMessage;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @className: DataGramSend
 * @description: 數據報文發送:
 *                1.發送消息線程負責發送,發送后將消息放入容器中等待應答,
 *                2.接受線程接收應答,並發送消息給接收端自己已收到信息,從容器中匹配后刪除
 *                3.重發線程負責重發,未收到應答的消息,發送3次后移出
 * @author: charon
 * @create: 2020-09-18 23:47
 */
public class DatagramSend {

    /**本地要發送的地址對象*/
    private SocketAddress localAddress;

    /**發送的socket對象*/
    private DatagramSocket datagramSender;

    /**目標地址*/
    private SocketAddress remoteAddress;

    /**本地緩存已發送的消息 Map key 為消息ID,value為消息對象本身*/
    private Map<Integer, RequestMessage> msgQueue = new ConcurrentHashMap<>();

    public static void main(String[] args) throws SocketException {
        new DatagramSend();
    }

    public DatagramSend() throws SocketException {
        localAddress = new InetSocketAddress("127.0.0.1",13000);
        datagramSender = new DatagramSocket(localAddress);
        remoteAddress = new InetSocketAddress("127.0.0.1",14000);

        // 啟動三個線程
        // 1.發送消息線程
        startSendThread();
        // 接收線程接收應答
        startRecvResponseThread();
        // 重發線程負責重發
        startReSendThread();
    }

    /**
     * 啟動重發線程
     */
    @SuppressWarnings("all")
    private void startReSendThread() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try{
                    while (true){
                        resendMsg();
                        Thread.sleep(1000);
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }).start();
    }

    /**
     * 重發業務:判斷map中的消息,如果超過3s未收到應答,則重發
     */
    private void resendMsg() {
        // 返回隊列中所有的key
        Set<Integer> keySet = msgQueue.keySet();
        Iterator<Integer> iterator = keySet.iterator();
        while(iterator.hasNext()){
            Integer key = iterator.next();
            RequestMessage requestMessage = msgQueue.get(key);

            // 如果重發超過3次,則移出
            if(requestMessage.getSendCount() >= 3){
                iterator.remove();
                System.out.println("發送端--檢測道丟失的消息:" + requestMessage);
            }

            long startTime =  System.currentTimeMillis();
            // 等待時間不超過3s
            if((startTime - requestMessage.getLastSendTime()) > 3000 && requestMessage.getSendCount() < 3 ){
                byte[] buffer = requestMessage.toByte();
                try {
                    DatagramPacket datagramPacket = new DatagramPacket(buffer,buffer.length,requestMessage.getRemoteAddr());
                    datagramSender.send(datagramPacket);
                    requestMessage.setSendCount(requestMessage.getSendCount()+1);
                    System.out.println("客戶端重新發送消息:"+requestMessage);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 啟動接受應答線程
     */
    @SuppressWarnings("all")
    private void startRecvResponseThread() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try{
                    recvResponse();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }).start();
    }

    /**
     * 接受應答消息
     */
    private void recvResponse() throws IOException {
        System.out.println("接收端-接受應答線程啟動");
        while (true){
            byte[] recvData = new byte[100];
            //創建接受數據包對象
            DatagramPacket recvRespPacket = new DatagramPacket(recvData,recvData.length);
            //發送
            datagramSender.receive(recvRespPacket);
            //接受返回數據
            RequestMessage requestMessage = new RequestMessage(recvRespPacket.getData());
            int repId = requestMessage.getId();
            RequestMessage requestMessage1 = msgQueue.get(new Integer(repId));
            if(requestMessage1 != null){
                System.out.println("發送端-原來發送的數據:"+requestMessage1);
                System.out.println("接受的數據:" + requestMessage);
                System.out.println("發送端-已收到接收端返回的信息:"+new String(requestMessage.getData()));
                msgQueue.remove(repId);
                //發送端需要告訴接收端,返回的數據已經收到
                //發送的數據
                byte[] msgData = (repId+" 數據已收到").getBytes();
                //創建要發送的消息對象
                RequestMessage sendMessage = new RequestMessage(repId,msgData);

                //要發送的數據,將要發送的數據轉為字節數組
                byte[] buffer = sendMessage.toByte();
                //創建書包,指定內容,指定目標地址
                DatagramPacket datagramSocket = new DatagramPacket(buffer,buffer.length,remoteAddress);
                //發送數據
                datagramSender.send(datagramSocket);
            }
        }
    }

    /**
     * 發送消息線程
     */
    @SuppressWarnings("all")
    private void startSendThread() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try{
                    send();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }).start();
    }

    /**
     * 模擬發送消息
     */
    private void send() throws IOException, InterruptedException {
        System.out.println("發送端-發送數據線程啟動");
        // 確認機制,id從0開始
        int id = 0;
        //模擬發送10個請求
        while (id < 1){
            id++;
            //發送的數據
            byte[] msgData = (id+" hello").getBytes();
            //創建要發送的消息對象
            RequestMessage sendMessage = new RequestMessage(id,msgData);

            //要發送的數據,將要發送的數據轉為字節數組
            byte[] buffer = sendMessage.toByte();
            //創建書包,指定內容,指定目標地址
            DatagramPacket datagramSocket = new DatagramPacket(buffer,buffer.length,remoteAddress);
            //發送數據
            datagramSender.send(datagramSocket);
            // 緩存當前發送的請求
            sendMessage.setSendCount(1);
            sendMessage.setRemoteAddr(remoteAddress);
            msgQueue.put(id,sendMessage);
            System.out.println("客戶端-數據已發送,緩存:"+sendMessage);
            Thread.sleep(1000);
        }
    }
}

package com.example.charon;

import com.example.charon.entity.RequestMessage;
import com.example.charon.entity.ResponseMessage;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @className: DatagramRecive
 * @description: 數據報文接受方
 * @author: charon
 * @create: 2020-09-20 17:56
 */
public class DatagramRecive {

    private SocketAddress localAddress;

    private DatagramSocket datagramSender;

    /**目標地址*/
    private SocketAddress remoteAddress;

    /**本地緩存已發送的消息 Map key 為消息ID,value為消息對象本身*/
    private Map<Integer, ResponseMessage> msgQueue = new ConcurrentHashMap<>();

    public static void main(String[] args) throws IOException {
        new DatagramRecive();
    }

    public DatagramRecive() throws SocketException {
        localAddress = new InetSocketAddress("127.0.0.1",14000);
        datagramSender = new DatagramSocket(localAddress);
        remoteAddress = new InetSocketAddress("127.0.0.1",13000);
        //  啟動接收線程
        startDecvThread();
        // 接收線程接收應答
        startDecvResponseThread();
        // 重發線程負責重發
        startReSendThread();
    }

    /**
     * 重發
     */
    @SuppressWarnings("all")
    private void startReSendThread() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try{
                    while (true){
                        resendMsg();
                        Thread.sleep(1000);
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }).start();
    }

    private void resendMsg() {
        // 返回隊列中所有的key
        Set<Integer> keySet = msgQueue.keySet();
        Iterator<Integer> iterator = keySet.iterator();
        while(iterator.hasNext()){
            Integer key = iterator.next();
            ResponseMessage responseMessage = msgQueue.get(key);

            // 如果重發超過3次,則移出
            if(responseMessage.getSendCount() >= 3){
                iterator.remove();
                System.out.println("發送端--檢測道丟失的消息:" + responseMessage);
            }

            long startTime =  System.currentTimeMillis();
            // 等待時間不超過3s
            if((startTime - responseMessage.getLastSendTime()) > 3000 && responseMessage.getSendCount() < 3 ){
                byte[] buffer = responseMessage.toByte();
                try {
                    DatagramPacket datagramPacket = new DatagramPacket(buffer,buffer.length,responseMessage.getRemoteAddr());
                    datagramSender.send(datagramPacket);
                    responseMessage.setSendCount(responseMessage.getSendCount()+1);
                    System.out.println("客戶端重新發送消息:"+responseMessage);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @SuppressWarnings("all")
    private void startDecvResponseThread() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try{
                    decvResponse();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }).start();
    }

    private void decvResponse() throws IOException {
        System.out.println("發送端-接受應答線程啟動");
        while (true){
            byte[] recvData = new byte[100];
            //創建接受數據包對象
            DatagramPacket recvRespPacket = new DatagramPacket(recvData,recvData.length);
            //接受數據
            datagramSender.receive(recvRespPacket);
            //接受返回數據
            RequestMessage requestMessage = new RequestMessage(recvRespPacket.getData());
            int repId = requestMessage.getId();
            System.out.println("接收端接受到的數據id:" + repId);
            ResponseMessage responseMessage = msgQueue.get(new Integer(repId));
            if(responseMessage != null){
                System.out.println("接收端發送的源數據:"+responseMessage);
                System.out.println("接收端已收到發送端返回的數據:"+ new String(requestMessage.getData()));
                msgQueue.remove(repId);
            }
        }
    }

    @SuppressWarnings("all")
    private void startDecvThread() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try{
                    recvMsg();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }).start();
    }

    private void recvMsg() throws IOException {
        System.out.println("啟動接收線程");
        while (true){
            // 接受發送端發送過來的數據 100表示緩存的長度
            byte[] recvData = new byte[100];
            DatagramPacket datagramPacket = new DatagramPacket(recvData,recvData.length);
            datagramSender.receive(datagramPacket);
            //獲取接收端發送的數據
            RequestMessage requestMessage = new RequestMessage(datagramPacket.getData());
            String requestMessageData = new String(requestMessage.getData());
            System.out.println("接收端收到發送端的數據:" + requestMessageData);

            // 將接收到的數據發送給發送端,
            byte[] responseData = (requestMessageData+" world").getBytes();
            ResponseMessage responseMessage = new ResponseMessage(requestMessage.getId(),0,responseData);
            System.out.println("接收端返回的數據:" + new String(responseMessage.getData()));
            byte[] data = responseMessage.toByte();
            DatagramPacket dp = new DatagramPacket(data,data.length,remoteAddress);
            datagramSender.send(dp);

            //將接收端返回的數據存入隊列中,用於后面監聽重發機制
            responseMessage.setLastSendTime(System.currentTimeMillis());
            responseMessage.setSendCount(1);
            responseMessage.setData(responseMessage.getData());
            //對於接收端來說,它需要返回的地址就是請求消息的本地
            responseMessage.setRemoteAddr(remoteAddress);
            msgQueue.put(requestMessage.getId(),responseMessage);

            System.out.println("接收端-已發送應答:" + responseMessage);
        }
    }
}

代碼連接:https://download.csdn.net/download/zj520_/12913781
參考文章:http://www.360doc.com/content/13/0602/11/11220452_289877920.shtml
https://blog.csdn.net/codes_first/article/details/78453713?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-4.channel_param&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-4.channel_param


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM