瘋狂創客圈 Java 分布式聊天室【 億級流量】實戰系列之 -25【 博客園 總入口 】
寫在前面
大家好,我是作者尼恩。目前和幾個小伙伴一起,組織了一個高並發的實戰社群【瘋狂創客圈】。正在開始高並發、億級流程的 IM 聊天程序 學習和實戰
前面,已經完成一個高性能的 Java 聊天程序的四件大事:
接下來,需要進入到分布式開發的環節了。 分布式的中間件,瘋狂創客圈的小伙伴們,一致的選擇了zookeeper,不僅僅是由於其在大數據領域,太有名了。更重要的是,很多的著名框架,都使用了zk。
本篇介紹 ZK Curator 的事件監聽。
1.1. Curator 事件監聽
Curator 事件有兩種模式,一種是標准的觀察模式,一種是緩存監聽模式。標准的監聽模式是使用Watcher 監聽器。第二種緩存監聽模式引入了一種本地緩存視圖的Cache機制,來實現對Zookeeper服務端事件監聽。
Cache事件監聽可以理解為一個本地緩存視圖與遠程Zookeeper視圖的對比過程。Cache提供了反復注冊的功能。Cache是一種緩存機制,可以借助Cache實現監聽。簡單來說,Cache在客戶端緩存了znode的各種狀態,當感知到zk集群的znode狀態變化,會觸發event事件,注冊的監聽器會處理這些事件。
Watcher 監聽器比較簡單,只有一種。Cache事件監聽的種類有3種Path Cache,Node Cache,Tree Cache。
1.1.1. Watcher 標准的事件處理器
在ZooKeeper中,接口類Watcher用於表示一個標准的事件處理器,其定義了事件通知相關的邏輯,包含KeeperState和EventType兩個枚舉類,分別代表了通知狀態和事件類型。
Watcher接口定義了事件的回調方法:process(WatchedEvent event)。定義一個Watcher的實例很簡單,代碼如下:
Watcher w = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
log.info("監聽器watchedEvent:" + watchedEvent);
}
};
使用Watcher監聽器實例的方式也很簡單,在Curator的調用鏈上,加上usingWatcher方法即可,代碼如下:
byte[] content = client.getData()
.usingWatcher(w).forPath(workerPath);
一個Watcher監聽器在向服務端完成注冊后,當服務端的一些事件觸發了這個Watcher,那么就會向指定客戶端發送一個事件通知,來實現分布式的通知功能。客戶收到服務器的通知后,Curator 會封裝一個WatchedEvent 事件實例,傳遞給監聽器的回調方法process(WatchedEvent event)。
WatchedEvent包含了三個基本屬性:
(1)通知狀態(keeperState)
(2)事件類型(EventType)
(3)節點路徑(path)
注意,WatchedEvent並不是直接從ZooKeeper集群直接傳遞過來的事件實例,而是Curator 封裝過的事件實例。WatchedEvent類型沒有實現序列化接口java.io.Serializable,因此不能用於網絡傳輸。ZooKeeper集群直接網絡傳輸傳遞過來的事件實例是啥呢? 是一個WatcherEvent類型的實例,這個傳輸實例和Curator 封裝過的WatchedEvent實例,在名稱上有一個字母之差,而且功能也是一樣的,都表示的是同一個事物,都是對一個服務端事件的封裝。
因此,這里只講Curator 封裝過的WatchedEvent實例。下邊列舉了ZooKeeper中最常見的幾個通知狀態和事件類型。
| KeeperState | EventType | 觸發條件 | 說明 |
|---|---|---|---|
| None (-1) | 客戶端與服務端成功建立連接 | ||
| SyncConnected (0) | NodeCreated (1) | Watcher監聽的對應數據節點被創建 | |
| NodeDeleted (2) | Watcher監聽的對應數據節點被刪除 | 此時客戶端和服務器處於連接狀態 | |
| NodeDataChanged (3) | Watcher監聽的對應數據節點的數據內容發生變更 | ||
| NodeChildChanged (4) | Wather監聽的對應數據節點的子節點列表發生變更 | ||
| Disconnected (0) | None (-1) | 客戶端與ZooKeeper服務器斷開連接 | 此時客戶端和服務器處於斷開連接狀態 |
| Expired (-112) | Node (-1) | 會話超時 | 此時客戶端會話失效,通常同時也會受到SessionExpiredException異常 |
| AuthFailed (4) | None (-1) | 通常有兩種情況,1:使用錯誤的schema進行權限檢查 2:SASL權限檢查失敗 | 通常同時也會收到AuthFailedException異常 |
利用Watcher來對節點進行監聽操作,但此監聽操作只能監聽一次。來看一個簡單的實例程序:
@Slf4j
@Data
public class ZkWatcherDemo {
private String workerPath = "/test/listener/node";
private String subWorkerPath = "/test/listener/node/id-";
@Test
public void testWatcher() {
CuratorFramework client = ZKclient.instance.getClient();
//檢查節點是否存在,沒有則創建
boolean isExist = ZKclient.instance.isNodeExist(workerPath);
if (!isExist) {
ZKclient.instance.createNode(workerPath, null);
}
try {
Watcher w = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("監聽到的變化 watchedEvent = " + watchedEvent);
}
};
byte[] content = client.getData()
.usingWatcher(w).forPath(workerPath);
log.info("監聽節點內容:" + new String(content));
// 第一次變更節點數據
client.setData().forPath(workerPath, "第1次更改內容".getBytes());
// 第二次變更節點數據
client.setData().forPath(workerPath, "第2次更改內容".getBytes());
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
//....
}
運行代碼,輸出的結果如下:
監聽到的變化 watchedEvent = WatchedEvent state:SyncConnected type:NodeDataChanged path:/test/listener/node
程序中,對節點路徑 “/test/listener/node”注冊一個Watcher監聽器實例,隨后調用setData方法兩次改變節點內容,但是,監聽器僅僅監聽到了一個事件。也就是說,當第二次改變節點內容時,監聽已經失效,無法再次獲得節點變動事件。
也就是說,Watcher監聽器是一次性的,如果要反復使用,就需要反復的使用usingWatcher提前注冊。
所以,Watcher監聽器不能應用於節點的數據變動或者節點變動這樣的一般業務場景。而是適用於一些特殊的,比如會話超時、授權失敗等這樣的特殊場景。
既然Watcher監聽器是一次性的,在開發過程中需要反復注冊Watcher,比較繁瑣。Curator引入了Cache來監聽ZooKeeper服務端的事件。Cache對ZooKeeper事件監聽進行了封裝,能夠自動處理反復注冊監聽。
1.1.2. NodeCache 節點緩存的監聽
Curator引入的Cache緩存實現,是一個系列,包括了Node Cache 、Path Cache、Tree Cache三組類。其中Node Cache節點緩存可以用於ZNode節點的監聽,Path Cache子節點緩存用於ZNode的子節點的監聽,而Tree Cache樹緩存是Path Cache的增強,不光能監聽子節點,也能監聽ZNode節點自身。
Node Cache,可以用於監控本節點的新增,刪除,更新。
Node Cache使用的第一步,就是構造一個NodeCache緩存實例。
有兩個構造方法,具體如下:
NodeCache(CuratorFramework client, String path)
NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
第一個參數就是傳入創建的Curator的框架客戶端,第二個參數就是監聽節點的路徑,第三個重載參數dataIsCompressed 表示是否對數據進行壓縮。
NodeCache使用的第二步,就是構造一個NodeCacheListener監聽器實例。該接口的定義如下:
package org.apache.curator.framework.recipes.cache;
public interface NodeCacheListener {
void nodeChanged() throws Exception;
}
NodeCacheListener監聽器接口,只定義了一個簡單的方法 nodeChanged,當節點變化時,這個方法就會被回調到。
在創建完NodeCacheListener的實例之后,需要將這個實例注冊到NodeCache緩存實例,使用緩存實例的addListener方法。 然后使用緩存實例nodeCache的start方法,啟動節點的事件監聽。
nodeCache.getListenable().addListener(l);
nodeCache.start();
強調下,需要調用nodeCache的start方法能進行緩存和事件監聽,這個方法有兩個版本:
void start()//Start the cache.
void start(boolean buildInitial) //true代表緩存當前節點
唯一的一個參數buildInitial代表着是否將該節點的數據立即進行緩存。如果設置為true的話,在start啟動時立即調用NodeCache的getCurrentData方法就能夠得到對應節點的信息ChildData類,如果設置為false的就得不到對應的信息。
使用NodeCache來監聽節點的事件,完整的實例代碼如下:
@Test
public void testNodeCache() {
//檢查節點是否存在,沒有則創建
boolean isExist = ZKclient.instance.isNodeExist(workerPath);
if (!isExist) {
ZKclient.instance.createNode(workerPath, null);
}
CuratorFramework client = ZKclient.instance.getClient();
try {
NodeCache nodeCache =
new NodeCache(client, workerPath, false);
NodeCacheListener l = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
ChildData childData = nodeCache.getCurrentData();
log.info("ZNode節點狀態改變, path={}", childData.getPath());
log.info("ZNode節點狀態改變, data={}", new String(childData.getData(), "Utf-8"));
log.info("ZNode節點狀態改變, stat={}", childData.getStat());
}
};
nodeCache.getListenable().addListener(l);
nodeCache.start();
// 第1次變更節點數據
client.setData().forPath(workerPath, "第1次更改內容".getBytes());
Thread.sleep(1000);
// 第2次變更節點數據
client.setData().forPath(workerPath, "第2次更改內容".getBytes());
Thread.sleep(1000);
// 第3次變更節點數據
client.setData().forPath(workerPath, "第3次更改內容".getBytes());
Thread.sleep(1000);
// 第4次變更節點數據
// client.delete().forPath(workerPath);
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
log.error("創建NodeCache監聽失敗, path={}", workerPath);
}
}
運行的結果是,NodeCashe節點緩存能夠重復的進行事件節點。代碼中的第三次監聽的輸出節選如下:
\- ZNode節點狀態改變, path=/test/listener/node
\- ZNode節點狀態改變, data=第3次更改內容
\- ZNode節點狀態改變, stat=17179869191,...
最后說明一下,如果NodeCache監聽的節點為空(也就是說傳入的路徑不存在)。那么如果我們后面創建了對應的節點,也是會觸發事件從而回調nodeChanged方法。
1.1.3. PathChildrenCache 子節點監聽
PathChildrenCache子節點緩存用於子節點的監聽,監控本節點的子節點被創建、更新或者刪除。需要強調兩點:
(1)只能監聽子節點,監聽不到當前節點
(2)不能遞歸監聽,子節點下的子節點不能遞歸監控
PathChildrenCache子節點緩存使用的第一步,就是構造一個緩存實例。
有多個重載版本的構造方法,選擇4個進行說明,具體如下:
public PathChildrenCache(CuratorFramework client, String path,boolean cacheData)
public PathChildrenCache(CuratorFramework client, String path,boolean cacheData,
boolean dataIsCompressed,final ExecutorService executorService)
public PathChildrenCache(CuratorFramework client, String path,boolean cacheData,
boolean dataIsCompressed,ThreadFactory threadFactory)
public PathChildrenCache(CuratorFramework client, String path,boolean cacheData,
ThreadFactory threadFactory)
所有的構造方法,前三個參數,都是一樣的。
第一個參數就是傳入創建的Curator的框架客戶端,第二個參數就是監聽節點的路徑,第三個重載參數cacheData表示是否把節點內容緩存起來。如果cacheData為true,那么接收到節點列表變更事件的同時,會將獲得節點內容。
dataIsCompressed參數(如果有),表示是否對節點數據進行壓縮。
executorService 和threadFactory參數差不多,表示通過傳入的線程池或者線程工廠,來異步處理監聽事件。
threadFactory參數(如果有)表示線程池工廠,當PathChildrenCache內部需要開啟新的線程執行時,使用該線程池工廠來創建線程。
PathChildrenCache子節點緩存使用的第二步,就是構造一個子節點緩存監聽器PathChildrenCacheListener實例。該接口的定義如下:
package org.apache.curator.framework.recipes.cache;
import org.apache.curator.framework.CuratorFramework;
public interface PathChildrenCacheListener {
void childEvent(CuratorFramework client, PathChildrenCacheEvent e) throws Exception;
}
PathChildrenCacheListener監聽器接口中,也只定義了一個簡單的方法 childEvent,當子節點有變化時,這個方法就會被回調到。
在創建完PathChildrenCacheListener的實例之后,需要將這個實例注冊到PathChildrenCache緩存實例,使用緩存實例的addListener方法。 然后使用緩存實例nodeCache的start方法,啟動節點的事件監聽。
這里的start方法,需要傳入啟動的模式。可以傳入三種模式,也就是API列表中看到的StartMode,其中定義了下面三種枚舉:
(1)NORMAL——異步初始化cache
(2)BUILD_INITIAL_CACHE——同步初始化cache
(3)POST_INITIALIZED_EVENT——異步初始化cache,並觸發完成事件
對於start模式的三種啟動方式,詳細的說明如下:
BUILD_INITIAL_CACHE:啟動時,同步初始化cache,以及創建cache后,就從服務器拉取對應的數據。
POST_INITIALIZED_EVENT:啟動時,異步初始化cache,初始化完成觸發PathChildrenCacheEvent.Type#INITIALIZED事件,cache中Listener會收到該事件的通知。
最后是第一個枚舉常量,NORMAL:啟動時,異步初始化cache,完成后不會發出通知。
使用PathChildrenCache來監聽節點的事件,完整的實例代碼如下:
@Test
public void testPathChildrenCache() {
//檢查節點是否存在,沒有則創建
boolean isExist = ZKclient.instance.isNodeExist(workerPath);
if (!isExist) {
ZKclient.instance.createNode(workerPath, null);
}
CuratorFramework client = ZKclient.instance.getClient();
try {
PathChildrenCache cache =
new PathChildrenCache(client, workerPath, true);
PathChildrenCacheListener l =
new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) {
try {
ChildData data = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
log.info("子節點增加, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
case CHILD_UPDATED:
log.info("子節點更新, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
case CHILD_REMOVED:
log.info("子節點刪除, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
default:
break;
}
} catch (
UnsupportedEncodingException e) {
e.printStackTrace();
}
}
};
cache.getListenable().addListener(l);
cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
Thread.sleep(1000);
for (int i = 0; i < 3; i++) {
ZKclient.instance.createNode(subWorkerPath + i, null);
}
Thread.sleep(1000);
for (int i = 0; i < 3; i++) {
ZKclient.instance.deleteNode(subWorkerPath + i);
}
} catch (Exception e) {
log.error("PathCache監聽失敗, path=", workerPath);
}
}
運行的結果如下:
\- 子節點增加, path=/test/listener/node/id-0, data=to set content
\- 子節點增加, path=/test/listener/node/id-2, data=to set content
\- 子節點增加, path=/test/listener/node/id-1, data=to set content
......
\- 子節點刪除, path=/test/listener/node/id-2, data=to set content
\- 子節點刪除, path=/test/listener/node/id-0, data=to set content
\- 子節點刪除, path=/test/listener/node/id-1, data=to set content
可以看到,PathChildrenCache 能夠反復的監聽到節點的新增和刪除。
簡單說下Curator的監聽原理,無論是PathChildrenCache,還是TreeCache,所謂的監聽,都是進行Curator本地緩存視圖和ZooKeeper服務器遠程的數據節點的對比。
在什么場景下觸發事件呢?
以節點增加事件NODE_ADDED為例,所在本地緩存視圖開始的時候,本地視圖為空,在數據同步的時候,本地的監聽器就能監聽到NODE_ADDED事件。這是因為,剛開始本地緩存並沒有內容,然后本地緩存和服務器緩存進行對比,發現ZooKeeper服務器有節點而本地緩存沒有,這才將服務器的節點緩存到本地,就會觸發本地緩存的NODE_ADDED事件。
1.1.4. Tree Cache 節點樹緩存
前面已經講完了兩個系列的緩存監聽。簡單回顧一下:
Node Cache用來觀察ZNode自身,如果ZNode節點本身被創建,更新或者刪除,那么Node Cache會更新緩存,並觸發事件給注冊的監聽器。Node Cache是通過NodeCache類來實現的,監聽器對應的接口為NodeCacheListener。
Path Cache子節點緩存用來觀察ZNode的子節點、並緩存子節點的狀態,如果ZNode的子節點被創建,更新或者刪除,那么Path Cache會更新緩存,並且觸發事件給注冊的監聽器。Path Cache是通過PathChildrenCache類來實現的,監聽器注冊是通過PathChildrenCacheListener。
最后的一個系列,是Tree Cache。Tree Cache可以看做是上兩種的合體,Tree Cache觀察的是當前ZNode節點的所有數據。而TreeCache節點樹緩存是PathChildrenCache的增強,不光能監聽子節點,也能監聽節點自身。
Tree Cache使用的第一步,就是構造一個TreeCache緩存實例。
有兩個構造方法,具體如下:
TreeCache(CuratorFramework client, String path)
TreeCache(CuratorFramework client, String path,
boolean cacheData, boolean dataIsCompressed, int maxDepth,
ExecutorService executorService, boolean createParentNodes,
TreeCacheSelector selector)
第一個參數就是傳入創建的Curator的框架客戶端,第二個參數就是監聽節點的路徑,第三個重載參數dataIsCompressed 表示是否對數據進行壓縮。maxDepth表示緩存的層次深度,默認為整數最大值。executorService 表示監聽的的執行線程池,默認會創建一個單一線程的線程池。createParentNodes 表示是否創建父親節點,默認為false。
一般情況下,使用第一個構造函數即可。
TreeCache使用的第二步,就是構造一個TreeCacheListener監聽器實例。該接口的定義如下:
package org.apache.curator.framework.recipes.cache;
import org.apache.curator.framework.CuratorFramework;
public interface TreeCacheListener {
void childEvent(CuratorFramework var1, TreeCacheEvent var2) throws Exception;
}
TreeCacheListener 監聽器接口中,也只定義了一個簡單的方法 childEvent,當子節點有變化時,這個方法就會被回調到。
在創建完TreeCacheListener 的實例之后,使用緩存實例的addListener方法,將TreeCacheListener 監聽器實例注冊到TreeCache 緩存實例。 然后使用緩存實例nodeCache的start方法,啟動節點的事件監聽。
整個實例的代碼如下:
@Test
public void testTreeCache() {
//檢查節點是否存在,沒有則創建
boolean isExist = ZKclient.instance.isNodeExist(workerPath);
if (!isExist) {
ZKclient.instance.createNode(workerPath, null);
}
CuratorFramework client = ZKclient.instance.getClient();
try {
TreeCache treeCache =
new TreeCache(client, workerPath);
TreeCacheListener l =
new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client,
TreeCacheEvent event) {
try {
ChildData data = event.getData();
if(data==null)
{
log.info("數據為空");
return;
}
switch (event.getType()) {
case NODE_ADDED:
log.info("[TreeCache]節點增加, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
case NODE_UPDATED:
log.info("[TreeCache]節點更新, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
case NODE_REMOVED:
log.info("[TreeCache]節點刪除, path={}, data={}",
data.getPath(), new String(data.getData(), "UTF-8"));
break;
default:
break;
}
} catch (
UnsupportedEncodingException e) {
e.printStackTrace();
}
}
};
treeCache.getListenable().addListener(l);
treeCache.start();
Thread.sleep(1000);
for (int i = 0; i < 3; i++) {
ZKclient.instance.createNode(subWorkerPath + i, null);
}
Thread.sleep(1000);
for (int i = 0; i < 3; i++) {
ZKclient.instance.deleteNode(subWorkerPath + i);
}
Thread.sleep(1000);
ZKclient.instance.deleteNode(workerPath);
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
log.error("PathCache監聽失敗, path=", workerPath);
}
}
運行的結果如下:
\- [TreeCache]節點增加, path=/test/listener/node, data=to set content
\- [TreeCache]節點增加, path=/test/listener/node/id-0, data=to set content
\- [TreeCache]節點增加, path=/test/listener/node/id-1, data=to set content
\- [TreeCache]節點增加, path=/test/listener/node/id-2, data=to set content
\- [TreeCache]節點刪除, path=/test/listener/node/id-2, data=to set content
\- [TreeCache]節點刪除, path=/test/listener/node/id-1, data=to set content
\- [TreeCache]節點刪除, path=/test/listener/node/id-0, data=to set content
\- [TreeCache]節點刪除, path=/test/listener/node, data=to set content
最后,說明下事件的類型,對應於節點的增加、修改、刪除,TreeCache 的事件類型為:
(1)NODE_ADDED
(2)NODE_UPDATED
(3)NODE_REMOVED
這一點,與Path Cache 的事件類型不同,與Path Cache 的事件類型為:
(1)CHILD_ADDED
(2)CHILD_UPDATED
(3)CHILD_REMOVED
寫在最后
下一篇:基於zk,實現分布式鎖。
瘋狂創客圈 億級流量 高並發IM 實戰 系列
- Java (Netty) 聊天程序【 億級流量】實戰 開源項目實戰
- Netty 源碼、原理、JAVA NIO 原理
- Java 面試題 一網打盡
- 瘋狂創客圈 【 博客園 總入口 】
