zookeeper源碼 — 四、session建立


目錄

  • session建立的主要過程
  • 客戶端發起連接
  • 服務端創建session

session建立的主要過程

用一張圖來說明session建立過程中client和server的交互

主要流程

  • 服務端啟動,客戶端啟動
  • 客戶端發起socket連接
  • 服務端accept socket連接,socket連接建立
  • 客戶端發送ConnectRequest給server
  • server收到后初始化ServerCnxn,代表一個和客戶端的連接,即session,server發送ConnectResponse給client
  • client處理ConnectResponse,session建立完成

客戶發起連接

和server建立socket連接

客戶端要發起連接要先啟動,不論是使用curator client還是zkClient,都是初始化org.apache.zookeeper.ZooKeeper#ZooKeeper

Zookeeper初始化的主要工作是初始化自己的一些關鍵組件

  • Watcher,外部構造好傳入
  • 初始化StaticHostProvider,決定客戶端選擇連接哪一個server
  • ClientCnxn,客戶端網絡通信的組件,主要啟動邏輯就是啟動這個類

ClientCnxn包含兩個線程

  • SendThread,負責client端消息的發送和接收
  • EventThread,負責處理event

ClientCnxn初始化的過程就是初始化啟動這兩個線程,客戶端發起連接的主要邏輯在SendThread線程中

// org.apache.zookeeper.ClientCnxn.SendThread#run
@Override
public void run() {
    clientCnxnSocket.introduce(this,sessionId);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = System.currentTimeMillis();
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    while (state.isAlive()) {
        try {
            // client是否連接到server,如果沒有連接到則連接server
            if (!clientCnxnSocket.isConnected()) {
                if(!isFirstConnect){
                    try {
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected exception", e);
                    }
                }
                // don't re-establish connection if we are closing
                if (closing || !state.isAlive()) {
                    break;
                }
                // 這個里面去連接server
                startConnect();
                clientCnxnSocket.updateLastSendAndHeard();
            }

            // 省略中間代碼...
            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
            // 省略中間代碼...
}

SendThread#run是一個while循環,只要client沒有被關閉會一直循環,每次循環判斷當前client是否連接到server,如果沒有則發起連接,發起連接調用了startConnect

private void startConnect() throws IOException {
    state = States.CONNECTING;

    InetSocketAddress addr;
    if (rwServerAddress != null) {
        addr = rwServerAddress;
        rwServerAddress = null;
    } else {
        // 通過hostProvider來獲取一個server地址
        addr = hostProvider.next(1000);
    }
	// 省略中間代碼...

    // 建立client與server的連接
    clientCnxnSocket.connect(addr);
}

到這里client發起了socket連接,server監聽的端口收到client的連接請求后和client建立連接。

通過一個request來建立session連接

socket連接建立后,client會向server發送一個ConnectRequest來建立session連接。兩種情況會發送ConnectRequest

  • 在上面的connect方法中會判斷是否socket已經建立成功,如果建立成功就會發送ConnectRequest
  • 如果socket沒有立即建立成功(socket連接建立是異步的),則發送這個packet要延后到doTransport中

發送ConnectRequest是在下面的方法中

// org.apache.zookeeper.ClientCnxn.SendThread#primeConnection
void primeConnection() throws IOException {
    LOG.info("Socket connection established to "
             + clientCnxnSocket.getRemoteSocketAddress()
             + ", initiating session");
    isFirstConnect = false;
    long sessId = (seenRwServerBefore) ? sessionId : 0;
    ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                                               sessionTimeout, sessId, sessionPasswd);
    	// 省略中間代碼...
    	// 將conReq封裝為packet放入outgoingQueue等待發送
        outgoingQueue.addFirst(new Packet(null, null, conReq,
                                          null, null, readOnly));
    }
    clientCnxnSocket.enableReadWriteOnly();
    if (LOG.isDebugEnabled()) {
        LOG.debug("Session establishment request sent on "
                  + clientCnxnSocket.getRemoteSocketAddress());
    }
}

請求中帶的參數

  • lastZxid:上一個事務的id
  • sessionTimeout:client端配置的sessionTimeout
  • sessId:sessionId,如果之前建立過連接取的是上一次連接的sessionId
  • sessionPasswd:session的密碼

服務端創建session

和client建立socket連接

在server啟動的過程中除了會啟動用於選舉的網絡組件還會啟動用於處理client請求的網絡組件

org.apache.zookeeper.server.NIOServerCnxnFactory

主要啟動了三個線程:

  • AcceptThread:用於接收client的連接請求,建立連接后交給SelectorThread線程處理
  • SelectorThread:用於處理讀寫請求
  • ConnectionExpirerThread:檢查session連接是否過期

client發起socket連接的時候,server監聽了該端口,接收到client的連接請求,然后把建立練級的SocketChannel放入隊列里面,交給SelectorThread處理

// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#addAcceptedConnection
public boolean addAcceptedConnection(SocketChannel accepted) {
    if (stopped || !acceptedQueue.offer(accepted)) {
        return false;
    }
    wakeupSelector();
    return true;
}

建立session連接

SelectorThread是一個不斷循環的線程,每次循環都會處理剛剛建立的socket連接

// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run
while (!stopped) {
    try {
        select();
        //  處理對立中的socket
        processAcceptedConnections();
        processInterestOpsUpdateRequests();
    } catch (RuntimeException e) {
        LOG.warn("Ignoring unexpected runtime exception", e);
    } catch (Exception e) {
        LOG.warn("Ignoring unexpected exception", e);
    }
}

// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#processAcceptedConnections
private void processAcceptedConnections() {
    SocketChannel accepted;
    while (!stopped && (accepted = acceptedQueue.poll()) != null) {
        SelectionKey key = null;
        try {
            // 向該socket注冊讀事件
            key = accepted.register(selector, SelectionKey.OP_READ);
            // 創建一個NIOServerCnxn維護session
            NIOServerCnxn cnxn = createConnection(accepted, key, this);
            key.attach(cnxn);
            addCnxn(cnxn);
        // 省略中間代碼...
    }
}

說了這么久,我們說的session究竟是什么還沒有解釋,session中文翻譯是會話,在這里就是zk的server和client維護的一個具有一些特別屬性的網絡連接,網絡連接這里就是socket連接,一些特別的屬性包括

  • sessionId:唯一標示一個會話
  • sessionTimeout:這個連接的超時時間,超過這個時間server就會把連接斷開

所以session建立的兩步就是

  • 建立socket連接
  • client發起建立session請求,server建立一個實例來維護這個連接

server收到ConnectRequest之后,按照正常處理io的方式處理這個request,server端的主要操作是

  • 反序列化為ConnectRequest
  • 根據request中的sessionId來判斷是新的session連接還是session重連
    • 如果是新連接
      • 生成sessionId
      • 創建新的SessionImpl並放入org.apache.zookeeper.server.SessionTrackerImpl#sessionExpiryQueue
      • 封裝該請求為新的request在processorChain中傳遞,最后交給FinalRequestProcessor處理
    • 如果是重連
      • 關閉sessionId對應的原來的session
        • 關閉原來的socket連接
        • sessionImp會在sessionExpiryQueue中由於過期被清理
      • 重新打開一個session
        • 將原來的sessionId設置到當前的NIOServerCnxn實例中,作為新的連接的sessionId
        • 校驗密碼是否正確密碼錯誤的時候直接返回給客戶端,不可用的session
        • 密碼正確的話,新建SessionImpl
        • 返回給客戶端sessionId

總體流程是

其中有一個session生成算法我們來看下

public static long initializeNextSession(long id) {
    // sessionId是long類型,共8個字節,64位
    long nextSid;
    // 取時間戳的的低40位作為初始化sessionId的第16-55位,這里使用的是無符號右移,不會出現負數
    nextSid = (Time.currentElapsedTime() << 24) >>> 8;
    // 使用serverId(配置文件中指定的myid)作為高8位
    nextSid =  nextSid | (id <<56);
    // nextSid為long的最小值,這中情況不可能出現,這里只是作為一個case列在這里
    if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
        ++nextSid;  // this is an unlikely edge case, but check it just in case
    }
    return nextSid;
}

初始化sessionId的組成

myid(1字節)+截取的時間戳低40位(5個字節)+2個字節(初始化都是0)

每個server再基於這個id不斷自增,這樣的算法就保證了每個server的sessionId是全局唯一的。

總結

session在zk框架中是一個重要概念,很多功能都依賴於session,比如臨時節點,session關閉后就自動刪除了。本文主要介紹了session的建立過程中client和server各自的處理方式。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM