摘要
一般來說,如果為JmsTemplate做了事務配置,那么它將會與當前線程的數據庫事務掛鈎,並且僅在數據庫事務的afterCommit動作中提交。
但是,如果一個MessageListener在接收Jms消息的同時,也使用JmsTemplate發送了Jms消息;那么它發送的Jms消息將與數據庫事務無關(即使為JmsTemplate做了事務配置),而是與Listener接收消息保持在同一個事務中。
問題
問題是一位同事發現的。
賬務系統的墊付功能存在REST和MessageListener兩個入口;兩個入口中調用的是同一套代碼和業務邏輯。但是,REST入口中發送的Jms消息會隨着數據庫事務回滾而回滾;MessageListener中卻不會回滾。相關流程圖說明如下。
我們期望的結果是:在還款操作中發送的Jms消息,隨還款操作的數據庫事務回滾而取消(紅色底色部分的操作);而墊付操作中發送的Jms消息,則應隨墊付操作的數據庫事務提交而提交(綠色底色部分的操作)。這一點在REST入口的相關日志和數據中得到了驗證。但是,從MessageListener入口調用此服務時,卻出現了問題:雖然還款服務的數據庫事務確實回滾了,但是其中的Jms消息卻成功發送了出來(參見紅色字體部分)。


分析
首先,REST入口的操作、結果是正確的。這說明,當數據庫事務回滾時,Jms消息確實沒有提交。那么,可以肯定一點:一定是MessageListener后續處理中做了提交消息這個動作。
經過一系列的Debug和逐行執行、分析,我找到了這段代碼。
MessageListener接收到消息后,會進入org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute( Object invoker, Session session, MessageConsumer consumer)方法中。由於沒有配置transactionManager,我們會通過doReceiveAndExecute(invoker, session, consumer, null)來調用org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute( Object invoker, Session session, MessageConsumer consumer, TransactionStatus status) 方法。
/** * Execute the listener for a message received from the given consumer, * wrapping the entire operation in an external transaction if demanded. * @param session the JMS Session to work on * @param consumer the MessageConsumer to work on * @return whether a message has been received * @throws JMSException if thrown by JMS methods * @see #doReceiveAndExecute */ protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer) throws JMSException { if ( this .transactionManager != null ) { // Execute receive within transaction. TransactionStatus status = this .transactionManager.getTransaction( this .transactionDefinition); boolean messageReceived; try { messageReceived = doReceiveAndExecute(invoker, session, consumer, status); } catch (JMSException ex) { rollbackOnException(status, ex); throw ex; } catch (RuntimeException ex) { rollbackOnException(status, ex); throw ex; } catch (Error err) { rollbackOnException(status, err); throw err; } this .transactionManager.commit(status); return messageReceived; } else { // Execute receive outside of transaction. return doReceiveAndExecute(invoker, session, consumer, null ); } } |
doReceiveAndExecute方法又會調用org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener( Session session, Message message)方法,以便於執行我們編寫的業務代碼,並處理Jms相關的事務。如果業務代碼中沒有拋出異常,那么就會進入org.springframework.jms.listener.AbstractMessageListenerContainer.commitIfNecessary( Session session, Message message)方法中。
/** * Execute the specified listener, * committing or rolling back the transaction afterwards (if necessary). * @param session the JMS Session to operate on * @param message the received JMS Message * @throws JMSException if thrown by JMS API methods * @see #invokeListener * @see #commitIfNecessary * @see #rollbackOnExceptionIfNecessary * @see #convertJmsAccessException */ protected void doExecuteListener(Session session, Message message) throws JMSException { if (!isAcceptMessagesWhileStopping() && !isRunning()) { if (logger.isWarnEnabled()) { logger.warn( "Rejecting received message because of the listener container " + "having been stopped in the meantime: " + message); } rollbackIfNecessary(session); throw new MessageRejectedWhileStoppingException(); } try { invokeListener(session, message); } catch (JMSException ex) { rollbackOnExceptionIfNecessary(session, ex); throw ex; } catch (RuntimeException ex) { rollbackOnExceptionIfNecessary(session, ex); throw ex; } catch (Error err) { rollbackOnExceptionIfNecessary(session, err); throw err; } commitIfNecessary(session, message); } |
commitIfNecessary方法幾經輾轉,最終會調用到org.apache.activemq.ActiveMQConnection.syncSendPacket( Command command) 方法。這個方法的作用,是將當前Connection中的數據同步到MQ服務端。也就是在這個方法執行完畢之后,不應當發送的消息被發送了出去。
public Response syncSendPacket(Command command) throws JMSException { if (isClosed()) { throw new ConnectionClosedException(); } else { try { Response response = (Response) this .transport.request(command); if (response.isException()) { ExceptionResponse er = (ExceptionResponse)response; if (er.getException() instanceof JMSException) { throw (JMSException)er.getException(); } else { if (isClosed()||closing.get()) { LOG.debug( "Received an exception but connection is closing" ); } JMSException jmsEx = null ; try { jmsEx = JMSExceptionSupport.create(er.getException()); } catch (Throwable e) { LOG.error( "Caught an exception trying to create a JMSException for " +er.getException(),e); } if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){ forceCloseOnSecurityException(er.getException()); } if (jmsEx != null ) { throw jmsEx; } } } return response; } catch (IOException e) { throw JMSExceptionSupport.create(e); } } } |
這就是消息被錯誤發送的原因:MessageListener在接收消息的時候,獲取了一個Connection;后續發送消息時,用的是同一個Connection。因此,盡管中間的數據庫事務回滾了,但由於這個Connection最終要提交(MessageListner中沒有拋出異常),用這個Connection發送的所有消息最終都被提交到了MQ上。
解決方案
方案一:使用JmsTransactionManager來管理Jms事務
可以通過以下配置,為MessageListner注入JmsTransactionManager:
< bean id = "jmsTransactionManager" class = "org.springframework.jms.connection.JmsTransactionManager" > < property name = "connectionFactory" ref = "jmsConnectionFactory" /> </ bean > < jms:listener-container destination-type = "queue" transaction-manager = "jmsTransactionManager" concurrency = "4" acknowledge = "transacted" connection-factory = "jmsConnectionFactory" > < jms:listener destination = "queue.thread.autopay" ref = "autoPayListener" /> </ jms:listener-container > |
但是在測試后發現,這樣配置並沒有用。我分析,JmsTransactionManager並不能為@Transactional(propagation = Propagation.REQUIRES_NEW)注解創建新的JmsConnection,因而,發送消息使用的仍然是接收消息時創建的connection。由於MessageListener中並沒有拋出異常,JmsTransactionManager仍然會提交這個connection中的數據,並最終導致消息提交。
方案二:手動將發送消息的操作放到數據庫事務的AfterCommit操作中
現有代碼中,我們是在事務體內執行JmsTemplate.send()操作;在事務的AfterCommit操作中執行Session.commit()。
如果我們將JmsTemplate.send()操作放到AfterCommit操作中,那么就可以確保只在數據庫事務提交后,才會提交Jms消息了。
此方案驗證可行。驗證代碼如下:
public void send(Event event, List<TransferParam> transferParams) { TransactionSynchronizationManager .registerSynchronization( new TransactionSynchronizationAdapter() { @Override public void afterCommit() { System.out.println( this .getClass() + " - " + event + "--" + transferParams); try { event.getTychoOperType().ifPresent( (value) -> { TychoProductor4Account. this .doSend(event, transferParams); }); } catch (Exception e) { System.out.println(e.getMessage()); TychoProductor4Account.LOGGER.error( "發送數據到tycho異常:{}" , e); } } }); } |
方案三:手動在數據庫事務的RollBack操作中回滾Jms消息
暫未找到實現方式。
方案四:嘗試為發送消息創建並使用新的Connection
代碼流程中之所以會使用同一個Connection,是因為接收、發送消息時,都是從線程上下文中嘗試獲取JmsResourceHolder,並從其中獲取連接的。
那么,簡單做法就是在接收到消息后,開啟一個子線程;復雜做法則是為JmsTransactionManager編寫識別@Transactional(propagation = Propagation.REQUIRES_NEW)注解的功能。
開啟子線程的方案可行。驗證代碼如下:
Future<Event> actualResult = this .keplerRestExecutor.submit(() -> { Event4Reserve event4Reserve = new Event4Reserve(); event4Reserve.setRecordId(recordId); event4Reserve.setUserId(ThreadConsts.SYSTEM_USER_ID); AutoPayListener4BaeEvent.LOGGER.debug( "event4Reserve={}" , event4Reserve); Event result = this .bizAccountEventService.handle(event4Reserve); AutoPayListener4BaeEvent.LOGGER.info( "result={}" , result); return result; }); try { actualResult.get(); } catch (InterruptedException e) { AutoPayListener4BaeEvent.LOGGER.error( "線程被中斷!" , e); throw new RuntimeException( "墊付線程中斷!" , e); } catch (ExecutionException e) { AutoPayListener4BaeEvent.LOGGER.error( "執行過程出錯!" , e); Throwable real = e.getCause(); if (real instanceof RuntimeException) { throw (RuntimeException) real; } else { throw new RuntimeException(real); } } |
方案五:使用org.springframework.jms.connection.CachingConnectionFactory
已驗證,方案無效。
測試配置如下:
< bean id = "jmsTransactionManager" class = "org.springframework.jms.connection.JmsTransactionManager" > < property name = "connectionFactory" ref = "jmsConnectionFactory" /> </ bean > < bean id = "jmsConnectionFactory" class = "org.springframework.jms.connection.CachingConnectionFactory" > < property name = "targetConnectionFactory" ref = "targetActiveMqConnectionFactory" /> < property name = "sessionCacheSize" value = "10" /> </ bean > < amq:connectionFactory id = "targetActiveMqConnectionFactory" brokerURL = "${jms.url.failover}" > < amq:redeliveryPolicyMap > < amq:redeliveryPolicyMap > < amq:defaultEntry > <!-- 5次,每次30秒 --> < amq:redeliveryPolicy maximumRedeliveries = "5" initialRedeliveryDelay = "30000" /> </ amq:defaultEntry > < amq:redeliveryPolicyEntries > <!-- 5次,每次10秒 --> < amq:redeliveryPolicy queue = "queue.thread.autopay" maximumRedeliveries = "5" initialRedeliveryDelay = "10000" /> </ amq:redeliveryPolicyEntries > < amq:redeliveryPolicyEntries > <!-- 銀聯實時划扣超時限制 ,5次,每次90秒 --> < amq:redeliveryPolicy queue = "queue.thread.instantUnionpay" maximumRedeliveries = "5" initialRedeliveryDelay = "90000" /> </ amq:redeliveryPolicyEntries > </ amq:redeliveryPolicyMap > </ amq:redeliveryPolicyMap > </ amq:connectionFactory > < jms:listener-container destination-type = "queue" transaction-manager = "jmsTransactionManager" concurrency = "4" acknowledge = "transacted" connection-factory = "jmsConnectionFactory" > < jms:listener destination = "queue.thread.autopay" ref = "autoPayListener" /> </ jms:listener-container > |
方案六:為jmsTemplate和MessageListener配置不同的ConnectionFactory
驗證可行。測試配置如下:
< bean id = "newJmsTemplate" class = "org.springframework.jms.core.JmsTemplate" > < property name = "connectionFactory" ref = "targetActiveMqConnectionFactory" /> < property name = "sessionTransacted" value = "true" /> < property name = "explicitQosEnabled" value = "${activemq.explicitQosEnabled}" /> < property name = "timeToLive" value = "86400000" /> </ bean > < amq:connectionFactory id = "targetActiveMqConnectionFactory" brokerURL = "${jms.url.failover}" > </ amq:connectionFactory > < jms:listener-container destination-type = "queue" concurrency = "4" acknowledge = "transacted" connection-factory = "jmsConnectionFactory" > < jms:listener destination = "queue.thread.autopay" ref = "autoPayListener" /> </ jms:listener-container > < amq:connectionFactory id = "jmsConnectionFactory" brokerURL = "${jms.url.failover}" > </ amq:connectionFactory > |
后續工作
基本已經驗證完畢。