在使用redisson消息訂閱時,我針對門店商品庫存減扣進行訂閱的操作(在這里一個商品一個監聽隊列),當正式投入生產時,發現一直再報Subscribe timeout: (" + timeout + "ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.
的錯誤,索性根據提示翻了翻源碼看看原因:
在redisson里先關注一個類:RedisPubSubConnection
該類繼承自RedisConnection
,根據名字我們可知它是一個典型的發布與訂閱的類。那么在redisson
使用時,會使用PubSubConnectionEntry
進行一次包裝:
public class PubSubConnectionEntry {
private final AtomicInteger subscribedChannelsAmount;
private final RedisPubSubConnection conn;
private final ConcurrentMap<ChannelName, SubscribeListener> subscribeChannelListeners = new ConcurrentHashMap<ChannelName, SubscribeListener>();
private final ConcurrentMap<ChannelName, Queue<RedisPubSubListener<?>>> channelListeners = new ConcurrentHashMap<ChannelName, Queue<RedisPubSubListener<?>>>();
public PubSubConnectionEntry(RedisPubSubConnection conn, int subscriptionsPerConnection) {
super();
this.conn = conn;
this.subscribedChannelsAmount = new AtomicInteger(subscriptionsPerConnection);
}
//.....省略其他代碼
}
在這里我們可以看到其有一個比較重要的屬性 subscribedChannelsAmount
,而這個值就是通過PublishSubscribeService
進行調用的:
private void connect(final Codec codec, final ChannelName channelName,
final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock, final RedisPubSubListener<?>... listeners) {
//....
RedisPubSubConnection conn = future.getNow();
final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
//....
}
那么此屬性就是根據config的subscriptionsPerConnection
里設置的,那么此值就代表了每個連接的最大訂閱數。當tryAcqcurie
的時候會減少這個數量:
public int tryAcquire() {
while (true) {
int value = subscribedChannelsAmount.get();
if (value == 0) {
return -1;
}
if (subscribedChannelsAmount.compareAndSet(value, value - 1)) {
return value - 1;
}
}
}
如果當此值為0時,那么會重新獲取一個可用的連接,代碼如下:
int remainFreeAmount = freeEntry.tryAcquire();
if (remainFreeAmount == -1) {
throw new IllegalStateException();
}
final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
if (oldEntry != null) {
freeEntry.release();
freePubSubLock.release();
subscribe(channelName, promise, type, lock, oldEntry, listeners);
return;
}
if (remainFreeAmount == 0) {
freePubSubConnections.poll();
}
freePubSubLock.release();
如果此時沒有可用的連接的話,恐怕此次操作就會等待新的連接直至超時,超時了就報上述的錯誤了,不過根據提示。我們此時的解決辦法是增大subscriptionsPerConnection或者subscriptionConnectionPoolSize的值。當我們使用springboot時可以通過設置spring.redis.redisson.config
(具體設置請參考官網)來指定redisson的配置文件或者重新創建RedissonClient:
@Bean(destroyMethod = "shutdown")
public RedissonClient redisson(RedissonProperties redissonProperties, RedisProperties redisProperties) throws IOException {
Config config = new Config();
String prefix = "redis://";
Method method = ReflectionUtils.findMethod(RedisProperties.class, "isSsl");
if (method != null && (Boolean) ReflectionUtils.invokeMethod(method, redisProperties)) {
prefix = "rediss://";
}
config.useSingleServer()
.setAddress(prefix + redisProperties.getHost() + ":" + redisProperties.getPort())
.setConnectTimeout(30000).setSubscriptionsPerConnection(5000) //在這里指定數目
.setDatabase(redisProperties.getDatabase())
.setPassword(redisProperties.getPassword());
return Redisson.create(config);
}