zookeeper源碼分析之六session機制


zookeeper中session意味着一個物理連接,客戶端連接服務器成功之后,會發送一個連接型請求,此時就會有session 產生。

session由sessionTracker產生的,sessionTracker的實現有SessionTrackerImpl,LocalSessionTracker,LeaderSessionTracker(leader),LearnerSessionTracker(follow and oberser)四種實現。它們的分支由各自的zookeeperServer.startup()開始。

1.sessionTrackerImpl

標准zookeeperServer的實現

    public synchronized void startup() {
        if (sessionTracker == null) {
            createSessionTracker();
        }
        startSessionTracker();
        setupRequestProcessors();

        registerJMX();

        state = State.RUNNING;
        notifyAll();
    }

其實現由sessionTrackerImpl來實現,其官方說明

/**
* This is a full featured SessionTracker. It tracks session in grouped by tick
* interval. It always rounds up the tick interval to provide a sort of grace
* period. Sessions are thus expired in batches made up of sessions that expire
* in a given interval.
*/

    protected void createSessionTracker() {
        sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
                tickTime, 1, getZooKeeperServerListener());
    }

    protected void startSessionTracker() {
        ((SessionTrackerImpl)sessionTracker).start();
    }

sessionTrackerImpl是一個線程,其run方法是真實邏輯:

    @Override
    public void run() {
        try {
            while (running) {
                long waitTime = sessionExpiryQueue.getWaitTime();
                if (waitTime > 0) {
                    Thread.sleep(waitTime);
                    continue;
                }

                for (SessionImpl s : sessionExpiryQueue.poll()) {
                    setSessionClosing(s.sessionId);
                    expirer.expire(s);
                }
            }
        } catch (InterruptedException e) {
            handleException(this.getName(), e);
        }
        LOG.info("SessionTrackerImpl exited loop!");
    }

sessionTrackerImpl實現了session的各種操作:創建session,檢測session,刪除session等。我們以增加session為例,看一下session機制:

    public long createSession(int sessionTimeout) {
        long sessionId = nextSessionId.getAndIncrement();
        addSession(sessionId, sessionTimeout);
        return sessionId;
    }
 public synchronized boolean addSession(long id, int sessionTimeout) {
        sessionsWithTimeout.put(id, sessionTimeout);

        boolean added = false;

 SessionImpl session = sessionsById.get(id);
        if (session == null){
            session = new SessionImpl(id, sessionTimeout);
        }

        // findbugs2.0.3 complains about get after put.
        // long term strategy would be use computeIfAbsent after JDK 1.8
        SessionImpl existedSession = sessionsById.putIfAbsent(id, session);

        if (existedSession != null) {
            session = existedSession;
        } else {
            added = true;
            LOG.debug("Adding session 0x" + Long.toHexString(id));
        }

        if (LOG.isTraceEnabled()) {
            String actionStr = added ? "Adding" : "Existing";
            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                    "SessionTrackerImpl --- " + actionStr + " session 0x"
                    + Long.toHexString(id) + " " + sessionTimeout);
        }

        updateSessionExpiry(session, sessionTimeout);
        return added;
    }

上文中出現了一個nextSessionId,看一下其的生成方式:

    /**
     * Generates an initial sessionId. High order byte is serverId, next 5
     * 5 bytes are from timestamp, and low order 2 bytes are 0s.
     */
    public static long initializeNextSession(long id) {
        long nextSid;
        nextSid = (Time.currentElapsedTime() << 24) >>> 8;
        nextSid =  nextSid | (id <<56);
        return nextSid;
    }

創建sessionImpl

  紅色代碼所示。

更新過期時間並記錄

    private void updateSessionExpiry(SessionImpl s, int timeout) {
        logTraceTouchSession(s.sessionId, timeout, "");
        sessionExpiryQueue.update(s, timeout);
    }

    private void logTraceTouchSession(long sessionId, int timeout, String sessionStatus){
        if (!LOG.isTraceEnabled())
            return;

        String msg = MessageFormat.format(
                "SessionTrackerImpl --- Touch {0}session: 0x{1} with timeout {2}",
                sessionStatus, Long.toHexString(sessionId), Integer.toString(timeout));

        ZooTrace.logTraceMessage(LOG, ZooTrace.CLIENT_PING_TRACE_MASK, msg);
    }

從上面的代碼可以看出,session都存放在一個sessionById的map里面,其定義為:

protected final ConcurrentHashMap<Long, SessionImpl> sessionsById =
new ConcurrentHashMap<Long, SessionImpl>();

 

2. LeaderSessionTracker

官方說明:

/**
* The leader session tracker tracks local and global sessions on the leader.
*/

LeaderZooKeeperServer、LeaderZooKeeperServer、FollowerZooKeeperServer、ObserverZooKeeperServer均繼承自QuorumZooKeeperServer,

QuorumZooKeeperServer的startSessionTracker方法如下:

    @Override
    protected void startSessionTracker() {
        upgradeableSessionTracker = (UpgradeableSessionTracker) sessionTracker;
        upgradeableSessionTracker.start();
    }
UpgradeableSessionTracker的實現類有兩個:LeaderSessionTracker和LearnerSessionTracker,很顯然,對leaderZookeeper的實現為LeaderSessionTracker,LearnerSessionTracker對應FollowerZooKeeperServer、ObserverZooKeeperServer。
LeaderSessionTracker的構造函數為:
    public LeaderSessionTracker(SessionExpirer expirer,
            ConcurrentMap<Long, Integer> sessionsWithTimeouts,
            int tickTime, long id, boolean localSessionsEnabled,
            ZooKeeperServerListener listener) {

        this.globalSessionTracker = new SessionTrackerImpl(
            expirer, sessionsWithTimeouts, tickTime, id, listener);

        this.localSessionsEnabled = localSessionsEnabled;
        if (this.localSessionsEnabled) {
            createLocalSessionTracker(expirer, tickTime, id, listener);
        }
        serverId = id;
    }

其分為兩個sessionTracker,一個為globalSessionTracker,其實現為SessionTrackerImpl;另一個為localSessionTracker,其實現為:

    public void createLocalSessionTracker(SessionExpirer expirer,
            int tickTime, long id, ZooKeeperServerListener listener) {
        this.localSessionsWithTimeouts =
            new ConcurrentHashMap<Long, Integer>();
        this.localSessionTracker = new LocalSessionTracker(
            expirer, this.localSessionsWithTimeouts, tickTime, id, listener);
    }
LeaderSessionTracker啟動時同時啟動global和local:
    public void start() {
        globalSessionTracker.start();
        if (localSessionTracker != null) {
            localSessionTracker.start();
        }
    }

創建session的過程:

    public long createSession(int sessionTimeout) {
        if (localSessionsEnabled) {
            return localSessionTracker.createSession(sessionTimeout);
        }
        return globalSessionTracker.createSession(sessionTimeout);
    }
globalSessionTracker的創建session上面已經論述,且看localSessionTracker的生成session,進一步代碼發現LocalSessionTracker繼承了SessionTrackerImpl,沒有重寫其創建session方法,即global和local創建session的方法相同。

3. LearnerSessionTracker

官方說明:

/**
* The learner session tracker is used by learners (followers and observers) to
* track zookeeper sessions which may or may not be echoed to the leader. When
* a new session is created it is saved locally in a wrapped
* LocalSessionTracker. It can subsequently be upgraded to a global session
* as required. If an upgrade is requested the session is removed from local
* collections while keeping the same session ID. It is up to the caller to
* queue a session creation request for the leader.
* A secondary function of the learner session tracker is to remember sessions
* which have been touched in this service. This information is passed along
* to the leader with a ping.
*/


其構造方法是:
    public LearnerSessionTracker(SessionExpirer expirer,
            ConcurrentMap<Long, Integer> sessionsWithTimeouts,
            int tickTime, long id, boolean localSessionsEnabled,
            ZooKeeperServerListener listener) {
        this.expirer = expirer;
        this.touchTable.set(new ConcurrentHashMap<Long, Integer>());
        this.globalSessionsWithTimeouts = sessionsWithTimeouts;
        this.serverId = id;
        nextSessionId.set(SessionTrackerImpl.initializeNextSession(serverId));

        this.localSessionsEnabled = localSessionsEnabled;
        if (this.localSessionsEnabled) {
            createLocalSessionTracker(expirer, tickTime, id, listener);
        }
    }

啟動時只啟動了localSessionTracker:

    public void start() {
        if (localSessionTracker != null) {
            localSessionTracker.start();
        }
    }

創建session時也僅僅由localSessionTracker生成:

    public long createSession(int sessionTimeout) {
        if (localSessionsEnabled) {
            return localSessionTracker.createSession(sessionTimeout);
        }
        return nextSessionId.getAndIncrement();
    }

 4 小結

    根據服務器角色不同,ZooKeeperServer,LeaderZooKeeperServer,FollowerZooKeeperServer,ObserverZooKeeperServer分別代表單機服務器,集群leader服務器,集群Follower服務器,集群observer服務器,它們的sessionTracker實現是不同的。ZookeeperServer的對應sessionTracker實現是SessionTrackerImpl;LeaderZooKeeperServer的對應sessionTracker實現是LeaderSessionTracker,FollowerZooKeeperServer,ObserverZooKeeperServer的對應sessionTracker實現是LearnerSessionTracker。

  有可以分為globalSessionTracker和LocalSessionTracker,其中單機只有一個標准的SessionTrackerImpl,集群leader開啟globalSessionTracker和LocalSessionTracker,follower和observer只開啟LocalSessionTracker。globalSessionTracker由SessionTrackerImpl實現,LocalSessionTracker繼承並擴展了SessionTrackerImpl。

 

 

 

 

 



 

 

 






 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM