簡介
首先,我們翻譯一下這個類的注釋:
RedisMessageListenerContainer 為Redis消息偵聽器 MessageListener 提供異步行為的容器。處理偵聽、轉換和消息分派的低級細節。
與低級別Redis(每個訂閱一個連接)相反,容器只使用一個連接,該連接對所有注冊的偵聽器都是“多路復用”的,消息調度是通過任務執行器完成的。
注意:容器以惰性方式使用連接(僅當至少配置了一個偵聽器時才使用連接)。
同時添加和刪除偵聽器會產生未定義的結果。強烈建議對這些方法進行相應的同步/排序。
public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle
關於 InitializingBean, DisposableBean, SmartLifecycle ,建議閱讀以下文章加以了解:
start()
// 調用start()方法后,當前對象狀態為運行中;調用stop()方法后,當前對象狀態為不在運行。
// whether the container is running (or not)
private volatile boolean running = false;
public void start() {
if (!running) {
running = true;
// wait 方法調用時,外層必須有 synchronized 關鍵字
// wait for the subscription to start before returning
// technically speaking we can only be notified right before the subscription starts
synchronized (monitor) {
// 惰性偵聽
lazyListen();
// lazyListen 中會異步啟動內部類SubscriptionTask的實例
// 這里加上 wait 同步,主要就是想等到SubscriptionTask實例也運行起來了,再打印出 "Started RedisMessageListenerContainer" 的日志
if (listening) {
try {
// wait up to 5 seconds for Subscription thread
monitor.wait(initWait);
} catch (InterruptedException e) {
// 如果調用 start() 方法的線程被設置打斷標記導致wait方法被喚醒
// 那啟動過程也就中止了,container 變為不在運行中的狀態
// stop waiting
Thread.currentThread().interrupt();
running = false;
return;
}
}
}
if (logger.isDebugEnabled()) {
logger.debug("Started RedisMessageListenerContainer");
}
}
}
lazyListen
/**
* 方法檢查是否確實需要偵聽消息(從而使用線程)並觸發它。
* Method inspecting whether listening for messages (and thus using a thread) is actually needed and triggering
*/
private void lazyListen() {
boolean debug = logger.isDebugEnabled();
boolean started = false;
if (isRunning()) {
// “double check”:synchronized 之前之后都進行一次狀態判定,並發知識,不多說了
if (!listening) {
synchronized (monitor) {
if (!listening) {
// 這里就對應上了類的注釋:僅當至少配置了一個偵聽器時才使用連接
if (channelMapping.size() > 0 || patternMapping.size() > 0) {
// 使用訂閱線程池啟動訂閱任務
subscriptionExecutor.execute(subscriptionTask);
listening = true;
started = true;
}
}
}
if (debug) {
if (started) {
logger.debug("Started listening for Redis messages");
} else {
logger.debug("Postpone listening for Redis messages until actual listeners are added");
}
}
}
}
}
RedisMessageListenerContainer 容器的惰性偵聽,有幾點:
- 至少配置了一個channel或者pattern才偵聽;
- 一個容器至多啟動一個subscriptionTask;
- 有三種情況會調用 lazyListen 檢查是否需要偵聽並啟動subscriptionTask:
- 容器啟動時
- 調用 addMessageListener 動態添加訂閱channel或者pattern時
- SubscriptionTask.run() 異常退出時
channel 指的是字符串類型的訂閱“主題”,pattern 則是用通配符表示訂閱一類“主題”
內部類SubscriptionTask
SubscriptionTask 實現了 Runnable 接口,在 subscriptionExecutor.execute(subscriptionTask); 調用之后,就會異步執行 run 方法。
run()
// Redis連接
private volatile @Nullable RedisConnection connection;
private final Object localMonitor = new Object();
private boolean subscriptionTaskRunning = false;
public void run() {
// 一個SubscriptionTask實例同時只允許一個線程執行接下來的代碼
synchronized (localMonitor) {
subscriptionTaskRunning = true;
}
try {
// 獲取Redis連接
connection = connectionFactory.getConnection();
if (connection.isSubscribed()) {
throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening");
}
// 如果ConnectionFactory是LettuceConnectionFactory及其子類,就返回true
boolean asyncConnection = ConnectionUtils.isAsync(connectionFactory);
// NB: sync drivers' Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription.
if (!asyncConnection) {
synchronized (monitor) {
monitor.notify();
}
}
// 訂閱代碼
SubscriptionPresentCondition subscriptionPresent = eventuallyPerformSubscription();
if (asyncConnection) {
// 設置檢驗條件和超時時長的“自旋屏障”:
// 要么條件滿足,結束自旋;
// 要么等待超時,結束自旋;
SpinBarrier.waitFor(subscriptionPresent, getMaxSubscriptionRegistrationWaitingTime());
synchronized (monitor) {
monitor.notify();
}
}
} catch (Throwable t) {
// 訂閱失敗的處理
handleSubscriptionException(t);
} finally {
// 此塊在訂閱線程結束后執行
// this block is executed once the subscription thread has ended, this may or may not mean
// the connection has been unsubscribed, depending on driver
synchronized (localMonitor) {
subscriptionTaskRunning = false;
localMonitor.notify();
}
}
}
eventuallyPerformSubscription
/**
* Performs a potentially asynchronous registration of a subscription.
*
* @return #SubscriptionPresentCondition that can serve as a handle to check whether the subscription is ready.
*/
private SubscriptionPresentCondition eventuallyPerformSubscription() {
SubscriptionPresentCondition condition = null;
if (channelMapping.isEmpty()) {
condition = new PatternSubscriptionPresentCondition();
// 對應Redis命令中的 PSUBSCRIBE 訂閱一個或多個符合給定模式的頻道。
connection.pSubscribe(new DispatchMessageListener(), unwrap(patternMapping.keySet()));
} else {
if (patternMapping.isEmpty()) {
condition = new SubscriptionPresentCondition();
} else {
// channelMapping 和 patternMapping 都有數據時進入該分支
// schedule the rest of the subscription
// 模式訂閱交給 PatternSubscriptionTask 去做
subscriptionExecutor.execute(new PatternSubscriptionTask());
condition = new PatternSubscriptionPresentCondition();
}
// 對應Redis命令中的 SUBSCRIBE 訂閱給定的一個或多個頻道的信息。
connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet()));
}
return condition;
}
這里又多了兩個需要了解的細節:
- *PatternSubscriptionTask 主要負責 模式訂閱;
- DispatchMessageListener 主要負責 消息分發;
handleSubscriptionException
RedisMessageListenerContainer 的訂閱任務具有重試機制。如果Redis發生宕機重啟,定時重試,可以讓WEB應用服務器在Redis恢復時,重新訂閱channel或者pattern。
/**
* 處理訂閱任務異常。如果異常是連接失敗(例如,Redis重新啟動),將嘗試重新啟動訂閱。
* Handle subscription task exception. Will attempt to restart the subscription if the Exception is a connection
* failure (for example, Redis was restarted).
*
* @param ex Throwable exception
*/
protected void handleSubscriptionException(Throwable ex) {
// 修改當前容器的偵聽狀態
listening = false;
// 釋放Redis連接
subscriptionTask.closeConnection();
// 如果是Redis連接斷開的情況
if (ex instanceof RedisConnectionFailureException) {
if (isRunning()) {
// 默認是睡眠 5 秒后重試
logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms");
sleepBeforeRecoveryAttempt();
// 重新執行“惰性偵聽”
lazyListen();
}
} else {
logger.error("SubscriptionTask aborted with exception:", ex);
}
}
PatternSubscriptionTask
PatternSubscriptionTask 又是 SubscriptionTask的一個子類,也實現了 Runnable 接口,我們來看看它的 run 方法
run
public void run() {
// done 表示等待 connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet())); 訂閱完成
// wait for subscription to be initialized
boolean done = false;
// 自旋三次,等待 Subscription 初始化完成,這里准備復用該對象
// wait 3 rounds for subscription to be initialized
for (int i = 0; i < ROUNDS && !done; i++) {
if (connection != null) {
synchronized (localMonitor) {
if (connection.isSubscribed()) {
done = true;
connection.getSubscription().pSubscribe(unwrap(patternMapping.keySet()));
} else {
// 每次自旋等待 500 毫秒
try {
Thread.sleep(WAIT);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return;
}
}
}
}
}
}
DispatchMessageListener
private class DispatchMessageListener implements MessageListener {
@Override
public void onMessage(Message message, @Nullable byte[] pattern) {
Collection<MessageListener> listeners = null;
// if it's a pattern, disregard channel
if (pattern != null && pattern.length > 0) {
listeners = patternMapping.get(new ByteArrayWrapper(pattern));
} else {
pattern = null;
// do channel matching first
listeners = channelMapping.get(new ByteArrayWrapper(message.getChannel()));
}
if (!CollectionUtils.isEmpty(listeners)) {
// 真正分發消息給容器管理的 MessageListener
dispatchMessage(listeners, message, pattern);
}
}
}
dispatchMessage
private void dispatchMessage(Collection<MessageListener> listeners, Message message, @Nullable byte[] pattern) {
byte[] source = (pattern != null ? pattern.clone() : message.getChannel());
for (MessageListener messageListener : listeners) {
taskExecutor.execute(() -> processMessage(messageListener, message, source));
}
}
如果咱CRUD程序員沒有自定義 subscriptionExecutor,那么 taskExecutor 其實和 subscriptionExecutor 是同一個引用:
// RedisMessageListenerContainer 的成員方法
public void afterPropertiesSet() {
if (taskExecutor == null) {
manageExecutor = true;
taskExecutor = createDefaultTaskExecutor();
}
if (subscriptionExecutor == null) {
// 如果執行到這里,那么兩者就是同一個引用
subscriptionExecutor = taskExecutor;
}
initialized = true;
}
processMessage
處理消息時,有一層 catch,如果捕獲到容器管理的 MessageListener 的異常,是可以通過容器的異常處理器來處理的。
// RedisMessageListenerContainer 的成員方法
protected void processMessage(MessageListener listener, Message message, byte[] pattern) {
executeListener(listener, message, pattern);
}
protected void executeListener(MessageListener listener, Message message, byte[] pattern) {
try {
listener.onMessage(message, pattern);
} catch (Throwable ex) {
handleListenerException(ex);
}
}
handleListenerException
可以通過給容器設置 ErrorHandler 實現對象,來統一處理捕獲到他管理的的 MessageListener 的異常。
protected void handleListenerException(Throwable ex) {
if (isActive()) {
// Regular case: failed while active.
// Invoke ErrorHandler if available.
invokeErrorHandler(ex);
} else {
// Rare case: listener thread failed after container shutdown.
// Log at debug level, to avoid spamming the shutdown logger.
logger.debug("Listener exception after container shutdown", ex);
}
}
protected void invokeErrorHandler(Throwable ex) {
if (this.errorHandler != null) {
this.errorHandler.handleError(ex);
} else if (logger.isWarnEnabled()) {
logger.warn("Execution of message listener failed, and no ErrorHandler has been set.", ex);
}
}