簡介
在前面一篇文章里討論過幾種應用系統集成的方式,發現實際上面向消息隊列的集成方案算是一個總體比較合理的選擇。這里,我們先針對具體的一個消息隊列Activemq的基本通信方式進行探討。activemq是JMS消息通信規范的一個實現。總的來說,消息規范里面定義最常見的幾種消息通信模式主要有發布-訂閱、點對點這兩種。另外,通過結合這些模式的具體應用,我們在處理某些應用場景的時候也衍生出來了一種請求應答的模式。下面,我們針對這幾種方式一一討論一下。
基礎流程
在討論具體方式的時候,我們先看看使用activemq需要啟動服務的主要過程。
按照JMS的規范,我們首先需要獲得一個JMS connection factory.,通過這個connection factory來創建connection.在這個基礎之上我們再創建session, destination, producer和consumer。因此主要的幾個步驟如下:
1. 獲得JMS connection factory. 通過我們提供特定環境的連接信息來構造factory。
2. 利用factory構造JMS connection
3. 啟動connection
4. 通過connection創建JMS session.
5. 指定JMS destination.
6. 創建JMS producer或者創建JMS message並提供destination.
7. 創建JMS consumer或注冊JMS message listener.
8. 發送和接收JMS message.
9. 關閉所有JMS資源,包括connection, session, producer, consumer等。
publish-subscribe
發布訂閱模式有點類似於我們日常生活中訂閱報紙。每年到年尾的時候,郵局就會發一本報紙集合讓我們來選擇訂閱哪一個。在這個表里頭列了所有出版發行的報紙,那么對於我們每一個訂閱者來說,我們可以選擇一份或者多份報紙。比如北京日報、瀟湘晨報等。那么這些個我們訂閱的報紙,就相當於發布訂閱模式里的topic。有很多個人訂閱報紙,也有人可能和我訂閱了相同的報紙。那么,在這里,相當於我們在同一個topic里注冊了。對於一份報紙發行方來說,它和所有的訂閱者就構成了一個1對多的關系。這種關系如下圖所示:
p2p
p2p的過程則理解起來更加簡單。它好比是兩個人打電話,這兩個人是獨享這一條通信鏈路的。一方發送消息,另外一方接收,就這么簡單。在實際應用中因為有多個用戶對使用p2p的鏈路,它的通信場景如下圖所示:
在p2p的場景里,相互通信的雙方是通過一個類似於隊列的方式來進行交流。和前面pub-sub的區別在於一個topic有一個發送者和多個接收者,而在p2p里一個queue只有一個發送者和一個接收者。
這兩種通信模式的代碼實現有很多相同之處,下面我們用一個例子來簡單實現這兩種通信方式:
發送者
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.StringReader; import java.util.StringTokenizer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Publisher { public static final String url = "tcp://localhost:61616";// 缺省端口,如果要改,可在apache-activemq-5.13.3\conf中的activemq.xml中更改端口號 ConnectionFactory factory; Connection connection; Session session; MessageProducer producer; Destination[] destinations; ComunicateMode comunicateMode = ComunicateMode.pubsub; enum ComunicateMode { p2p, pubsub } public Publisher(ComunicateMode mode) throws JMSException { this.comunicateMode = mode; factory = new ActiveMQConnectionFactory(url);// 這里的url也可以不指定,java代碼將默認將端口賦值為61616 connection = factory.createConnection(); try { connection.start(); } catch (JMSException e) { connection.close(); throw e; } session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); } protected void setDestinations(String[] stocks) throws JMSException { destinations = new Destination[stocks.length]; for (int i = 0; i < stocks.length; i++) { destinations[i] = comunicateMode == ComunicateMode.pubsub ? session .createTopic("Topic." + stocks[i]) : session .createQueue("Queue." + stocks[i]); } } protected void sendMessage(String msg) throws JMSException { for (Destination item : destinations) { TextMessage msgMessage = session.createTextMessage(msg); producer.send(item, msgMessage); System.out.println(String.format("成功向Topic為【%s】發送消息【%s】", item.toString(), msgMessage.getText())); } } protected void close() throws JMSException { if (connection != null) connection.close(); } public static void main(String[] args) throws JMSException, InterruptedException, IOException { Publisher publisher = new Publisher(ComunicateMode.p2p);// 這里可以修改消息傳輸方式為pubsub publisher.setDestinations(new String[] { "1", "2", "3" }); BufferedReader reader = null; String contentString = ""; do { System.out.println("請輸入要發送的內容(exit退出):"); reader = new BufferedReader(new InputStreamReader(System.in)); contentString = reader.readLine(); if (contentString.equals("exit")) break; publisher.sendMessage(contentString); } while (!contentString.equals("exit")); reader.close(); publisher.close(); } }
接收者
import java.io.IOException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer { public static final String url = "tcp://localhost:61616";// 缺省端口,如果要改,可在apache-activemq-5.13.3\conf中的activemq.xml中更改端口號 ConnectionFactory factory; Connection connection; Session session; MessageConsumer[] consumers; ComunicateMode comunicateMode = ComunicateMode.pubsub; enum ComunicateMode { p2p, pubsub } public Consumer(ComunicateMode mode, String[] destinationNames) throws JMSException { this.comunicateMode = mode; factory = new ActiveMQConnectionFactory(url);// 這里的url也可以不指定,java代碼將默認將端口賦值為61616 connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); consumers = new MessageConsumer[destinationNames.length]; for (int i = 0; i < destinationNames.length; i++) { Destination destination = comunicateMode == ComunicateMode.pubsub ? session .createTopic("Topic." + destinationNames[i]) : session .createQueue("Queue." + destinationNames[i]); consumers[i] = session.createConsumer(destination); consumers[i].setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println(String.format("收到消息【%s】", ((TextMessage) message).getText())); } catch (JMSException e) { e.printStackTrace(); } } }); } } public void close() throws JMSException { if (connection != null) connection.close(); } public static void main(String[] args) throws JMSException, IOException { Consumer consumer = new Consumer(ComunicateMode.p2p, new String[] { "2" });// 這里可以修改消息傳輸方式為pubsub System.in.read(); consumer.close(); } }
request-response
和前面兩種方式比較起來,request-response的通信方式很常見,但是不是默認提供的一種模式。在前面的兩種模式中都是一方負責發送消息而另外一方負責處理。而我們實際中的很多應用相當於一種一應一答的過程,需要雙方都能給對方發送消息。於是請求-應答的這種通信方式也很重要。它也應用的很普遍。
請求-應答方式並不是JMS規范系統默認提供的一種通信方式,而是通過在現有通信方式的基礎上稍微運用一點技巧實現的。下圖是典型的請求-應答方式的交互過程:
在JMS里面,如果要實現請求/應答的方式,可以利用JMSReplyTo和JMSCorrelationID消息頭來將通信的雙方關聯起來。另外,QueueRequestor和TopicRequestor能夠支持簡單的請求/應答過程。
現在,如果我們要實現這么一個過程,在發送請求消息並且等待返回結果的client端的流程如下:
client端創建一個臨時隊列並在發送的消息里指定了發送返回消息的destination以及correlationID。那么在處理消息的server端得到這個消息后就知道該發送給誰了。Server端的大致流程如下:
這里我們是用server端注冊MessageListener,通過設置返回信息的CorrelationID和JMSReplyTo將信息返回。
以上就是發送和接收消息的雙方的大致程序結構。具體的實現代碼如下:
Client:
這里的代碼除了初始化構造函數里的參數還同時設置了兩個destination,一個是自己要發送消息出去的destination,在session.createProducer(adminQueue);這一句設置。另外一個是自己要接收的消息destination, 通過Destination tempDest = session.createTemporaryQueue(); responseConsumer = session.createConsumer(tempDest); 這兩句指定了要接收消息的目的地。這里是用的一個臨時隊列。在前面指定了返回消息的通信隊列之后,我們需要通知server端知道發送返回消息給哪個隊列。於是txtMessage.setJMSReplyTo(tempDest);指定了這一部分,同時txtMessage.setJMSCorrelationID(correlationId);方法主要是為了保證每次發送回來請求的server端能夠知道對應的是哪個請求。這里一個請求和一個應答是相當於對應一個相同的序列號一樣。
同時,因為client端在發送消息之后還要接收server端返回的消息,所以它也要實現一個消息receiver的功能。這里采用實現MessageListener接口的方式:
public void onMessage(Message message) { String messageText = null; try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; messageText = textMessage.getText(); System.out.println("messageText = " + messageText); } } catch (JMSException e) { //Handle the exception appropriately } }
Server:
這里server端要執行的過程和client端相反,它是先接收消息,在接收到消息后根據提供的JMSCorelationID來發送返回的消息:
前面,在replyProducer.send()方法里,message.getJMSReplyTo()就得到了要發送消息回去的destination。
另外,設置這些發送返回信息的replyProducer的信息主要在構造函數相關的方法里實現了:
總體來說,整個的交互過程並不復雜,只是比較繁瑣。對於請求/應答的方式來說,這種典型交互的過程就是Client端在設定正常發送請求的Queue同時也設定一個臨時的Queue。同時在要發送的message里頭指定要返回消息的destination以及CorelationID,這些就好比是一封信里面所帶的回執。根據這個信息人家才知道怎么給你回信。對於Server端來說則要額外創建一個producer,在處理接收到消息的方法里再利用producer將消息發回去。這一系列的過程看起來很像http協議里面請求-應答的方式,都是一問一答。
一些應用和改進
回顧前面三種基本的通信方式,我們會發現,他們都存在着一定的共同點,比如說都要初始化ConnectionFactory, Connection, Session等。在使用完之后都要將這些資源關閉。如果每一個實現它的通信端都這么寫一通的話,其實是一種簡單的重復。從工程的角度來看是完全沒有必要的。那么,我們有什么辦法可以減少這種重復呢?
一種簡單的方式就是通過工廠方法封裝這些對象的創建和銷毀,然后簡單的通過調用工廠方法的方式得到他們。另外,既然基本的流程都是在開頭創建資源在結尾銷毀,我們也可以采用Template Method模式的思路。通過繼承一個抽象類,在抽象類里提供了資源的封裝。所有繼承的類只要實現怎么去使用這些資源的方法就可以了。Spring中間的JMSTemplate就提供了這種類似思想的封裝。具體的實現可以參考這篇文章。
總結
activemq默認提供了pub-sub, p2p這兩種通信的方式。同時也提供了一些對request-response方式的支持。實際上,不僅僅是activemq,對於所有其他實現JMS規范的產品都能夠提供類似的功能。這里每種方式都不太復雜,主要是創建和管理資源的步驟顯得比較繁瑣。