即時通信系統Openfire分析之五:會話管理


  什么是會話?

  A撥了B的電話
  電話接通
  A問道:Are you OK? 
  B回復:I have a bug!
  A掛了電話

  上面所喻整個過程就是所謂的會話。

 

  會話(Session)是一個客戶與服務器之間的不中斷的請求響應序列。注意其中“不中斷”一詞。

  Openfire的通信,是以服務器為中轉站的消息轉發機制,客戶端與服務器要實現通信,必須保持連接,即持有會話。Session的管理,集中在SessionManager模塊中。

  SessionManager

  SessionManager提供了一系列與Session生命周期相關的管理功能,例如:

// 創建
public LocalClientSession createClientSession(Connection conn, StreamID id, Locale language) ;
// 添加
public void addSession(LocalClientSession session) ;
// 獲取
public ClientSession getSession(JID from) ;
// 移除
public boolean removeSession(LocalClientSession session) ;

......

  Session的整個生命周期,大致的講可以分為:預創建、認證、移除

  •   預創建:在連接打開后,服務端收到客戶端的第一個消息請求(即初始化流)時完成,此時的Session還不能用於通信
  •   認證:在資源綁定時完成,此時的Session被添加到會話管理隊列以及路由表中,象征着已具備通信功能
  •   移除:當連接空閑或者關閉時,Session被移除

  特別注意的一點:

  預創建、認證這兩個過程,是在客戶端登錄的時候完成,從后面分析中看到的回應報文,以及第一章《Openfire與XMPP協議》中提到登錄報文協議,可以清楚的看到這一點。而移除則是在客戶端掉線的時候完成。

 

  下面,就重點來看看,Openfire是具體是如何實現對Session的管理。

  Session 預創建

  回顧一下上一章的內容:ConnectionHandler類作為MINA的處理器,ConnectionHandler中的messageReceived()方法是消息的接收入口,接收到的消息交由StanzaHandler類處理。

  StanzaHandler.process()方法在處理消息時,首先調用本類中createSession()方法,完成了對Session的預創建。

abstract boolean createSession(String namespace, String serverName, XmlPullParser xpp, Connection connection)
throws XmlPullParserException;

  上面的createSession()是一個抽象方法,由其子類完成。本文我們以C2S通信為研究對象,故其實現子類為:ClientStanzaHandler類

  ClientStanzaHandler.createSession()方法代碼如下:

@Override
boolean createSession(String namespace, String serverName, XmlPullParser xpp, Connection connection)
        throws XmlPullParserException {
    if ("jabber:client".equals(namespace)) {
        // The connected client is a regular client so create a ClientSession
        session = LocalClientSession.createSession(serverName, xpp, connection);
        return true;
    }
    return false;
}

這里創建了一個LocalClientSession類型的Session對象。

LocalClientSession.createSession()方法如下,只保留與創建流程相關的代碼:

public static LocalClientSession createSession(String serverName, XmlPullParser xpp, Connection connection)
        throws XmlPullParserException {
  
    ......

    // Create a ClientSession for this user.
    LocalClientSession session = SessionManager.getInstance().createClientSession(connection, language);
    
        // Build the start packet response
    StringBuilder sb = new StringBuilder(200);
    sb.append("<?xml version='1.0' encoding='");
    sb.append(CHARSET);
    sb.append("'?>");
    if (isFlashClient) {
        sb.append("<flash:stream xmlns:flash=\"http://www.jabber.com/streams/flash\" ");
    }
    else {
        sb.append("<stream:stream ");
    }
    sb.append("xmlns:stream=\"http://etherx.jabber.org/streams\" xmlns=\"jabber:client\" from=\"");
    sb.append(serverName);
    sb.append("\" id=\"");
    sb.append(session.getStreamID().toString());
    sb.append("\" xml:lang=\"");
    sb.append(language.toLanguageTag());
    // Don't include version info if the version is 0.0.
    if (majorVersion != 0) {
        sb.append("\" version=\"");
        sb.append(majorVersion).append('.').append(minorVersion);
    }
    sb.append("\">");
    connection.deliverRawText(sb.toString());

    // If this is a "Jabber" connection, the session is now initialized and we can
    // return to allow normal packet parsing.
    if (majorVersion == 0) {
        return session;
    }
    // Otherwise, this is at least XMPP 1.0 so we need to announce stream features.

    sb = new StringBuilder(490);
    sb.append("<stream:features>");
    if (connection.getTlsPolicy() != Connection.TLSPolicy.disabled) {
        sb.append("<starttls xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\">");
        if (connection.getTlsPolicy() == Connection.TLSPolicy.required) {
            sb.append("<required/>");
        }
        sb.append("</starttls>");
    }
    // Include available SASL Mechanisms
    sb.append(SASLAuthentication.getSASLMechanisms(session));
    // Include Stream features
    String specificFeatures = session.getAvailableStreamFeatures();
    if (specificFeatures != null) {
        sb.append(specificFeatures);
    }
    sb.append("</stream:features>");

    connection.deliverRawText(sb.toString());
    
    return session;
}

  創建了一個LocalClientSession對象之后,服務端調用了兩次deliverRawText()給客戶端發送報文。從協議報文的內容來看,其實就是登錄過程中,服務端收到客戶端第一個初始化流之后的兩個應答:第一個是流回復,第二個是通知客戶端進行STL協商。

  而LocalClientSession是由SessionManager生成。

  SessionManager.createClientSession()代碼如下:

 

public LocalClientSession createClientSession(Connection conn, Locale language) {
    return createClientSession(conn, nextStreamID(), language);
}

 

public LocalClientSession createClientSession(Connection conn, StreamID id, Locale language) {
    if (serverName == null) {
        throw new IllegalStateException("Server not initialized");
    }
    LocalClientSession session = new LocalClientSession(serverName, conn, id, language);
    conn.init(session);
    // Register to receive close notification on this session so we can
    // remove  and also send an unavailable presence if it wasn't
    // sent before
    conn.registerCloseListener(clientSessionListener, session);

    // Add to pre-authenticated sessions.
    localSessionManager.getPreAuthenticatedSessions().put(session.getAddress().getResource(), session);
    // Increment the counter of user sessions
    connectionsCounter.incrementAndGet();
    return session;
} 

  由上面兩個方法總結起來,Session的預創建流程為:

  (1)生成一個新streamID,並創建一個LocalClientSession對象的session

  (2)調用conn.registerCloseListener(),注冊了Session的關閉監聽。作用是當Connection關掉時,Session也相應清除掉

  (3)將生成的session添加到preAuthenticatedSessions隊列中,表示預創建完成。但此時的Session並沒有加入到路由表,還不能用來通信

  Session 認證

  在第一章,《Openfire與XMPP協議》一文中已經介紹,資源綁定其實是用戶登錄過程的其中一步。亦即,在這里完成了Session的認證。

  資源綁定是一個IQ消息,結合上一章《消息路由》中的分析,對於IQ消息,PacketRouterImpl模塊使用IQRouter來完成路由。

  IQRouter.route()方法如下,其中只保留資源綁定部分代碼:

public void route(IQ packet) {
    ......
    try {
        ......
        else if (session == null || session.getStatus() == Session.STATUS_AUTHENTICATED || (
                childElement != null && isLocalServer(to) && (
                    "jabber:iq:auth".equals(childElement.getNamespaceURI()) ||
                    "jabber:iq:register".equals(childElement.getNamespaceURI()) ||
                    "urn:ietf:params:xml:ns:xmpp-bind".equals(childElement.getNamespaceURI())))) {
            handle(packet);
        } 
        ......
    }
    catch (PacketRejectedException e) {
        ......
    }
}

  其中,handle()方法創建了處理該IQ的IQHandler,並調用IQandler中的process()進行包處理。

  IQRouter.handle():

