盡管消息接收可以使用消息監聽器的方式替代模版方法,但是在發送的時候是無法替代的,在Spring中必須要使用JmsTemplate提供的方法來進行發送操作,可見JmsTemplate類的重要性,那么我們對於Spring整合消息服務的分析就從JmsTemplate開始。
查看JmsTemplate的類型層級結構圖發現實現了InitializingBean接口,接口方法實現是在JmsAccessor類中。發現函數中只是一個驗證的功能,並沒有邏輯實現。
public void afterPropertiesSet() { if (getConnectionFactory() == null) { throw new IllegalArgumentException("Property 'connectionFactory' is required"); } }
在Spring中發送消息可以通過JmsTemplate中提供的方法來實現。
public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException { execute(new SessionCallback<Object>() { @Override public Object doInJms(Session session) throws JMSException { Destination destination = resolveDestinationName(session, destinationName); doSend(session, destination, messageCreator); return null; } }, false); }
現在的風格不得不讓我們回想起JdbcTemplate的類實現風格,極為相似,都是提取一個公共的方法作為最底層、最通用的功能實現,然后又通過回調函數的不同來區分個性化的功能。我們首先查看通用代碼的抽取實現。
通用代碼抽取execute()
在execute中封裝了Connection以及Session的創建操作
public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException { Assert.notNull(action, "Callback object must not be null"); Connection conToClose = null; Session sessionToClose = null; try { //事務相關 Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( getConnectionFactory(), this.transactionalResourceFactory, startConnection); if (sessionToUse == null) { //創建connection conToClose = createConnection(); //根據connection創建session sessionToClose = createSession(conToClose); //是否開啟向服務器推送連接信息,只有接收信息時需要,發送時不需要 if (startConnection) { conToClose.start(); } sessionToUse = sessionToClose; } if (logger.isDebugEnabled()) { logger.debug("Executing callback on JMS Session: " + sessionToUse); } //調用回調函數 return action.doInJms(sessionToUse); } catch (JMSException ex) { throw convertJmsAccessException(ex); } finally { //關閉session JmsUtils.closeSession(sessionToClose); //釋放連接 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection); } }
為了發送一條消息需要做很多工作,需要很多的輔助代碼,而這些代碼又都是千篇一律的,沒有任何的差異,所以execute方法的目的就是幫助我們抽離這些冗余代碼使我們更加專注於業務邏輯的實現。從函數中看,這些冗余代碼包括創建Connection、創建Session、當然也包括關閉Session和關閉Connection。而在准備工作結束后,調用回調函數將程序引入用戶自定義實現的個性化處理。
發送消息的實現
有了基類輔助實現,使Spring更加專注於個性的處理,也就是說Spring使用execute方法中封裝了冗余代碼,而將個性化的代碼實現放在了回調函數doInJms函數中。在發送消息的功能中回調函數通過局部類實現。
protected void doSend(Session session, Destination destination, MessageCreator messageCreator) throws JMSException { Assert.notNull(messageCreator, "MessageCreator must not be null"); MessageProducer producer = createProducer(session, destination); try { Message message = messageCreator.createMessage(session); if (logger.isDebugEnabled()) { logger.debug("Sending created message: " + message); } doSend(producer, message); // Check commit - avoid commit call within a JTA transaction. if (session.getTransacted() && isSessionLocallyTransacted(session)) { // Transacted session created by this template -> commit. JmsUtils.commitIfNecessary(session); } } finally { JmsUtils.closeMessageProducer(producer); } }
在發送消息遵循着消息發送的規則,比如根據Destination創建MessageProducer、創建Message,並使用MessageProducer實例來發送消息。
接收消息
我們通常使用jmsTemplate.receive(destination)來接收簡單的消息,那么這個功能Spring是如何封裝的呢?
@Override public Message receive(Destination destination) throws JmsException { return receiveSelected(destination, null); } @Override public Message receiveSelected(final Destination destination, final String messageSelector) throws JmsException { return execute(new SessionCallback<Message>() { @Override public Message doInJms(Session session) throws JMSException { return doReceive(session, destination, messageSelector); } }, true); } protected Message doReceive(Session session, Destination destination, String messageSelector) throws JMSException { return doReceive(session, createConsumer(session, destination, messageSelector)); } protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException { try { // Use transaction timeout (if available). long timeout = getReceiveTimeout(); JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory()); if (resourceHolder != null && resourceHolder.hasTimeout()) { timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis()); } Message message = doReceive(consumer, timeout); if (session.getTransacted()) { // Commit necessary - but avoid commit call within a JTA transaction. if (isSessionLocallyTransacted(session)) { // Transacted session created by this template -> commit. JmsUtils.commitIfNecessary(session); } } else if (isClientAcknowledge(session)) { // Manually acknowledge message, if any. if (message != null) { message.acknowledge(); } } return message; } finally { JmsUtils.closeMessageConsumer(consumer); } }
實現的套路與發送差不多,同樣還是使用execute函數來封裝冗余的公共操作,而最終的目標還是通過consumer.receive()來接收消息,其中的過程就是對於MessageConsumer的創建以及一些輔助操作。