spring-data-redis之RedisMessageListenerContainer源碼解析


簡介

首先,我們翻譯一下這個類的注釋:

RedisMessageListenerContainer 為Redis消息偵聽器 MessageListener 提供異步行為的容器。處理偵聽、轉換和消息分派的低級細節。
與低級別Redis(每個訂閱一個連接)相反,容器只使用一個連接,該連接對所有注冊的偵聽器都是“多路復用”的,消息調度是通過任務執行器完成的。
注意:容器以惰性方式使用連接(僅當至少配置了一個偵聽器時才使用連接)。
同時添加和刪除偵聽器會產生未定義的結果。強烈建議對這些方法進行相應的同步/排序。

public class RedisMessageListenerContainer implements InitializingBean, DisposableBean, BeanNameAware, SmartLifecycle

關於 InitializingBean, DisposableBean, SmartLifecycle ,建議閱讀以下文章加以了解:

start()

// 調用start()方法后,當前對象狀態為運行中;調用stop()方法后,當前對象狀態為不在運行。
// whether the container is running (or not)
private volatile boolean running = false;

public void start() {
  if (!running) {
    running = true;
    // wait 方法調用時,外層必須有 synchronized 關鍵字
    // wait for the subscription to start before returning
    // technically speaking we can only be notified right before the subscription starts
    synchronized (monitor) {
      // 惰性偵聽
      lazyListen();
      // lazyListen 中會異步啟動內部類SubscriptionTask的實例
      // 這里加上 wait 同步,主要就是想等到SubscriptionTask實例也運行起來了,再打印出 "Started RedisMessageListenerContainer" 的日志
      if (listening) {
        try {
          // wait up to 5 seconds for Subscription thread
          monitor.wait(initWait);
        } catch (InterruptedException e) {
          // 如果調用 start() 方法的線程被設置打斷標記導致wait方法被喚醒
          // 那啟動過程也就中止了,container 變為不在運行中的狀態
          // stop waiting
          Thread.currentThread().interrupt();
          running = false;
          return;
        }
      }
    }

    if (logger.isDebugEnabled()) {
      logger.debug("Started RedisMessageListenerContainer");
    }
  }
}

lazyListen

/**
 * 方法檢查是否確實需要偵聽消息(從而使用線程)並觸發它。
 * Method inspecting whether listening for messages (and thus using a thread) is actually needed and triggering
 */
private void lazyListen() {
  boolean debug = logger.isDebugEnabled();
  boolean started = false;
  if (isRunning()) {
    // “double check”:synchronized 之前之后都進行一次狀態判定,並發知識,不多說了
    if (!listening) {
      synchronized (monitor) {
        if (!listening) {
          // 這里就對應上了類的注釋:僅當至少配置了一個偵聽器時才使用連接
          if (channelMapping.size() > 0 || patternMapping.size() > 0) {
            // 使用訂閱線程池啟動訂閱任務
            subscriptionExecutor.execute(subscriptionTask);
            listening = true;
            started = true;
          }
        }
      }
      if (debug) {
        if (started) {
          logger.debug("Started listening for Redis messages");
        } else {
          logger.debug("Postpone listening for Redis messages until actual listeners are added");
	}
      }
    }
  }
}

RedisMessageListenerContainer 容器的惰性偵聽,有幾點:

  1. 至少配置了一個channel或者pattern才偵聽;
  2. 一個容器至多啟動一個subscriptionTask;
  3. 有三種情況會調用 lazyListen 檢查是否需要偵聽並啟動subscriptionTask:
    • 容器啟動時
    • 調用 addMessageListener 動態添加訂閱channel或者pattern時
    • SubscriptionTask.run() 異常退出時

channel 指的是字符串類型的訂閱“主題”,pattern 則是用通配符表示訂閱一類“主題”

內部類SubscriptionTask

SubscriptionTask 實現了 Runnable 接口,在 subscriptionExecutor.execute(subscriptionTask); 調用之后,就會異步執行 run 方法。

run()

// Redis連接
private volatile @Nullable RedisConnection connection;
private final Object localMonitor = new Object();
private boolean subscriptionTaskRunning = false;
public void run() {
  // 一個SubscriptionTask實例同時只允許一個線程執行接下來的代碼
  synchronized (localMonitor) {
    subscriptionTaskRunning = true;
  }
  try {
    // 獲取Redis連接
    connection = connectionFactory.getConnection();
    if (connection.isSubscribed()) {
      throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening");
    }
    // 如果ConnectionFactory是LettuceConnectionFactory及其子類,就返回true
    boolean asyncConnection = ConnectionUtils.isAsync(connectionFactory);
    // NB: sync drivers' Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription.
    if (!asyncConnection) {
      synchronized (monitor) {
        monitor.notify();
      }
    }
    // 訂閱代碼
    SubscriptionPresentCondition subscriptionPresent = eventuallyPerformSubscription();
    if (asyncConnection) {
      // 設置檢驗條件和超時時長的“自旋屏障”:
      // 要么條件滿足,結束自旋;
      // 要么等待超時,結束自旋;
      SpinBarrier.waitFor(subscriptionPresent, getMaxSubscriptionRegistrationWaitingTime());
      synchronized (monitor) {
        monitor.notify();
      }
    }
  } catch (Throwable t) {
    // 訂閱失敗的處理
    handleSubscriptionException(t);
  } finally {
    // 此塊在訂閱線程結束后執行
    // this block is executed once the subscription thread has ended, this may or may not mean
    // the connection has been unsubscribed, depending on driver
    synchronized (localMonitor) {
      subscriptionTaskRunning = false;
      localMonitor.notify();
    }
  }
}

eventuallyPerformSubscription

/**
 * Performs a potentially asynchronous registration of a subscription.
 *
 * @return #SubscriptionPresentCondition that can serve as a handle to check whether the subscription is ready.
 */
private SubscriptionPresentCondition eventuallyPerformSubscription() {
  SubscriptionPresentCondition condition = null;
  if (channelMapping.isEmpty()) {
    condition = new PatternSubscriptionPresentCondition();
    // 對應Redis命令中的 PSUBSCRIBE 訂閱一個或多個符合給定模式的頻道。
    connection.pSubscribe(new DispatchMessageListener(), unwrap(patternMapping.keySet()));
  } else {
    if (patternMapping.isEmpty()) {
      condition = new SubscriptionPresentCondition();
    } else {
      // channelMapping 和 patternMapping 都有數據時進入該分支
      // schedule the rest of the subscription
      // 模式訂閱交給 PatternSubscriptionTask 去做
      subscriptionExecutor.execute(new PatternSubscriptionTask());
      condition = new PatternSubscriptionPresentCondition();
    }
    // 對應Redis命令中的 SUBSCRIBE 訂閱給定的一個或多個頻道的信息。
    connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet()));
  }
  return condition;
}

這里又多了兩個需要了解的細節:

  • *PatternSubscriptionTask 主要負責 模式訂閱
  • DispatchMessageListener 主要負責 消息分發

handleSubscriptionException

RedisMessageListenerContainer 的訂閱任務具有重試機制。如果Redis發生宕機重啟,定時重試,可以讓WEB應用服務器在Redis恢復時,重新訂閱channel或者pattern。

/**
 * 處理訂閱任務異常。如果異常是連接失敗(例如,Redis重新啟動),將嘗試重新啟動訂閱。
 * Handle subscription task exception. Will attempt to restart the subscription if the Exception is a connection
 * failure (for example, Redis was restarted).
 *
 * @param ex Throwable exception
 */
protected void handleSubscriptionException(Throwable ex) {
  // 修改當前容器的偵聽狀態
  listening = false;
  // 釋放Redis連接
  subscriptionTask.closeConnection();
  // 如果是Redis連接斷開的情況
  if (ex instanceof RedisConnectionFailureException) {
    if (isRunning()) {
      // 默認是睡眠 5 秒后重試
      logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms");
      sleepBeforeRecoveryAttempt();
      // 重新執行“惰性偵聽”
      lazyListen();
    }
  } else {
    logger.error("SubscriptionTask aborted with exception:", ex);
  }
}

PatternSubscriptionTask

PatternSubscriptionTask 又是 SubscriptionTask的一個子類,也實現了 Runnable 接口,我們來看看它的 run 方法

run

public void run() {
  // done 表示等待 connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet())); 訂閱完成
  // wait for subscription to be initialized
  boolean done = false;
  // 自旋三次,等待 Subscription 初始化完成,這里准備復用該對象 
  // wait 3 rounds for subscription to be initialized
  for (int i = 0; i < ROUNDS && !done; i++) {
    if (connection != null) {
      synchronized (localMonitor) {
        if (connection.isSubscribed()) {
          done = true;
	  connection.getSubscription().pSubscribe(unwrap(patternMapping.keySet()));
        } else {
          // 每次自旋等待 500 毫秒
          try {
            Thread.sleep(WAIT);
          } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            return;
          }
        }
      }
    }
  }
}

DispatchMessageListener

private class DispatchMessageListener implements MessageListener {
  @Override
  public void onMessage(Message message, @Nullable byte[] pattern) {
    Collection<MessageListener> listeners = null;
    // if it's a pattern, disregard channel
    if (pattern != null && pattern.length > 0) {
      listeners = patternMapping.get(new ByteArrayWrapper(pattern));
    } else {
      pattern = null;
      // do channel matching first
      listeners = channelMapping.get(new ByteArrayWrapper(message.getChannel()));
    }
    if (!CollectionUtils.isEmpty(listeners)) {
      // 真正分發消息給容器管理的 MessageListener
      dispatchMessage(listeners, message, pattern);
    }
  }
}

dispatchMessage

private void dispatchMessage(Collection<MessageListener> listeners, Message message, @Nullable byte[] pattern) {
  byte[] source = (pattern != null ? pattern.clone() : message.getChannel());
  for (MessageListener messageListener : listeners) {
    taskExecutor.execute(() -> processMessage(messageListener, message, source));
  }
}

如果咱CRUD程序員沒有自定義 subscriptionExecutor,那么 taskExecutor 其實和 subscriptionExecutor 是同一個引用:

// RedisMessageListenerContainer 的成員方法
public void afterPropertiesSet() {
  if (taskExecutor == null) {
    manageExecutor = true;
    taskExecutor = createDefaultTaskExecutor();
  }
  if (subscriptionExecutor == null) {
    // 如果執行到這里,那么兩者就是同一個引用
    subscriptionExecutor = taskExecutor;
  }
  initialized = true;
}

processMessage

處理消息時,有一層 catch,如果捕獲到容器管理的 MessageListener 的異常,是可以通過容器的異常處理器來處理的。

// RedisMessageListenerContainer 的成員方法
protected void processMessage(MessageListener listener, Message message, byte[] pattern) {
  executeListener(listener, message, pattern);
}

protected void executeListener(MessageListener listener, Message message, byte[] pattern) {
  try {
    listener.onMessage(message, pattern);
  } catch (Throwable ex) {
    handleListenerException(ex);
  }
}

handleListenerException

可以通過給容器設置 ErrorHandler 實現對象,來統一處理捕獲到他管理的的 MessageListener 的異常。

protected void handleListenerException(Throwable ex) {
  if (isActive()) {
    // Regular case: failed while active.
    // Invoke ErrorHandler if available.
    invokeErrorHandler(ex);
  } else {
    // Rare case: listener thread failed after container shutdown.
    // Log at debug level, to avoid spamming the shutdown logger.
    logger.debug("Listener exception after container shutdown", ex);
  }
}

protected void invokeErrorHandler(Throwable ex) {
  if (this.errorHandler != null) {
    this.errorHandler.handleError(ex);
  } else if (logger.isWarnEnabled()) {
    logger.warn("Execution of message listener failed, and no ErrorHandler has been set.", ex);
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM