目錄
- session建立的主要過程
- 客戶端發起連接
- 服務端創建session
session建立的主要過程
用一張圖來說明session建立過程中client和server的交互
主要流程
- 服務端啟動,客戶端啟動
- 客戶端發起socket連接
- 服務端accept socket連接,socket連接建立
- 客戶端發送ConnectRequest給server
- server收到后初始化ServerCnxn,代表一個和客戶端的連接,即session,server發送ConnectResponse給client
- client處理ConnectResponse,session建立完成
客戶發起連接
和server建立socket連接
客戶端要發起連接要先啟動,不論是使用curator client還是zkClient,都是初始化org.apache.zookeeper.ZooKeeper#ZooKeeper
。
Zookeeper初始化的主要工作是初始化自己的一些關鍵組件
- Watcher,外部構造好傳入
- 初始化StaticHostProvider,決定客戶端選擇連接哪一個server
- ClientCnxn,客戶端網絡通信的組件,主要啟動邏輯就是啟動這個類
ClientCnxn包含兩個線程
- SendThread,負責client端消息的發送和接收
- EventThread,負責處理event
ClientCnxn初始化的過程就是初始化啟動這兩個線程,客戶端發起連接的主要邏輯在SendThread線程中
// org.apache.zookeeper.ClientCnxn.SendThread#run
@Override
public void run() {
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = System.currentTimeMillis();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
while (state.isAlive()) {
try {
// client是否連接到server,如果沒有連接到則連接server
if (!clientCnxnSocket.isConnected()) {
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
// 這個里面去連接server
startConnect();
clientCnxnSocket.updateLastSendAndHeard();
}
// 省略中間代碼...
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
// 省略中間代碼...
}
SendThread#run是一個while循環,只要client沒有被關閉會一直循環,每次循環判斷當前client是否連接到server,如果沒有則發起連接,發起連接調用了startConnect
private void startConnect() throws IOException {
state = States.CONNECTING;
InetSocketAddress addr;
if (rwServerAddress != null) {
addr = rwServerAddress;
rwServerAddress = null;
} else {
// 通過hostProvider來獲取一個server地址
addr = hostProvider.next(1000);
}
// 省略中間代碼...
// 建立client與server的連接
clientCnxnSocket.connect(addr);
}
到這里client發起了socket連接,server監聽的端口收到client的連接請求后和client建立連接。
通過一個request來建立session連接
socket連接建立后,client會向server發送一個ConnectRequest來建立session連接。兩種情況會發送ConnectRequest
- 在上面的connect方法中會判斷是否socket已經建立成功,如果建立成功就會發送ConnectRequest
- 如果socket沒有立即建立成功(socket連接建立是異步的),則發送這個packet要延后到doTransport中
發送ConnectRequest是在下面的方法中
// org.apache.zookeeper.ClientCnxn.SendThread#primeConnection
void primeConnection() throws IOException {
LOG.info("Socket connection established to "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", initiating session");
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
// 省略中間代碼...
// 將conReq封裝為packet放入outgoingQueue等待發送
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));
}
clientCnxnSocket.enableReadWriteOnly();
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request sent on "
+ clientCnxnSocket.getRemoteSocketAddress());
}
}
請求中帶的參數
- lastZxid:上一個事務的id
- sessionTimeout:client端配置的sessionTimeout
- sessId:sessionId,如果之前建立過連接取的是上一次連接的sessionId
- sessionPasswd:session的密碼
服務端創建session
和client建立socket連接
在server啟動的過程中除了會啟動用於選舉的網絡組件還會啟動用於處理client請求的網絡組件
org.apache.zookeeper.server.NIOServerCnxnFactory
主要啟動了三個線程:
- AcceptThread:用於接收client的連接請求,建立連接后交給SelectorThread線程處理
- SelectorThread:用於處理讀寫請求
- ConnectionExpirerThread:檢查session連接是否過期
client發起socket連接的時候,server監聽了該端口,接收到client的連接請求,然后把建立練級的SocketChannel放入隊列里面,交給SelectorThread處理
// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#addAcceptedConnection
public boolean addAcceptedConnection(SocketChannel accepted) {
if (stopped || !acceptedQueue.offer(accepted)) {
return false;
}
wakeupSelector();
return true;
}
建立session連接
SelectorThread是一個不斷循環的線程,每次循環都會處理剛剛建立的socket連接
// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run
while (!stopped) {
try {
select();
// 處理對立中的socket
processAcceptedConnections();
processInterestOpsUpdateRequests();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
// org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#processAcceptedConnections
private void processAcceptedConnections() {
SocketChannel accepted;
while (!stopped && (accepted = acceptedQueue.poll()) != null) {
SelectionKey key = null;
try {
// 向該socket注冊讀事件
key = accepted.register(selector, SelectionKey.OP_READ);
// 創建一個NIOServerCnxn維護session
NIOServerCnxn cnxn = createConnection(accepted, key, this);
key.attach(cnxn);
addCnxn(cnxn);
// 省略中間代碼...
}
}
說了這么久,我們說的session究竟是什么還沒有解釋,session中文翻譯是會話,在這里就是zk的server和client維護的一個具有一些特別屬性的網絡連接,網絡連接這里就是socket連接,一些特別的屬性包括
- sessionId:唯一標示一個會話
- sessionTimeout:這個連接的超時時間,超過這個時間server就會把連接斷開
所以session建立的兩步就是
- 建立socket連接
- client發起建立session請求,server建立一個實例來維護這個連接
server收到ConnectRequest之后,按照正常處理io的方式處理這個request,server端的主要操作是
- 反序列化為ConnectRequest
- 根據request中的sessionId來判斷是新的session連接還是session重連
- 如果是新連接
- 生成sessionId
- 創建新的SessionImpl並放入org.apache.zookeeper.server.SessionTrackerImpl#sessionExpiryQueue
- 封裝該請求為新的request在processorChain中傳遞,最后交給FinalRequestProcessor處理
- 如果是重連
- 關閉sessionId對應的原來的session
- 關閉原來的socket連接
- sessionImp會在sessionExpiryQueue中由於過期被清理
- 重新打開一個session
- 將原來的sessionId設置到當前的NIOServerCnxn實例中,作為新的連接的sessionId
- 校驗密碼是否正確密碼錯誤的時候直接返回給客戶端,不可用的session
- 密碼正確的話,新建SessionImpl
- 返回給客戶端sessionId
- 關閉sessionId對應的原來的session
- 如果是新連接
總體流程是
其中有一個session生成算法我們來看下
public static long initializeNextSession(long id) {
// sessionId是long類型,共8個字節,64位
long nextSid;
// 取時間戳的的低40位作為初始化sessionId的第16-55位,這里使用的是無符號右移,不會出現負數
nextSid = (Time.currentElapsedTime() << 24) >>> 8;
// 使用serverId(配置文件中指定的myid)作為高8位
nextSid = nextSid | (id <<56);
// nextSid為long的最小值,這中情況不可能出現,這里只是作為一個case列在這里
if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
++nextSid; // this is an unlikely edge case, but check it just in case
}
return nextSid;
}
初始化sessionId的組成
myid(1字節)+截取的時間戳低40位(5個字節)+2個字節(初始化都是0)
每個server再基於這個id不斷自增,這樣的算法就保證了每個server的sessionId是全局唯一的。
總結
session在zk框架中是一個重要概念,很多功能都依賴於session,比如臨時節點,session關閉后就自動刪除了。本文主要介紹了session的建立過程中client和server各自的處理方式。