我們在生產環境使用了ActiveMQ作為消息中間件,消息中間件連接到數據庫對消息進行持久化。
最近發生了一個奇怪的事情,消費者端的生產日志總是報如下錯誤:
The JMS connection has failed: java.io.EOFException
Successfully refreshed JMS Connection
這個日志在生產環境大量重復,第一個錯誤是EOFException,當一個連接的遠端主動關閉連接時,本端會接收到這個異常。
第二行的消息咋一看,是“Successfully refreshed JMS Connection”,理論上此時連接已經恢復,但是消費者依然無法獲取信息。
將這個問題放到Baidu,Bing和Google上搜索,也看到了其他用戶遇到過類似情況,但是仔細研究一下發現並不是我們遇到的情況,他們提供的解決方案也無法解決我們的問題。
就在問題要陷入僵局的時候,我們發現:
1. 在錯誤消息刷出來之前,ActiveMQ報了錯,疑似它使用的持久化數據庫中途掛掉,且被重啟了
2. 同一個MQ有幾個組件作為消費端,但是只有這個組件刷錯誤日志,其他組件正常恢復了連接
初步分析是因為數據庫掛掉導致連接失效,因為發現MQ日志中,數據庫連接報錯之后很短的時間間隔內客戶端就開始刷日志。
很奇怪的是其他幾個組件沒有問題,所以我們對比了這幾個組件之間的配置文件差異,發現只有這個組件使用了SingleConnectionFactory,其他組件使用的都是CachingConnectionFactory,所以懷疑問題出在了這個配置上。
對比了下這兩個類的源代碼,這是CachingConnectionFactory.java的注釋部分和第一行代碼:
/** * {@link SingleConnectionFactory} subclass that adds {@link javax.jms.Session} * caching as well {@link javax.jms.MessageProducer} caching. This ConnectionFactory * also switches the {@link #setReconnectOnException "reconnectOnException" property} * to "true" by default, allowing for automatic recovery of the underlying Connection. * ...... * * @author Juergen Hoeller * @since 2.5.3 */ public class CachingConnectionFactory extends SingleConnectionFactory {
......
大家可以注意到,這個類就是擴展了SingleConnectionFactory類,並且將reconnectOnException設置為true
至此,問題已經有一點眉目了,如果連接出現異常,通過reconnectOnException決定是否reconnect(重連接),這個屬性在SingleConnectionFactory默認設置為false的(可以參見它的代碼,默認設置為false),但是在CachingConnectionFactory中設置為true,這就是為何連接失效了,客戶端卻沒能重新連上的原因。
進一步檢查日志中消息“Successfully refreshed JMS Connection”的來源,可以進一步印證我們的看法:
請查看DefaultMessageListenerContainer.java:
package org.springframework.jms.listener; ... public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer { ... protected void refreshConnectionUntilSuccessful() { while (true) { if (this.isRunning()) { try { if (this.sharedConnectionEnabled()) { this.refreshSharedConnection(); } else { Connection con = this.createConnection(); JmsUtils.closeConnection(con); } this.logger.info("Successfully refreshed JMS Connection"); } catch (Exception var3) { ... } ... }
可以看到,上面代碼中的消息,正是我們在日志中反復看到的信息,而通過檢查refreshSharedConnection()和createConnection(),我們發現:
refreshSharedConnection()調用鏈:
AbstractJmsListeningContainer.refreshSharedConnection()
AbstractJmsListeningContainer.createSharedConnection()
JmsAccessor.createConnection()
JmsAccessor.getConnectionFactory().createConnection()
createConnection()調用鏈:
JmsAccessor.createConnection()
JmsAccessor.getConnectionFactory().createConnection()
可以看出,這兩個分支最后都是到連接工廠中調用createConnection(),查看下代碼:
package org.springframework.jms.connection; public class SingleConnectionFactory ... public Connection createConnection() throws JMSException { Object var1 = this.connectionMonitor; synchronized (this.connectionMonitor) { if (this.connection == null) { this.initConnection(); } return this.connection; } } ... }
使用SingleConnectionFactory時,如果連接對象connection不為空,即使此時連接失效,依然不會進入initConnection,所以雖然connection返回了非null值,但是這個連接其實是壞的
至此,原因查明,將SingleConnectionFactory改為CachingConnectionFactory后,此問題消失,當客戶端發現連接失效后,能夠主動連接到MQ服務器
思考:為何設置了“reconnectOnException”屬性后,就能夠自動重連了呢?
找到代碼中使用了這個變量的位置:
package org.springframework.jms.connection; public class SingleConnectionFactory ... protected void prepareConnection(Connection con) throws JMSException { if (this.getClientId() != null) { con.setClientID(this.getClientId()); } if (this.getExceptionListener() != null || this.isReconnectOnException()) { ExceptionListener listenerToUse = this.getExceptionListener(); if (this.isReconnectOnException()) { listenerToUse = new InternalChainedExceptionListener(this, (ExceptionListener) listenerToUse); } con.setExceptionListener((ExceptionListener) listenerToUse); } } ... }
代碼中添加了一個異常監聽器,此監聽器觸發的代碼為:
package org.springframework.jms.connection; public class SingleConnectionFactory { ... public void onException(JMSException ex) { this.logger.warn("Encountered a JMSException - resetting the underlying JMS Connection", ex); this.resetConnection(); } public void resetConnection() { Object var1 = this.connectionMonitor; synchronized (this.connectionMonitor) { if (this.target != null) { this.closeConnection(this.target); } this.target = null; this.connection = null; } } ... }
可見,當發生異常時,異常監聽器調用了resetConnection()函數,此函數會將connection設置為null,然后DefaultMessageListenerContainer中的監視線程經過一段時間即能重新建立此連接