即時通信系統Openfire分析之四:消息路由


  兩個人的孤獨

  兩個人的孤獨,大抵是,你每發出去一句話,都要經由無數網絡、由幾百個計算機處理后,出在他的面前,而他就在你不遠處。

  連接建立之后

  Openfire使用MINA網絡框架,並設置ConnectionHandler為MINA的處理器,連接的啟停、消息的收發,都在這個類中做中轉。這是我們上一章《連接管理》中分析的內容。

  那么,當客戶端與服務器的建立起連接以后,信息交換中,消息到了ConnectionHandler之后,是如何實現路由的,本文來一探究竟。

  ConnectionHandler類,MINA的處理器

  ConnectionHandler是個抽象類,openfire中的有四種handle,分別為:ServerConnectionHandler、ClientConnectionHandler、ComponentConnectionHandler、MultiplexerConnectionHandler,代表了S2S、C2S等四種不同的消息類型,這四個handle都繼承自ConnectionHandler。

  而ConnectionHandler繼承org.apache.mina.core.service.IoHandlerAdapter,IoHandlerAdapter實現了IoHandler接口。

  類關系如下:

|-- IoHandler(接口)
    |-- IoHandlerAdapter(默認的空實現,實際的handler繼承它就行)
        |-- ConnectionHandler
            |-- ServerConnectionHandler
            |-- ClientConnectionHandler
            |-- ComponentConnectionHandler
            |-- MultiplexerConnectionHandler

  IoHandler中定義了消息響應中所需要的一系列方法:

public interface IoHandler  
{  
    //創建session
    public void sessionCreated(IoSession session) throws Exception
    //開啟session  
    public void sessionOpened(IoSession iosession)  throws Exception;  
    //關閉session  
    public void sessionClosed(IoSession iosession)  throws Exception;  
    //session空閑  
    public void sessionIdle(IoSession iosession, IdleStatus idlestatus)  throws Exception;  
    //異常處理  
    public void exceptionCaught(IoSession iosession, Throwable throwable)  throws Exception;   
    //接收消息  
    public void messageReceived(IoSession iosession, Object obj)  throws Exception;  
    //發送消息  
    public void messageSent(IoSession iosession, Object obj)  throws Exception;  
}

  ConnectionHandler中覆寫這些方法,並注入到MINA的適配器NioSocketAcceptor中,當接收到連接與進行交互時,將相應調用ConnectionHandler中覆寫的方法。

  消息路由

  下面分析ConnectionHandler的消息響應機制,以C2S的message消息為例。

  ConnectionHandler除了實現IoHandler內定義的方法外,還定義了如下三個抽象方法: 

// 創建NIOConnection
abstract NIOConnection createNIOConnection(IoSession session);
// 創建StanzaHandler
abstract StanzaHandler createStanzaHandler(NIOConnection connection);
// 從數據庫中獲取閑置timeout
abstract int getMaxIdleTime();

  這三個方法,在具體的Handler子類里面實現,在sessionOpened()中調用,根據連接類型創建不同的NIOConnection、StanzaHandler的對象。

   ConnectionHandler.sessionOpened()

@Override
public void sessionOpened(IoSession session) throws Exception {

    final XMLLightweightParser parser = new XMLLightweightParser(StandardCharsets.UTF_8);
    session.setAttribute(XML_PARSER, parser);

    final NIOConnection connection = createNIOConnection(session);
    session.setAttribute(CONNECTION, connection);
    session.setAttribute(HANDLER, createStanzaHandler(connection));

    final int idleTime = getMaxIdleTime() / 2;
    if (idleTime > 0) {
        session.getConfig().setIdleTime(IdleStatus.READER_IDLE, idleTime);
    }
}

  其中,NIOConnection是對IoSession的一次包裝,將MINA框架的IoSession轉化為Openfire的Connection。StanzaHandler負責數據節的處理。 

  當服務器接收到客戶端發送的消息時,MINA框架調用IoHandler.messageReceived將消息傳遞到指定的處理器ConnectionHandler中的messageReceived()方法。

    ConnectionHandler.messageReceived()

@Override
public void messageReceived(IoSession session, Object message) throws Exception {
    StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);
    final XMPPPacketReader parser = PARSER_CACHE.get();
    updateReadBytesCounter(session);
    try {
        handler.process((String) message, parser);
    } catch (Exception e) {
        final Connection connection = (Connection) session.getAttribute(CONNECTION);
        if (connection != null) {
            connection.close();
        }
    }
}

  消息由StanzaHandler處理,C2S消息具體的實現類是ClientStanzaHandler。

  StanzaHandler.process()方法如下:

public void process(String stanza, XMPPPacketReader reader) throws Exception {
    boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream");
    if (!sessionCreated || initialStream) {
        if (!initialStream) {
            ......
        }
        // Found an stream:stream tag...
        if (!sessionCreated) {
            sessionCreated = true;
            MXParser parser = reader.getXPPParser();
            parser.setInput(new StringReader(stanza));
            createSession(parser);
        }
        .....
    }

        ......
        process(doc);
    }
}  

  上面省略掉部分代碼,可以看到執行了如下操作:

  (1)若Session未創建,則創建之

  (2)調用本類的process(Element doc)

  Session的創建中,涉及到Session的管理,路由表的構建等重點內容,在下一章再專門做講解。這里先提兩點:1、此處的Session,只是預創建,還未能用於通信;2、在與客戶端完成資源綁定的時候,該Session才真正可用。

  而process(Element doc)如下,只保留了和message相關的代碼: 

private void process(Element doc) throws UnauthorizedException {
    if (doc == null) {
        return;
    }

    // Ensure that connection was secured if TLS was required
    if (connection.getTlsPolicy() == Connection.TLSPolicy.required &&
            !connection.isSecure()) {
        closeNeverSecuredConnection();
        return;
    }
    
    String tag = doc.getName();
    if ("message".equals(tag)) {
        Message packet;
        try {
            packet = new Message(doc, !validateJIDs());
        }
        catch (IllegalArgumentException e) {
            Log.debug("Rejecting packet. JID malformed", e);
            // The original packet contains a malformed JID so answer with an error.
            Message reply = new Message();
            reply.setID(doc.attributeValue("id"));
            reply.setTo(session.getAddress());
            reply.getElement().addAttribute("from", doc.attributeValue("to"));
            reply.setError(PacketError.Condition.jid_malformed);
            session.process(reply);
            return;
        }
        processMessage(packet);
    }
    ......
}

  將Element轉化為Message對象,然后在StanzaHandler.processMessage()中,調用包路由PacketRouterImpl模塊發送消息。

protected void processMessage(Message packet) throws UnauthorizedException {
    router.route(packet);
    session.incrementClientPacketCount();
}

  Openfire有三種數據包:IQ、Message、Presence,對應的路由器也有三種:IQRouter、MessageRouter、PresenceRouter。

  PacketRouterImpl是對這三種路由器統一做包裝,對於message消息,調用的是MessageRouter中的route()方法。

  PacketRouterImpl.route()如下:

@Override
public void route(Message packet) {
    messageRouter.route(packet);
}

  MessageRouter.route()中消息的發送,分如下兩步:

  (1)調用路由表,將消息發給Message中指定的接收者ToJID。

  (2)通過session,將消息原路返回給發送方(當發送方收到推送回來的消息,表示消息已發送成功)

  MessageRouter.route()代碼如下:

public void route(Message packet) {
    if (packet == null) {
        throw new NullPointerException();
    }
    ClientSession session = sessionManager.getSession(packet.getFrom());

    try {
        // Invoke the interceptors before we process the read packet
        InterceptorManager.getInstance().invokeInterceptors(packet, session, true, false);
        
        if (session == null || session.getStatus() == Session.STATUS_AUTHENTICATED) {
            JID recipientJID = packet.getTo();

            ......

            boolean isAcceptable = true;
            if (session instanceof LocalClientSession) {
                .....
            }
            if (isAcceptable) {
                boolean isPrivate = packet.getElement().element(QName.get("private", "urn:xmpp:carbons:2")) != null;
                try {
                    // Deliver stanza to requested route
                    routingTable.routePacket(recipientJID, packet, false);
                } catch (Exception e) {
                    log.error("Failed to route packet: " + packet.toXML(), e);
                routingFailed(recipientJID, packet);
                }
                // Sent carbon copies to other resources of the sender:
                // When a client sends a <message/> of type "chat"
                if (packet.getType() == Message.Type.chat && !isPrivate && session != null) { 
                    List<JID> routes = routingTable.getRoutes(packet.getFrom().asBareJID(), null);
                    for (JID route : routes) {
                        if (!route.equals(session.getAddress())) {
                            ClientSession clientSession = sessionManager.getSession(route);
                            if (clientSession != null && clientSession.isMessageCarbonsEnabled()) {
                                Message message = new Message();
                                message.setType(packet.getType());
                                message.setFrom(packet.getFrom().asBareJID());
                                message.setTo(route);
                                message.addExtension(new Sent(new Forwarded(packet)));
                                clientSession.process(message);
                            }
                        }
                    }
                }
            }
        }
    ......
}

  其中,routingTable.routePacket(recipientJID, packet, false)是發送消息的關鍵代碼。

  路由模塊中,對消息的發送做了封裝,在任何需要發送消息的地方,例如自定義插件中,只需要調用下面這個方法,就能完成消息的發送:

XMPPServer.getInstance().getRoutingTable().routePacket(to, message, true);

  路由表中保存了該連接的Session對象,Session中攜帶有連接創建時生成的Connection對象,而從上一章我們知道,Connection是對MINA的Iosession的封裝。

  換言之,其實路由表的消息發送功能,就是通過Connection調用MINA底層來實現的。答案是否是如此?下面來看看。

  路由表中的消息發送

  路由表中的其他細節,我們暫時不關注過多,目前主要看它的消息發送流程:

  消息發送的方法RoutingTableImpl.routePacket():

@Override
public void routePacket(JID jid, Packet packet, boolean fromServer) throws PacketException {
    boolean routed = false;
    try {
        if (serverName.equals(jid.getDomain())) {
            // Packet sent to our domain.
            routed = routeToLocalDomain(jid, packet, fromServer);
        }
        else if (jid.getDomain().endsWith(serverName) && hasComponentRoute(jid)) {
            // Packet sent to component hosted in this server
            routed = routeToComponent(jid, packet, routed);
        }
        else {
            // Packet sent to remote server
            routed = routeToRemoteDomain(jid, packet, routed);
        }
    } catch (Exception ex) {

        Log.error("Primary packet routing failed", ex); 
    }

    if (!routed) {
        if (Log.isDebugEnabled()) {
            Log.debug("Failed to route packet to JID: {} packet: {}", jid, packet.toXML());
        }
        if (packet instanceof IQ) {
            iqRouter.routingFailed(jid, packet);
        }
        else if (packet instanceof Message) {
            messageRouter.routingFailed(jid, packet);
        }
        else if (packet instanceof Presence) {
            presenceRouter.routingFailed(jid, packet);
        }
    }
}

  這里有幾個分支:

|-- routeToLocalDomain      路由到本地
|-- routeToComponent        路由到組件
|-- routeToRemoteDomain     路由到遠程

 

  對於單機情況的消息,調用的是routeToLocalDomain()。

  RoutingTableImpl.routeToLocalDomain()

private boolean routeToLocalDomain(JID jid, Packet packet, boolean fromServer) {
    boolean routed = false;
    Element privateElement = packet.getElement().element(QName.get("private", "urn:xmpp:carbons:2"));
    boolean isPrivate = privateElement != null;
    // The receiving server and SHOULD remove the <private/> element before delivering to the recipient.
    packet.getElement().remove(privateElement);

    if (jid.getResource() == null) {
        // Packet sent to a bare JID of a user
        if (packet instanceof Message) {
            // Find best route of local user
            routed = routeToBareJID(jid, (Message) packet, isPrivate);
        }
        else {
            throw new PacketException("Cannot route packet of type IQ or Presence to bare JID: " + packet.toXML());
        }
    }
    else {
        // Packet sent to local user (full JID)
        ClientRoute clientRoute = usersCache.get(jid.toString());
        if (clientRoute == null) {
            clientRoute = anonymousUsersCache.get(jid.toString());
        }
        if (clientRoute != null) {
            if (!clientRoute.isAvailable() && routeOnlyAvailable(packet, fromServer) &&
                    !presenceUpdateHandler.hasDirectPresence(packet.getTo(), packet.getFrom())) {
                Log.debug("Unable to route packet. Packet should only be sent to available sessions and the route is not available. {} ", packet.toXML());
                routed = false;
            } else {
                if (localRoutingTable.isLocalRoute(jid)) {
                    if (packet instanceof Message) {
                        Message message = (Message) packet;
                        if (message.getType() == Message.Type.chat && !isPrivate) {
                            List<JID> routes = getRoutes(jid.asBareJID(), null);
                            for (JID route : routes) {

                                if (!route.equals(jid)) {
                                    ClientSession clientSession = getClientRoute(route);
                                    if (clientSession.isMessageCarbonsEnabled()) {
                                        Message carbon = new Message();
                                        // The wrapping message SHOULD maintain the same 'type' attribute value;
                                        carbon.setType(message.getType());
                                        // the 'from' attribute MUST be the Carbons-enabled user's bare JID
                                        carbon.setFrom(route.asBareJID());
                                        // and the 'to' attribute MUST be the full JID of the resource receiving the copy
                                        carbon.setTo(route);
                                        carbon.addExtension(new Received(new Forwarded(message)));

                                        try {
                                            localRoutingTable.getRoute(route.toString()).process(carbon);
                                        } catch (UnauthorizedException e) {
                                            Log.error("Unable to route packet " + packet.toXML(), e);
                                        }
                                    }
                                }
                            }
                        }
                    }

                    // This is a route to a local user hosted in this node
                    try {
                        localRoutingTable.getRoute(jid.toString()).process(packet);
                        routed = true;
                    } catch (UnauthorizedException e) {
                        Log.error("Unable to route packet " + packet.toXML(), e);
                    }
                }
                else {
                    // This is a route to a local user hosted in other node
                    if (remotePacketRouter != null) {
                        routed = remotePacketRouter
                                .routePacket(clientRoute.getNodeID().toByteArray(), jid, packet);
                        if (!routed) {
                            removeClientRoute(jid); // drop invalid client route
                        }
                    }
                }
            }
        }
    }
    return routed;
}

  上面的關鍵代碼中是這一段:

try {
    localRoutingTable.getRoute(route.toString()).process(carbon);
} catch (UnauthorizedException e) {
    Log.error("Unable to route packet " + packet.toXML(), e);
}

  可以看出,RoutingTable的路由功能,是通過localRoutingTable實現的。

  LocalRoutingTable中用一個容器保存了所有的路由:

Map<String, RoutableChannelHandler> routes = new ConcurrentHashMap<>();

  RoutingTableImpl中通過調用LocalRoutingTable的add、get、remove等方法,實現對路由的管理。

  localRoutingTable.getRoute()方法實現從路由表中獲取RoutableChannelHandler對象,那么具體消息是如何通過路由發出去的?

  要解釋這個問題,先來看一下與RoutableChannelHandler相關的繼承和派生關系,如下:

|-- ChannelHandler
    |-- RoutableChannelHandler
        |-- Session
            |-- LocalSession
                |-- LocalClientSession

  也就是說,其實localRoutingTable.getRoute(route.toString()).process(carbon)最終調用的是LacalSession.process()。

  LacalSession.process()代碼如下:

@Override
public void process(Packet packet) {
    // Check that the requested packet can be processed
    if (canProcess(packet)) {
        // Perform the actual processing of the packet. This usually implies sending
        // the packet to the entity
        try {

            InterceptorManager.getInstance().invokeInterceptors(packet, this, false, false);
            
            deliver(packet);

            InterceptorManager.getInstance().invokeInterceptors(packet, this, false, true);
        }
        catch (PacketRejectedException e) {
            // An interceptor rejected the packet so do nothing
        }
        catch (Exception e) {
            Log.error(LocaleUtils.getLocalizedString("admin.error"), e);
        }
    }
    ......
}

  其中的deliver()是LacalSession定義的一個插象方法,由其子類來實現。

  有一點值得提一下,在deliver()前后,都做了攔截,方便在發送的前后做一些額外的處理。

  繼續講回deliver(),對於C2S連接類型來說,它是在LocalClientSession類中實現。

  LocalClientSession.deliver()代碼如下:

@Override
public void deliver(Packet packet) throws UnauthorizedException {

    conn.deliver(packet);
    streamManager.sentStanza(packet);
}

  此時的發送方法conn.deliver()中的conn,就是來自最初在sessionOpened()被調用時創建的NIOConnection對象。

  NIOConnection.deliver():

@Override
public void deliver(Packet packet) throws UnauthorizedException {
    if (isClosed()) {
        backupDeliverer.deliver(packet);
    }
    else {
        boolean errorDelivering = false;
        IoBuffer buffer = IoBuffer.allocate(4096);
        buffer.setAutoExpand(true);
        try {
            buffer.putString(packet.getElement().asXML(), encoder.get());
            if (flashClient) {
                buffer.put((byte) '\0');
            }
            buffer.flip();
            
            ioSessionLock.lock();
            try {
                ioSession.write(buffer);
            } finally {
                ioSessionLock.unlock();
            }
        }
        catch (Exception e) {
            Log.debug("Error delivering packet:\n" + packet, e);
            errorDelivering = true;
        }
        if (errorDelivering) {
            close();
            // Retry sending the packet again. Most probably if the packet is a
            // Message it will be stored offline
            backupDeliverer.deliver(packet);
        }
        else {
            session.incrementServerPacketCount();
        }
    }
}

  NIOConnection.deliver()中,通過其內部包裝的IoSession對象,調用write()方法將數據流寫入網卡中,完成消息的發送。

  ConnectionHandler.messageSent()

  消息發送完成,MINA回調:

@Override
public void messageSent(IoSession session, Object message) throws Exception {
    super.messageSent(session, message);
    // Update counter of written btyes
    updateWrittenBytesCounter(session);
    
    System.out.println("Fordestiny-SEND: "+ioBufferToString(message));
}

 

  至此,系統完成了一條消息的接收、轉發。


 

  其實消息的路由中,除了消息的整個流通路徑之外,怎么保證消息能夠准確的發送到對應的客戶端是至關重要的。這方面Openfire是如何處理的,在下個章節做解析,即Openfire的會話管理和路由表。Over!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM