本文介紹在使用curator框架的時候如何handle session expire。
1、什么是zookeeper的會話過期?
一般來說,我們使用zookeeper是集群形式,如下圖,client和zookeeper集群(3個實例)建立一個會話session。
在這個會話session當中,client其實是隨機與其中一個zk provider建立的鏈接,並且互發心跳heartbeat。zk集群負責管理這個session,並且在所有的provider上維護這個session的信息,包括這個session中定義的臨時數據和監視點watcher。
如果再網絡不佳或者zk集群中某一台provider掛掉的情況下,有可能出現connection loss的情況,例如client和zk provider1連接斷開,這時候client不需要任何的操作(zookeeper api已經給我們做好了),只需要等待client與其他provider重新連接即可。這個過程可能導致兩個結果:
1)在session timeout之內連接成功
這個時候client成功切換到連接另一個provider例如是provider2,由於zk在所有的provider上同步了session相關的數據,此時可以認為無縫遷移了。
2)在session timeout之內沒有重新連接
這就是session expire的情況,這時候zookeeper集群會任務會話已經結束,並清除和這個session有關的所有數據,包括臨時節點和注冊的監視點Watcher。
在session超時之后,如果client重新連接上了zookeeper集群,很不幸,zookeeper會發出session expired異常,且不會重建session,也就是不會重建臨時數據和watcher。
2、如何使用curator實現session expired異常的捕獲和處理?
1)首先我們先創建一個鏈接
這里設置了重試策略retryPolicy和會話超時時間sessionTimeoutMs,並打開鏈接。
public void init() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); client = CuratorFrameworkFactory.builder().connectString(zookeeperServer).retryPolicy(retryPolicy) .sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).build(); client.start(); }
2)客戶端注冊
public void register() { try { String rootPath = "/services"; String hostAddress = InetAddress.getLocalHost().getHostAddress(); String serviceInstance = "/prometheus" + "-" + hostAddress + "-"; String path = rootPath + serviceInstance; SessionConnectionListener sessionConnectionListener = new SessionConnectionListener(path, ""); client.getConnectionStateListenable().addListener(sessionConnectionListener); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); } catch (Exception e) { logger.error("注冊出錯", e); } }
這里我們創建了一個臨時有序節點node,這個節點將會在session expired觸發的時候被自動刪除。當session又重新恢復的時候,client只會收到session expired異常和不會自動將臨時節點添加到zookeeper中。
為了解決這個問題,我們增加了一個監聽器,
client.getConnectionStateListenable().addListener(sessionConnectionListener)
這個監聽器監聽session expired事件,並且在事件發生的時候進行處理,監聽器處理的流程如下。
注意:這個監聽器注冊是可以復用的,即如果多次session expired,不用重復注冊監聽器。
3、監聽器sessionConnectionListener
package com.xiaoju.dqa.prometheus.client.zookeeper; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SessionConnectionListener implements ConnectionStateListener { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private String path; private String data; public SessionConnectionListener(String path, String data) { this.path = path; this.data = data; } @Override public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState){ if(connectionState == ConnectionState.LOST){ logger.error("[負載均衡失敗]zk session超時"); while(true){ try { if(curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut()){ curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, data.getBytes("UTF-8")); logger.info("[負載均衡修復]重連zk成功"); break; } } catch (InterruptedException e) { break; } catch (Exception e){ } } } } }
這里的ConnectionState.LOST等同於session expired事件,對這個事件的處理是,在一個死循環中重試鏈接zk,知道鏈接成功才退出循環。
需要注意的是:一旦重新創建了會話,那么之前會話的所有觀察點都會失效,需要重新初始化觀察點。