Java實現心跳機制


一、心跳機制簡介

     在分布式系統中,分布在不同主機上的節點需要檢測其他節點的狀態,如服務器節點需要檢測從節點是否失效。為了檢測對方節點的有效性,每隔固定時間就發送一個固定信息給對方,對方回復一個固定信息,如果長時間沒有收到對方的回復,則斷開與對方的連接。

     發包方既可以是服務端,也可以是客戶端,這要看具體實現。因為是每隔固定時間發送一次,類似心跳,所以發送的固定信息稱為心跳包。心跳包一般為比較小的包,可根據具體實現。心跳包主要應用於長連接的保持與短線鏈接。

      一般而言,應該客戶端主動向服務器發送心跳包,因為服務器向客戶端發送心跳包會影響服務器的性能。

二、心跳機制實現方式

    心跳機制有兩種實現方式,一種基於TCP自帶的心跳包,TCP的SO_KEEPALIVE選項可以,系統默認的默認跳幀頻率為2小時,超過2小時后,本地的TCP 實現會發送一個數據包給遠程的 Socket. 如果遠程Socket 沒有發回響應, TCP實現就會持續嘗試 11 分鍾, 直到接收到響應為止。 否則就會自動斷開Socket連接。但TCP自帶的心跳包無法檢測比較敏感地知道對方的狀態,默認2小時的空閑時間,對於大多數的應用而言太長了。可以手工開啟KeepAlive功能並設置合理的KeepAlive參數。

    另一種在應用層自己進行實現,基本步驟如下:

  1. Client使用定時器,不斷發送心跳;
  2. Server收到心跳后,回復一個包;
  3. Server為每個Client啟動超時定時器,如果在指定時間內沒有收到Client的心跳包,則Client失效。

三、Java實現心跳機制

    這里基於Java實現的簡單RPC框架實現心跳機制。Java實現代碼如下所示:

    心跳客戶端類:

public class HeartbeatClient implements Runnable {

    private String serverIP = "127.0.0.1";
    private int serverPort = 8089;
    private String nodeID = UUID.randomUUID().toString();
    private boolean isRunning = true;
    //  最近的心跳時間
    private long lastHeartbeat;
    // 心跳間隔時間
    private long heartBeatInterval = 10 * 1000;

    public void run() {
        try {
            while (isRunning) {
                HeartbeatHandler handler = RPClient.getRemoteProxyObj(HeartbeatHandler.class, new InetSocketAddress(serverIP, serverPort));
                long startTime = System.currentTimeMillis();
                // 是否達到發送心跳的周期時間
                if (startTime - lastHeartbeat > heartBeatInterval) {
                    System.out.println("send a heart beat");
                    lastHeartbeat = startTime;

                    HeartbeatEntity entity = new HeartbeatEntity();
                    entity.setTime(startTime);
                    entity.setNodeID(nodeID);

                    // 向服務器發送心跳,並返回需要執行的命令
                    Cmder cmds = handler.sendHeartBeat(entity);

                    if (!processCommand(cmds))
                        continue;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private boolean processCommand(Cmder cmds) {
        // ...
        return true;
    }

}

      心跳包實體類:

public class HeartbeatEntity implements Serializable {

    private long time;
    private String nodeID;
    private String error;
    private Map<String, Object> info = new HashMap<String, Object>();

    public String getNodeID() {
        return nodeID;
    }

    public void setNodeID(String nodeID) {
        this.nodeID = nodeID;
    }

    public String getError() {
        return error;
    }

    public void setError(String error) {
        this.error = error;
    }

    public Map<String, Object> getInfo() {
        return info;
    }

    public void setInfo(Map<String, Object> info) {
        this.info = info;
    }

    public long getTime() {
        return time;
    }

    public void setTime(long time) {
        this.time = time;
    }
}

  服務器接受心跳包返回的命令對象類:

public class Cmder implements Serializable {

    private String nodeID;
    private String error;
    private Map<String, Object> info = new HashMap<String, Object>();

    public String getNodeID() {
        return nodeID;
    }

    public void setNodeID(String nodeID) {
        this.nodeID = nodeID;
    }

    public String getError() {
        return error;
    }

    public void setError(String error) {
        this.error = error;
    }

    public Map<String, Object> getInfo() {
        return info;
    }

    public void setInfo(Map<String, Object> info) {
        this.info = info;
    }
}

  RPC服務注冊中心:

public class ServiceCenter {

    private ExecutorService executor = Executors.newFixedThreadPool(20);

    private final ConcurrentHashMap<String, Class> serviceRegistry = new ConcurrentHashMap<String, Class>();

    private AtomicBoolean isRunning = new AtomicBoolean(true);

    // 服務器監聽端口
    private int port = 8089;

    // 心跳監聽器
    HeartbeatLinstener linstener;

    // 單例模式
    private static class SingleHolder {
        private static final ServiceCenter INSTANCE = new ServiceCenter();
    }

    private ServiceCenter() {
    }

    public static ServiceCenter getInstance() {
        return SingleHolder.INSTANCE;
    }

    public void register(Class serviceInterface, Class impl) {
        System.out.println("regeist service " + serviceInterface.getName());
        serviceRegistry.put(serviceInterface.getName(), impl);
    }

    public void start() throws IOException {
        ServerSocket server = new ServerSocket();
        server.bind(new InetSocketAddress(port));
        System.out.println("start server");
        linstener = HeartbeatLinstener.getInstance();
        System.out.println("start listen heart beat");
        try {
            while (true) {
                // 1.監聽客戶端的TCP連接,接到TCP連接后將其封裝成task,由線程池執行
                executor.execute(new ServiceTask(server.accept()));
            }
        } finally {
            server.close();
        }
    }

    public void stop() {
        isRunning.set(false);
        executor.shutdown();
    }


    public boolean isRunning() {
        return isRunning.get();
    }

    public int getPort() {
        return port;
    }

    public void settPort(int port) {
        this.port = port;
    }

    public ConcurrentHashMap<String, Class> getServiceRegistry() {
        return serviceRegistry;
    }

    private class ServiceTask implements Runnable {
        Socket clent = null;

        public ServiceTask(Socket client) {
            this.clent = client;
        }

        public void run() {
            ObjectInputStream input = null;
            ObjectOutputStream output = null;
            try {
                // 2.將客戶端發送的碼流反序列化成對象,反射調用服務實現者,獲取執行結果
                input = new ObjectInputStream(clent.getInputStream());
                String serviceName = input.readUTF();
                String methodName = input.readUTF();
                Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                Object[] arguments = (Object[]) input.readObject();
                Class serviceClass = serviceRegistry.get(serviceName);
                if (serviceClass == null) {
                    throw new ClassNotFoundException(serviceName + " not found");
                }
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                Object result = method.invoke(serviceClass.newInstance(), arguments);

                // 3.將執行結果反序列化,通過socket發送給客戶端
                output = new ObjectOutputStream(clent.getOutputStream());
                output.writeObject(result);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (output != null) {
                    try {
                        output.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (input != null) {
                    try {
                        input.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (clent != null) {
                    try {
                        clent.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }

        }
    }
}

  心跳監聽類:

package com.cang.heartbeat;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 心跳監聽保存信息
 *
 * @author cang
 * @create_time 2016-09-28 11:40
 */
public class HeartbeatLinstener {

    private ExecutorService executor = Executors.newFixedThreadPool(20);

    private final ConcurrentHashMap<String, Object> nodes = new ConcurrentHashMap<String, Object>();
    private final ConcurrentHashMap<String, Long> nodeStatus = new ConcurrentHashMap<String, Long>();

    private long timeout = 10 * 1000;

    // 服務器監聽端口
    private int port = 8089;

    // 單例模式
    private static class SingleHolder {
        private static final HeartbeatLinstener INSTANCE = new HeartbeatLinstener();
    }

    private HeartbeatLinstener() {
    }

    public static HeartbeatLinstener getInstance() {
        return SingleHolder.INSTANCE;
    }

    public ConcurrentHashMap<String, Object> getNodes() {
        return nodes;
    }

    public void registerNode(String nodeId, Object nodeInfo) {
        nodes.put(nodeId, nodeInfo);
        nodeStatus.put(nodeId, System.currentTimeMillis());
    }

    public void removeNode(String nodeID) {
        if (nodes.containsKey(nodeID)) {
            nodes.remove(nodeID);
        }
    }

    // 檢測節點是否有效
    public boolean checkNodeValid(String key) {
        if (!nodes.containsKey(key) || !nodeStatus.containsKey(key)) return false;
        if ((System.currentTimeMillis() - nodeStatus.get(key)) > timeout) return false;
        return true;
    }

    // 刪除所有失效節點
    public void removeInValidNode() {
        Iterator<Map.Entry<String, Long>> it = nodeStatus.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, Long> e = it.next();
            if ((System.currentTimeMillis() - nodeStatus.get(e.getKey())) > timeout) {
                nodes.remove(e.getKey());
            }
        }
    }

}

  心跳處理類接口:

public interface HeartbeatHandler {
    public Cmder sendHeartBeat(HeartbeatEntity info);
}

   心跳處理實現類:

public class HeartbeatHandlerImpl implements HeartbeatHandler {
    public Cmder sendHeartBeat(HeartbeatEntity info) {
        HeartbeatLinstener linstener = HeartbeatLinstener.getInstance();

        // 添加節點
        if (!linstener.checkNodeValid(info.getNodeID())) {
            linstener.registerNode(info.getNodeID(), info);
        }

        // 其他操作
        Cmder cmder = new Cmder();
       cmder.setNodeID(info.getNodeID());
        // ...

        System.out.println("current all the nodes: ");
        Map<String, Object> nodes = linstener.getNodes();
        for (Map.Entry e : nodes.entrySet()) {
            System.out.println(e.getKey() + " : " + e.getValue());
        }
        System.out.println("hadle a heartbeat");
        return cmder;
    }
}

  測試類:

public class HeartbeatTest {

    public static void main(String[] args) {
        new Thread(new Runnable() {
            public void run() {
                try {
                    ServiceCenter serviceServer = ServiceCenter.getInstance();
                    serviceServer.register(HeartbeatHandler.class, HeartbeatHandlerImpl.class);
                    serviceServer.start();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        Thread client1 = new Thread(new HeartbeatClient());
        client1.start();
        Thread client2 = new Thread(new HeartbeatClient());
        client2.start();
    }
}

四、總結

    上面的代碼還有很多不足的地方,希望有空能進行改善:

  1.  配置為硬編碼;
  2.  命令類Cmder沒有實際實現,返回的Cmder對象沒有實際進行處理;

   其他小問題就暫時不管了,希望以后能重寫上面的代碼。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM