SpringJMS解析3-監聽器


消息監聽器容器是一個用於查看JMS目標等待消息到達的特殊bean,一旦消息到達它就可以獲取到消息,並通過調用onMessage()方法將消息傳遞給一個MessageListener實現。Spring中消息監聽器容器的類型如下。

SimpleMessageListenerContainer:最簡單的消息監聽器容器,只能處理固定數量的JMS會話,且不支持事務。

DefaultMessageListenerContainer:這個消息監聽器容器建立在SimpleMessageListenerContainer容器之上,添加了對事務的支持。

serversession.ServerSessionMessage.ListenerContainer:這是功能最強大的消息監聽器,與DefaultMessageListenerContainer相同,它支持事務,但是它還允許動態地管理JMS會話。

下面以DefaultMessageListenerContainer為例進行分析,看看消息監聽器容器的實現。在使用消息監聽器容器時一定要將自定義的消息監聽器置入到容器中,這樣才可以在收到信息時,容器把消息轉向監聽器處理。查看DefaultMessageListenerContainer層次結構圖,我們看到此類實現了InitializingBean接口,按照以往的風格我們還是首先查看接口方法afterPropertiesSet()中的邏輯,其方法實現在其父類AbstractJmsListeningContainer中。

  @Override
  public void afterPropertiesSet() {
    //驗證connectionFactory
    super.afterPropertiesSet();
    //驗證配置文件
    validateConfiguration();
    //初始化
    initialize();
  }

監聽器容器的初始化只包含了三句代碼,其中前兩句只用於屬性的驗證,比如connectionFacory或者destination等屬性是否為空等,而真正用於初始化的操作委托在initialize()中執行。

  public void initialize() throws JmsException {
    try {
      //lifecycleMonitor用於控制生命周期的同步處理
      synchronized (this.lifecycleMonitor) {
        this.active = true;
        this.lifecycleMonitor.notifyAll();
      }
      doInitialize();
    }
    catch (JMSException ex) {
      synchronized (this.sharedConnectionMonitor) {
        ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup);
        this.sharedConnection = null;
      }
      throw convertJmsAccessException(ex);
    }
  }
  @Override
  protected void doInitialize() throws JMSException {
    synchronized (this.lifecycleMonitor) {
      for (int i = 0; i < this.concurrentConsumers; i++) {
        scheduleNewInvoker();
      }
    }
  }

這里用到了concurrentConsumers屬性,消息監聽器允許創建多個Session和MessageConsumer來接收消息。具體的個數由concurrentConsumers屬性指定。需要注意的是,應該只是在Destination為Queue的時候才使用多個MessageConsumer(Queue中的一個消息只能被一個Consumer接收),雖然使用多個MessageConsumer會提高消息處理的性能,但是消息處理的順序卻得不到保證。消息被接收的順序仍然是消息發送時的順序,但是由於消息可能會被並發處理,因此消息處理的順序可能和消息發送的順序不同。此外,不應該在Destination為Topic的時候使用多個MessageConsumer,因為多個MessageConsumer會接收到同樣的消息。

對於具體的實現邏輯我們只能繼續查看源碼:

  private void scheduleNewInvoker() {
    AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
    if (rescheduleTaskIfNecessary(invoker)) {
      // This should always be true, since we're only calling this when active.
      this.scheduledInvokers.add(invoker);
    }
  }
  protected final boolean rescheduleTaskIfNecessary(Object task) {
  if (this.running) {
    try {
      doRescheduleTask(task);
    }
    catch (RuntimeException ex) {
      logRejectedTask(task, ex);
      this.pausedTasks.add(task);
    }
    return true;
  }
  else if (this.active) {
    this.pausedTasks.add(task);
    return true;
  }
  else {
    return false;
  }
}
@Override
protected void doRescheduleTask(Object task) {
this.taskExecutor.execute((Runnable) task);
}
 

分析源碼得知,根據concurrentConsumers數量建立了對應數量的線程,即使讀者不了解線程池的使用,至少根據以上代碼可以推斷出doRescheduleTask函數其實是在開啟一個線程執行Runnable。我們反追蹤這個傳入的參數,可以看到這個參數其實是AsyncMessageListenerInvoker類型實例。因此我們可以推斷,Spring根據concurrentConsumers數量建立了對應數量的線程,而每個線程都作為一個獨立的接收者在循環接收消息。於是我們把所有的焦點轉向AsyncMessageListenerInvoker這個類的實現,由於它是作為一個Runnable角色去執行,所以對以這個類的分析從run方法開始。

@Override
public void run() {
  //並發控制
  synchronized (lifecycleMonitor) {
    activeInvokerCount++;
    lifecycleMonitor.notifyAll();
  }
  boolean messageReceived = false;
  try {
    //根據每個任務設置的最大處理消息數量而作不同處理
    //小於0默認為無限制,一直接收消息
    if (maxMessagesPerTask < 0) {
      messageReceived = executeOngoingLoop();
    }
    else {
      int messageCount = 0;
      //消息數量控制,一旦超出數量則停止循環
      while (isRunning() && messageCount < maxMessagesPerTask) {
        messageReceived = (invokeListener() || messageReceived);
        messageCount++;
      }
    }
  }
  catch (Throwable ex) {
    //清理操作,包括關閉session等
    clearResources();
    if (!this.lastMessageSucceeded) {
      // We failed more than once in a row or on startup - sleep before
      // first recovery attempt.
      sleepBeforeRecoveryAttempt();
    }
    this.lastMessageSucceeded = false;
    boolean alreadyRecovered = false;
    synchronized (recoveryMonitor) {
      if (this.lastRecoveryMarker == currentRecoveryMarker) {
        handleListenerSetupFailure(ex, false);
        recoverAfterListenerSetupFailure();
        currentRecoveryMarker = new Object();
      }
      else {
        alreadyRecovered = true;
      }
    }
    if (alreadyRecovered) {
      handleListenerSetupFailure(ex, true);
    }
  }
  finally {
    synchronized (lifecycleMonitor) {
      decreaseActiveInvokerCount();
      lifecycleMonitor.notifyAll();
    }
    if (!messageReceived) {
      this.idleTaskExecutionCount++;
    }
    else {
      this.idleTaskExecutionCount = 0;
    }
    synchronized (lifecycleMonitor) {
      if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
        // We're shutting down completely.
        scheduledInvokers.remove(this);
        if (logger.isDebugEnabled()) {
          logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
        }
        lifecycleMonitor.notifyAll();
        clearResources();
      }
      else if (isRunning()) {
        int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount();
        if (nonPausedConsumers < 1) {
          logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " +
              "Check your thread pool configuration! Manual recovery necessary through a start() call.");
        }
        else if (nonPausedConsumers < getConcurrentConsumers()) {
          logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +
              "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +
              "to be triggered by remaining consumers.");
        }
      }
    }
  }
}

以上函數中主要根據變量maxMessagesPerTask的值來分為不同的情況處理,當然,函數中還使用了大量的代碼處理異常機制的數據維護,我們更加關注程序的正常流程是如何處理的。

其實核心的處理就是調用invokeListener來接收消息並激活消息監聽器,但是之所以兩種情況分開處理,正是考慮到在無限制循環接收消息的情況下,用戶可以通過設置標志位running來控制消息接收的暫停與恢復,並維護當前消息監聽器的數量。

        private boolean executeOngoingLoop() throws JMSException {
            boolean messageReceived = false;
            boolean active = true;
            while (active) {
                synchronized (lifecycleMonitor) {
                    boolean interrupted = false;
                    boolean wasWaiting = false;
                    //如果當前任務已經處於激活狀態但是卻給了暫時終止的命令
                    while ((active = isActive()) && !isRunning()) {
                        if (interrupted) {
                            throw new IllegalStateException("Thread was interrupted while waiting for " +
                                    "a restart of the listener container, but container is still stopped");
                        }
                        //如果並非處於等待狀態則說明是第一次執行,需要將激活任務數量減少
                        if (!wasWaiting) {
                            decreaseActiveInvokerCount();
                        }
                        wasWaiting = true;
                        //開始進入等待狀態,等待任務的恢復命令
                        try {
                            //通過wait等待,也就是等待notify或者notifyAll
                            lifecycleMonitor.wait();
                        }
                        catch (InterruptedException ex) {
                            // Re-interrupt current thread, to allow other threads to react.
                            Thread.currentThread().interrupt();
                            interrupted = true;
                        }
                    }
                    if (wasWaiting) {
                        activeInvokerCount++;
                    }
                    if (scheduledInvokers.size() > maxConcurrentConsumers) {
                        active = false;
                    }
                }
                //正常處理流程
                if (active) {
                    messageReceived = (invokeListener() || messageReceived);
                }
            }
            return messageReceived;
        }

如果按照正常的流程其實是不會進入while循環中的,而是直接進入函數invokeListener()來接收消息並激活監聽器,但是,我們不可能讓循環一直持續下去,我們要考慮到暫停線程或者恢復線程的情況,這時,isRunning()函數就派上用場了。

isRunning()用來檢測標志位this.running狀態進而判斷是否需要進入while循環。由於要維護當前線程激活數量,所以引入了wasWaiting變量,用來判斷線程是否處於等待狀態。如果線程首次進入等待狀態,則需要減少線程激活數量計數器。

當然,還有一個地方需要提一下,就是線程等待不是一味地采用while循環來控制,因為如果單純地采用while循環會浪費CPU的始終周期,給資源造成巨大的浪費。這里,Spring采用的是使用全局控制變量lifecycleMonitor的wait()方法來暫停線程,所以,如果終止線程需要再次恢復的話,除了更改this.running標志位外,還需要調用lifecycleMonitor.notify或者lifecycleMonitor.notifyAll來使線程恢復。

接下來就是消息接收的處理了invokeListener。

    private boolean invokeListener() throws JMSException {
      //初始化資源包括首次創建的時候創建session與consumer
      initResourcesIfNecessary();
      boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
      //改變標志位,信息成功處理
      this.lastMessageSucceeded = true;
      return messageReceived;
    }
    private void initResourcesIfNecessary() throws JMSException {
      if (getCacheLevel() <= CACHE_CONNECTION) {
        updateRecoveryMarker();
      }
      else {
        if (this.session == null && getCacheLevel() >= CACHE_SESSION) {
          updateRecoveryMarker();
          this.session = createSession(getSharedConnection());
        }
        if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
          this.consumer = createListenerConsumer(this.session);
          synchronized (lifecycleMonitor) {
            registeredWithDestination++;
          }
        }
      }
    }
    protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer)
        throws JMSException {

      if (this.transactionManager != null) {
        // Execute receive within transaction.
        TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
        boolean messageReceived;
        try {
          messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
        }
        catch (JMSException ex) {
          rollbackOnException(status, ex);
          throw ex;
        }
        catch (RuntimeException ex) {
          rollbackOnException(status, ex);
          throw ex;
        }
        catch (Error err) {
          rollbackOnException(status, err);
          throw err;
        }
        this.transactionManager.commit(status);
        return messageReceived;
      }

      else {
        // Execute receive outside of transaction.
        return doReceiveAndExecute(invoker, session, consumer, null);
      }
    }

在介紹消息監聽器容器的分類時,已介紹了DefaultMessageListenerContainer消息監聽器容器建立在SimpleMessageListenerContainer容器之上,添加了對事務的支持,那么此時,事務特性的實現已經開始了。如果用戶配置了this.transactionManager ,也就是配置了事務,那么,消息的接收會被控制在事務之內,一旦出現任何異常都會被回滾,而回滾操作也會交由事務管理器統一處理,比如this.transactionManager.rollback(status)。

doReceiveAndExecute包含了整個消息的接收處理過程,由於參雜着事務,所以並沒有復用模板中的方法。

  protected boolean doReceiveAndExecute(
      Object invoker, Session session, MessageConsumer consumer, TransactionStatus status)
      throws JMSException {

    Connection conToClose = null;
    Session sessionToClose = null;
    MessageConsumer consumerToClose = null;
    try {
      Session sessionToUse = session;
      boolean transactional = false;
      if (sessionToUse == null) {
        sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
            getConnectionFactory(), this.transactionalResourceFactory, true);
        transactional = (sessionToUse != null);
      }
      if (sessionToUse == null) {
        Connection conToUse;
        if (sharedConnectionEnabled()) {
          conToUse = getSharedConnection();
        }
        else {
          conToUse = createConnection();
          conToClose = conToUse;
          conToUse.start();
        }
        sessionToUse = createSession(conToUse);
        sessionToClose = sessionToUse;
      }
      MessageConsumer consumerToUse = consumer;
      if (consumerToUse == null) {
        consumerToUse = createListenerConsumer(sessionToUse);
        consumerToClose = consumerToUse;
      }
      //接收消息
      Message message = receiveMessage(consumerToUse);
      if (message != null) {
        if (logger.isDebugEnabled()) {
          logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +
              consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
              sessionToUse + "]");
        }
        //模板方法,當消息接收且在未處理前給子類機會做相應處理,當前空實現
        messageReceived(invoker, sessionToUse);
        boolean exposeResource = (!transactional && isExposeListenerSession() &&
            !TransactionSynchronizationManager.hasResource(getConnectionFactory()));
        if (exposeResource) {
          TransactionSynchronizationManager.bindResource(
              getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
        }
        try {
          //激活監聽器
          doExecuteListener(sessionToUse, message);
        }
        catch (Throwable ex) {
          if (status != null) {
            if (logger.isDebugEnabled()) {
              logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
            }
            status.setRollbackOnly();
          }
          handleListenerException(ex);
          // Rethrow JMSException to indicate an infrastructure problem
          // that may have to trigger recovery...
          if (ex instanceof JMSException) {
            throw (JMSException) ex;
          }
        }
        finally {
          if (exposeResource) {
            TransactionSynchronizationManager.unbindResource(getConnectionFactory());
          }
        }
        // Indicate that a message has been received.
        return true;
      }
      else {
        if (logger.isTraceEnabled()) {
          logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") +
              "session [" + sessionToUse + "] did not receive a message");
        }
        //接收到空消息的處理
        noMessageReceived(invoker, sessionToUse);
        // Nevertheless call commit, in order to reset the transaction timeout (if any).
        // However, don't do this on Tibco since this may lead to a deadlock there.
        if (shouldCommitAfterNoMessageReceived(sessionToUse)) {
          commitIfNecessary(sessionToUse, message);
        }
        // Indicate that no message has been received.
        return false;
      }
    }
    finally {
      JmsUtils.closeMessageConsumer(consumerToClose);
      JmsUtils.closeSession(sessionToClose);
      ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
    }
  }
  //監聽器的激活處理
  protected void doExecuteListener(Session session, Message message) throws JMSException {
    if (!isAcceptMessagesWhileStopping() && !isRunning()) {
      if (logger.isWarnEnabled()) {
        logger.warn("Rejecting received message because of the listener container " +
            "having been stopped in the meantime: " + message);
      }
      rollbackIfNecessary(session);
      throw new MessageRejectedWhileStoppingException();
    }

    try {
      invokeListener(session, message);
    }
    catch (JMSException ex) {
      rollbackOnExceptionIfNecessary(session, ex);
      throw ex;
    }
    catch (RuntimeException ex) {
      rollbackOnExceptionIfNecessary(session, ex);
      throw ex;
    }
    catch (Error err) {
      rollbackOnExceptionIfNecessary(session, err);
      throw err;
    }
    commitIfNecessary(session, message);
  }
  protected void invokeListener(Session session, Message message) throws JMSException {
    Object listener = getMessageListener();
    if (listener instanceof SessionAwareMessageListener) {
      doInvokeListener((SessionAwareMessageListener) listener, session, message);
    }
    else if (listener instanceof MessageListener) {
      doInvokeListener((MessageListener) listener, message);
    }
    else if (listener != null) {
      throw new IllegalArgumentException(
          "Only MessageListener and SessionAwareMessageListener supported: " + listener);
    }
    else {
      throw new IllegalStateException("No message listener specified - see property 'messageListener'");
    }
  }
  protected void doInvokeListener(MessageListener listener, Message message) throws JMSException {
    listener.onMessage(message);
  }

通過層層調用,最終提取監聽器並使用listener.onMessage(message)激活了監聽器,也就是激活了用戶自定義的監聽器邏輯。這里還有一句重要的代碼很容易被忽略掉,commitIfNecessary(session, message),完成的功能是session.commit()。完成消息服務的事務提交,涉及兩個事務,我們常說的DefaultMessageListenerContainer增加了事務的支持,是通用的事務,也就是說我們在消息接收過程中如果產生其他操作,比如向數據庫中插入數據,一旦出現異常時就需要全部回滾,包括回滾插入數據庫中的數據。但是,除了我們常說的事務之外,對於消息本身還有一個事務,當接收一個消息的時候,必須使用事務提交的方式,這是在告訴消息服務器本地已經正常接收消息,消息服務器接收到本地的事務提交后便可以將此消息刪除,否則,當前消息會被其他接收者重新接收。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM