使用redisson時關於訂閱數的問題


在使用redisson消息訂閱時,我針對門店商品庫存減扣進行訂閱的操作(在這里一個商品一個監聽隊列),當正式投入生產時,發現一直再報Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.的錯誤,索性根據提示翻了翻源碼看看原因:

在redisson里先關注一個類:RedisPubSubConnection該類繼承自RedisConnection,根據名字我們可知它是一個典型的發布與訂閱的類。那么在redisson使用時,會使用PubSubConnectionEntry進行一次包裝:

    public class PubSubConnectionEntry {
    
        private final AtomicInteger subscribedChannelsAmount;
        private final RedisPubSubConnection conn;
    
        private final ConcurrentMap<ChannelName, SubscribeListener> subscribeChannelListeners = new ConcurrentHashMap<ChannelName, SubscribeListener>();
        private final ConcurrentMap<ChannelName, Queue<RedisPubSubListener<?>>> channelListeners = new ConcurrentHashMap<ChannelName, Queue<RedisPubSubListener<?>>>();
    
        public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) {
            super();
            this.conn = conn;
            this.subscribedChannelsAmount = new AtomicInteger(subscriptionsPerConnection);
        }
      
     //.....省略其他代碼 
    }

在這里我們可以看到其有一個比較重要的屬性 subscribedChannelsAmount,而這個值就是通過PublishSubscribeService進行調用的:

        private void connect(final Codec codec, final ChannelName channelName,
                final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
          //....
          
          
             RedisPubSubConnection conn = future.getNow();
                    
                    final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
                    entry.tryAcquire();
          //....
        }

那么此屬性就是根據config的subscriptionsPerConnection里設置的,那么此值就代表了每個連接的最大訂閱數。當tryAcqcurie的時候會減少這個數量:

      public int tryAcquire() {
            while (true) {
                int value = subscribedChannelsAmount.get();
                if (value == 0) {
                    return -1;
                }
                
                if (subscribedChannelsAmount.compareAndSet(value, value - 1)) {
                    return value - 1;
                }
            }
        }

如果當此值為0時,那么會重新獲取一個可用的連接,代碼如下:

     int remainFreeAmount = freeEntry.tryAcquire();
                    if (remainFreeAmount == -1) {
                        throw new IllegalStateException();
                    }
                    
                    final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
                    if (oldEntry != null) {
                        freeEntry.release();
                        freePubSubLock.release();
                        subscribe(channelName, promise, type, lock, oldEntry, listeners);
                        return;
                    }
                    
                    if (remainFreeAmount == 0) {
                        freePubSubConnections.poll();
                    }
                    freePubSubLock.release();

如果此時沒有可用的連接的話,恐怕此次操作就會等待新的連接直至超時,超時了就報上述的錯誤了,不過根據提示。我們此時的解決辦法是增大subscriptionsPerConnection或者subscriptionConnectionPoolSize的值。當我們使用springboot時可以通過設置spring.redis.redisson.config(具體設置請參考官網)來指定redisson的配置文件或者重新創建RedissonClient:

            @Bean(destroyMethod = "shutdown")
            public RedissonClient redisson(RedissonProperties redissonProperties, RedisProperties redisProperties) throws IOException {
    
                Config config = new Config();
                String prefix = "redis://";
                Method method = ReflectionUtils.findMethod(RedisProperties.class, "isSsl");
                if (method != null && (Boolean) ReflectionUtils.invokeMethod(method, redisProperties)) {
                    prefix = "rediss://";
                }
    
                config.useSingleServer()
                        .setAddress(prefix + redisProperties.getHost() + ":" + redisProperties.getPort())
                        .setConnectTimeout(30000).setSubscriptionsPerConnection(5000) //在這里指定數目
                        .setDatabase(redisProperties.getDatabase())
                        .setPassword(redisProperties.getPassword());
    
                return Redisson.create(config);
            }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM