兩個人的孤獨
兩個人的孤獨,大抵是,你每發出去一句話,都要經由無數網絡、由幾百個計算機處理后,出在他的面前,而他就在你不遠處。
連接建立之后
Openfire使用MINA網絡框架,並設置ConnectionHandler為MINA的處理器,連接的啟停、消息的收發,都在這個類中做中轉。這是我們上一章《連接管理》中分析的內容。
那么,當客戶端與服務器的建立起連接以后,信息交換中,消息到了ConnectionHandler之后,是如何實現路由的,本文來一探究竟。
ConnectionHandler類,MINA的處理器
ConnectionHandler是個抽象類,openfire中的有四種handle,分別為:ServerConnectionHandler、ClientConnectionHandler、ComponentConnectionHandler、MultiplexerConnectionHandler,代表了S2S、C2S等四種不同的消息類型,這四個handle都繼承自ConnectionHandler。
而ConnectionHandler繼承org.apache.mina.core.service.IoHandlerAdapter,IoHandlerAdapter實現了IoHandler接口。
類關系如下:
|-- IoHandler(接口)
|-- IoHandlerAdapter(默認的空實現,實際的handler繼承它就行)
|-- ConnectionHandler
|-- ServerConnectionHandler
|-- ClientConnectionHandler
|-- ComponentConnectionHandler
|-- MultiplexerConnectionHandler
IoHandler中定義了消息響應中所需要的一系列方法:
public interface IoHandler { //創建session public void sessionCreated(IoSession session) throws Exception //開啟session public void sessionOpened(IoSession iosession) throws Exception; //關閉session public void sessionClosed(IoSession iosession) throws Exception; //session空閑 public void sessionIdle(IoSession iosession, IdleStatus idlestatus) throws Exception; //異常處理 public void exceptionCaught(IoSession iosession, Throwable throwable) throws Exception; //接收消息 public void messageReceived(IoSession iosession, Object obj) throws Exception; //發送消息 public void messageSent(IoSession iosession, Object obj) throws Exception; }
ConnectionHandler中覆寫這些方法,並注入到MINA的適配器NioSocketAcceptor中,當接收到連接與進行交互時,將相應調用ConnectionHandler中覆寫的方法。
消息路由
下面分析ConnectionHandler的消息響應機制,以C2S的message消息為例。
ConnectionHandler除了實現IoHandler內定義的方法外,還定義了如下三個抽象方法:
// 創建NIOConnection abstract NIOConnection createNIOConnection(IoSession session); // 創建StanzaHandler abstract StanzaHandler createStanzaHandler(NIOConnection connection); // 從數據庫中獲取閑置timeout abstract int getMaxIdleTime();
這三個方法,在具體的Handler子類里面實現,在sessionOpened()中調用,根據連接類型創建不同的NIOConnection、StanzaHandler的對象。
ConnectionHandler.sessionOpened()
@Override public void sessionOpened(IoSession session) throws Exception { final XMLLightweightParser parser = new XMLLightweightParser(StandardCharsets.UTF_8); session.setAttribute(XML_PARSER, parser); final NIOConnection connection = createNIOConnection(session); session.setAttribute(CONNECTION, connection); session.setAttribute(HANDLER, createStanzaHandler(connection)); final int idleTime = getMaxIdleTime() / 2; if (idleTime > 0) { session.getConfig().setIdleTime(IdleStatus.READER_IDLE, idleTime); } }
其中,NIOConnection是對IoSession的一次包裝,將MINA框架的IoSession轉化為Openfire的Connection。StanzaHandler負責數據節的處理。
當服務器接收到客戶端發送的消息時,MINA框架調用IoHandler.messageReceived將消息傳遞到指定的處理器ConnectionHandler中的messageReceived()方法。
ConnectionHandler.messageReceived()
@Override public void messageReceived(IoSession session, Object message) throws Exception { StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER); final XMPPPacketReader parser = PARSER_CACHE.get(); updateReadBytesCounter(session); try { handler.process((String) message, parser); } catch (Exception e) { final Connection connection = (Connection) session.getAttribute(CONNECTION); if (connection != null) { connection.close(); } } }
消息由StanzaHandler處理,C2S消息具體的實現類是ClientStanzaHandler。
StanzaHandler.process()方法如下:
public void process(String stanza, XMPPPacketReader reader) throws Exception { boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream"); if (!sessionCreated || initialStream) { if (!initialStream) { ...... } // Found an stream:stream tag... if (!sessionCreated) { sessionCreated = true; MXParser parser = reader.getXPPParser(); parser.setInput(new StringReader(stanza)); createSession(parser); } ..... } ...... process(doc); } }
上面省略掉部分代碼,可以看到執行了如下操作:
(1)若Session未創建,則創建之
(2)調用本類的process(Element doc)
Session的創建中,涉及到Session的管理,路由表的構建等重點內容,在下一章再專門做講解。這里先提兩點:1、此處的Session,只是預創建,還未能用於通信;2、在與客戶端完成資源綁定的時候,該Session才真正可用。
而process(Element doc)如下,只保留了和message相關的代碼:
private void process(Element doc) throws UnauthorizedException { if (doc == null) { return; } // Ensure that connection was secured if TLS was required if (connection.getTlsPolicy() == Connection.TLSPolicy.required && !connection.isSecure()) { closeNeverSecuredConnection(); return; } String tag = doc.getName(); if ("message".equals(tag)) { Message packet; try { packet = new Message(doc, !validateJIDs()); } catch (IllegalArgumentException e) { Log.debug("Rejecting packet. JID malformed", e); // The original packet contains a malformed JID so answer with an error. Message reply = new Message(); reply.setID(doc.attributeValue("id")); reply.setTo(session.getAddress()); reply.getElement().addAttribute("from", doc.attributeValue("to")); reply.setError(PacketError.Condition.jid_malformed); session.process(reply); return; } processMessage(packet); } ...... }
將Element轉化為Message對象,然后在StanzaHandler.processMessage()中,調用包路由PacketRouterImpl模塊發送消息。
protected void processMessage(Message packet) throws UnauthorizedException { router.route(packet); session.incrementClientPacketCount(); }
Openfire有三種數據包:IQ、Message、Presence,對應的路由器也有三種:IQRouter、MessageRouter、PresenceRouter。
PacketRouterImpl是對這三種路由器統一做包裝,對於message消息,調用的是MessageRouter中的route()方法。
PacketRouterImpl.route()如下:
@Override public void route(Message packet) { messageRouter.route(packet); }
MessageRouter.route()中消息的發送,分如下兩步:
(1)調用路由表,將消息發給Message中指定的接收者ToJID。
(2)通過session,將消息原路返回給發送方(當發送方收到推送回來的消息,表示消息已發送成功)
MessageRouter.route()代碼如下:
public void route(Message packet) { if (packet == null) { throw new NullPointerException(); } ClientSession session = sessionManager.getSession(packet.getFrom()); try { // Invoke the interceptors before we process the read packet InterceptorManager.getInstance().invokeInterceptors(packet, session, true, false); if (session == null || session.getStatus() == Session.STATUS_AUTHENTICATED) { JID recipientJID = packet.getTo(); ...... boolean isAcceptable = true; if (session instanceof LocalClientSession) { ..... } if (isAcceptable) { boolean isPrivate = packet.getElement().element(QName.get("private", "urn:xmpp:carbons:2")) != null; try { // Deliver stanza to requested route routingTable.routePacket(recipientJID, packet, false); } catch (Exception e) { log.error("Failed to route packet: " + packet.toXML(), e); routingFailed(recipientJID, packet); } // Sent carbon copies to other resources of the sender: // When a client sends a <message/> of type "chat" if (packet.getType() == Message.Type.chat && !isPrivate && session != null) { List<JID> routes = routingTable.getRoutes(packet.getFrom().asBareJID(), null); for (JID route : routes) { if (!route.equals(session.getAddress())) { ClientSession clientSession = sessionManager.getSession(route); if (clientSession != null && clientSession.isMessageCarbonsEnabled()) { Message message = new Message(); message.setType(packet.getType()); message.setFrom(packet.getFrom().asBareJID()); message.setTo(route); message.addExtension(new Sent(new Forwarded(packet))); clientSession.process(message); } } } } } } ...... }
其中,routingTable.routePacket(recipientJID, packet, false)是發送消息的關鍵代碼。
路由模塊中,對消息的發送做了封裝,在任何需要發送消息的地方,例如自定義插件中,只需要調用下面這個方法,就能完成消息的發送:
XMPPServer.getInstance().getRoutingTable().routePacket(to, message, true);
路由表中保存了該連接的Session對象,Session中攜帶有連接創建時生成的Connection對象,而從上一章我們知道,Connection是對MINA的Iosession的封裝。
換言之,其實路由表的消息發送功能,就是通過Connection調用MINA底層來實現的。答案是否是如此?下面來看看。
路由表中的消息發送
路由表中的其他細節,我們暫時不關注過多,目前主要看它的消息發送流程:
消息發送的方法RoutingTableImpl.routePacket():
@Override public void routePacket(JID jid, Packet packet, boolean fromServer) throws PacketException { boolean routed = false; try { if (serverName.equals(jid.getDomain())) { // Packet sent to our domain. routed = routeToLocalDomain(jid, packet, fromServer); } else if (jid.getDomain().endsWith(serverName) && hasComponentRoute(jid)) { // Packet sent to component hosted in this server routed = routeToComponent(jid, packet, routed); } else { // Packet sent to remote server routed = routeToRemoteDomain(jid, packet, routed); } } catch (Exception ex) { Log.error("Primary packet routing failed", ex); } if (!routed) { if (Log.isDebugEnabled()) { Log.debug("Failed to route packet to JID: {} packet: {}", jid, packet.toXML()); } if (packet instanceof IQ) { iqRouter.routingFailed(jid, packet); } else if (packet instanceof Message) { messageRouter.routingFailed(jid, packet); } else if (packet instanceof Presence) { presenceRouter.routingFailed(jid, packet); } } }
這里有幾個分支:
|-- routeToLocalDomain 路由到本地
|-- routeToComponent 路由到組件
|-- routeToRemoteDomain 路由到遠程
對於單機情況的消息,調用的是routeToLocalDomain()。
RoutingTableImpl.routeToLocalDomain()
private boolean routeToLocalDomain(JID jid, Packet packet, boolean fromServer) { boolean routed = false; Element privateElement = packet.getElement().element(QName.get("private", "urn:xmpp:carbons:2")); boolean isPrivate = privateElement != null; // The receiving server and SHOULD remove the <private/> element before delivering to the recipient. packet.getElement().remove(privateElement); if (jid.getResource() == null) { // Packet sent to a bare JID of a user if (packet instanceof Message) { // Find best route of local user routed = routeToBareJID(jid, (Message) packet, isPrivate); } else { throw new PacketException("Cannot route packet of type IQ or Presence to bare JID: " + packet.toXML()); } } else { // Packet sent to local user (full JID) ClientRoute clientRoute = usersCache.get(jid.toString()); if (clientRoute == null) { clientRoute = anonymousUsersCache.get(jid.toString()); } if (clientRoute != null) { if (!clientRoute.isAvailable() && routeOnlyAvailable(packet, fromServer) && !presenceUpdateHandler.hasDirectPresence(packet.getTo(), packet.getFrom())) { Log.debug("Unable to route packet. Packet should only be sent to available sessions and the route is not available. {} ", packet.toXML()); routed = false; } else { if (localRoutingTable.isLocalRoute(jid)) { if (packet instanceof Message) { Message message = (Message) packet; if (message.getType() == Message.Type.chat && !isPrivate) { List<JID> routes = getRoutes(jid.asBareJID(), null); for (JID route : routes) { if (!route.equals(jid)) { ClientSession clientSession = getClientRoute(route); if (clientSession.isMessageCarbonsEnabled()) { Message carbon = new Message(); // The wrapping message SHOULD maintain the same 'type' attribute value; carbon.setType(message.getType()); // the 'from' attribute MUST be the Carbons-enabled user's bare JID carbon.setFrom(route.asBareJID()); // and the 'to' attribute MUST be the full JID of the resource receiving the copy carbon.setTo(route); carbon.addExtension(new Received(new Forwarded(message))); try { localRoutingTable.getRoute(route.toString()).process(carbon); } catch (UnauthorizedException e) { Log.error("Unable to route packet " + packet.toXML(), e); } } } } } } // This is a route to a local user hosted in this node try { localRoutingTable.getRoute(jid.toString()).process(packet); routed = true; } catch (UnauthorizedException e) { Log.error("Unable to route packet " + packet.toXML(), e); } } else { // This is a route to a local user hosted in other node if (remotePacketRouter != null) { routed = remotePacketRouter .routePacket(clientRoute.getNodeID().toByteArray(), jid, packet); if (!routed) { removeClientRoute(jid); // drop invalid client route } } } } } } return routed; }
上面的關鍵代碼中是這一段:
try { localRoutingTable.getRoute(route.toString()).process(carbon); } catch (UnauthorizedException e) { Log.error("Unable to route packet " + packet.toXML(), e); }
可以看出,RoutingTable的路由功能,是通過localRoutingTable實現的。
LocalRoutingTable中用一個容器保存了所有的路由:
Map<String, RoutableChannelHandler> routes = new ConcurrentHashMap<>();
RoutingTableImpl中通過調用LocalRoutingTable的add、get、remove等方法,實現對路由的管理。
localRoutingTable.getRoute()方法實現從路由表中獲取RoutableChannelHandler對象,那么具體消息是如何通過路由發出去的?
要解釋這個問題,先來看一下與RoutableChannelHandler相關的繼承和派生關系,如下:
|-- ChannelHandler
|-- RoutableChannelHandler
|-- Session
|-- LocalSession
|-- LocalClientSession
也就是說,其實localRoutingTable.getRoute(route.toString()).process(carbon)最終調用的是LacalSession.process()。
LacalSession.process()代碼如下:
@Override public void process(Packet packet) { // Check that the requested packet can be processed if (canProcess(packet)) { // Perform the actual processing of the packet. This usually implies sending // the packet to the entity try { InterceptorManager.getInstance().invokeInterceptors(packet, this, false, false); deliver(packet); InterceptorManager.getInstance().invokeInterceptors(packet, this, false, true); } catch (PacketRejectedException e) { // An interceptor rejected the packet so do nothing } catch (Exception e) { Log.error(LocaleUtils.getLocalizedString("admin.error"), e); } } ...... }
其中的deliver()是LacalSession定義的一個插象方法,由其子類來實現。
有一點值得提一下,在deliver()前后,都做了攔截,方便在發送的前后做一些額外的處理。
繼續講回deliver(),對於C2S連接類型來說,它是在LocalClientSession類中實現。
LocalClientSession.deliver()代碼如下:
@Override public void deliver(Packet packet) throws UnauthorizedException { conn.deliver(packet); streamManager.sentStanza(packet); }
此時的發送方法conn.deliver()中的conn,就是來自最初在sessionOpened()被調用時創建的NIOConnection對象。
NIOConnection.deliver():
@Override public void deliver(Packet packet) throws UnauthorizedException { if (isClosed()) { backupDeliverer.deliver(packet); } else { boolean errorDelivering = false; IoBuffer buffer = IoBuffer.allocate(4096); buffer.setAutoExpand(true); try { buffer.putString(packet.getElement().asXML(), encoder.get()); if (flashClient) { buffer.put((byte) '\0'); } buffer.flip(); ioSessionLock.lock(); try { ioSession.write(buffer); } finally { ioSessionLock.unlock(); } } catch (Exception e) { Log.debug("Error delivering packet:\n" + packet, e); errorDelivering = true; } if (errorDelivering) { close(); // Retry sending the packet again. Most probably if the packet is a // Message it will be stored offline backupDeliverer.deliver(packet); } else { session.incrementServerPacketCount(); } } }
NIOConnection.deliver()中,通過其內部包裝的IoSession對象,調用write()方法將數據流寫入網卡中,完成消息的發送。
ConnectionHandler.messageSent()
消息發送完成,MINA回調:
@Override public void messageSent(IoSession session, Object message) throws Exception { super.messageSent(session, message); // Update counter of written btyes updateWrittenBytesCounter(session); System.out.println("Fordestiny-SEND: "+ioBufferToString(message)); }
至此,系統完成了一條消息的接收、轉發。
其實消息的路由中,除了消息的整個流通路徑之外,怎么保證消息能夠准確的發送到對應的客戶端是至關重要的。這方面Openfire是如何處理的,在下個章節做解析,即Openfire的會話管理和路由表。Over!