心跳包:告訴別人,我還活着


心跳包是什么?有什么用?

心跳一般是指客戶端(也可以是服務器端)向對方每隔一段時間發送一個應用層的自定義指令,以確保連接的有效性。因為是固定間隔,同時是檢測存活,就像人的心跳一樣,顧名思義,稱為心跳包。一般是用於長連接,對消息實時性要求比較高的服務中,比如IM服務,推送服務。

長連接有什么用?

在即時通訊領域和推送服務中,對消息的實時性和可用性要求非常高,建立長連接,可以有效節省DNS解釋時間,TCP/IP三次握手時間,同時為了保證連接是可用的,不至於經常發了消息對方無法收到,必須要有一種機制檢測連接的有效性。TCP是一個基於連接的協議,連接是由一個狀態機進行維護,當連接建立成功后,雙方都處於established ,除非我們進行主動調用,否則狀態一直不會變化,即使中間路由已經崩潰,網線已經被剪斷。TCP有一種KeepAlive機制,TCP層在定時時間發送相應的KeepAlive探針以確保連接的可用性,默認每7200秒發送一次,超過75秒沒有返回就超時,超時后重試10次,雖然可以修改默認值,但仍然無法滿足要求。尤其是考慮到一種特殊情況,TCP連接存活,但是主機不處於存活狀態,比如CPU負載到100%,無法響應任何請求。這時候,就需要客戶端主動切斷連接,主動切換到其他備用機。

移動端面臨的挑戰

通常,我們一個家庭里面只接入一根網線,所有設備通過路由器共用一個出口IP,路由器就是一個NAT設備,NAT設備在IP封包流過設備的時候,自動修改源和目標地址,家用路由器甚至基於NAPT修改端口號,路由器內部會維護一個NAT映射表
比如內網里面的172.1.1.2:7777 對應外網221.22.2.1:8888等。我們的手機接入的蜂窩網絡后,運營商就會給我們分配一個內網IP(類似10.2.2.3),由運營商的網管維護一個NAT的映射表,確保手機能接入互聯網。大部分運營商會在手機一段時間沒有數據通訊的時候,會把設備從NAT表中剔除,造成了連接中斷,但是對TCP連接的雙方是不可感知的,服務端就無法給客戶端發送消息。像中國移動和中國聯通的NAT超時時間是5分鍾,國際上運營商普遍都是大於28分鍾。

實現方案

合理間隔

心跳太短保證不了可靠性,太頻繁會帶來高耗電和大量的流量消耗,這在移動設備上面是不可接受的。最合理的解決方案是設定一個合理的間隔,一般可以根據程序狀態進行調整,逐步拉長心跳間隔,5分鍾,10分鍾,甚至15分鍾。服務端進行可靠性判斷的時候也可以放寬標准,只有N次超時才被認為是連接已經斷開。心跳的周期以最后一條指令為准,而非固定間隔。

自定義應用層實現協議

在DEMO中,雙方約定一個協議,發送方先對管道寫入一個8位的byte值,接收方只要一接收到數據,馬上按照byte類型標准讀取前8位,通過這一個字節的值來確定對方現在發過來的是什么類型的數據。為什么要選擇byte呢?因為byte足夠短,只占用一個字節,盡量減少數據傳輸量,可以通過一個字節表達256種情況。當然根據實際業務需求,選擇int,long類型也是完全沒問題的。
在這個例子中,我們約定byte的值是1的話,那么我們解釋為心跳包,后面不再有數據,直接在屏幕中打印收到客戶端的心跳包,byte的值是2的話,我們知道對方要發一個字符串過來,那么需要進一步處理,再次調用readUTF方法,讀取一個UTF-8字符串

下面是一個用JAVA實現的心跳包DEMO,主要用了多線程和Socket

服務端代碼

服務端建立一個類,采用同步多線程模式,主類負責接收socket請求,子線程Worker類負責處理業務邏輯

public class Server {
    public static void main(String[] args) {
        try {
            ServerSocket serverSocket = new ServerSocket(30000); //實例化ServerSocket,綁定監聽本機的30000端口
            while(true){  
                Socket socket = serverSocket.accept();  //這個是阻塞方法,只有監聽到客戶端連接過來了,才會繼續往下走。
                System.out.println(socket.getInetAddress().getHostName()+"連接到服務器...");
                //Worker線程啟動代碼
                Worker worker = new Worker(socket);
                new Thread(worker).start();
            }
        }catch (Exception e){
            System.out.println("主線程拋出異常");
            e.printStackTrace();
        }
    }
}

Worker線程

class Worker implements Runnable{
    private Socket socket;
    private InputStream in;
    private OutputStream out;
    private ObjectInputStream ois;
    private boolean flag = true;

    public Worker(Socket socket){
        try{
            this.socket = socket; //要獲得一個從主線程傳過來的客戶端socket實例,每個客戶端都不一樣
            in = socket.getInputStream(); //從客戶端實例中,獲取輸入流實例
            out = socket.getOutputStream(); //獲取輸出流實例
            ois = new ObjectInputStream(in); //實例化ObjectInputStream
        }catch (Exception e){
            System.out.println("worker構造函數拋出異常");
            e.printStackTrace();
        }
    }

    public void run(){
        try{
            while(flag) {
                //協議的第一位是數字,先讀取第一位
                int type = ois.readByte();
                if(type == 1){
                    //第一位是1的話,就直接當心跳包處理
                    System.out.println("收到"+socket.getInetAddress().getHostAddress()+"發送過來的心跳包");
                }else if(type == 2){
                    //第一位是2的話,我們可以知道,對方發過來的是UTF-8格式的String,所以可以調用readUTF方法繼續讀取
                    System.out.println(socket.getInetAddress().getHostAddress()+"說:"+ois.readUTF());
                }
            }
        }catch (EOFException e){
            System.out.println("對方已關閉連接");
            flag = false;
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }finally {
            if(socket != null) {
                try{
                    socket.close();
                }catch (Exception e){
                    ;
                }
            }
        }
    }
}

客戶端代碼

public class Client {
    private static final String host = "127.0.0.1";  //目標地址,這里是本機
    private static final int port = 30000; //目標端口

    public static void main(String[] args) {
        Socket socket = new Socket();
       try{
           socket.connect(new InetSocketAddress(host,port)); //建立socket連接
           OutputStream out = socket.getOutputStream(); //從socket中獲取讀取流的實例
           ObjectOutputStream oos = new ObjectOutputStream(out); //實例化ObjectOutputStream ,用於自定義的傳輸協議
           BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));  //用來讀取鍵盤輸入,用了緩沖類
           TimeStore timeStore = new TimeStore(); //建立一個類來存儲數據最后發送時間

           new Thread(new SendHeartbeat(oos,timeStore)).start(); //啟動心跳業務線程

           String line = new String();
           //下面幾行代碼用於獲取用戶輸入
           while((line = bufferedReader.readLine()) != null){
               oos.writeByte(2); //要發送的是自定義協議的字符串,先寫入一個2,告訴服務端,准備發送字符串數據
               oos.writeUTF(line); //寫入一個UTF字符串到流中
               oos.flush();
               timeStore.setLastSendTime(System.currentTimeMillis()); //記錄最后的寫入時間到時間存儲類
           }
           oos.close();
           bufferedReader.close();
       }catch (IOException e){
           System.out.println("數據寫入IO異常");
       }finally {
           try {
               socket.close();
           }catch (IOException e2){
               e2.printStackTrace();
           }
       }
    }
}

發送心跳的邏輯

心跳專門開一條線程來發送,這樣不受主線程業務的堵塞代碼影響

class SendHeartbeat implements Runnable{
    private ObjectOutputStream oos;
    private TimeStore timeStore;

    public SendHeartbeat(ObjectOutputStream oos,TimeStore timeStore){
        this.oos = oos;
        this.timeStore = timeStore;
    }

    public void run(){
        try{
            while(true){
                Thread.sleep(1000); //死循環,每秒啟動一次
                 //當上次發送時間是在10秒或之前,才發送心跳
                if((System.currentTimeMillis() - timeStore.getLastSendTime()) >= 10*1000){  
                    //寫入1,告訴服務端發送的是心跳包
                    oos.writeByte(1);
                    oos.flush();
                    //記錄時間
                    timeStore.setLastSendTime(System.currentTimeMillis());
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

時間存儲類,用於記錄最后發送的時間

class TimeStore{
    private long lastSendTime;
    
    //多線程下讀取需要加鎖
    public synchronized long getLastSendTime() {
        return lastSendTime;
    }

    //同樣,多線程下寫入需要加鎖
    public synchronized void setLastSendTime(long lastSendTime) {
        this.lastSendTime = lastSendTime; //把時間放到私有屬性
        System.out.println("最后一次發包時間"+ new java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(new java.util.Date(lastSendTime))); //把發包時間打印到屏幕上
    }
}

效果

心跳包的挑戰:信令風暴

2013年,中國移動曾把刀口指向了微信,正是因為心跳包可能會引起的信令風暴,微信占用了中移動60%的信令資源,但僅帶來10%的移動數據流量。每次發送心跳包,都需要移動通信網絡為用戶分配資源,分配的過程體現在信令的發送和接收上。一次心跳包的發送過程,牽涉的信令多達幾十條。后來微信對心跳間隔進行了優化才暫時平息了這場風波。微信采用的方案是當微信處於前台活躍狀態時,使用固定心跳。微信進入后台(或者前台關屏)時,先用幾次最小心跳維持長鏈接。然后進入后台自適應心跳計算。這樣做的目的是盡量選擇用戶不活躍的時間段,來減少心跳計算可能產生的消息不及時收取影響。詳看微信心跳包優化方案


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM