轉載,原文連接:http://blog.csdn.net/autfish/article/details/51576695
ZooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,提供的功能包括配置維護、名字服務、分布式同步、組服務等。
ZooKeeper會維護一個樹形的數據結構,類似於Windows資源管理器目錄,其中EPHEMERAL類型的節點會隨着創建它的客戶端斷開而被刪除,利用這個特性很容易實現軟負載均衡。
基本原理是,每個應用的Server啟動時創建一個EPHEMERAL節點,應用客戶端通過讀取節點列表獲得可用服務器列表,並訂閱節點事件,有Server宕機斷開時觸發事件,客戶端監測到后把該Server從可用列表中刪除。
來看示例,這里用了BIO模型編寫了一個接收/應答的小程序用於演示效果,優點就是簡單。為了方便后面的改造,客戶端每次發送消息時都會讀取服務器列表並從新建立連接。后邊會看到只需要幾十行代碼即可改造為使用ZooKeeper的軟負載模式。
Server代碼
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.PrintWriter;
- import java.net.ServerSocket;
- import java.net.Socket;
- public class SimpleServer implements Runnable {
- public static void main(String[] args) throws IOException {
- int port = 18080;
- SimpleServer server = new SimpleServer(port);
- Thread thread = new Thread(server);
- thread.start();
- }
- private int port;
- public SimpleServer(int port) {
- this.port = port;
- }
- @Override
- public void run() {
- ServerSocket server = null;
- try {
- server = new ServerSocket(port);
- System.out.println("Server started");
- Socket socket = null;
- while (true) {
- socket = server.accept();
- new Thread(new SimpleServerHandler(socket)).start();
- }
- } catch(IOException ex) {
- ex.printStackTrace();
- } finally {
- if (server != null) {
- try {
- server.close();
- } catch (IOException e) {}
- }
- }
- }
- }
- class SimpleServerHandler implements Runnable {
- private Socket socket;
- public SimpleServerHandler(Socket socket) {
- this.socket = socket;
- }
- @Override
- public void run() {
- BufferedReader in = null;
- PrintWriter out = null;
- try {
- in = new BufferedReader(new InputStreamReader(
- this.socket.getInputStream()));
- out = new PrintWriter(this.socket.getOutputStream(), true);
- String body = null;
- while (true) {
- body = in.readLine();
- if (body == null)
- break;
- System.out.println("Receive : " + body);
- out.println("Hello, " + body);
- }
- } catch (Exception e) {
- if (in != null) {
- try {
- in.close();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- }
- if (out != null) {
- out.close();
- }
- if (this.socket != null) {
- try {
- this.socket.close();
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- this.socket = null;
- }
- }
- }
- }
客戶端代碼
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.PrintWriter;
- import java.net.Socket;
- import java.util.ArrayList;
- import java.util.List;
- public class SimpleClient {
- private static List<String> servers = new ArrayList<>();
- public static void main(String[] args) {
- initServerList();
- SimpleClient client = new SimpleClient();
- BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
- while (true) {
- String name;
- try {
- name = console.readLine();
- if("exit".equals(name)) {
- System.exit(0);
- }
- client.send(name);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- private static void initServerList() {
- servers.clear();
- servers.add("127.0.0.1:18080");
- }
- public static String getServer() {
- return servers.get(0);
- }
- public SimpleClient() {
- }
- public void send(String name) {
- String server = SimpleClient.getServer();
- String[] cfg = server.split(":");
- Socket socket = null;
- BufferedReader in = null;
- PrintWriter out = null;
- try {
- socket = new Socket(cfg[0], Integer.parseInt(cfg[1]));
- in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- out = new PrintWriter(socket.getOutputStream(), true);
- out.println(name);
- while(true) {
- String resp = in.readLine();
- if(resp == null)
- break;
- else if(resp.length() > 0) {
- System.out.println("Receive : " + resp);
- break;
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (out != null) {
- out.close();
- }
- if (in != null) {
- try {
- in.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- if (socket != null) {
- try {
- socket.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
運行測試,服務器端輸出截圖:

客戶端輸出截圖:

很好,一切運行正常。
接下來添加ZooKeeper部分。為了演示效果更好,修改一下ZooKeeper的配置文件,以便於服務器斷開后能更快的被監測到。主要是減小Session的超時時間
zookeeper/conf/zoo.cfg
- tickTime=2000
- initLimit=2
- syncLimit=5
- dataDir=D:\\ZooKeeper\\zookeeper-3.4.8\\data
- clientPort=2181
- http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
- minSessionTimeout=2000
- maxSessionTimeout=5000
在項目中添加zkclient的maven依賴
- <!-- http://mvnrepository.com/artifact/com.101tec/zkclient -->
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- <version>0.8</version>
- </dependency>
Server代碼
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.PrintWriter;
- import java.net.ServerSocket;
- import java.net.Socket;
- import org.I0Itec.zkclient.ZkClient;
- public class SimpleServer implements Runnable {
- public static void main(String[] args) throws IOException {
- int port = 18080;
- SimpleServer server = new SimpleServer(port);
- Thread thread = new Thread(server);
- thread.start();
- }
- private int port;
- public SimpleServer(int port) {
- this.port = port;
- }
- private void regServer() {
- //向ZooKeeper注冊當前服務器
- ZkClient client = new ZkClient("127.0.0.1:2181", 60000, 1000);
- String path = "/test/server" + port;
- if(client.exists(path))
- client.delete(path);
- client.createEphemeral(path, "127.0.0.1:" + port);
- }
- @Override
- public void run() {
- ServerSocket server = null;
- try {
- server = new ServerSocket(port);
- regServer();
- System.out.println("Server started at " + port);
- Socket socket = null;
- while (true) {
- socket = server.accept();
- new Thread(new SimpleServerHandler(socket)).start();
- }
- } catch(IOException ex) {
- ex.printStackTrace();
- } finally {
- if (server != null) {
- try {
- server.close();
- } catch (IOException e) {}
- }
- }
- }
- }
- //SimpleServerHandler略
客戶端代碼
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.PrintWriter;
- import java.net.Socket;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.List;
- import java.util.Random;
- import org.I0Itec.zkclient.IZkChildListener;
- import org.I0Itec.zkclient.ZkClient;
- public class SimpleClient {
- private static List<String> servers = new ArrayList<>();
- public static void main(String[] args) {
- initServerList();
- SimpleClient client = new SimpleClient();
- BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
- while (true) {
- String name;
- try {
- name = console.readLine();
- if("exit".equals(name)) {
- System.exit(0);
- }
- client.send(name);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- private static void initServerList() {
- //啟動時從ZooKeeper讀取可用服務器
- String path = "/test";
- ZkClient zkClient = new ZkClient("127.0.0.1:2181", 60000, 1000);
- List<String> childs = zkClient.getChildren(path);
- servers.clear();
- for(String p : childs) {
- servers.add(zkClient.readData(path + "/" + p));
- }
- //訂閱節點變化事件
- zkClient.subscribeChildChanges("/test", new IZkChildListener() {
- @Override
- public void handleChildChange(String parentPath, List<String> currentChilds)
- throws Exception {
- System.out.println(String.format("[ZookeeperRegistry] service list change: path=%s, currentChilds=%s", parentPath, currentChilds.toString()));
- servers.clear();
- for(String p : currentChilds) {
- servers.add(zkClient.readData(path + "/" + p));
- }
- System.out.println("Servers: " + servers.toString());
- }
- });
- }
- public static String getServer() {
- return servers.get(new Random().nextInt(servers.size()));
- }
- //其他無變化, 略
- }
分別啟動Server和Client,然后修改Server的端口號,再啟動一個實例,可以看到客戶端檢測到了這個新服務器的存在

在客戶端發送一些消息,可以看到被隨機的分發到兩個Server上處理



接下來關閉其中一個Server,可以看到客戶端幾秒鍾后監測到這個事件並自動刪除了該服務器。

可以看到,基於ZooKeeper實現軟負載均衡非常簡單,與應用緊密結合,使用靈活。
