使用Zookeeper實現負載均衡原理


 

思路

使用Zookeeper實現負載均衡原理,服務器端將啟動的服務注冊到,zk注冊中心上,采用臨時節點。客戶端從zk節點上獲取最新服務節點信息,本地使用負載均衡算法,隨機分配服務器。

創建項目工程

Maven依賴

<dependencies>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.8</version>
        </dependency>
    </dependencies>

創建Server服務

ZkServerScoekt服務

//##ServerScoekt服務端
public class ZkServerScoekt implements Runnable {
    private int port = 18080;

    public static void main(String[] args) throws IOException {
        int port = 18080;
        ZkServerScoekt server = new ZkServerScoekt(port);
        Thread thread = new Thread(server);
        thread.start();
    }

    public ZkServerScoekt(int port) {
        this.port = port;
    }

    public void run() {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);
            System.out.println("Server start port:" + port);
            Socket socket = null;
            while (true) {
                socket = serverSocket.accept();
                new Thread(new ServerHandler(socket)).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (serverSocket != null) {
                    serverSocket.close();
                }
            } catch (Exception e2) {

            }
        }
    }

}

ZkServerClient

public class ZkServerClient {
    public static List<String> listServer = new ArrayList<String>();

    public static void main(String[] args) {
        initServer();
        ZkServerClient     client= new ZkServerClient();
        BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            String name;
            try {
                name = console.readLine();
                if ("exit".equals(name)) {
                    System.exit(0);
                }
                client.send(name);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    // 注冊所有server
    public static void initServer() {
        listServer.clear();
        listServer.add("127.0.0.1:18080");
    }

    // 獲取當前server信息
    public static String getServer() {
        return listServer.get(0);
    }
    
    public void send(String name) {

        String server = ZkServerClient.getServer();
        String[] cfg = server.split(":");

        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);

            out.println(name);
            while (true) {
                String resp = in.readLine();
                if (resp == null)
                    break;
                else if (resp.length() > 0) {
                    System.out.println("Receive : " + resp);
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (out != null) {
                out.close();
            }
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

ServerHandler

public class ServerHandler implements Runnable {
    private Socket socket;

    public ServerHandler(Socket socket) {
        this.socket = socket;
    }

    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            out = new PrintWriter(this.socket.getOutputStream(), true);
            String body = null;
            while (true) {
                body = in.readLine();
                if (body == null)
                    break;
                System.out.println("Receive : " + body);
                out.println("Hello, " + body);
            }

        } catch (Exception e) {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
            if (out != null) {
                out.close();
            }
            if (this.socket != null) {
                try {
                    this.socket.close();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
                this.socket = null;
            }
        }
    }
}

改造ZkServerScoekt

public class ZkServerScoekt implements Runnable {
    private static int port = 18081;

    public static void main(String[] args) throws IOException {
        ZkServerScoekt server = new ZkServerScoekt(port);
        Thread thread = new Thread(server);
        thread.start();
    }

    public ZkServerScoekt(int port) {
        this.port = port;
    }

    public void regServer() {
        // 向ZooKeeper注冊當前服務器
        ZkClient client = new ZkClient("127.0.0.1:2181", 60000, 1000);
        String path = "/test/server" + port;
        if (client.exists(path))
            client.delete(path);
        client.createEphemeral(path, "127.0.0.1:" + port);
    }

    public void run() {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(port);
            regServer();
            System.out.println("Server start port:" + port);
            Socket socket = null;
            while (true) {
                socket = serverSocket.accept();
                new Thread(new ServerHandler(socket)).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (serverSocket != null) {
                    serverSocket.close();
                }
            } catch (Exception e2) {

            }
        }
    }

}

改造ZkServerClientScoekt

public class ZkServerClient {
    public static List<String> listServer = new ArrayList<String>();

    public static void main(String[] args) {
        initServer();
        ZkServerClient client = new ZkServerClient();
        BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            String name;
            try {
                name = console.readLine();
                if ("exit".equals(name)) {
                    System.exit(0);
                }
                client.send(name);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    // 注冊所有server
    public static void initServer() {
        String path = "/test";
        final ZkClient zkClient = new ZkClient("127.0.0.1:2181", 6000, 1000);
        List<String> children = zkClient.getChildren(path);
        for (String ipServer : children) {
            listServer.add((String) zkClient.readData(path + "/" + ipServer));
        }
        System.out.println("####從注冊中心獲取服務信息####listServer:" + listServer.toString());
        // 監聽事件
        zkClient.subscribeChildChanges(path, new IZkChildListener() {

            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                listServer.clear();
                for (String ctPath : currentChilds) {
                    listServer.add(zkClient.readData(parentPath) + "/" + ctPath);
                }
                System.out.println("####節點事件監聽發生變化### listServer:" + listServer.toString());

            }
        });
        // listServer.clear();
        // listServer.add("127.0.0.1:8080");
    }

    private static int reqestCount = 1;

    // 獲取當前server信息
    public static String getServer() {
        int serverCount = listServer.size();
        String serverHost = listServer.get(reqestCount / serverCount);
        reqestCount++;
        return serverHost;
    }

    public void send(String name) {

        String server = ZkServerClient.getServer();
        String[] cfg = server.split(":");

        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);

            out.println(name);
            while (true) {
                String resp = in.readLine();
                if (resp == null)
                    break;
                else if (resp.length() > 0) {
                    System.out.println("Receive : " + resp);
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (out != null) {
                out.close();
            }
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
} 

使用Zookeeper實現選舉策略

 

場景

  有一個向外提供的服務,服務必須7*24小時提供服務,不能有單點故障。所以采用集群的方式,采用master、slave的結構。一台主機多台備機。主機向外提供服務,備機負責監聽主機的狀態,一旦主機宕機,備機要迅速接代主機繼續向外提供服務。從備機選擇一台作為主機,就是master選舉。

 

原理分析

 右邊三台主機會嘗試創建master節點,誰創建成功了,就是master,向外提供。其他兩台就是slave

所有slave必須關注master的刪除事件(臨時節點,如果服務器宕機了,Zookeeper會自動把master節點刪除)。如果master宕機了,會進行新一輪的master選舉。本次我們主要關注master選舉,服務注冊、發現先不討論。

使用Zookeeper原理

» 領導者(leader),負責進行投票的發起和決議,更新系統狀態
  » 學習者(learner),包括跟隨者(follower)和觀察者(observer),follower用於接受客戶端請求並想客戶端返回結果,在選主過程中參與投票
  » Observer可以接受客戶端連接,將寫請求轉發給leader,但observer不參加投票過程,只同步leader的狀態,observer的目的是為了擴展系統,提高讀取速度
  » 客戶端(client),請求發起方

• Zookeeper的核心是原子廣播,這個機制保證了各個Server之間的同步。實現這個機制的協議叫做Zab
     議。Zab協議有兩種模式,它們分別是恢復模式(選主)和廣播模式(同步)。當服務啟動或者在領導者
   崩潰后,Zab就進入了恢復模式,當領導者被選舉出來,且大多數Server完成了和leader的狀態同步以后
    ,恢復模式就結束了。狀態同步保證了leaderServer具有相同的系統狀態。

  為了保證事務的順序一致性,zookeeper采用了遞增的事務id號(zxid)來標識事務。所有的提議(
   proposal)都在被提出的時候加上了zxid。實現中zxid是一個64位的數字,它高32位是epoch用來標識
     leader關系是否改變,每次一個leader被選出來,它都會有一個新的epoch,標識當前屬於那個leader
   統治時期。低32位用於遞增計數。
  每個Server在工作過程中有三種狀態:
    LOOKING:當前Server不知道leader是誰,正在搜尋
    LEADING:當前Server即為選舉出來的leader
    FOLLOWINGleader已經選舉出來,當前Server與之同步

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM