前言
在第六章《路由表》中,客戶端進行會話時,首先要獲取對方的Session實例。獲取Session實例的方法,是先查找本地路由表,若找不到,則通過路由表中的緩存數據,由集群定位器獲取。
路由表中定義的緩存,如下:
public RoutingTableImpl() { super("Routing table"); serversCache = CacheFactory.createCache(S2S_CACHE_NAME); componentsCache = CacheFactory.createCache(COMPONENT_CACHE_NAME); usersCache = CacheFactory.createCache(C2S_CACHE_NAME); anonymousUsersCache = CacheFactory.createCache(ANONYMOUS_C2S_CACHE_NAME); usersSessions = CacheFactory.createCache(C2S_SESSION_NAME); localRoutingTable = new LocalRoutingTable(); }
這些緩存中,存儲了整個集群內的所有Session信息,用於做集群同步,Openfire實現了對集群的支持接口,可以通過插件的形式構建集群。
集群的維護、數據在設備間的復制,由集群插件來實現,並為每一個Openfire實例開放數據訪問接口。而Openfire只要處理如何把數據遞交給集群插件即可。
具體如何實現,下面來分析。本文使用的集群插件為Hazelcast。
集群的關鍵類與接口
1、接口:
RemoteSessionLocator ----> Session遠程定位器接口,由具體的集群插件實現,用於從集群中獲取Session
ClusterEventListener ----> 集群加入、離開監聽接口
CacheFactoryStrategy ----> 緩存策略接口
2、類:
ClusterManager ----> 集群管理類,管理自身而非集群。集群內的Master節點的選取、緩存同步等由插件處理
CacheFactory ----> 緩存工廠類
DefaultLocalCacheStrategy ----> 本地緩存策略,實現CacheFactoryStrategy接口
ClusteredCacheFactory ----> 集群緩存策略,實現CacheFactoryStrategy接口
集群插件:HazelcastPlugin
集群插件在啟動時,由一個線程,調用集群管理的方法,啟動集群功能
public void initializePlugin(PluginManager manager, File pluginDirectory) { // start cluster using a separate thread after a short delay // this will allow other plugins to initialize during startup TaskEngine.getInstance().schedule(this, CLUSTER_STARTUP_DELAY_TIME*1000); } @Override public void run() { System.out.println("Starting Hazelcast Clustering Plugin"); // Check if another cluster is installed and stop loading this plugin if found File pluginDir = new File(JiveGlobals.getHomeDirectory(), "plugins"); File[] jars = pluginDir.listFiles(new FileFilter() { public boolean accept(File pathname) { String fileName = pathname.getName().toLowerCase(); return (fileName.equalsIgnoreCase("enterprise.jar") || fileName.equalsIgnoreCase("coherence.jar")); } }); if (jars.length > 0) { // Do not load this plugin if a conflicting implementation exists logger.warn("Conflicting clustering plugins found; remove Coherence and/or Enterprise jar files"); throw new IllegalStateException("Clustering plugin configuration conflict (Coherence)"); } ClusterManager.startup(); }
當系統關閉時,銷毀插件的同時,關閉集群
public void destroyPlugin() { // Shutdown is initiated by XMPPServer before unloading plugins if (!XMPPServer.getInstance().isShuttingDown()) { ClusterManager.shutdown(); } }
集群管理:ClusterManager
集群事件隊列
集群的管理主要圍繞如下兩個隊列進行,集群中發生的每個事件,都會載入隊列中,這是多個Openfire實例能夠協同響應的基礎:
private static Queue<ClusterEventListener> listeners = new ConcurrentLinkedQueue<>(); private static BlockingQueue<Event> events = new LinkedBlockingQueue<>(10000);
listeners:用於通知所有注冊了ClusterEventListener事件的組件
events:用於存儲集群中所有設備進、出集群的事件
CluterManager相應的提供了如下幾個方法,用於操作這兩個隊列的增、刪操作:
public static void fireJoinedCluster(byte[] nodeID, boolean asynchronous) { try { Event event = new Event(EventType.joined_cluster, nodeID); events.put(event); if (!asynchronous) { while (!event.isProcessed()) { Thread.sleep(50); } } } catch (InterruptedException e) { // Should never happen Log.error(e.getMessage(), e); } } public static void fireLeftCluster(byte[] nodeID) { try { Event event = new Event(EventType.left_cluster, nodeID); events.put(event); } catch (InterruptedException e) { // Should never happen Log.error(e.getMessage(), e); } }
public static void addListener(ClusterEventListener listener) { if (listener == null) { throw new NullPointerException(); } listeners.add(listener); } public static void removeListener(ClusterEventListener listener) { listeners.remove(listener); }
集群的啟動
public static synchronized void startup() { if (isClusteringEnabled() && !isClusteringStarted()) { initEventDispatcher(); CacheFactory.startClustering(); } }
上面代碼中, initEventDispatcher()方法,啟動一個線程,根據events事件隊列,完成事件調度:當有設備加入、離開集群中時,調用CacheFactory.joinedCluster()、CacheFactory.leftCluster()處理緩存數據的同步,並啟用監聽器通知所有注冊了集群事件監聽的組件。
private static void initEventDispatcher() { if (dispatcher == null || !dispatcher.isAlive()) { dispatcher = new Thread("ClusterManager events dispatcher") { @Override public void run() { // exit thread if/when clustering is disabled while (ClusterManager.isClusteringEnabled()) { try { Event event = events.take(); EventType eventType = event.getType(); // Make sure that CacheFactory is getting this events first (to update cache structure) if (event.getNodeID() == null) { // Replace standalone caches with clustered caches and migrate data if (eventType == EventType.joined_cluster) { CacheFactory.joinedCluster(); } else if (eventType == EventType.left_cluster) { CacheFactory.leftCluster(); } } // Now notify rest of the listeners for (ClusterEventListener listener : listeners) { try { switch (eventType) { case joined_cluster: { if (event.getNodeID() == null) { listener.joinedCluster(); } else { listener.joinedCluster(event.getNodeID()); } break; } case left_cluster: { if (event.getNodeID() == null) { listener.leftCluster(); } else { listener.leftCluster(event.getNodeID()); } break; } case marked_senior_cluster_member: { listener.markedAsSeniorClusterMember(); break; } default: break; } } catch (Exception e) { Log.error(e.getMessage(), e); } } // Mark event as processed event.setProcessed(true); } catch (Exception e) { Log.warn(e.getMessage(), e); } } } }; dispatcher.setDaemon(true); dispatcher.start(); } }
集群的關閉
public static synchronized void shutdown() { if (isClusteringStarted()) { Log.debug("ClusterManager: Shutting down clustered cache service."); CacheFactory.stopClustering(); } }
由上過程可以看出,集群功能的具體實現,是通過CacheFactory類實現。
緩存工廠CacheFactory類,集群功能的上層實現
緩存隊列
集群功能,除了請求的均衡之外,最主要的是數據的同步。CacheFactory中為數據同步提供了一個緩存隊列,用於存儲所有通過createCache()方法生成的緩存:
private static Map<String, Cache> caches = new ConcurrentHashMap<>();
通過調用指定的緩存策略構造緩存,並存入隊列中:
@SuppressWarnings("unchecked") public static synchronized <T extends Cache> T createCache(String name) { T cache = (T) caches.get(name); if (cache != null) { return cache; } cache = (T) cacheFactoryStrategy.createCache(name); log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for " + name); return wrapCache(cache, name); }
緩存策略切換
Openfire定義的緩存策略有兩種,本地緩存、集群緩存。這兩種緩存策略對應的類名由Openfire預先定好。本地緩存由Openfire自身實現,集群緩存由集群插件按定好的類名規范實現。
兩種緩存機制的類名如下:
static { localCacheFactoryClass = JiveGlobals.getProperty(LOCAL_CACHE_PROPERTY_NAME, "org.jivesoftware.util.cache.DefaultLocalCacheStrategy"); clusteredCacheFactoryClass = JiveGlobals.getProperty(CLUSTERED_CACHE_PROPERTY_NAME, "org.jivesoftware.openfire.plugin.util.cache.ClusteredCacheFactory"); }
無集群的情況,使用本地緩存:
public static synchronized void initialize() throws InitializationException { try { localCacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(localCacheFactoryClass).newInstance(); cacheFactoryStrategy = localCacheFactoryStrategy; } catch (Exception e) { log.error("Failed to instantiate local cache factory strategy: " + localCacheFactoryClass, e); throw new InitializationException(e); } }
當加入集群時,切換為集群緩存:
@SuppressWarnings("unchecked") public static synchronized void joinedCluster() { cacheFactoryStrategy = clusteredCacheFactoryStrategy; // Loop through local caches and switch them to clustered cache (copy content) for (Cache cache : getAllCaches()) { // skip local-only caches if (localOnly.contains(cache.getName())) continue; CacheWrapper cacheWrapper = ((CacheWrapper) cache); Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName()); clusteredCache.putAll(cache); cacheWrapper.setWrappedCache(clusteredCache); } clusteringStarting = false; clusteringStarted = true; log.info("Clustering started; cache migration complete"); }
切換的方法是將本地緩存使用集群緩存策略重新生成一次,這時,本地的緩存將會被同步到集群中的各個機器上。
當離開集群時,又會切換為本地緩存:
@SuppressWarnings("unchecked") public static synchronized void leftCluster() { clusteringStarted = false; cacheFactoryStrategy = localCacheFactoryStrategy; // Loop through clustered caches and change them to local caches (copy content) for (Cache cache : getAllCaches()) { // skip local-only caches if (localOnly.contains(cache.getName())) continue; CacheWrapper cacheWrapper = ((CacheWrapper) cache); Cache standaloneCache = cacheFactoryStrategy.createCache(cacheWrapper.getName()); standaloneCache.putAll(cache); cacheWrapper.setWrappedCache(standaloneCache); } log.info("Clustering stopped; cache migration complete"); }
集群緩存策略 ClusteredCacheFactory
集群緩存策略,是Openfire與集群組件的過渡層。由Openfire制定了接口規范CacheFactoryStrategy,且包名必須為org.jivesoftware.openfire.plugin.util.cache.ClusteredCacheFactory,其中的方法,由具體的集群插件來完成。
集群緩存的創建:
public Cache createCache(String name) { // Check if cluster is being started up while (state == State.starting) { // Wait until cluster is fully started (or failed) try { Thread.sleep(250); } catch (InterruptedException e) { // Ignore } } if (state == State.stopped) { throw new IllegalStateException("Cannot create clustered cache when not in a cluster"); } return new ClusteredCache(name, hazelcast.getMap(name)); }
其中,CluteredCache對象的生成,是實現數據同步的關鍵:
return new ClusteredCache(name, hazelcast.getMap(name));
表明該緩存隊列是Hazelcast中定義的,當隊列發生變更時,實際上是更新了Hazelcast中的內容。
啟動集群的方法
public boolean startCluster() { state = State.starting; // Set the serialization strategy to use for transmitting objects between node clusters serializationStrategy = ExternalizableUtil.getInstance().getStrategy(); ExternalizableUtil.getInstance().setStrategy(new ClusterExternalizableUtil()); // Set session locator to use when in a cluster XMPPServer.getInstance().setRemoteSessionLocator(new RemoteSessionLocator()); // Set packet router to use to deliver packets to remote cluster nodes XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(new ClusterPacketRouter()); ClassLoader oldLoader = null; // Store previous class loader (in case we change it) oldLoader = Thread.currentThread().getContextClassLoader(); ClassLoader loader = new ClusterClassLoader(); Thread.currentThread().setContextClassLoader(loader); int retry = 0; do { try { Config config = new ClasspathXmlConfig(HAZELCAST_CONFIG_FILE); config.setInstanceName("openfire"); config.setClassLoader(loader); if (JMXManager.isEnabled() && HAZELCAST_JMX_ENABLED) { config.setProperty("hazelcast.jmx", "true"); config.setProperty("hazelcast.jmx.detailed", "true"); } hazelcast = Hazelcast.newHazelcastInstance(config); cluster = hazelcast.getCluster(); // Update the running state of the cluster state = cluster != null ? State.started : State.stopped; // Set the ID of this cluster node XMPPServer.getInstance().setNodeID(NodeID.getInstance(getClusterMemberID())); // CacheFactory is now using clustered caches. We can add our listeners. clusterListener = new ClusterListener(cluster); lifecycleListener = hazelcast.getLifecycleService().addLifecycleListener(clusterListener); membershipListener = cluster.addMembershipListener(clusterListener); break; } catch (Exception e) { if (retry < CLUSTER_STARTUP_RETRY_COUNT) { logger.warn("Failed to start clustering (" + e.getMessage() + "); " + "will retry in " + CLUSTER_STARTUP_RETRY_TIME + " seconds"); try { Thread.sleep(CLUSTER_STARTUP_RETRY_TIME*1000); } catch (InterruptedException ie) { /* ignore */ } } else { logger.error("Unable to start clustering - continuing in local mode", e); state = State.stopped; } } } while (retry++ < CLUSTER_STARTUP_RETRY_COUNT); if (oldLoader != null) { // Restore previous class loader Thread.currentThread().setContextClassLoader(oldLoader); } return cluster != null; }
停止集群的方法
public void stopCluster() { // Stop the cache services. cacheStats = null; // Update the running state of the cluster state = State.stopped; // Stop the cluster Hazelcast.shutdownAll(); cluster = null; if (clusterListener != null) { // Wait until the server has updated its internal state while (!clusterListener.isDone()) { try { Thread.sleep(100); } catch (InterruptedException e) { // Ignore } } hazelcast.getLifecycleService().removeLifecycleListener(lifecycleListener); cluster.removeMembershipListener(membershipListener); lifecycleListener = null; membershipListener = null; clusterListener = null; } // Reset the node ID XMPPServer.getInstance().setNodeID(null); // Reset packet router to use to deliver packets to remote cluster nodes XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(null); // Reset the session locator to use XMPPServer.getInstance().setRemoteSessionLocator(null); // Set the old serialization strategy was using before clustering was loaded ExternalizableUtil.getInstance().setStrategy(serializationStrategy); }
集群的啟動、停止兩個方法,下面做一個綜合分析,主要執行了如下操作:
(1)設置緩存序列化策略,序列化是為了使數據能夠在集群之間復制。
設置之前,先對原有的序列化策略做備份
serializationStrategy = ExternalizableUtil.getInstance().getStrategy(); ExternalizableUtil.getInstance().setStrategy(new ClusterExternalizableUtil());
在集群停止的時候,重置為原來的策略
ExternalizableUtil.getInstance().setStrategy(serializationStrategy);
(2)設置遠程Session定位器。集群中的每台機器,都只保存了連接到本機的Session實例。當連接到不同機器的兩個客戶端發生通信時,就需要用定位器從集群中找到對方。
XMPPServer.getInstance().setRemoteSessionLocator(new RemoteSessionLocator());
在集群停止的時候,置空即可
XMPPServer.getInstance().setRemoteSessionLocator(null);
(3)添加遠程包路由器到路由表中,主要是用於數據同步。
XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(new ClusterPacketRouter());
離開集群時,置空
XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(null);
(4)根據配置文件,加載Hazelcast的實例
Config config = new ClasspathXmlConfig(HAZELCAST_CONFIG_FILE); config.setInstanceName("openfire"); config.setClassLoader(loader); if (JMXManager.isEnabled() && HAZELCAST_JMX_ENABLED) { config.setProperty("hazelcast.jmx", "true"); config.setProperty("hazelcast.jmx.detailed", "true"); } hazelcast = Hazelcast.newHazelcastInstance(config); cluster = hazelcast.getCluster();
(5)設置節點ID號
XMPPServer.getInstance().setNodeID(NodeID.getInstance(getClusterMemberID()));
(6)設置監聽,當集群中狀態變化、成員變化時,實現回調
clusterListener = new ClusterListener(cluster); lifecycleListener = hazelcast.getLifecycleService().addLifecycleListener(clusterListener); membershipListener = cluster.addMembershipListener(clusterListener);
ClusterListener中實現了MembershipListener,LifecycleListener接口,當收到回調時,會觸發集群管理CluterManager更新事件隊列events,並進行事件調度、建立集群緩存等工作,以此實現了集群的響應與管理。
對集群響應的流程總體做一個描述
1、初始狀態,Openfire系統啟動,並加載了集群插件,第一台完成啟動的機器,會被Hazelcast標記為master節點,此時的集群環境,與單機沒什么差別
2、當Openfire系統陸續完成啟動,新的設備陸續加入、移出集群,Hazelcast本身會完成集群內各種數據同步,然后通過ClusterListener會回調到如下兩個方法:
public void memberAdded(MembershipEvent event) { ....... ClusterManager.fireJoinedCluster(StringUtils.getBytes(event.getMember().getUuid()), true); ...... }
public void memberRemoved(MembershipEvent event) { ...... ClusterManager.fireLeftCluster(nodeID); ...... }
3、CluterManager中的fireJoinedCluster()與fireLeftCluster()方法會觸發事件隊列的events的更新
4、CluterManager事件調度線程dispatcher中,在事件隊列更新時將執行CacheFactory.joinedCluster()或CacheFactory.leftCluster()方法更新緩存數據,並通知其他相關組件更新數據,如SessionManager、RouteTableIpml等
5、當有新的客戶端發出登錄請求,在資源綁定時針將該客戶端的Session信息放入集群緩存隊列中,由Hazelcast完成數據同步。
6、當集群內客戶端發生通信時,使用RemoteSessionLocator獲得對方的session實例,再由路由表完成消息路由。
集群中的消息路由
在第四章《消息路由》中,在路由表中,如果是遠程消息,將調用routeToRemoteDomain()方法實現消息路由。
RouteTableImpl.routeToRemoteDomain()方法:
private boolean routeToRemoteDomain(JID jid, Packet packet, boolean routed) { byte[] nodeID = serversCache.get(jid.getDomain()); if (nodeID != null) { if (server.getNodeID().equals(nodeID)) { // This is a route to a remote server connected from this node try { localRoutingTable.getRoute(jid.getDomain()).process(packet); routed = true; } catch (UnauthorizedException e) { Log.error("Unable to route packet " + packet.toXML(), e); } } else { // This is a route to a remote server connected from other node if (remotePacketRouter != null) { routed = remotePacketRouter.routePacket(nodeID, jid, packet); } } } else { // Return a promise of a remote session. This object will queue packets pending // to be sent to remote servers OutgoingSessionPromise.getInstance().process(packet); routed = true; } return routed; }
在集群啟動中,設置了ClusterPacketRouter作為路由器RemotePacketRouter,ClusterPacketRouter類:
public class ClusterPacketRouter implements RemotePacketRouter { private static Logger logger = LoggerFactory.getLogger(ClusterPacketRouter.class); public boolean routePacket(byte[] nodeID, JID receipient, Packet packet) { // Send the packet to the specified node and let the remote node deliver the packet to the recipient try { CacheFactory.doClusterTask(new RemotePacketExecution(receipient, packet), nodeID); return true; } catch (IllegalStateException e) { logger.warn("Error while routing packet to remote node: " + e); return false; } } public void broadcastPacket(Message packet) { // Execute the broadcast task across the cluster CacheFactory.doClusterTask(new BroadcastMessage(packet)); } }
使用集群中的計算任務,指定一個節點完成消息路由:
CacheFactory.doClusterTask(new RemotePacketExecution(receipient, packet), nodeID);
而RemotePacketExecution實際是一個線程,其run()方法:
public void run() { XMPPServer.getInstance().getRoutingTable().routePacket(recipient, packet, false); }
也就是說,集群中的消息路由,如果通信雙方是分處於兩台機器上,那么將使用集群將消息指定由對應的主機執行消息路由。
Over!