zookeeper curator處理會話過期session expired


本文介紹在使用curator框架的時候如何handle session expire。

 

1、什么是zookeeper的會話過期?

 一般來說,我們使用zookeeper是集群形式,如下圖,client和zookeeper集群(3個實例)建立一個會話session。

 在這個會話session當中,client其實是隨機與其中一個zk provider建立的鏈接,並且互發心跳heartbeat。zk集群負責管理這個session,並且在所有的provider上維護這個session的信息,包括這個session中定義的臨時數據和監視點watcher。

如果再網絡不佳或者zk集群中某一台provider掛掉的情況下,有可能出現connection loss的情況,例如client和zk provider1連接斷開,這時候client不需要任何的操作(zookeeper api已經給我們做好了),只需要等待client與其他provider重新連接即可。這個過程可能導致兩個結果:

1)在session timeout之內連接成功

這個時候client成功切換到連接另一個provider例如是provider2,由於zk在所有的provider上同步了session相關的數據,此時可以認為無縫遷移了。

2)在session timeout之內沒有重新連接

這就是session expire的情況,這時候zookeeper集群會任務會話已經結束,並清除和這個session有關的所有數據,包括臨時節點和注冊的監視點Watcher。

在session超時之后,如果client重新連接上了zookeeper集群,很不幸,zookeeper會發出session expired異常,且不會重建session,也就是不會重建臨時數據和watcher。

 

2、如何使用curator實現session expired異常的捕獲和處理?

1)首先我們先創建一個鏈接

這里設置了重試策略retryPolicy和會話超時時間sessionTimeoutMs,並打開鏈接。

public void init() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder().connectString(zookeeperServer).retryPolicy(retryPolicy)
                .sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).build();
        client.start();
    }

 

2)客戶端注冊

public void register() {
        try {
            String rootPath = "/services";
            String hostAddress = InetAddress.getLocalHost().getHostAddress();
            String serviceInstance = "/prometheus" + "-" +  hostAddress + "-";
            String path = rootPath + serviceInstance;
            SessionConnectionListener sessionConnectionListener = new SessionConnectionListener(path, "");
            client.getConnectionStateListenable().addListener(sessionConnectionListener);
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        } catch (Exception e) {
            logger.error("注冊出錯", e);
        }
    }

 這里我們創建了一個臨時有序節點node,這個節點將會在session expired觸發的時候被自動刪除。當session又重新恢復的時候,client只會收到session expired異常和不會自動將臨時節點添加到zookeeper中。

為了解決這個問題,我們增加了一個監聽器,

client.getConnectionStateListenable().addListener(sessionConnectionListener)

這個監聽器監聽session expired事件,並且在事件發生的時候進行處理,監聽器處理的流程如下。

注意:這個監聽器注冊是可以復用的,即如果多次session expired,不用重復注冊監聽器。

 

3、監聽器sessionConnectionListener

package com.xiaoju.dqa.prometheus.client.zookeeper;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SessionConnectionListener implements ConnectionStateListener {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private String path;
    private String data;

    public SessionConnectionListener(String path, String data) {
        this.path = path;
        this.data = data;
    }

    @Override
    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState){
        if(connectionState == ConnectionState.LOST){
            logger.error("[負載均衡失敗]zk session超時");
            while(true){
                try {
                    if(curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut()){
                        curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, data.getBytes("UTF-8"));
                        logger.info("[負載均衡修復]重連zk成功");
                        break;
                    }
                } catch (InterruptedException e) {
                    break;
                } catch (Exception e){

                }
            }
        }
    }
}

 這里的ConnectionState.LOST等同於session expired事件,對這個事件的處理是,在一個死循環中重試鏈接zk,知道鏈接成功才退出循環。

 

需要注意的是:一旦重新創建了會話,那么之前會話的所有觀察點都會失效,需要重新初始化觀察點。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM