還是從會話管理說起
上一章,Session經過預創建、認證之后,才正常可用。認證時,最重要的操作,就是將Session加入到路由表,使之擁用了通信功能。
添加到至路由表的操作,是在SessionManager中操作的,如下:
SessionManager.addSession(LocalClientSession session):
public void addSession(LocalClientSession session) { // Add session to the routing table (routing table will know session is not available yet) routingTable.addClientRoute(session.getAddress(), session); // Remove the pre-Authenticated session but remember to use the temporary ID as the key localSessionManager.getPreAuthenticatedSessions().remove(session.getStreamID().toString()); SessionEventDispatcher.EventType event = session.getAuthToken().isAnonymous() ? SessionEventDispatcher.EventType.anonymous_session_created : SessionEventDispatcher.EventType.session_created; // Fire session created event. SessionEventDispatcher.dispatchEvent(session, event); if (ClusterManager.isClusteringStarted()) { // Track information about the session and share it with other cluster nodes sessionInfoCache.put(session.getAddress().toString(), new ClientSessionInfo(session)); } }
進入路由表模塊, RoutingTableImpl.addClientRoute(session.getAddress(), session)方法:
public boolean addClientRoute(JID route, LocalClientSession destination) { boolean added; boolean available = destination.getPresence().isAvailable(); localRoutingTable.addRoute(route.toString(), destination); ...... return added; }
從這里可以看出,路由表的底層,是借助LocalRoutingTable類來實現。
路由表的底層數據結構
LocalRoutingTable類的成員構成,非常的簡單:
Map<String, RoutableChannelHandler> routes = new ConcurrentHashMap<>();
也就是說,路由表的實質,就是一個Map的數據結構,其Key為JID地址,Velue為RoutableChannelHandler類型報文處理器。
查看路由表RoutingTableImpl模塊中的路由添加方法,可以看到表中存儲的是以RoutableChannelHandler衍生出來的幾個Session類型,總共提供了三種:
LocalOutgoingServerSession(用於存儲連接本機的遠程服務端)、LocalClientSession(用於存儲連接到本機的客戶端)、RoutableChannelHandler(用於存儲組件),類結構如下:
|-- RoutableChannelHandler |-- Session |-- LocalSession |-- LocalClientSession |-- LocalServerSession |-- LocalOutgoingServerSession
而LocalRoutingTable內的所有方法,就是一系列對這個Map結構的操作函數,核心的如下幾個:
添加路由:
boolean addRoute(String address, RoutableChannelHandler route) { return routes.put(address, route) != route; }
獲取路由:
RoutableChannelHandler getRoute(String address) { return routes.get(address); }
獲取客戶端的Session列表:
Collection<LocalClientSession> getClientRoutes() { List<LocalClientSession> sessions = new ArrayList<>(); for (RoutableChannelHandler route : routes.values()) { if (route instanceof LocalClientSession) { sessions.add((LocalClientSession) route); } } return sessions; }
移除路由
void removeRoute(String address) { routes.remove(address); }
還有一個每3分鍾一次的定時任務,查詢並關閉被閑置了的遠程服務器Session,在路由表中啟動該任務
public void start() { int period = 3 * 60 * 1000; TaskEngine.getInstance().scheduleAtFixedRate(new ServerCleanupTask(), period, period); }
路由表模塊 RoutingTable
路由表是Openfire的核心module之一,RoutingTable接口定義了一系列操作標准,主要圍繞路由表進行,提供添加,刪除,查詢,消息路由等操作,而RoutingTableImpl負責具體實現。
先來看看RoutingTableImpl的成員列表
/** * 緩存外部遠程服務器session * Key: server domain, Value: nodeID */ private Cache<String, byte[]> serversCache; /** * 緩存服務器的組件 * Key: component domain, Value: list of nodeIDs hosting the component */ private Cache<String, Set<NodeID>> componentsCache; /** * 緩存已認證的客戶端session * Key: full JID, Value: {nodeID, available/unavailable} */ private Cache<String, ClientRoute> usersCache; /** * 緩存已認證匿名的客戶端session * Key: full JID, Value: {nodeID, available/unavailable} */ private Cache<String, ClientRoute> anonymousUsersCache; /** * 緩存已認證(包括匿名)的客戶端Resource,一個用戶,在每一端登錄,都會有一個resource * Key: bare JID, Value: list of full JIDs of the user */ private Cache<String, Collection<String>> usersSessions; private String serverName; // 服務器的域名 private XMPPServer server; // XMPP服務 private LocalRoutingTable localRoutingTable; // 路由表底層 private RemotePacketRouter remotePacketRouter; // 遠程包路由器 private IQRouter iqRouter; // IQ包路由器 private MessageRouter messageRouter; // Message包路由器 private PresenceRouter presenceRouter; // Presence包路由器 private PresenceUpdateHandler presenceUpdateHandler; // 在線狀態更新處理器
成員列表中,除了LocalRoutingTable之外,還定義了一堆的緩存。這些緩存干嘛用?
Openfire支持集群機制,即在多台服務器上分別運行一個Openfire實例,並使各個實例的數據同步。算法一致,數據一致,用戶不管連接到任意一台服務器,效果就都一樣。
集群中的數據同步,除了數據庫之外,其他的都是用通過緩存來處理,而上面的這些緩存正是集群同步的一部分,用於同步用戶路由信息,每個服務器都會有緩存的副本。
總的來說,LocalRoutingTable用於存儲本機的路由數據,而Cache中是存儲了整個集群的路由數據。
但是,需要注意的一點,LocalRoutingTable與Cache,這兩者的數據結構並不相同:
(1)LocalRoutingTable中記錄了本機中所有的Session實例,可以用來通信
(2)Cache中只存儲了用戶路由節點信息,需要通過集群管理組件來獲取Session實例
路由表的操作
路由表的操作,實際上就是在會話管理中,對會話實例的操作。為免與上面混淆,這一節的功能說明,以會話代稱。
添加路由(會話)
代碼如下:
@Override public boolean addClientRoute(JID route, LocalClientSession destination) { boolean added; boolean available = destination.getPresence().isAvailable(); // 加入到路由表 localRoutingTable.addRoute(route.toString(), destination); // 若為匿名客戶端,添加到anonymousUsersCache、usersSessions緩存隊列中 if (destination.getAuthToken().isAnonymous()) { Lock lockAn = CacheFactory.getLock(route.toString(), anonymousUsersCache); try { lockAn.lock(); added = anonymousUsersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null; } finally { lockAn.unlock(); } // Add the session to the list of user sessions if (route.getResource() != null && (!available || added)) { Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions); try { lock.lock(); usersSessions.put(route.toBareJID(), Arrays.asList(route.toString())); } finally { lock.unlock(); } } } // 非匿名客戶端,添加到usersCache、usersSessions緩存隊列中 else { Lock lockU = CacheFactory.getLock(route.toString(), usersCache); try { lockU.lock(); added = usersCache.put(route.toString(), new ClientRoute(server.getNodeID(), available)) == null; } finally { lockU.unlock(); } // Add the session to the list of user sessions if (route.getResource() != null && (!available || added)) { Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions); try { lock.lock(); Collection<String> jids = usersSessions.get(route.toBareJID()); if (jids == null) { // Optimization - use different class depending on current setup if (ClusterManager.isClusteringStarted()) { jids = new HashSet<>(); } else { jids = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); } } jids.add(route.toString()); usersSessions.put(route.toBareJID(), jids); } finally { lock.unlock(); } } } return added; }
主要兩步:
(1)添加到路由表
(2)添加到對應的緩存中
移除路由(會話)
代碼如下:
@Override public boolean removeClientRoute(JID route) { boolean anonymous = false; String address = route.toString(); ClientRoute clientRoute = null; // 從緩存中移除客戶端的Session信息 Lock lockU = CacheFactory.getLock(address, usersCache); try { lockU.lock(); clientRoute = usersCache.remove(address); } finally { lockU.unlock(); } if (clientRoute == null) { Lock lockA = CacheFactory.getLock(address, anonymousUsersCache); try { lockA.lock(); clientRoute = anonymousUsersCache.remove(address); anonymous = true; } finally { lockA.unlock(); } } if (clientRoute != null && route.getResource() != null) { Lock lock = CacheFactory.getLock(route.toBareJID(), usersSessions); try { lock.lock(); if (anonymous) { usersSessions.remove(route.toBareJID()); } else { Collection<String> jids = usersSessions.get(route.toBareJID()); if (jids != null) { jids.remove(route.toString()); if (!jids.isEmpty()) { usersSessions.put(route.toBareJID(), jids); } else { usersSessions.remove(route.toBareJID()); } } } } finally { lock.unlock(); } } // 將對應客戶端的Session信息,移出路由表 localRoutingTable.removeRoute(address); return clientRoute != null; }
操作與添加類似:
(1)移除緩存里的路由信息
(2)移除路由表中的信息
獲取路由(會話)
@Override public ClientSession getClientRoute(JID jid) { // Check if this session is hosted by this cluster node ClientSession session = (ClientSession) localRoutingTable.getRoute(jid.toString()); if (session == null) { // The session is not in this JVM so assume remote RemoteSessionLocator locator = server.getRemoteSessionLocator(); if (locator != null) { // Check if the session is hosted by other cluster node ClientRoute route = usersCache.get(jid.toString()); if (route == null) { route = anonymousUsersCache.get(jid.toString()); } if (route != null) { session = locator.getClientSession(route.getNodeID().toByteArray(), jid); } } } return session; }
從上面的方法代碼中可以看到,獲取路由的方法是:先查找本地路由表,若獲取不到對應Session時,則通過集群獲取。RemoteSessionLocator是用於適配不同的集群組件所抽象的接口,為不同集群組件提供了透明處理。
至於如何從集群中獲取Session,主要就在於sersCache和anonymousUsersCache這兩個cache,它們記錄了每個客戶端的路由節點信息,通過它可以取得對應的Session實例。詳見第八章《集群管理》
消息路由
根據發送的形式,分為兩種:一是廣播、二是單點路由
1、以廣播的形式,向所有在線的客戶端發送消息
@Override public void broadcastPacket(Message packet, boolean onlyLocal) { // Send the message to client sessions connected to this JVM for(ClientSession session : localRoutingTable.getClientRoutes()) { session.process(packet); } // Check if we need to broadcast the message to client sessions connected to remote cluter nodes if (!onlyLocal && remotePacketRouter != null) { remotePacketRouter.broadcastPacket(packet); } }
2、單點發送的形式,向某個指定的客戶端發送消息
@Override public void routePacket(JID jid, Packet packet, boolean fromServer) throws PacketException { boolean routed = false; try { if (serverName.equals(jid.getDomain())) { // Packet sent to our domain. routed = routeToLocalDomain(jid, packet, fromServer); Log.info("routeToLocalDomain"); } else if (jid.getDomain().endsWith(serverName) && hasComponentRoute(jid)) { // Packet sent to component hosted in this server routed = routeToComponent(jid, packet, routed); Log.info("routeToComponent"); } else { // Packet sent to remote server routed = routeToRemoteDomain(jid, packet, routed); Log.info("routeToRemoteDomain"); } } catch (Exception ex) { // Catch here to ensure that all packets get handled, despite various processing // exceptions, rather than letting any fall through the cracks. For example, // an IAE could be thrown when running in a cluster if a remote member becomes // unavailable before the routing caches are updated to remove the defunct node. // We have also occasionally seen various flavors of NPE and other oddities, // typically due to unexpected environment or logic breakdowns. Log.error("Primary packet routing failed", ex); } if (!routed) { if (Log.isDebugEnabled()) { Log.debug("Failed to route packet to JID: {} packet: {}", jid, packet.toXML()); } if (packet instanceof IQ) { iqRouter.routingFailed(jid, packet); } else if (packet instanceof Message) { messageRouter.routingFailed(jid, packet); } else if (packet instanceof Presence) { presenceRouter.routingFailed(jid, packet); } } }
路由表中的功能,最后由SessionManager集中處理,詳見上一章的分析,這里不再贅述。
特點提一點,比較有用:路由表做為一個module已經在Openfire主服務啟動時完成實例化,所以,在自定義的插件、或者其他任何需要發送消息的地方,只需選擇調用如下兩個方法中之一,即可完成消息發送:
XMPPServer.getInstance().getRoutingTable().routePacket(jid, packet, fromServer);
XMPPServer.getInstance().getRoutingTable().broadcastPacket(packet, onlyLocal);
而消息發送中,最后消息如何送到網卡實現發送,在第三章《消息路由》中已經詳細分析,同樣不再贅述。
本章就到此結束,OVER!