private void handle(IQ packet) {
    JID recipientJID = packet.getTo();
    ......
    if (isLocalServer(recipientJID)) {
        if (namespace == null) {
            ......
        }
        else {
            IQHandler handler = getHandler(namespace);
            if (handler == null) {
               ......
            }
            else {
                handler.process(packet);
            }
        }
    }
    ......
}

  傳入的參數"namespace",是IQ的唯一標識碼,將決定了這個IQ由誰來處理。

  資源綁定的namespace為:urn:ietf:params:xml:ns:xmpp-bind,也就是說,這個IQ,最終將交給IQBindHandler來處理。

  可以看到,IQBindHandler的構造方法:

public IQBindHandler() {
    super("Resource Binding handler");
    info = new IQHandlerInfo("bind", "urn:ietf:params:xml:ns:xmpp-bind");
}

  包處理方法IQHandler.process():

@Override
public void process(Packet packet) throws PacketException {
    IQ iq = (IQ) packet;
    try {
        IQ reply = handleIQ(iq);
        if (reply != null) {
            deliverer.deliver(reply);
        }
    }
   ......
}

  IQBindHandler.handleIQ()中,setAuthToken()方法實現對Session認證。

@Override
public IQ handleIQ(IQ packet) throws UnauthorizedException {
    LocalClientSession session = (LocalClientSession) sessionManager.getSession(packet.getFrom());
    IQ reply = IQ.createResultIQ(packet);
    Element child = reply.setChildElement("bind", "urn:ietf:params:xml:ns:xmpp-bind");
    ......
    if (authToken.isAnonymous()) {
        // User used ANONYMOUS SASL so initialize the session as an anonymous login
        session.setAnonymousAuth();
    }
    else {
        ......
        session.setAuthToken(authToken, resource);
    }
    child.addElement("jid").setText(session.getAddress().toString());
    // Send the response directly since a route does not exist at this point.
    session.process(reply);
    // After the client has been informed, inform all listeners as well.
    SessionEventDispatcher.dispatchEvent(session, SessionEventDispatcher.EventType.resource_bound);
    return null;
}

  Session的認證后,其實就是將Session加入SessionManager中,如下:

LocalClientSession.setAuthToken():

public void setAuthToken(AuthToken auth, String resource) {
    ......
    sessionManager.addSession(this);
}

  在第四章分析消息路由時,發送消息前,首先用ToJID從路由表中獲取Session,接着再進行消息路由。也就是說,一條消息能否被接收到,取決於接收者的Session是否存在於路由表中。
  而SessionManager.addSession()剛好就是將Session加入路由表,如下:

  SessionManager.addSession():

public void addSession(LocalClientSession session) {

    routingTable.addClientRoute(session.getAddress(), session);
    ....
}

  此時,就代表了這個Session擁有了全部的功能,可以用來進行通信了。

  Session 移除

  移除工作就相對簡單一些了,當監聽到Connection關閉時,應清除掉相應的Session。

  在SessionManager的私有類ClientSessionListener實現了ConnectionCloseListener,能及時地監聽到Connection關閉並進行Session的清除工作。監聽是在Session預創建時注冊,上文已經介紹。

  Session的關閉監聽,ClientSessionListener類如下:

private class ClientSessionListener implements ConnectionCloseListener {
    /**
     * Handle a session that just closed.
     *
     * @param handback The session that just closed
     */
    @Override
    public void onConnectionClose(Object handback) {
        try {
            LocalClientSession session = (LocalClientSession) handback;
            try {
                if ((session.getPresence().isAvailable() || !session.wasAvailable()) &&
                        routingTable.hasClientRoute(session.getAddress())) {
                    // Send an unavailable presence to the user's subscribers
                    // Note: This gives us a chance to send an unavailable presence to the
                    // entities that the user sent directed presences
                    Presence presence = new Presence();
                    presence.setType(Presence.Type.unavailable);
                    presence.setFrom(session.getAddress());
                    router.route(presence);
                }

                session.getStreamManager().onClose(router, serverAddress);
            }
            finally {
                // Remove the session
                removeSession(session);
            }
        }
        catch (Exception e) {
            // Can't do anything about this problem...
            Log.error(LocaleUtils.getLocalizedString("admin.error.close"), e);
        }
    }
}

  先關閉Sessoin,然后移除出隊列,清除完畢!


Over!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM