什么是會話?
A撥了B的電話
電話接通
A問道:Are you OK?
B回復:I have a bug!
A掛了電話
上面所喻整個過程就是所謂的會話。
會話(Session)是一個客戶與服務器之間的不中斷的請求響應序列。注意其中“不中斷”一詞。
Openfire的通信,是以服務器為中轉站的消息轉發機制,客戶端與服務器要實現通信,必須保持連接,即持有會話。Session的管理,集中在SessionManager模塊中。
SessionManager
SessionManager提供了一系列與Session生命周期相關的管理功能,例如:
// 創建 public LocalClientSession createClientSession(Connection conn, StreamID id, Locale language) ; // 添加 public void addSession(LocalClientSession session) ; // 獲取 public ClientSession getSession(JID from) ; // 移除 public boolean removeSession(LocalClientSession session) ; ......
Session的整個生命周期,大致的講可以分為:預創建、認證、移除
- 預創建:在連接打開后,服務端收到客戶端的第一個消息請求(即初始化流)時完成,此時的Session還不能用於通信
- 認證:在資源綁定時完成,此時的Session被添加到會話管理隊列以及路由表中,象征着已具備通信功能
- 移除:當連接空閑或者關閉時,Session被移除
特別注意的一點:
預創建、認證這兩個過程,是在客戶端登錄的時候完成,從后面分析中看到的回應報文,以及第一章《Openfire與XMPP協議》中提到登錄報文協議,可以清楚的看到這一點。而移除則是在客戶端掉線的時候完成。
下面,就重點來看看,Openfire是具體是如何實現對Session的管理。
Session 預創建
回顧一下上一章的內容:ConnectionHandler類作為MINA的處理器,ConnectionHandler中的messageReceived()方法是消息的接收入口,接收到的消息交由StanzaHandler類處理。
StanzaHandler.process()方法在處理消息時,首先調用本類中createSession()方法,完成了對Session的預創建。
abstract boolean createSession(String namespace, String serverName, XmlPullParser xpp, Connection connection) throws XmlPullParserException;
上面的createSession()是一個抽象方法,由其子類完成。本文我們以C2S通信為研究對象,故其實現子類為:ClientStanzaHandler類
ClientStanzaHandler.createSession()方法代碼如下:
@Override boolean createSession(String namespace, String serverName, XmlPullParser xpp, Connection connection) throws XmlPullParserException { if ("jabber:client".equals(namespace)) { // The connected client is a regular client so create a ClientSession session = LocalClientSession.createSession(serverName, xpp, connection); return true; } return false; }
這里創建了一個LocalClientSession類型的Session對象。
LocalClientSession.createSession()方法如下,只保留與創建流程相關的代碼:
public static LocalClientSession createSession(String serverName, XmlPullParser xpp, Connection connection) throws XmlPullParserException { ...... // Create a ClientSession for this user. LocalClientSession session = SessionManager.getInstance().createClientSession(connection, language); // Build the start packet response StringBuilder sb = new StringBuilder(200); sb.append("<?xml version='1.0' encoding='"); sb.append(CHARSET); sb.append("'?>"); if (isFlashClient) { sb.append("<flash:stream xmlns:flash=\"http://www.jabber.com/streams/flash\" "); } else { sb.append("<stream:stream "); } sb.append("xmlns:stream=\"http://etherx.jabber.org/streams\" xmlns=\"jabber:client\" from=\""); sb.append(serverName); sb.append("\" id=\""); sb.append(session.getStreamID().toString()); sb.append("\" xml:lang=\""); sb.append(language.toLanguageTag()); // Don't include version info if the version is 0.0. if (majorVersion != 0) { sb.append("\" version=\""); sb.append(majorVersion).append('.').append(minorVersion); } sb.append("\">"); connection.deliverRawText(sb.toString()); // If this is a "Jabber" connection, the session is now initialized and we can // return to allow normal packet parsing. if (majorVersion == 0) { return session; } // Otherwise, this is at least XMPP 1.0 so we need to announce stream features. sb = new StringBuilder(490); sb.append("<stream:features>"); if (connection.getTlsPolicy() != Connection.TLSPolicy.disabled) { sb.append("<starttls xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\">"); if (connection.getTlsPolicy() == Connection.TLSPolicy.required) { sb.append("<required/>"); } sb.append("</starttls>"); } // Include available SASL Mechanisms sb.append(SASLAuthentication.getSASLMechanisms(session)); // Include Stream features String specificFeatures = session.getAvailableStreamFeatures(); if (specificFeatures != null) { sb.append(specificFeatures); } sb.append("</stream:features>"); connection.deliverRawText(sb.toString()); return session; }
創建了一個LocalClientSession對象之后,服務端調用了兩次deliverRawText()給客戶端發送報文。從協議報文的內容來看,其實就是登錄過程中,服務端收到客戶端第一個初始化流之后的兩個應答:第一個是流回復,第二個是通知客戶端進行STL協商。
而LocalClientSession是由SessionManager生成。
SessionManager.createClientSession()代碼如下:
public LocalClientSession createClientSession(Connection conn, Locale language) { return createClientSession(conn, nextStreamID(), language); }
public LocalClientSession createClientSession(Connection conn, StreamID id, Locale language) { if (serverName == null) { throw new IllegalStateException("Server not initialized"); } LocalClientSession session = new LocalClientSession(serverName, conn, id, language); conn.init(session); // Register to receive close notification on this session so we can // remove and also send an unavailable presence if it wasn't // sent before conn.registerCloseListener(clientSessionListener, session); // Add to pre-authenticated sessions. localSessionManager.getPreAuthenticatedSessions().put(session.getAddress().getResource(), session); // Increment the counter of user sessions connectionsCounter.incrementAndGet(); return session; }
由上面兩個方法總結起來,Session的預創建流程為:
(1)生成一個新streamID,並創建一個LocalClientSession對象的session
(2)調用conn.registerCloseListener(),注冊了Session的關閉監聽。作用是當Connection關掉時,Session也相應清除掉
(3)將生成的session添加到preAuthenticatedSessions隊列中,表示預創建完成。但此時的Session並沒有加入到路由表,還不能用來通信
Session 認證
在第一章,《Openfire與XMPP協議》一文中已經介紹,資源綁定其實是用戶登錄過程的其中一步。亦即,在這里完成了Session的認證。
資源綁定是一個IQ消息,結合上一章《消息路由》中的分析,對於IQ消息,PacketRouterImpl模塊使用IQRouter來完成路由。
IQRouter.route()方法如下,其中只保留資源綁定部分代碼:
public void route(IQ packet) { ...... try { ...... else if (session == null || session.getStatus() == Session.STATUS_AUTHENTICATED || ( childElement != null && isLocalServer(to) && ( "jabber:iq:auth".equals(childElement.getNamespaceURI()) || "jabber:iq:register".equals(childElement.getNamespaceURI()) || "urn:ietf:params:xml:ns:xmpp-bind".equals(childElement.getNamespaceURI())))) { handle(packet); } ...... } catch (PacketRejectedException e) { ...... } }
其中,handle()方法創建了處理該IQ的IQHandler,並調用IQandler中的process()進行包處理。
IQRouter.handle():
private void handle(IQ packet) { JID recipientJID = packet.getTo(); ...... if (isLocalServer(recipientJID)) { if (namespace == null) { ...... } else { IQHandler handler = getHandler(namespace); if (handler == null) { ...... } else { handler.process(packet); } } } ...... }
傳入的參數"namespace",是IQ的唯一標識碼,將決定了這個IQ由誰來處理。
資源綁定的namespace為:urn:ietf:params:xml:ns:xmpp-bind,也就是說,這個IQ,最終將交給IQBindHandler來處理。
可以看到,IQBindHandler的構造方法:
public IQBindHandler() { super("Resource Binding handler"); info = new IQHandlerInfo("bind", "urn:ietf:params:xml:ns:xmpp-bind"); }
包處理方法IQHandler.process():
@Override public void process(Packet packet) throws PacketException { IQ iq = (IQ) packet; try { IQ reply = handleIQ(iq); if (reply != null) { deliverer.deliver(reply); } } ...... }
IQBindHandler.handleIQ()中,setAuthToken()方法實現對Session認證。
@Override public IQ handleIQ(IQ packet) throws UnauthorizedException { LocalClientSession session = (LocalClientSession) sessionManager.getSession(packet.getFrom()); IQ reply = IQ.createResultIQ(packet); Element child = reply.setChildElement("bind", "urn:ietf:params:xml:ns:xmpp-bind"); ...... if (authToken.isAnonymous()) { // User used ANONYMOUS SASL so initialize the session as an anonymous login session.setAnonymousAuth(); } else { ...... session.setAuthToken(authToken, resource); } child.addElement("jid").setText(session.getAddress().toString()); // Send the response directly since a route does not exist at this point. session.process(reply); // After the client has been informed, inform all listeners as well. SessionEventDispatcher.dispatchEvent(session, SessionEventDispatcher.EventType.resource_bound); return null; }
Session的認證后,其實就是將Session加入SessionManager中,如下:
LocalClientSession.setAuthToken(): public void setAuthToken(AuthToken auth, String resource) { ...... sessionManager.addSession(this); }
在第四章分析消息路由時,發送消息前,首先用ToJID從路由表中獲取Session,接着再進行消息路由。也就是說,一條消息能否被接收到,取決於接收者的Session是否存在於路由表中。
而SessionManager.addSession()剛好就是將Session加入路由表,如下:
SessionManager.addSession():
public void addSession(LocalClientSession session) { routingTable.addClientRoute(session.getAddress(), session); .... }
此時,就代表了這個Session擁有了全部的功能,可以用來進行通信了。
Session 移除
移除工作就相對簡單一些了,當監聽到Connection關閉時,應清除掉相應的Session。
在SessionManager的私有類ClientSessionListener實現了ConnectionCloseListener,能及時地監聽到Connection關閉並進行Session的清除工作。監聽是在Session預創建時注冊,上文已經介紹。
Session的關閉監聽,ClientSessionListener類如下:
private class ClientSessionListener implements ConnectionCloseListener { /** * Handle a session that just closed. * * @param handback The session that just closed */ @Override public void onConnectionClose(Object handback) { try { LocalClientSession session = (LocalClientSession) handback; try { if ((session.getPresence().isAvailable() || !session.wasAvailable()) && routingTable.hasClientRoute(session.getAddress())) { // Send an unavailable presence to the user's subscribers // Note: This gives us a chance to send an unavailable presence to the // entities that the user sent directed presences Presence presence = new Presence(); presence.setType(Presence.Type.unavailable); presence.setFrom(session.getAddress()); router.route(presence); } session.getStreamManager().onClose(router, serverAddress); } finally { // Remove the session removeSession(session); } } catch (Exception e) { // Can't do anything about this problem... Log.error(LocaleUtils.getLocalizedString("admin.error.close"), e); } } }
先關閉Sessoin,然后移除出隊列,清除完畢!
Over!