在ActiveMQ中使用SingleConnectionFactory遇到的坑


我們在生產環境使用了ActiveMQ作為消息中間件,消息中間件連接到數據庫對消息進行持久化。

最近發生了一個奇怪的事情,消費者端的生產日志總是報如下錯誤:

The JMS connection has failed: java.io.EOFException

Successfully refreshed JMS Connection

這個日志在生產環境大量重復,第一個錯誤是EOFException,當一個連接的遠端主動關閉連接時,本端會接收到這個異常。

第二行的消息咋一看,是“Successfully refreshed JMS Connection”,理論上此時連接已經恢復,但是消費者依然無法獲取信息。

將這個問題放到Baidu,Bing和Google上搜索,也看到了其他用戶遇到過類似情況,但是仔細研究一下發現並不是我們遇到的情況,他們提供的解決方案也無法解決我們的問題。

 

就在問題要陷入僵局的時候,我們發現:

1. 在錯誤消息刷出來之前,ActiveMQ報了錯,疑似它使用的持久化數據庫中途掛掉,且被重啟了

2. 同一個MQ有幾個組件作為消費端,但是只有這個組件刷錯誤日志,其他組件正常恢復了連接

 

初步分析是因為數據庫掛掉導致連接失效,因為發現MQ日志中,數據庫連接報錯之后很短的時間間隔內客戶端就開始刷日志。

很奇怪的是其他幾個組件沒有問題,所以我們對比了這幾個組件之間的配置文件差異,發現只有這個組件使用了SingleConnectionFactory,其他組件使用的都是CachingConnectionFactory,所以懷疑問題出在了這個配置上。

對比了下這兩個類的源代碼,這是CachingConnectionFactory.java的注釋部分和第一行代碼:

/**
 * {@link SingleConnectionFactory} subclass that adds {@link javax.jms.Session}
 * caching as well {@link javax.jms.MessageProducer} caching. This ConnectionFactory
 * also switches the {@link #setReconnectOnException "reconnectOnException" property}
 * to "true" by default, allowing for automatic recovery of the underlying Connection.
 *
......
 *
 * @author Juergen Hoeller
 * @since 2.5.3
 */

public class CachingConnectionFactory extends SingleConnectionFactory {
......

大家可以注意到,這個類就是擴展了SingleConnectionFactory類,並且將reconnectOnException設置為true

至此,問題已經有一點眉目了,如果連接出現異常,通過reconnectOnException決定是否reconnect(重連接),這個屬性在SingleConnectionFactory默認設置為false的(可以參見它的代碼,默認設置為false),但是在CachingConnectionFactory中設置為true,這就是為何連接失效了,客戶端卻沒能重新連上的原因。

進一步檢查日志中消息“Successfully refreshed JMS Connection”的來源,可以進一步印證我們的看法:

 請查看DefaultMessageListenerContainer.java:

package org.springframework.jms.listener;
...

public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
...
    protected void refreshConnectionUntilSuccessful() {
        while (true) {
            if (this.isRunning()) {
                try {
                    if (this.sharedConnectionEnabled()) {
                        this.refreshSharedConnection();
                    } else {
                        Connection con = this.createConnection();
                        JmsUtils.closeConnection(con);
                    }

                    this.logger.info("Successfully refreshed JMS Connection");
                } catch (Exception var3) {
        ...
    }
...
}

可以看到,上面代碼中的消息,正是我們在日志中反復看到的信息,而通過檢查refreshSharedConnection()和createConnection(),我們發現:

refreshSharedConnection()調用鏈:

AbstractJmsListeningContainer.refreshSharedConnection()
AbstractJmsListeningContainer.createSharedConnection()
JmsAccessor.createConnection()
JmsAccessor.getConnectionFactory().createConnection()

createConnection()調用鏈:

JmsAccessor.createConnection()
JmsAccessor.getConnectionFactory().createConnection()

可以看出,這兩個分支最后都是到連接工廠中調用createConnection(),查看下代碼:

package org.springframework.jms.connection;

public class SingleConnectionFactory
...
    public Connection createConnection() throws JMSException {
        Object var1 = this.connectionMonitor;
        synchronized (this.connectionMonitor) {
            if (this.connection == null) {
                this.initConnection();
            }

            return this.connection;
        }
    }
...
}

使用SingleConnectionFactory時,如果連接對象connection不為空,即使此時連接失效,依然不會進入initConnection,所以雖然connection返回了非null值,但是這個連接其實是壞的

至此,原因查明,將SingleConnectionFactory改為CachingConnectionFactory后,此問題消失,當客戶端發現連接失效后,能夠主動連接到MQ服務器

 

思考:為何設置了“reconnectOnException”屬性后,就能夠自動重連了呢?

找到代碼中使用了這個變量的位置:

package org.springframework.jms.connection;

public class SingleConnectionFactory
...
    protected void prepareConnection(Connection con) throws JMSException {
        if (this.getClientId() != null) {
            con.setClientID(this.getClientId());
        }

        if (this.getExceptionListener() != null || this.isReconnectOnException()) {
            ExceptionListener listenerToUse = this.getExceptionListener();
            if (this.isReconnectOnException()) {
                listenerToUse = new InternalChainedExceptionListener(this, (ExceptionListener) listenerToUse);
            }

            con.setExceptionListener((ExceptionListener) listenerToUse);
        }

    }
...
}

代碼中添加了一個異常監聽器,此監聽器觸發的代碼為:

package org.springframework.jms.connection;

public class SingleConnectionFactory
{
...

   public void onException(JMSException ex) {
        this.logger.warn("Encountered a JMSException - resetting the underlying JMS Connection", ex);
        this.resetConnection();
    }

    public void resetConnection() {
        Object var1 = this.connectionMonitor;
        synchronized (this.connectionMonitor) {
            if (this.target != null) {
                this.closeConnection(this.target);
            }

            this.target = null;
            this.connection = null;
        }
    }
...
}

可見,當發生異常時,異常監聽器調用了resetConnection()函數,此函數會將connection設置為null,然后DefaultMessageListenerContainer中的監視線程經過一段時間即能重新建立此連接


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM