上幾次博客,我說了一下Zookeeper的簡單使用和API的使用,我們接下來看一下他的真實場景。
一、分布式集群管理✨✨✨
我們現在有這樣一個需求,請先拋開Zookeeper是集群還是單機的概念,下面提到的都是以Zookeeper集群來說的。
1. 主動查看線上服務節點
2. 查看服務節點資源使用情況
3. 服務離線通知
4. 服務資源(CPU、內存、硬盤)超出閥值通知

我們先來看一下代碼實現流程吧。主要分為兩個部分的,一個部分是寫入Zookeeper集群,另一部分是獲取Zookeeper集群內部的數據。
寫入Zookeeper集群部分:
寫入的信息包括該服務器的內存使用情況,CPU使用情況等信息。
public void init() { zkClient = new ZkClient(server, 5000, 10000); System.out.println("zk連接成功" + server); // 創建根節點 buildRoot(); // 創建臨時節點 createServerNode(); // 啟動更新的線程 stateThread = new Thread(() -> { while (true) { updateServerNode();//數據寫到 當前的臨時節點中去 try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }, "zk_stateThread"); stateThread.setDaemon(true); stateThread.start(); }
//創建根節點,如果根節點已經存在,則不再重復創建 public void buildRoot() { if (!zkClient.exists(rootPath)) { zkClient.createPersistent(rootPath); } }
// 生成服務節點 public void createServerNode() { nodePath = zkClient.createEphemeralSequential(servicePath, getOsInfo()); System.out.println("創建節點:" + nodePath); }
每一個服務都有自己的唯一的臨時序號節點。
// 獲取服務節點狀態 public String getOsInfo() { OsBean bean = new OsBean(); bean.lastUpdateTime = System.currentTimeMillis(); bean.ip = getLocalIp(); bean.cpu = CPUMonitorCalc.getInstance().getProcessCpu(); MemoryUsage memoryUsag = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); bean.usedMemorySize = memoryUsag.getUsed() / 1024 / 1024; bean.usableMemorySize = memoryUsag.getMax() / 1024 / 1024; bean.pid = ManagementFactory.getRuntimeMXBean().getName(); ObjectMapper mapper = new ObjectMapper(); try { return mapper.writeValueAsString(bean); } catch (JsonProcessingException e) { throw new RuntimeException(e); } }
// 數據寫到 當前的臨時節點中去 public void updateServerNode() { zkClient.writeData(nodePath, getOsInfo()); }
當每個服務開啟的時候,我們就應該向我們的Zookeeper發送我們的信息,也就是優先在根節點下創建一個臨時序號節點,並且寫入服務器的相關信息。就這樣我們的Zookeeper集群中就有了我們的服務器相關的信息。
讀取Zookeeper集群信息部分:
我們對於我們的節點遞歸監聽就可以了。監聽過程可以寫入我們的閾值限制,從而達到報警的目的。
// 初始化訂閱事件 public void initSubscribeListener() { zkClient.unsubscribeAll(); // 獲取所有子節點 zkClient.getChildren(rootPath) .stream() .map(p -> rootPath + "/" + p)// 得出子節點完整路徑 .forEach(p -> { zkClient.subscribeDataChanges(p, new DataChanges());// 數據變更的監聽 }); // 監聽子節點,的變更 增加,刪除 zkClient.subscribeChildChanges(rootPath, (parentPath, currentChilds) -> initSubscribeListener()); }
我們也可以將我們獲取到的信息寫入到我們的web頁面中去。作為我們Zookeeper集群對於服務器健康信息管理的小程序。(我服務器到期了,要不就給你們一套完整的代碼演示了,過幾天補全)
總結:就是每個服務器往我們的Zookeeper寫入數據,在寫入之前創建根節點,然后創建我們的臨時序號節點再來寫入我們的數據,也是利用了臨時序號節點的特性,不會重復,而且斷開連接會清理掉。也可以將server服務器和我們的Zookeeper部署在同一個服務器也是不會影響的。(自行考慮內存,CPU,網絡等問題)
二、分布式注冊中心✨✨✨✨
很多分布式項目,並不是使用Spring Clould的Eureka的,自我覺得Eureka和Zookeeper平分秋色吧。我們來看一下我們的需求。
現有一個積分系統,由於使用人數巨大,我們需要同時部署四台服務器才能承載住我們的並發壓力。那么我們的請求來了,由誰來控制請求哪台服務器呢?這時就有了我們的Zookeeper注冊中心(需結合dubbo)。
我來大致用圖解的形式說一下原理,一會再說細節。
分布式注冊中心原理:
說到分布式注冊中心,我們需要知道幾個名詞。
注冊中心:注冊中心是指我們的Zookeeper集群,主要是用來存儲我們的接口信息和監聽我們服務提供者是否正常運行的。並且還保存了我們服務消費者的相關信息。
服務提供者:誰提供了這些接口,誰就是提供者。
服務消費者:誰想調用這些接口,誰就是消費者。
工作流程:
1.服務啟動,也就是我們的接口啟動了,優先去我們的注冊中心去注冊我們的接口信息,也就是用臨時序號節點來存我們的接口信息。
2.以后我們的服務提供者會持續的發送消息到注冊中心去,持續的告訴我們的注冊中心,我們還是可用的,還是活着的。
3.服務消費者來調用我們的接口了,第一次,需要到我們的注冊中心去找一個合適接口。(具體如何分配,並不是由Zookeeper來控制的),並將我們的注冊中心的提供服務IP列表緩存到自己的服務器上。
4.只要服務提供方節點沒有變動,我們的消費者以后的調用,只許讀取自己本地的緩存即可,不在需要去注冊中心讀取我們的服務提供者IP列表。

這里有一個最直觀的好處就是,原來我們寫接口需要指定去哪個IP調用,如果接口服務器IP變了,我們還需要調整我們的程序,這里我們只需要調用Zookeeper即可,不再需要調整程序了。

注意:保存消費者,我暫時理解的是為了方便直觀的看到當前都有哪些在調用我們的接口。
三、分布式JOB✨✨✨
分布式JOB,我第一次遇到這個名字的時候是懵的,我還記得我當時做項目要弄一個自動發送郵件的這樣一個需求,但是我們是橫向部署的,三台服務器都有這段代碼,每到半夜11.30的時候都會發三份完全一致的郵件,有人會提出,我們只寫一個自動任務,一台服務器部署不就可以了嗎?請你弄死他,我們要的高可用,你一台服務器怎么保證高可用,這樣的程序是明顯不合理的。說到這我們就有了我們的分布式Job,分布式Job就是要解決這樣類似的問題的。
還是先看一下實現原理和思路。

這樣我們就能保證只有master服務器能執行我們的自動任務,如果master宕機了,我們會有候補隊員保證我們的高可用。
四、分布式鎖✨✨✨✨✨✨
我們單機的程序,來使用synchronized關鍵字是可以實現多線程爭搶的問題,分布式鎖很多是redis集群來實現的,我們來使用Zookeeper也是可以的實現的。
程序內的鎖一般分為共享讀鎖和排它寫鎖,也就是我們加了共享讀鎖的時候,其它線程可以來讀,但是不能改,而我們的排它寫鎖,其它線程是不能進行任何操作的。
我們可以這樣來設計。

來一個線程就往我們的lock節點內添加一個臨時序號節點,值設置為readLock或者是writeLock,標記我們獲得是什么類型的鎖,當我們再來線城時,優先監聽我們的Lock節點的數據,來判斷我們是否可以得到鎖的資源,感覺還不錯,可以實現。但這樣的實現並不是很合理的,我們圖中畫了三個等待的線程還好,如果等待的線程是100個1000個的話,lock節點數據變化了,也就是上一個鎖釋放掉了,我們那1000個線程會瘋搶我們的鎖(羊群效應),可以想象1000個大媽在超市搶雞蛋的樣子,可怕....
我們換一個實現的思路再來試試。

我們這次改為只監聽比其小的節點數據即可,以圖為例來說,我們的Tread3想獲得寫鎖,必定等待Tread1和Tread2的讀鎖全部釋放,我們才可以給Tread3添加寫鎖,我們持續監聽Tread2線程,當Tread2線程鎖釋放掉,我們的Tread3會繼續監聽到Tread1的使用情況,直到沒有比他小的在使用鎖資源,我們才獲得我們的寫鎖資源。
感覺這個和我們的分布式JOB差不多,最小的序號獲得鎖。只不過有一個共享讀鎖和排它寫鎖的區別而已。
等我服務器續費的,上代碼,下次博客繼續來說說Zookeeper的源碼。


最進弄了一個公眾號,小菜技術,歡迎大家的加入

