即時通信系統Openfire分析之八:集群管理


  前言

  在第六章《路由表》中,客戶端進行會話時,首先要獲取對方的Session實例。獲取Session實例的方法,是先查找本地路由表,若找不到,則通過路由表中的緩存數據,由集群定位器獲取。

  路由表中定義的緩存,如下:

public RoutingTableImpl() {
    super("Routing table");
    serversCache = CacheFactory.createCache(S2S_CACHE_NAME);
    componentsCache = CacheFactory.createCache(COMPONENT_CACHE_NAME);
    usersCache = CacheFactory.createCache(C2S_CACHE_NAME);
    anonymousUsersCache = CacheFactory.createCache(ANONYMOUS_C2S_CACHE_NAME);
    usersSessions = CacheFactory.createCache(C2S_SESSION_NAME);
    localRoutingTable = new LocalRoutingTable();
}

  這些緩存中,存儲了整個集群內的所有Session信息,用於做集群同步,Openfire實現了對集群的支持接口,可以通過插件的形式構建集群。

  集群的維護、數據在設備間的復制,由集群插件來實現,並為每一個Openfire實例開放數據訪問接口。而Openfire只要處理如何把數據遞交給集群插件即可。

  具體如何實現,下面來分析。本文使用的集群插件為Hazelcast。

  集群的關鍵類與接口

1、接口:
        RemoteSessionLocator        ----> Session遠程定位器接口,由具體的集群插件實現,用於從集群中獲取Session
        ClusterEventListener        ----> 集群加入、離開監聽接口
        CacheFactoryStrategy        ----> 緩存策略接口
2、類:
        ClusterManager              ----> 集群管理類,管理自身而非集群。集群內的Master節點的選取、緩存同步等由插件處理
        CacheFactory                ----> 緩存工廠類
        DefaultLocalCacheStrategy   ----> 本地緩存策略,實現CacheFactoryStrategy接口
        ClusteredCacheFactory       ----> 集群緩存策略,實現CacheFactoryStrategy接口

  集群插件:HazelcastPlugin

  集群插件在啟動時,由一個線程,調用集群管理的方法,啟動集群功能

public void initializePlugin(PluginManager manager, File pluginDirectory) {
    // start cluster using a separate thread after a short delay
    // this will allow other plugins to initialize during startup
    TaskEngine.getInstance().schedule(this, CLUSTER_STARTUP_DELAY_TIME*1000);
}

@Override
public void run() {
    System.out.println("Starting Hazelcast Clustering Plugin");

    // Check if another cluster is installed and stop loading this plugin if found
    File pluginDir = new File(JiveGlobals.getHomeDirectory(), "plugins");
    File[] jars = pluginDir.listFiles(new FileFilter() {
        public boolean accept(File pathname) {
            String fileName = pathname.getName().toLowerCase();
            return (fileName.equalsIgnoreCase("enterprise.jar") || 
                    fileName.equalsIgnoreCase("coherence.jar"));
        }
    });
    if (jars.length > 0) {
        // Do not load this plugin if a conflicting implementation exists
        logger.warn("Conflicting clustering plugins found; remove Coherence and/or Enterprise jar files");
        throw new IllegalStateException("Clustering plugin configuration conflict (Coherence)");
    }
    ClusterManager.startup();
}

  當系統關閉時,銷毀插件的同時,關閉集群

public void destroyPlugin() {
    // Shutdown is initiated by XMPPServer before unloading plugins
    if (!XMPPServer.getInstance().isShuttingDown()) {
        ClusterManager.shutdown();
    }
}

 集群管理:ClusterManager

  集群事件隊列  

  集群的管理主要圍繞如下兩個隊列進行,集群中發生的每個事件,都會載入隊列中,這是多個Openfire實例能夠協同響應的基礎:

private static Queue<ClusterEventListener> listeners = new ConcurrentLinkedQueue<>();
private static BlockingQueue<Event> events = new LinkedBlockingQueue<>(10000);

  listeners:用於通知所有注冊了ClusterEventListener事件的組件

  events:用於存儲集群中所有設備進、出集群的事件

  CluterManager相應的提供了如下幾個方法,用於操作這兩個隊列的增、刪操作:

public static void fireJoinedCluster(byte[] nodeID, boolean asynchronous) {
    try {
        Event event = new Event(EventType.joined_cluster, nodeID);
        events.put(event);
        if (!asynchronous) {
            while (!event.isProcessed()) {
                Thread.sleep(50);
            }
        }
    } catch (InterruptedException e) {
        // Should never happen
        Log.error(e.getMessage(), e);
    }
}

