一、在具體實現之前,先來了解一下Zookeeper的監聽器的原理:
圖中Main()線程作為客戶端,當在主線程中創建Zookeeper客戶端時,會默認創建兩個子線程:Listener和connect,connect線程負責將某一操作對應的的監聽事件發送給Zookeeper服務集群。Zookeeper收到監聽事件后會在該操作對應的監聽器列表中注冊該事件。
比如圖中的獲取節點“/”的子節點getChildren這一事件,並設置了true,表示監聽此事件,那么Zookeeper就會在監聽器列表中注冊該事件。一旦“/”節點的子節點發生變化,getChildren的結果就隨之發生變化,Zookeeper就會通知客戶端的Listener線程,Listener就會去調用process方法對“/”的變化做出應對處理。“/”的變化可能是客戶端不能控制的,但是為了適應這種變化,客戶端在收到服務器的通知后可根據自身情況做出應對。
二、這樣說可能比較抽象,我們用一個案例來說明:
public class ZkDemo { private String connect = "hadoop101:2181"; private int timeout = 2000; private ZooKeeper zooKeeper = null; //1、獲取Zookeeper客戶端,用於連接Zookeeper集群,其功能類似於Linux中啟動./zkCli.sh @Before public void getClient() throws Exception{ zooKeeper = new ZooKeeper(connect, timeout, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getPath() + "已被修改,是否確定?"); try { zooKeeper.getChildren("/lsj",true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } /** * 查看子節點 */ @Test public void testList() throws Exception { List<String> children = zooKeeper.getChildren("/lsj", true); for (String str: children) { System.out.println(str); } Thread.sleep(Long.MAX_VALUE); } }
附:Thread.sleep()的設置是為了讓testList方法從某種意義上更像是客戶端那樣持續保持連接,所以必須讓testList方法處在執行的過程中。
默認的監聽次數是一次(即List<String> children = zooKeeper.getChildren("/lsj", true);執行后,Zookeeper會在/lsj節點的監聽列表中注冊一個監聽事件,如果該節點發生變化,就通知給申請監聽的客戶端的listener,並將該監聽事件從節點的監聽列表中刪除),服務端並不會持續為某客戶端監聽某一節點的變化。如果被監聽的節點發生變化,會調用監聽器的process方法,可以在process方法中再次調用getChildre方法並申請對目標節點的監聽,通過這個小技巧使得監聽的次數變得無限多了。只需記住:這段代碼啟動后對Zookeeper來說也是一個客戶端,設為客戶端1。在zooKeeper.getChildren("/lsj", true)設置了監聽此事件,當/lsj節點下的子節點發生變化時getChildren方法結果會發生變化,會觸發監聽器Watcher(就是那個內部類)執行process方法,在process方法中執行了應對變化的處理后再次調用getChildren方法,這才使得監聽好像變的無限多了。
下圖所示:在Linux中啟動的客戶端(設為客戶端2)每一次對/lsj節點增加節點都會視為getChildren結果的變化,故而Zookeeper會通知客戶端1這種變化,進而觸發監聽器執行process方法。
三、下面進入Zookeeper的正題:如何利用Zookeeper監聽服務器集群的變化,並在服務器集群變化時通知客戶端呢?
原理和上面一樣一樣的,只需將客戶端1想象成下文的客戶端群體,把客戶端2想想成服務器群體。如果沒有Zookeeper這個角色,讓客戶端直接和服務器接觸,當客戶端請求的一台服務器正好宕機時,客戶端將無法獲取資源,但又不知道這是服務器宕機所造成的問題,也無法改變請求到另一台正常運行的服務器,那么這個問題如何解決呢?
思路是這樣的,每一台服務器上線時都會在Zookeeper上的/ServerCluster的節點下創建一個標識本機的子節點(臨時的 -e )。當,某一台服務器宕機時,那么其在/ServerCluster下創建的節點就會隨之消失。我們讓客戶端獲取服務器信息時,監聽/ServerCluster的子節點變化,那么當某一台服務器宕機時(臨時節點隨之消失),Zookeeper會通知客戶端/ServerCluster的變化,客戶端也就知道了具體的哪一台服務器當機,哪一台服務器正常運行可以訪問了。
在這個過程中,最重要的一點:服務器和客戶端對於Zookeeper集群來說,都是客戶端。
具體實現:
public class Server { private static String connect = "hadoop101:2181"; private static int timeout = 2000; private static ZooKeeper zooKeeper = null; private static String parentPath = "/ServerCluster"; public static void main(String[] args) throws Exception { //1、獲取一個Zookeeper的客戶端對象,用於服務器向Zookeeper集群注冊自己。 getClient(); //2、把本服務器的主機名注冊到Zookeeper中的特定節點中 registServer(args[0]); //3、服務器本身的業務邏輯 getBusiness(args[0]); } private static void getBusiness(String hostname) throws InterruptedException { System.out.println(hostname + " is working..."); Thread.sleep(Long.MAX_VALUE); } private static void registServer(String hostname) throws KeeperException, InterruptedException { //創建臨時節點 String path = zooKeeper.create(parentPath + "/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname + " is online..."); } private static void getClient() throws Exception { zooKeeper = new ZooKeeper(connect, timeout, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getType() + "---" + event.getPath()); } }); } }
public class Client { private static String connect = "hadoop101:2181"; private static int timeout = 2000; private static ZooKeeper zooKeeper = null; private static String parentPath = "/ServerCluster";
public static void main(String[] args) throws Exception { //1、獲取一個Zookeeper的客戶端對象,用於服務器向Zookeeper集群注冊自己。 getClient(); //2、獲取服務器列表(主機名),並監聽 getServers(); //3、客戶端的業務邏輯 getBusiness(); } private static void getBusiness() throws InterruptedException { System.out.println("Client is working..."); Thread.sleep(Long.MAX_VALUE); } private static void getServers() throws KeeperException, InterruptedException { //向Zookeeper給getChildren方法注冊監聽,一旦parentPath節點發生變化,就會通知監聽器觸發process方法 List<String> children = zooKeeper.getChildren(parentPath, true); //此集合用於保存服務器主機名 ArrayList<String> hosts = new ArrayList(); for (String child: children) { byte[] data = zooKeeper.getData(parentPath + "/" + child, false, null); hosts.add(new String(data)); } System.out.println(hosts); } private static void getClient() throws Exception { zooKeeper = new ZooKeeper(connect, timeout, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getType() + "---" + event.getPath()); //執行到本方法就說明parentPath已經被修改了,即服務器列表發生變化,需要重新獲取。 try { getServers(); } catch (Exception e) { e.printStackTrace(); } } }); } }
通過配置參數開啟三個服務器hadoop101,開啟一個Client:
關閉hadoop101:
Client觀察到這種變化,打印新的服務器列表:
當然,如果在Linux中加入一個“服務器”,Client也可以監聽到: