Java消息機制
1、問: 什么是 Java 消息服務?
答: Java 消息服務(Java Message Service,JMS) API 是一個用於訪問企業消息傳遞系統的 API。是 Java 2 Platform, Enterprise(J2EE)的一部分。
2、目前流行的消息傳送產品有哪些?
答:目前流行的有ActiveMQ、IBM WebSphere MQ、SonicMQ等
3、什么時候可以用到java消息機制?
答:(1)異構系統集成,整合現有資源,提高資源的利用率
(2)異步請求處理,減輕或消除系統瓶頸,提高用戶生產率和系統的整體可伸縮性
(3)組件解偶,增加系統的靈活性
4、消息傳送的兩種模型
發布/訂閱模型
客戶端發送消息到一個名為主題(topic)的虛擬通道中,每個訂閱該主題的消費者都會接收到每條消息的一個副本。
點對點模型
客戶端通過隊列(queue)這個虛擬通道來同步和異步發送、接收消息,發送到隊列的消息只能被一個接收者所接收,即使有多個消費者時也只能有一個消費者處理消息
5、JMS包含哪些接口?
JMS API可以分為3個主要部分:公共API、點對點API和發布/訂閱API。在JMS1.1中,公共API可被用於向一個隊列或一個主題發送消息,或從其中接收消息。點對點API專門用於使用隊列的消息傳送,而發布/訂閱API則專門用於使用主題的消息傳送。
在JMS公共API內部,和發送和接收JMS消息有關的JMS API接口主要有7個:
l ConnectionFactory
l Destination
l Connection
l Session
l Message
l MessageProducer
l MessageConsumer
在這些公共接口中,ConnectionFactory和Destination必須使用JNDI(遵照JMS規范要求)從提供者處獲得。其他接口則可以通過工廠方法在不同的API接口中創建。舉例來說,一旦有了一個ConnectionFactory,就可以創建一個 Connection。一旦有了一個Connection,就可以創建一個Session。而一旦有了一個Session,就可以創建一個 Message、MessageProducer和MessageConsumer。這7個主要的JMS公共API接口之間的關系如圖1-5所示。
在JMS中,是Session對象保存着用於消息傳送的事務性工作單元(transactional unit),而不是Connection對象。這和JDBC不同,JDBC中是Connection對象保存事務性工作單元。這就意味着在使用JMS時,一個應用程序通常只會有一個Connection對象,但是它可以有一個Session對象池。
另外,還有和異常處理、消息優先級及消息持久性有關的其他接口
6、java消息分為哪些部分?
消息頭、消息屬性、消息自身
7、消息過濾
消息訂閱者需要對消息進行過濾,否則訂閱者就會接受到主題或隊列的每一條消息,浪費了不必要的資源(CPU、內存等),而使用消息過濾技術,能讓訂閱者只接受它需要的消息。(消息過濾對於隊列消費尤其重要,因為一個隊列消費者消費消息后其他消費者就不再可用,此時如果不對消息進行過濾處理,這條消息就很可能被浪費掉)。
消息選擇器使用消息屬性和消息頭作為條件表達式的傳送載體(消息體不能作為消息選擇器的參考對象)。
消息選擇器由標識符、常量和比較運算符組成:
例:
創建一個消息如下:
TextMessage textMessage = Session.createTextMessage();
textMessage.setText(“mytestMsg”);
textMessage.setStringProperty(“city”,”hangzhou”);
textMessage.setStringProperty(“company”,”mycompany”);
這條消息中設置的消息屬性名“city”和“company”代表消息選擇器的標識符,”hangzhou”和”mycompany”代表常量
在消費端創建一個選擇器:
String selector = “city = 'hangzhou' AND company='mycompany'”;
QueueReceiver qReceiver = qSession.createReceiver(testQ,selector);
其中“=”和“AND”為比較運算符,其他常用比較運算符還有:
算數比較運算符(=、>、<、<=、<>等)
like運算符、BETWEEN運算符、IN運算符、IS NULL運算符等
8 、消息過濾
消息訂閱者需要對消息進行過濾,否則訂閱者就會接受到主題或隊列的每一條消息,浪費了不必要的資源( CPU、內存等),而使用消息過濾技術,能讓訂閱者只接受它需要的消息。(消息過濾對於隊列消費尤其重要,因為一個隊列消費者消費消息后其他消費者就不再可用,此時如果不對消息進行過濾處理,這條消息就很可能被浪費掉)。
消息選擇器使用消息屬性和消息頭作為條件表達式的傳送載體(消息體不能作為消息選擇器的參考對象)。
消息選擇器由標識符、常量和比較運算符組成:
例:
創建一個消息如下:
TextMessage textMessage = Session.createTextMessage();
textMessage.setText(“mytestMsg”);
textMessage.setStringProperty(“city”,”hangzhou”);
textMessage.setStringProperty(“company”,”alibaba”);
這條消息中設置的消息屬性名 “city” 和 “company” 代表消息選擇器的標識符, ”hangzhou” 和 ”alibaba”代表常量
在消費端創建一個選擇器:
String selector = “city = 'hangzhou' AND company='alibaba' ”;
QueueReceiver qReceiver = qSession.createReceiver(testQ,selector);
其中 “=” 和 “AND” 為比較運算符,其他常用比較運算符還有:
算數比較運算符( = 、 > 、 < 、 <= 、 <> 等)
like 運算符、 BETWEEN 運算符、 IN 運算符、 IS NULL 運算符等
9 、消息傳送的可靠性
在消息的傳送過程中由於網絡、軟硬件故障等都會導致消息的發送失敗, jms 為保證消息的傳送定義了 3 條法則:
( 1 )消息自主性,消息是自包含的自主性實體,當發送端發出這條消息后這條消息就不再受發送端的限制,它可以在多個進程間被多次發送。
( 2 )存儲轉發,當消息被標記位持久性消息時,就由 jms 提供者利用 “ 保存並轉發 ” 機制,將消息保存在可信的介質上,防止發生故障時仍然可以正常恢復
( 3 )消息確認機制,服務器確認已經從發送端收到了消息,消費者則從確認從服務器接收了消息,對消息傳送過程的監控,保證了消息的可靠傳送。 消息確認的 3 種模式:
1 、 AUTO_ACKNOWLEDGE
從消息生產者角度:發送消息后就開始阻塞,直到從消息服務器收到回復,期間如發生異常則認為消息未被傳送
從消息服務器角度:非持久消息在接受到消息后通知生產者,並將消息存入內存,持久性消息在接受道消息后先存入磁盤,然后通知生產者
從消費者角度:接受到消息后就向服務器發送確認信息,如果服務器沒有收到確認,會重新發送
2 、 CLIENT_ACKNOWLEDGE
消費者可在處理完業務邏輯后在代碼重顯示調用 message.acknowledge() 通知 jms 提供者已成功接收道消息
3 、 DUPS_ACKNOWLEDGE
可將一條消息向同一目的地發送兩次以上
這里介紹兩個概念:
( 1 )持久化消息:消息持久化就是在發送者將消息發送出去后,消息中心首先將消息存儲到本地數據文件、內存數據庫或者遠程數據庫等,然后試圖將消息發送給接收 者,發送成功則將消息從存儲中刪除,失敗則繼續嘗試。消息中心啟動以后首先要檢查制定的存儲位置,如果有未發送成功的消息,則需要把消息發送出去。通過在消息頭設置來實現:如
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
(2) 持久訂閱者和非持久訂閱者:非持久訂閱者是指某個訂閱者由於某種原因停止運行,那么在停止運行期間發布到該訂閱者所訂閱主題的消息就會無法獲得,持久訂閱者則剛好相反,持久訂閱者會接收發送到訂閱主題的所有消息,無論訂閱者是否正常運行,電子郵件就是類似的一個例子。
持久化消息和持久訂閱者在服務器和消費者端之間的消息傳送保證機制比較類似,但在有一種情況下他們還是存在區別的,對持久訂閱者來說,當消息服務器向發送者發送確認消息之后,並為當前未運行的持久訂閱者將消息保存到介質之間如果發送故障,該消息就會丟失,而持久化消息則是先保存消息道介質,然后才向發送者發送確認消息,所以不存在這個問題,因此嚴格來說持久化消息的可靠性會更高。
10 、事務性消息
jms 事務性保證了一組發送的消息或接收的消息要么全部成功要么全部失敗,概念上和我們在java 中使用的jta 相似,但jms 事務是由jms 提供者來管理的,而不是jta 。
使用方法如下:
// 此處用true ,表示使用事務性消息
Session session =connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 用來發送的3 個消息
MessageProducer sender = session.createProducer(“queue/testQueue”);
try{
TextMessage message1 = session.createTextMessage(“ 要發送的消息1”);
sender.send(message);
TextMessage message2 = session.createTextMessage(“ 要發送的消息2”);
sender.send(message);
TextMessage message3 = session.createTextMessage(“ 要發送的消息3”);
sender.send(message);
session.commit();
}catch(Exception e){
try{
session.rollback();
}catch(JMSException e){
}
}