public static void fireLeftCluster(byte[] nodeID) {
    try {
        Event event = new Event(EventType.left_cluster, nodeID);
        events.put(event);
    } catch (InterruptedException e) {
        // Should never happen
        Log.error(e.getMessage(), e);
    }
}
public static void addListener(ClusterEventListener listener) {
    if (listener == null) {
        throw new NullPointerException();
    }
    listeners.add(listener);
}

public static void removeListener(ClusterEventListener listener) {
    listeners.remove(listener);
}

  集群的啟動

public static synchronized void startup() {
    if (isClusteringEnabled() && !isClusteringStarted()) {
        initEventDispatcher();
        CacheFactory.startClustering();
    }
}

  上面代碼中, initEventDispatcher()方法,啟動一個線程,根據events事件隊列,完成事件調度:當有設備加入、離開集群中時,調用CacheFactory.joinedCluster()、CacheFactory.leftCluster()處理緩存數據的同步,並啟用監聽器通知所有注冊了集群事件監聽的組件。

 

private static void initEventDispatcher() {
    if (dispatcher == null || !dispatcher.isAlive()) {
        dispatcher = new Thread("ClusterManager events dispatcher") {
            @Override
            public void run() {
                // exit thread if/when clustering is disabled
                while (ClusterManager.isClusteringEnabled()) {
                    try {
                        Event event = events.take();
                        EventType eventType = event.getType();
                        // Make sure that CacheFactory is getting this events first (to update cache structure)
                        if (event.getNodeID() == null) {
                            // Replace standalone caches with clustered caches and migrate data
                            if (eventType == EventType.joined_cluster) {
                                CacheFactory.joinedCluster();
                            } else if (eventType == EventType.left_cluster) {
                                CacheFactory.leftCluster();
                            }
                        }
                        // Now notify rest of the listeners
                        for (ClusterEventListener listener : listeners) {
                            try {
                                switch (eventType) {
                                    case joined_cluster: {
                                        if (event.getNodeID() == null) {
                                            listener.joinedCluster();
                                        }
                                        else {
                                            listener.joinedCluster(event.getNodeID());
                                        }
                                        break;
                                    }
                                    case left_cluster: {
                                        if (event.getNodeID() == null) {
                                            listener.leftCluster();
                                        }
                                        else {
                                            listener.leftCluster(event.getNodeID());
                                        }
                                        break;
                                    }
                                    case marked_senior_cluster_member: {
                                        listener.markedAsSeniorClusterMember();
                                        break;
                                    }
                                    default:
                                        break;
                                }
                            }
                            catch (Exception e) {
                                Log.error(e.getMessage(), e);
                            }
                        }
                        // Mark event as processed
                        event.setProcessed(true);
                    } catch (Exception e) {
                        Log.warn(e.getMessage(), e);
                    }
                }
            }
        };
        dispatcher.setDaemon(true);
        dispatcher.start();
    }
}

  集群的關閉

public static synchronized void shutdown() {
    if (isClusteringStarted()) {
        Log.debug("ClusterManager: Shutting down clustered cache service.");
        CacheFactory.stopClustering();
    }
}

  由上過程可以看出,集群功能的具體實現,是通過CacheFactory類實現。

  緩存工廠CacheFactory類,集群功能的上層實現

  緩存隊列

  集群功能,除了請求的均衡之外,最主要的是數據的同步。CacheFactory中為數據同步提供了一個緩存隊列,用於存儲所有通過createCache()方法生成的緩存:

private static Map<String, Cache> caches = new ConcurrentHashMap<>();

  通過調用指定的緩存策略構造緩存,並存入隊列中:

@SuppressWarnings("unchecked")
public static synchronized <T extends Cache> T createCache(String name) {
    T cache = (T) caches.get(name);
    if (cache != null) {
        return cache;
    }
    cache = (T) cacheFactoryStrategy.createCache(name);
    
    log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for " + name);

    return wrapCache(cache, name);
}

  緩存策略切換

  Openfire定義的緩存策略有兩種,本地緩存、集群緩存。這兩種緩存策略對應的類名由Openfire預先定好。本地緩存由Openfire自身實現,集群緩存由集群插件按定好的類名規范實現。

  兩種緩存機制的類名如下:

static {
    localCacheFactoryClass = JiveGlobals.getProperty(LOCAL_CACHE_PROPERTY_NAME,
            "org.jivesoftware.util.cache.DefaultLocalCacheStrategy");
    clusteredCacheFactoryClass = JiveGlobals.getProperty(CLUSTERED_CACHE_PROPERTY_NAME,
            "org.jivesoftware.openfire.plugin.util.cache.ClusteredCacheFactory");
}

  無集群的情況,使用本地緩存:

public static synchronized void initialize() throws InitializationException {
    try {
        localCacheFactoryStrategy = (CacheFactoryStrategy) Class.forName(localCacheFactoryClass).newInstance();
        cacheFactoryStrategy = localCacheFactoryStrategy;
    } catch (Exception e) {
        log.error("Failed to instantiate local cache factory strategy: " + localCacheFactoryClass, e);
         throw new InitializationException(e);
    }
}

  當加入集群時,切換為集群緩存:

@SuppressWarnings("unchecked")
public static synchronized void joinedCluster() {
    cacheFactoryStrategy = clusteredCacheFactoryStrategy;
    // Loop through local caches and switch them to clustered cache (copy content)
    for (Cache cache : getAllCaches()) {
        // skip local-only caches
        if (localOnly.contains(cache.getName())) continue;
        CacheWrapper cacheWrapper = ((CacheWrapper) cache);
        Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
        clusteredCache.putAll(cache);
        cacheWrapper.setWrappedCache(clusteredCache);
    }
    clusteringStarting = false;
    clusteringStarted = true;
    log.info("Clustering started; cache migration complete");
}

  切換的方法是將本地緩存使用集群緩存策略重新生成一次,這時,本地的緩存將會被同步到集群中的各個機器上。

  當離開集群時,又會切換為本地緩存:

@SuppressWarnings("unchecked")
public static synchronized void leftCluster() {
    clusteringStarted = false;
    cacheFactoryStrategy = localCacheFactoryStrategy;

    // Loop through clustered caches and change them to local caches (copy content)
    for (Cache cache : getAllCaches()) {
        // skip local-only caches
        if (localOnly.contains(cache.getName())) continue;
        CacheWrapper cacheWrapper = ((CacheWrapper) cache);
        Cache standaloneCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
        standaloneCache.putAll(cache);
        cacheWrapper.setWrappedCache(standaloneCache);
    }
    log.info("Clustering stopped; cache migration complete");
}

  集群緩存策略 ClusteredCacheFactory

  集群緩存策略,是Openfire與集群組件的過渡層。由Openfire制定了接口規范CacheFactoryStrategy,且包名必須為org.jivesoftware.openfire.plugin.util.cache.ClusteredCacheFactory,其中的方法,由具體的集群插件來完成。

  集群緩存的創建:

public Cache createCache(String name) {
    // Check if cluster is being started up
    while (state == State.starting) {
        // Wait until cluster is fully started (or failed)
        try {
            Thread.sleep(250);
        }
        catch (InterruptedException e) {
            // Ignore
        }
    }
    if (state == State.stopped) {
        throw new IllegalStateException("Cannot create clustered cache when not in a cluster");
    }
    return new ClusteredCache(name, hazelcast.getMap(name));
}

  其中,CluteredCache對象的生成,是實現數據同步的關鍵:

return new ClusteredCache(name, hazelcast.getMap(name));

  表明該緩存隊列是Hazelcast中定義的,當隊列發生變更時,實際上是更新了Hazelcast中的內容。

  啟動集群的方法

public boolean startCluster() {
    state = State.starting;
    
    // Set the serialization strategy to use for transmitting objects between node clusters
    serializationStrategy = ExternalizableUtil.getInstance().getStrategy();
    ExternalizableUtil.getInstance().setStrategy(new ClusterExternalizableUtil());
    // Set session locator to use when in a cluster
    XMPPServer.getInstance().setRemoteSessionLocator(new RemoteSessionLocator());
    // Set packet router to use to deliver packets to remote cluster nodes
    XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(new ClusterPacketRouter());

    ClassLoader oldLoader = null;
    // Store previous class loader (in case we change it)
    oldLoader = Thread.currentThread().getContextClassLoader();
    ClassLoader loader = new ClusterClassLoader();
    Thread.currentThread().setContextClassLoader(loader);
    int retry = 0;
    do {
        try {
            Config config = new ClasspathXmlConfig(HAZELCAST_CONFIG_FILE);
            config.setInstanceName("openfire");
            config.setClassLoader(loader);
            if (JMXManager.isEnabled() && HAZELCAST_JMX_ENABLED) {
                config.setProperty("hazelcast.jmx", "true");
                config.setProperty("hazelcast.jmx.detailed", "true");
            }
            hazelcast = Hazelcast.newHazelcastInstance(config);
            cluster = hazelcast.getCluster();

            // Update the running state of the cluster
            state = cluster != null ? State.started : State.stopped;

            // Set the ID of this cluster node
            XMPPServer.getInstance().setNodeID(NodeID.getInstance(getClusterMemberID()));
            // CacheFactory is now using clustered caches. We can add our listeners.
            clusterListener = new ClusterListener(cluster);
            lifecycleListener = hazelcast.getLifecycleService().addLifecycleListener(clusterListener);
            membershipListener = cluster.addMembershipListener(clusterListener);
            break;
        } catch (Exception e) {
            if (retry < CLUSTER_STARTUP_RETRY_COUNT) {
                logger.warn("Failed to start clustering (" +  e.getMessage() + "); " +
                        "will retry in " + CLUSTER_STARTUP_RETRY_TIME + " seconds");
                try { Thread.sleep(CLUSTER_STARTUP_RETRY_TIME*1000); }
                catch (InterruptedException ie) { /* ignore */ }
            } else {
                logger.error("Unable to start clustering - continuing in local mode", e);
                state = State.stopped;
            }
        }
    } while (retry++ < CLUSTER_STARTUP_RETRY_COUNT);
    
    if (oldLoader != null) {
        // Restore previous class loader
        Thread.currentThread().setContextClassLoader(oldLoader);
    }
    return cluster != null;
}

  停止集群的方法

public void stopCluster() {
    // Stop the cache services.
    cacheStats = null;
    // Update the running state of the cluster
    state = State.stopped;
    // Stop the cluster
    Hazelcast.shutdownAll();
    cluster = null;
    if (clusterListener != null) {
        // Wait until the server has updated its internal state
        while (!clusterListener.isDone()) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // Ignore
            }
        }
        hazelcast.getLifecycleService().removeLifecycleListener(lifecycleListener);
        cluster.removeMembershipListener(membershipListener);
        lifecycleListener = null;
        membershipListener = null;
        clusterListener = null;
    }
    // Reset the node ID
    XMPPServer.getInstance().setNodeID(null);

    // Reset packet router to use to deliver packets to remote cluster nodes
    XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(null);
    // Reset the session locator to use
    XMPPServer.getInstance().setRemoteSessionLocator(null);
    // Set the old serialization strategy was using before clustering was loaded
    ExternalizableUtil.getInstance().setStrategy(serializationStrategy);
}

  集群的啟動、停止兩個方法,下面做一個綜合分析,主要執行了如下操作:

  (1)設置緩存序列化策略,序列化是為了使數據能夠在集群之間復制。

  設置之前,先對原有的序列化策略做備份

serializationStrategy = ExternalizableUtil.getInstance().getStrategy();
ExternalizableUtil.getInstance().setStrategy(new ClusterExternalizableUtil());

  在集群停止的時候,重置為原來的策略

ExternalizableUtil.getInstance().setStrategy(serializationStrategy);

  (2)設置遠程Session定位器。集群中的每台機器,都只保存了連接到本機的Session實例。當連接到不同機器的兩個客戶端發生通信時,就需要用定位器從集群中找到對方。

XMPPServer.getInstance().setRemoteSessionLocator(new RemoteSessionLocator());

  在集群停止的時候,置空即可

XMPPServer.getInstance().setRemoteSessionLocator(null);

  (3)添加遠程包路由器到路由表中,主要是用於數據同步。

XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(new ClusterPacketRouter());

  離開集群時,置空

XMPPServer.getInstance().getRoutingTable().setRemotePacketRouter(null);

  (4)根據配置文件,加載Hazelcast的實例

Config config = new ClasspathXmlConfig(HAZELCAST_CONFIG_FILE);
config.setInstanceName("openfire");
config.setClassLoader(loader);
if (JMXManager.isEnabled() && HAZELCAST_JMX_ENABLED) {
    config.setProperty("hazelcast.jmx", "true");
    config.setProperty("hazelcast.jmx.detailed", "true");
}
hazelcast = Hazelcast.newHazelcastInstance(config);
cluster = hazelcast.getCluster();

  (5)設置節點ID號

XMPPServer.getInstance().setNodeID(NodeID.getInstance(getClusterMemberID()));

  (6)設置監聽,當集群中狀態變化、成員變化時,實現回調

clusterListener = new ClusterListener(cluster);
lifecycleListener = hazelcast.getLifecycleService().addLifecycleListener(clusterListener);
membershipListener = cluster.addMembershipListener(clusterListener);

  ClusterListener中實現了MembershipListener,LifecycleListener接口,當收到回調時,會觸發集群管理CluterManager更新事件隊列events,並進行事件調度、建立集群緩存等工作,以此實現了集群的響應與管理。

  對集群響應的流程總體做一個描述

  1、初始狀態,Openfire系統啟動,並加載了集群插件,第一台完成啟動的機器,會被Hazelcast標記為master節點,此時的集群環境,與單機沒什么差別

  2、當Openfire系統陸續完成啟動,新的設備陸續加入、移出集群,Hazelcast本身會完成集群內各種數據同步,然后通過ClusterListener會回調到如下兩個方法:

public void memberAdded(MembershipEvent event) {
    .......
    ClusterManager.fireJoinedCluster(StringUtils.getBytes(event.getMember().getUuid()), true);
    ......
}
public void memberRemoved(MembershipEvent event) {
    ......
    ClusterManager.fireLeftCluster(nodeID);
    ......
}

  3、CluterManager中的fireJoinedCluster()與fireLeftCluster()方法會觸發事件隊列的events的更新

  4、CluterManager事件調度線程dispatcher中,在事件隊列更新時將執行CacheFactory.joinedCluster()或CacheFactory.leftCluster()方法更新緩存數據,並通知其他相關組件更新數據,如SessionManager、RouteTableIpml等

  5、當有新的客戶端發出登錄請求,在資源綁定時針將該客戶端的Session信息放入集群緩存隊列中,由Hazelcast完成數據同步。

  6、當集群內客戶端發生通信時,使用RemoteSessionLocator獲得對方的session實例,再由路由表完成消息路由。

  集群中的消息路由

  在第四章《消息路由》中,在路由表中,如果是遠程消息,將調用routeToRemoteDomain()方法實現消息路由。

  RouteTableImpl.routeToRemoteDomain()方法:

private boolean routeToRemoteDomain(JID jid, Packet packet,
        boolean routed) {
    byte[] nodeID = serversCache.get(jid.getDomain());
    if (nodeID != null) {
        if (server.getNodeID().equals(nodeID)) {
            // This is a route to a remote server connected from this node
            try {
                localRoutingTable.getRoute(jid.getDomain()).process(packet);
                routed = true;
            } catch (UnauthorizedException e) {
                Log.error("Unable to route packet " + packet.toXML(), e);
            }
        }
        else {
            // This is a route to a remote server connected from other node
            if (remotePacketRouter != null) {
                routed = remotePacketRouter.routePacket(nodeID, jid, packet);
            }
        }
    }
    else {
        // Return a promise of a remote session. This object will queue packets pending
        // to be sent to remote servers
        OutgoingSessionPromise.getInstance().process(packet);
        routed = true;
    }
    return routed;
}

  在集群啟動中,設置了ClusterPacketRouter作為路由器RemotePacketRouter,ClusterPacketRouter類:

public class ClusterPacketRouter implements RemotePacketRouter {

    private static Logger logger = LoggerFactory.getLogger(ClusterPacketRouter.class);

    public boolean routePacket(byte[] nodeID, JID receipient, Packet packet) {
        // Send the packet to the specified node and let the remote node deliver the packet to the recipient
        try {
            CacheFactory.doClusterTask(new RemotePacketExecution(receipient, packet), nodeID);
            return true;
        } catch (IllegalStateException  e) {
            logger.warn("Error while routing packet to remote node: " + e);
            return false;
        }
    }

    public void broadcastPacket(Message packet) {
        // Execute the broadcast task across the cluster
        CacheFactory.doClusterTask(new BroadcastMessage(packet));
    }
}

  使用集群中的計算任務,指定一個節點完成消息路由:

CacheFactory.doClusterTask(new RemotePacketExecution(receipient, packet), nodeID);

  而RemotePacketExecution實際是一個線程,其run()方法:

public void run() {
    XMPPServer.getInstance().getRoutingTable().routePacket(recipient, packet, false);
}

  也就是說,集群中的消息路由,如果通信雙方是分處於兩台機器上,那么將使用集群將消息指定由對應的主機執行消息路由。


 

  Over!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM