Dubbo(七):redis注冊中心的應用


  上篇我們講了Dubbo中有一個非常本質和重要的功能,那就是服務的自動注冊與發現,而這個功能是通過注冊中心來實現的。上篇中使用zookeeper實現了注冊中心的功能,同時了提了dubbo中有其他許多的注冊中心的實現。

  今天我們就來看看另一個注冊中心的實現吧: redis 。

 

1. dubbo在 Redis 中的服務分布

  dubbo在zk中的服務體現是一個個的文件路徑形式,如 /dubbo/xxx.xx.XxxService/providers/xxx 。 而在redis中,則體現是一個個的緩存key-value。具體分布如下:

    /dubbo/xxx.xx.XxxService/providers: 以hash類型存放所有提供者列表, 每個hash的字段為 url -> expireTime
    /dubbo/xxx.xx.XxxService/consumers: 以hash類型存放所有消費者列表, 每個hash的字段為 url -> expireTime
    /dubbo/xxx.xx.XxxService/configurators: 存放配置信息
    /dubbo/xxx.xx.XxxService/routers: 存放路由配置信息

  如上,同樣,redis也是以service為粒度進行存儲划分的。

 

2. Redis 組件的接入

  你可能需要先引入redis注冊依賴包:

        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo-registry-redis</artifactId>
        </dependency>

  在配置dubbo服務時,需要將注冊中心換為 redis, 如下選合適的一個即可:

    <dubbo:registry address="redis://127.0.0.1:6379" cluster="failover" />
    <dubbo:registry address="redis://10.20.153.10:6379?backup=10.20.153.11:6379,10.20.153.12:6379" cluster="failover" />
    <dubbo:registry protocol="redis" address="127.0.0.1:6379" cluster="failover" />
    <dubbo:registry protocol="redis" address="10.20.153.10:6379,10.20.153.11:6379,10.20.153.12:6379" cluster="failover" />

  cluster 設置 redis 集群策略,缺省為 failover:(這個配置不會和集群容錯配置有誤會么,尷尬)

    failover: 失效轉移策略。只寫入和讀取任意一台,失敗時重試另一台,需要服務器端自行配置數據同步;

    replicate: 復制模式策略。在客戶端同時寫入所有服務器,只讀取單台,服務器端不需要同步,注冊中心集群增大,性能壓力也會更大;

  redis作為注冊中心與zk作為注冊的前置操作都是一樣的。都是一是作為服務提供者時會在 ServiceConfig#doExportUrlsFor1Protocol 中,進行遠程服務暴露時會拉起。二是在消費者在進行遠程調用時會 ReferenceConfig#createProxy 時拉取以便獲取提供者列表。

  只是在依賴注入 RegistryFactory 時,根據是 zookeeper/redis, 選擇了不一樣的 RegistryFactory, 所以創建了不同的注冊中心實例。

  redis 中根據SPI的配置創建, RedisRegistryFactory 工廠, 配置文件 META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory 的內容如下:

redis=org.apache.dubbo.registry.redis.RedisRegistryFactory
    /**
     * Get an instance of registry based on the address of invoker
     *
     * @param originInvoker
     * @return
     */
    protected Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = getRegistryUrl(originInvoker);
        // RegistryFactory 又是通過 SPI 機制生成的    
        // 會根據具體的注冊中心的類型創建調用具體實例,如此處為: redis, 所以會調用 RedisRegistryFactory.getRegistry()
        return registryFactory.getRegistry(registryUrl);
    }
    // 所有 RegistryFactory 都會被包裝成 RegistryFactoryWrapper, 以便修飾
    // org.apache.dubbo.registry.RegistryFactoryWrapper#getRegistry
    @Override
    public Registry getRegistry(URL url) {
        // 對於zk, 會調用 RedisRegistryFactory
        return new ListenerRegistryWrapper(registryFactory.getRegistry(url),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class)
                        .getActivateExtension(url, "registry.listeners")));
    }
    // org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry(org.apache.dubbo.common.URL)
    @Override
    public Registry getRegistry(URL url) {
        if (destroyed.get()) {
            LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
                    "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
            return DEFAULT_NOP_REGISTRY;
        }

        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        String key = createRegistryCacheKey(url);
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //create registry by spi/ioc
            // 調用子類方法創建 registry 實例,此處為 RedisRegistryFactory.createRegistry
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
    // org.apache.dubbo.registry.redis.RedisRegistryFactory#createRegistry
    @Override
    protected Registry createRegistry(URL url) {
        // 最終將redis組件接入到應用中了,后續就可以使用redis提供的相應功能了
        return new RedisRegistry(url);
    }

  至此,redis被接入了。我們先來看下 redis 注冊中心構造方法實現:

    // org.apache.dubbo.registry.redis.RedisRegistry#RedisRegistry
    public RedisRegistry(URL url) {
        // RedisRegistry 與zk一樣,同樣繼承了 FailbackRegistry
        // 所以,同樣會創建retryTimer, 同樣會創建緩存文件
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        // 使用redis連接池處理事務
        // 設置各配置項
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
        config.setTestOnReturn(url.getParameter("test.on.return", false));
        config.setTestWhileIdle(url.getParameter("test.while.idle", false));
        if (url.getParameter("max.idle", 0) > 0) {
            config.setMaxIdle(url.getParameter("max.idle", 0));
        }
        if (url.getParameter("min.idle", 0) > 0) {
            config.setMinIdle(url.getParameter("min.idle", 0));
        }
        if (url.getParameter("max.active", 0) > 0) {
            config.setMaxTotal(url.getParameter("max.active", 0));
        }
        if (url.getParameter("max.total", 0) > 0) {
            config.setMaxTotal(url.getParameter("max.total", 0));
        }
        if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0) {
            config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0)));
        }
        if (url.getParameter("num.tests.per.eviction.run", 0) > 0) {
            config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));
        }
        if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) {
            config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
        }
        if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) {
            config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));
        }
        // redis 復用了cluster配置項?
        String cluster = url.getParameter("cluster", "failover");
        if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
            throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
        }
        replicate = "replicate".equals(cluster);

        List<String> addresses = new ArrayList<>();
        addresses.add(url.getAddress());
        String[] backups = url.getParameter(RemotingConstants.BACKUP_KEY, new String[0]);
        if (ArrayUtils.isNotEmpty(backups)) {
            addresses.addAll(Arrays.asList(backups));
        }
        //獲得Redis主節點名稱
        String masterName = url.getParameter(REDIS_MASTER_NAME_KEY);
        if (StringUtils.isEmpty(masterName)) {
            //單機版redis
            for (String address : addresses) {
                int i = address.indexOf(':');
                String host;
                int port;
                if (i > 0) {
                    host = address.substring(0, i);
                    port = Integer.parseInt(address.substring(i + 1));
                } else {
                    host = address;
                    port = DEFAULT_REDIS_PORT;
                }
                this.jedisPools.put(address, new JedisPool(config, host, port,
                        url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(),
                        url.getParameter("db.index", 0)));
            }
        } else {
            //哨兵版redis
            Set<String> sentinelSet = new HashSet<>(addresses);
            int index = url.getParameter("db.index", 0);
            int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            String password = StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword();
            JedisSentinelPool pool = new JedisSentinelPool(masterName, sentinelSet, config, timeout, password, index);
            this.jedisPools.put(masterName, pool);
        }

        this.reconnectPeriod = url.getParameter(REGISTRY_RECONNECT_PERIOD_KEY, DEFAULT_REGISTRY_RECONNECT_PERIOD);
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        if (!group.endsWith(PATH_SEPARATOR)) {
            group = group + PATH_SEPARATOR;
        }
        this.root = group;
        // session=60000, 默認1分鍾過期
        this.expirePeriod = url.getParameter(SESSION_TIMEOUT_KEY, DEFAULT_SESSION_TIMEOUT);
        // 使用定時任務刷新存活狀態,相當於心跳維護線程,定時任務頻率為 session有效其的1/2
        this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
            try {
                deferExpired(); // Extend the expiration time
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
            }
        }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
    }

  RedisRegistry構造方法中,主要完成redis配置信息的轉換接入,創建連接池,默認使用0號數據庫。另外,每個客戶端都是單例的RedisRegistry, 所以也就是說會開啟一個過期掃描定時任務(可以稱之為心跳任務)。

 

3. Redis 服務提供者注冊

  與ZK過程類似,服務注冊主要就分兩步:1. 獲取registry實例(通過SPI機制); 2. 將服務的信息注冊到注冊中心。只是zk是路徑,redis是kv.

    // org.apache.dubbo.registry.redis.RedisRegistry#doRegister
    @Override
    public void doRegister(URL url) {
        // 與zk一致,按服務組裝key前綴
        String key = toCategoryPath(url);
        // 全服務路徑作為value
        String value = url.toFullString();
        String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
        boolean success = false;
        RpcException exception = null;
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    // 使用hash存儲提供者/消費者 標識,帶過期時間(該時間需后續主動判定,redis並不維護該狀態)
                    // 注冊好自向標識后,pub一條消息,以便其他客戶端可以sub感知到該服務
                    jedis.hset(key, value, expire);
                    jedis.publish(key, REGISTER);
                    success = true;
                    // 如果不是復制模式的redis 服務(即為failover模式),只需往一個redis寫數據即可,
                    // 剩余redis自行同步實際上這里應該是存在數據一致性問題的
                    if (!replicate) {
                        break; //  If the server side has synchronized data, just write a single machine
                    }
                }
            } catch (Throwable t) {
                exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
            }
        }
        // 只要有一個成功,即算成功
        if (exception != null) {
            if (success) {
                logger.warn(exception.getMessage(), exception);
            } else {
                throw exception;
            }
        }
    }

  以hash類型存放所有提供者列表, key為服務粒度的前綴信息: /dubbo/xxx.xx.XxxService/providers, hash中每個field->value表示,服務全路徑信息->過期時間。

  通過redis的 pub/sub 機制,通知其他客戶端變化。注冊時發布一條消息到提供者路徑, publish <key> register 。 

 

4. redis 消費者服務訂閱

  服務注冊的目的,主要是讓注冊中心及其他應用端可以發現自己。而服務訂閱則為了讓自己可以發現別的系統的變化。如查找所有提供者列表,接收應用上下線通知,開啟監聽等等。

    // org.apache.dubbo.registry.redis.RedisRegistry#doSubscribe
    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        String service = toServicePath(url);
        // 基於service開啟訂閱線程
        Notifier notifier = notifiers.get(service);
        if (notifier == null) {
            // 主動開啟一個 notifier 線程,進行subscribe處理
            // 如果service很多,那就意味着有很多的此類線程,這並不是件好事
            Notifier newNotifier = new Notifier(service);
            notifiers.putIfAbsent(service, newNotifier);
            notifier = notifiers.get(service);
            if (notifier == newNotifier) {
                notifier.start();
            }
        }
        boolean success = false;
        RpcException exception = null;
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    if (service.endsWith(ANY_VALUE)) {
                        admin = true;
                        Set<String> keys = jedis.keys(service);
                        if (CollectionUtils.isNotEmpty(keys)) {
                            Map<String, Set<String>> serviceKeys = new HashMap<>();
                            for (String key : keys) {
                                String serviceKey = toServicePath(key);
                                Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
                                sk.add(key);
                            }
                            for (Set<String> sk : serviceKeys.values()) {
                                doNotify(jedis, sk, url, Collections.singletonList(listener));
                            }
                        }
                    } else {
                        // 首次訂閱,使用 keys xx/* 將所有服務信息存儲到本地
                        doNotify(jedis, jedis.keys(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener));
                    }
                    success = true;
                    break; // Just read one server's data
                }
            } catch (Throwable t) { // Try the next server
                exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
            }
        }
        if (exception != null) {
            if (success) {
                logger.warn(exception.getMessage(), exception);
            } else {
                throw exception;
            }
        }
    }

  與zk的直接調用zkClient.addChildListener()實現訂閱不同,redis中使用了多個獨立的訂閱線程,使用pub/sub機制進行處理。(因redis的pub/sub是基於channel進行的長連接通信,所以每個service只能使用單獨的線程,有點傷!)。 使用 doNotify() 將redis中的數據接入應用中。在做訂閱的同時,也拉取了提供者服務列表達到初始化的作用。

 

5. Redis 服務下線處理

  當應用要關閉,或者注冊失敗時,需要進行服務下線。當然,如果應用沒有及時做下線處理,zk會通過其自身的臨時節點過期機制,也會將該服務做下線處理。從而避免消費者或管理台看到無效的服務存在。

  應用服務的主動下線操作是由 ShutdownHookCallbacks 和在判斷服務不可用時進行的 invoker.destroy() 來實現優雅下線。

    // org.apache.dubbo.registry.integration.RegistryDirectory#destroy
    @Override
    public void destroy() {
        if (isDestroyed()) {
            return;
        }

        // unregister.
        try {
            if (getRegisteredConsumerUrl() != null && registry != null && registry.isAvailable()) {
                registry.unregister(getRegisteredConsumerUrl());
            }
        } catch (Throwable t) {
            logger.warn("unexpected error when unregister service " + serviceKey + "from registry" + registry.getUrl(), t);
        }
        // unsubscribe.
        try {
            if (getConsumerUrl() != null && registry != null && registry.isAvailable()) {
                registry.unsubscribe(getConsumerUrl(), this);
            }
            ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension()
                    .removeListener(ApplicationModel.getApplication(), CONSUMER_CONFIGURATION_LISTENER);
        } catch (Throwable t) {
            logger.warn("unexpected error when unsubscribe service " + serviceKey + "from registry" + registry.getUrl(), t);
        }
        super.destroy(); // must be executed after unsubscribing
        try {
            destroyAllInvokers();
        } catch (Throwable t) {
            logger.warn("Failed to destroy service " + serviceKey, t);
        }
    }
    // org.apache.dubbo.registry.support.FailbackRegistry#unregister
    @Override
    public void unregister(URL url) {
        super.unregister(url);
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // Sending a cancellation request to the server side
            doUnregister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedUnregistered(url);
        }
    }
    // org.apache.dubbo.registry.redis.RedisRegistry#doUnregister
    @Override
    public void doUnregister(URL url) {
        String key = toCategoryPath(url);
        String value = url.toFullString();
        RpcException exception = null;
        boolean success = false;
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    // 直接刪除當前服務對應的 key-field 信息
                    // 然后發布一條 UNREGISTER 消息,通知其他客戶端
                    jedis.hdel(key, value);
                    jedis.publish(key, UNREGISTER);
                    success = true;
                    // 如果redis 是復制模型,需要在每個redis上都做一次刪除
                    // 此時各應用端將會重復收到消息,重復處理,看起來並不是件好事
                    if (!replicate) {
                        break; //  If the server side has synchronized data, just write a single machine
                    }
                }
            } catch (Throwable t) {
                exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
            }
        }
        if (exception != null) {
            if (success) {
                logger.warn(exception.getMessage(), exception);
            } else {
                throw exception;
            }
        }
    }

  總結: 下線處理兩步驟: 1. 刪除對應的hash key-field; 2. publish 一個下線消息通知其他應用; 3. 針對redis的集群配置決定是刪除1次或n次,且反復通知操作;

 

6. redis 服務解除事件訂閱

  事實上,redis的 doUnsubscribe, 已不再處理任何事件。

    @Override
    public void doUnsubscribe(URL url, NotifyListener listener) {
    }

  那么,前面注冊的多個 Notifier 監聽線程就不管了嗎?那肯定是不行的,它會在 destroy() 被調用時進行收尾處理。實際上,它是 unregister() 的后續工作。

    // org.apache.dubbo.registry.support.AbstractRegistryFactory#destroyAll
    /**
     * Close all created registries
     */
    public static void destroyAll() {
        if (!destroyed.compareAndSet(false, true)) {
            return;
        }

        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Close all registries " + getRegistries());
        }
        // Lock up the registry shutdown process
        LOCK.lock();
        try {
            for (Registry registry : getRegistries()) {
                try {
                    registry.destroy();
                } catch (Throwable e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            REGISTRIES.clear();
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
    // org.apache.dubbo.registry.redis.RedisRegistry#destroy
    @Override
    public void destroy() {
        // 該方法甚至可以去調用 unregister(), unsubscribe() 方法
        super.destroy();
        try {
            expireFuture.cancel(true);
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
        try {
            // 遍歷所有 notifiers, 依次調用 shutdown, 即停止訂閱工作
            for (Notifier notifier : notifiers.values()) {
                notifier.shutdown();
            }
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                jedisPool.destroy();
            } catch (Throwable t) {
                logger.warn("Failed to destroy the redis registry client. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
            }
        }
        // 最后優雅關閉過期掃描定時任務線程池,即 shutdown()..awaitTermination()的應用。
        ExecutorUtil.gracefulShutdown(expireExecutor, expirePeriod);
    }
        // 停止notifier
        // org.apache.dubbo.registry.redis.RedisRegistry.Notifier#shutdown
        public void shutdown() {
            try {
                // step1. 設置停止標識
                // step2. 斷開redis連接,這不只是一斷開的操作,它會停止psubscribe的調用,從而間接中止訂閱線程工作
                running = false;
                jedis.disconnect();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    // 如下方法,即是其父類的 destroy(), 里面涵蓋了未關閉的 地址信息,則會觸發 unregister, unsubscribe
    // org.apache.dubbo.registry.support.AbstractRegistry#destroy
    @Override
    public void destroy() {
        if (logger.isInfoEnabled()) {
            logger.info("Destroy registry:" + getUrl());
        }
        Set<URL> destroyRegistered = new HashSet<>(getRegistered());
        // step1. unregister 未下線的服務
        if (!destroyRegistered.isEmpty()) {
            for (URL url : new HashSet<>(getRegistered())) {
                if (url.getParameter(DYNAMIC_KEY, true)) {
                    try {
                        unregister(url);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unregister url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        // step2. unsubscribe 未取消訂閱的服務
        Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
        if (!destroySubscribed.isEmpty()) {
            for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    try {
                        unsubscribe(url, listener);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unsubscribe url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        // step3. 從已注冊列表中刪除當前實例
        AbstractRegistryFactory.removeDestroyedRegistry(this);
    }
    // org.apache.dubbo.registry.support.AbstractRegistryFactory#removeDestroyedRegistry
    public static void removeDestroyedRegistry(Registry toRm) {
        LOCK.lock();
        try {
            REGISTRIES.entrySet().removeIf(entry -> entry.getValue().equals(toRm));
        } finally {
            LOCK.unlock();
        }
    }

  總結:此處講了更多unregister,unsubscribe的前置操作。而 notifier.shutdown(); 才是關閉redis訂閱相關工作的關鍵。它是通過設置停止循環標識,以及關閉redis連接實現的。事實上,這各取消訂閱方式並沒有很優雅。

 

7. 服務心跳的維護處理

  redis本身只是一個緩存存儲系統,心跳邏輯需要自行實現。實際上,我們也可以依賴於redis的自動過期機制,進行心跳續期。那么,redis注冊中心是否也是這樣實現的呢?好像並不是!

    // 在 RedisRegistry 的構造方法中,初始化了一個定時任務的調度
     this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
            try {
                deferExpired(); // Extend the expiration time
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
            }
        }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
    // org.apache.dubbo.registry.redis.RedisRegistry#deferExpired
    private void deferExpired() {
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    // 取出所有注冊了的服務,進行心跳更新
                    for (URL url : new HashSet<>(getRegistered())) {
                        if (url.getParameter(DYNAMIC_KEY, true)) {
                            String key = toCategoryPath(url);
                            // 增加過期時間+expirePeriod, url -> expireAt
                            if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
                                // 如果是第一次新增該值,或者重新新增該值(可能由於原來的地址過期被刪除),則觸發一次regiter的消息發布,自會有相應訂閱者處理該變更
                                jedis.publish(key, REGISTER);
                            }
                        }
                    }
                    // 如果是管理類配置,interface=*, 則會開啟清理服務功能,注意此類操作會很重,將會消耗很大
                    // 該值會在subscribe()的時候置為 true
                    // 按文檔說明該操作會在 監控中心執行,而非存在於應用端
                    if (admin) {
                        clean(jedis);
                    }
                    if (!replicate) {
                        break;//  If the server side has synchronized data, just write a single machine
                    }
                }
            } catch (Throwable t) {
                logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
            }
        }
    }
    // The monitoring center is responsible for deleting outdated dirty data
    private void clean(Jedis jedis) {
        // redis: keys * , 列舉所有相關的key, 根據服務數量來定該值多少
        Set<String> keys = jedis.keys(root + ANY_VALUE);
        if (CollectionUtils.isNotEmpty(keys)) {
            for (String key : keys) {
                // redis: hgetall <key>
                Map<String, String> values = jedis.hgetAll(key);
                if (CollectionUtils.isNotEmptyMap(values)) {
                    boolean delete = false;
                    long now = System.currentTimeMillis();
                    for (Map.Entry<String, String> entry : values.entrySet()) {
                        URL url = URL.valueOf(entry.getKey());
                        // 根據hash中value 指定的時間,判定是否過期,如果過期則做刪除操作
                        // redis: hdel <key> <field>
                        if (url.getParameter(DYNAMIC_KEY, true)) {
                            long expire = Long.parseLong(entry.getValue());
                            if (expire < now) {
                                jedis.hdel(key, entry.getKey());
                                delete = true;
                                if (logger.isWarnEnabled()) {
                                    logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
                                }
                            }
                        }
                    }
                    // 只要有一個服務被判定為過期,則訂閱了該服務的客戶端都應該被通知到
                    // 多個服務下線只會被通知一次
                    if (delete) {
                        jedis.publish(key, UNREGISTER);
                    }
                }
            }
        }
    }

  deferExpired() 的作用,就是維護本實例的所有服務的有效性,做續期作用。兩個重量級操作: 1. 依次延期某service下的所有url的過期時間;2. 做全量清理過期服務url;keys xx* 的操作,也對redis提出了一些要求,因為有些redis出於安全限制可能會禁用keys命令。

 

8. 服務信息變更通知處理notify

  redis注冊中心其實不會主動發現服務變更,只有應用自己發布regiter或unregister消息后,其他應用才能感知到變化。前面在 doRegister() 時,我看到,應用是通過hash添加字段注冊自己,並同時發布 REGISTER 消息通知所有訂閱者。在 doSubscribe() 時開啟另一個服務線程處理subscribe();

    // org.apache.dubbo.registry.redis.RedisRegistry#doSubscribe
    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        String service = toServicePath(url);
        // 訂閱是基於服務處理的,每個服務一個訂閱處理線程
        Notifier notifier = notifiers.get(service);
        if (notifier == null) {
            Notifier newNotifier = new Notifier(service);
            notifiers.putIfAbsent(service, newNotifier);
            notifier = notifiers.get(service);
            // 此處應為防止並發所做的努力
            if (notifier == newNotifier) {
                notifier.start();
            }
        }
        boolean success = false;
        RpcException exception = null;
        for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
            Pool<Jedis> jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    // 使用 /dubbo/* 代表是管理服務,其需要做清理過期key的作用
                    if (service.endsWith(ANY_VALUE)) {
                        admin = true;
                        ...
                    } else {
                        // 使用 keys xxx/* 命令,列舉出該服務下所有緩存key, 實際上就是 providers, consumers, configurators, routers
                        doNotify(jedis, jedis.keys(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener));
                    }
                    success = true;
                    break; // Just read one server's data
                }
            } catch (Throwable t) { // Try the next server
                exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
            }
        }
        if (exception != null) {
            if (success) {
                logger.warn(exception.getMessage(), exception);
            } else {
                throw exception;
            }
        }
    }
    // 根據列如上得到redis-key信息,做服務信息變更
    private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
        if (keys == null || keys.isEmpty()
                || listeners == null || listeners.isEmpty()) {
            return;
        }
        long now = System.currentTimeMillis();
        List<URL> result = new ArrayList<>();
        List<String> categories = Arrays.asList(url.getParameter(CATEGORY_KEY, new String[0]));
        String consumerService = url.getServiceInterface();
        for (String key : keys) {
            if (!ANY_VALUE.equals(consumerService)) {
                // 截取出 service
                String providerService = toServiceName(key);
                if (!providerService.equals(consumerService)) {
                    continue;
                }
            }
            String category = toCategoryName(key);
            // consumers應用只會處理, providers,routers,configurators 的服務, 從而忽略 consumers 下的數據
            if (!categories.contains(ANY_VALUE) && !categories.contains(category)) {
                continue;
            }
            List<URL> urls = new ArrayList<>();
            // 獲取所有hash值
            Map<String, String> values = jedis.hgetAll(key);
            if (CollectionUtils.isNotEmptyMap(values)) {
                for (Map.Entry<String, String> entry : values.entrySet()) {
                    URL u = URL.valueOf(entry.getKey());
                    // 判斷服務是否過期,過期且存在的服務將不會被利用,但不會做更多處理
                    if (!u.getParameter(DYNAMIC_KEY, true)
                            || Long.parseLong(entry.getValue()) >= now) {
                        if (UrlUtils.isMatch(url, u)) {
                            urls.add(u);
                        }
                    }
                }
            }
            // 如果沒有找到合適的可用服務,則添加一個 empty:// 的地址
            if (urls.isEmpty()) {
                urls.add(URLBuilder.from(url)
                        .setProtocol(EMPTY_PROTOCOL)
                        .setAddress(ANYHOST_VALUE)
                        .setPath(toServiceName(key))
                        .addParameter(CATEGORY_KEY, category)
                        .build());
            }
            result.addAll(urls);
            if (logger.isInfoEnabled()) {
                logger.info("redis notify: " + key + " = " + urls);
            }
        }
        if (CollectionUtils.isEmpty(result)) {
            return;
        }
        // 調用父類 FailbackRegistry.notify 方法,與zk調用一致了
        // 刷新提供者列表,路由,配置等本地緩存信息
        for (NotifyListener listener : listeners) {
            notify(url, listener, result);
        }
    }
    private String toServiceName(String categoryPath) {
        // 截取root+interfaceName
        // 截取 interfaceName
        String servicePath = toServicePath(categoryPath);
        return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath;
    }
    private String toServicePath(String categoryPath) {
        int i;
        // 排除root路徑,找到第一個'/', 取出servicePath
        if (categoryPath.startsWith(root)) {
            i = categoryPath.indexOf(PATH_SEPARATOR, root.length());
        } else {
            i = categoryPath.indexOf(PATH_SEPARATOR);
        }
        return i > 0 ? categoryPath.substring(0, i) : categoryPath;
    }
    // 另外,對於某個服務發生變更時,需要遍歷所有consumer, 確認是否需要刷新
    // 額,意義嘛,暫是沒太明白
    private void doNotify(Jedis jedis, String key) {
        for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) {
            doNotify(jedis, Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue()));
        }
    }

  總結: 

    1. redis 做初次subscribe時,notify會通過redis-keys 命令獲取所有需要的key, 然后依次將其提供者、路由、配置等信息都緩存起來。
    2. 針對每個服務,都會開啟相關的訂閱線程Notifier處理訂閱工作。
    3. 最終的listener處理默認會由 RegistryDirectory 處理。

  接下來,我們來看 Notifier 是如何處理訂閱的?

        // org.apache.dubbo.registry.redis.RedisRegistry.Notifier#run
        @Override
        public void run() {
            // 每個訂閱線程,死循環處理只是為了避免網絡等其他異常情況出現,以便重新嘗試連接redis 訂閱channel
            while (running) {
                try {
                    // 額,這是個優化,我不懂的
                    if (!isSkip()) {
                        try {
                            for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
                                Pool<Jedis> jedisPool = entry.getValue();
                                try {
                                    if (jedisPool.isClosed()) {
                                        continue;
                                    }
                                    jedis = jedisPool.getResource();
                                    if (!jedis.isConnected()) {
                                        continue;
                                    }
                                    try {
                                        if (service.endsWith(ANY_VALUE)) {
                                            if (first) {
                                                first = false;
                                                Set<String> keys = jedis.keys(service);
                                                if (CollectionUtils.isNotEmpty(keys)) {
                                                    for (String s : keys) {
                                                        doNotify(jedis, s);
                                                    }
                                                }
                                                resetSkip();
                                            }
                                            jedis.psubscribe(new NotifySub(jedisPool), service); // blocking
                                        } else {
                                            if (first) {
                                                // 首次處理,通知RegistryDirectory 按service刷新緩存
                                                first = false;
                                                doNotify(jedis, service);
                                                resetSkip();
                                            }
                                            // 使用 psubscribe channel 命令,阻塞監聽channel信息
                                            // 當消息返回時,使用 NotifySub 進行業務處理,實際就是調用 doNotify() 的過程
                                            // 訂閱的channel 為: /dubbo/xxx.xx.XxxService/*
                                            jedis.psubscribe(new NotifySub(jedisPool), service + PATH_SEPARATOR + ANY_VALUE); // blocking
                                        }
                                        break;
                                    } finally {
                                        jedis.close();
                                    }
                                } catch (Throwable t) { // Retry another server
                                    logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
                                    // If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
                                    sleep(reconnectPeriod);
                                }
                            }
                        } catch (Throwable t) {
                            logger.error(t.getMessage(), t);
                            // 異常發生后,sleep片刻再重試
                            sleep(reconnectPeriod);
                        }
                    }
                } catch (Throwable t) {
                    logger.error(t.getMessage(), t);
                }
            }
        }
        // org.apache.dubbo.registry.redis.RedisRegistry.NotifySub#onMessage
        @Override
        public void onMessage(String key, String msg) {
            if (logger.isInfoEnabled()) {
                logger.info("redis event: " + key + " = " + msg);
            }
            // 只關注 REGISTER / UNREGISTER, 兩個消息
            if (msg.equals(REGISTER)
                    || msg.equals(UNREGISTER)) {
                try {
                    Jedis jedis = jedisPool.getResource();
                    try {
                        // 復用 doNotify
                        doNotify(jedis, key);
                    } finally {
                        jedis.close();
                    }
                } catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
                    logger.error(t.getMessage(), t);
                }
            }
        }
        // 最后還是來看下 isSkip() 的小優化吧
        // 雖然不懂為什么,但是感覺很厲害的樣子
        // org.apache.dubbo.registry.redis.RedisRegistry.Notifier#isSkip
        private boolean isSkip() {
            // connectSkip: 已經跳過連接的總次數, connectSkipped: 當前周期內已跳過連接的次數
            // step1. 在connectSkip < 10 情況下,直接用 connectSkipped 與其比較,connectSkipped<connectSkip, 則繼續跳過本次,否則不跳過,進入連接邏輯connectSkipped, connectSkip次數增加
            // step2. connectSkip >= 10, 不可再用其作為判定跳過次數, 使用一個10-20間的隨機值,作為跳過連接次數判定
            // step3. 如果本次判定為不跳過,則重置 connectSkipped已連接次數自增
            int skip = connectSkip.get(); // Growth of skipping times
            if (skip >= 10) { // If the number of skipping times increases by more than 10, take the random number
                if (connectRandom == 0) {
                    connectRandom = ThreadLocalRandom.current().nextInt(10);
                }
                skip = 10 + connectRandom;
            }
            if (connectSkipped.getAndIncrement() < skip) { // Check the number of skipping times
                return true;
            }
            connectSkip.incrementAndGet();
            connectSkipped.set(0);
            connectRandom = 0;
            return false;
        }

  監聽服務就做好一件事就行,調用 psubscribe命令訂閱channel, 發生變化時調用 doNotify() 回調listener處理刷新。為避免異常情況下訂閱功能仍然成立,使用外部的while循環包裹訂閱邏輯重試。

  注意其訂閱的redis channel 為 /dubbo/xxx.xx.XxxService/*, 所以相當於其自身的變更也被包含在內了。而是否要處理該事件,則依賴於url中的categorys配置,如消費為:category=providers,configurators,routers, 即它會處理這三種類型的key變更。

 

9. 一點感想

  dubbo用redis做注冊中心,可以看作是一個簡單的擴展實現。其核心是基於redis的 pub/sub 能力。

  但和zk比起來,redis功能實現會相對困難些,甚至看起來有些蹩腳(如其redis集群策略需要自行從外部保證同步,這恐怕不會是件容易的事,現有的主從,集群方案都完全無法cover其場景。既要保證任意寫,又要保證全同步(數據一致性),呵呵)。它需要單獨去維護一些心跳、過期類的事務。過多的服務會導致這類工作更加繁重。

  但這也許不能成為大家拒絕應用的理由,畢竟,按官方說明阿里內部是基於數據庫實現的注冊中心,自然有其道理。

    (事實上,redis版本的注冊中心,並非是完全優化的,你完全可以順手美化下再使用)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM