三.JMS API簡析
頂級接口 |
P2P |
Pub/sub |
備注 |
ConnectionFactory |
QueueConnectionFactory |
TopicConnectionFactory |
基於工廠模式,創建和JMS提供者之間的鏈接,需要制定鏈接的URL或者協議;任何JMS客戶端和JMS提供者之間的交互,都必須基於制定的連接. |
Destination |
Queue |
Topic |
“目的地”,用於描述消息的通道類型,是JMS提供者用於標記消息歸屬類型的”標記”. |
Connection |
QueueConnection |
TopicConnection |
“鏈接”,用於描述一個實際的網絡通訊鏈接,比如TCP/UDP等,任何交互數據,都必須通過”鏈接”進行傳輸,JMS實現者負責定義數據格式(協議);在物理層用於區分JMS客戶端,一般而言,一個應用只會有一個”鏈接”. |
Session |
QueueSession |
TopicSession |
“會話”,在邏輯上用於區分JMS客戶端,因為”鏈接”可被公用以提高網絡利用率;每個session可以支持相互獨立的”事務”和相關屬性.每個session都有ID. |
Message |
-- |
-- |
“消息”,JMS API中提供了多種類型Message,它們有各自的”序列化/反序列化”機制;消息中可以包含多種JMS屬性以及客戶端自定義的消息屬性和內容. |
MessageProducer |
QueueSender |
TopicPublisher |
“生產者”,一種可以向JMS提供者提交消息的客戶端類型. |
MessageConsumer |
QueueReceiver |
TopicSubscriber |
“消費者”,一種可以向JMS提供獲取消息的客戶端類型. |
PTP(point-to-point)即點對點消息傳輸模型。PTP如下圖所示:
我們可以看到,一個或多個生產者發送消息,消息m2先抵達了queue,然后m1也發出了,並一同存在於一個先進先出的queue里面。消費者也存在一個或多個,對queue里的消息進行消費。但消息被隨機的一個消費者消費且僅消費一次。
在pub/sub消息模型中,消息被廣播給所有訂閱者。如下圖:
接口關系如下:可以看到一個JMS應用的
發送端的標准流程是:創建連接工廠>創建連接>創建session>創建發送者>創建消息體>發送消息到Destination(queue或topic)。
接收端則為:創建連接工廠>創建連接>創建session>創建接收者>創建消息監聽器監聽某Destination的消息>獲取消息並執行業務邏輯
四. JMS-API詳解部分
1. ConnectionFactory接口:
鏈接工廠,用於創建"鏈接",此處所指的連接為底層實際物理連接,即TCP連接(Socket通道).此接口有多個子接口:QueueConnectionFactory(用來創建Queue消息類型的鏈接),TopicConnectFactory,XAQueueConnectionFactory(基於XA分布式事務的Queue鏈接),XATopicConnectionFactory.
ConnectionFactory接口中並沒有約束建立鏈接所使用的協議、URL、安全策略等;這一切就交給JMS Provider去實現。通常情況下ConnectionFactory實例為單例,而且推薦以JNDI的方式獲取.
Connection createConnection()Connection createConnection(String username,String password):使用簡單的密碼校驗方式來創建鏈接.如果JMS Server端對受托管的queue/topic配置了需要授權才能訪問,那么在建立相應的鏈接時需要交付密碼.
2. Connection接口:
代表底層一個物理(或者邏輯上)的一個Socket鏈接通道,此鏈接將會保持活躍直到JMS Client關閉或者JMS提供者(即JMS server端)在socket上阻塞超時.通常情況下,Connection為一個TCP鏈接(長連接),一個JMS Client建議維持一個Connection即可,事實上Queue/Topic不同類型的Client,那么也將創建不同的Connection.對於Server而言,Socket的資源開支是昂貴的,盡量避免一個Client創建多個Connection的情況.
消息將通過"數據格式協議"在socket通道中傳輸,同一個connection中消息的發送/接受是有順序的,這受限於Socket本身對流數據操作的特性.
JMS Provider會為每個Connection生成ID,用來監控鏈接、消息分組等;JMS server端將會維護當前所有存活的Connection列表。
void start(): "開啟"消息接收,此后即可接收消息;不過對於Producer而言,無論鏈接處於何種狀態,均可以發送消息;此方法主要對消費者有效.此操作對Connection上所有的session都有效.
void stop():"終止"消息接收,此后將不能接收到消息;當鏈接重新被start之后,將仍然可以繼續接收.
void close():關閉鏈接,底層為直接關閉socket;與Connection有關的臨時(temporary)目的地都將被刪除(包括TemporaryQueue,TemporaryTopic),以及此Connection有關的Sessions/productor/consumer都將被關閉.JMS規范要求,如果close方法返回意味着connection中所有的send操作已經結束,消息接收的receive方法已經返回(或中斷);close方法會導致事務中的session無法繼續,可能會rollback.Session createSession(boolean transacted,int acknowledgeMode):創建會話,並指定此Session的事務性和消息確認模式.
String getClientID():獲得當前JMS Client的ID;每個Connection,對於JMS提供者而言,就被認為是一個Client,clientId用來表示全局中唯一的一個鏈接,有server端生成;不同的JMS提供者對此ID的生成策略有所不同.
void setClientID():設置ClientID,此操作需要在建立Connection之后,未使用Connection進行任何實際操作之前(包括創建session)進行;否則將會拋出異常.因為clientID是JMS server用來唯一標記鏈接的,因此在全局中不能重復,如果嘗試設定一個已有的ClientId,將會拋出InvalidClientIDException.不過此方法可能在某些JMS Provider上不被支持.
void setExceptionListener(ExceptionListener listener):設定Connection失效異常監聽器,當JMS Client檢測到鏈接異常,比如鏈接異常斷開,將會通知此listener,可以在listener中做一些日志記錄/補救措施,比如重新建立鏈接/會話恢復等.
ConnectionMetaData getMetaData():獲取JMS Provider中有關的元數據信息,比如版本號等.
3. DeliveryMode接口:
"消息傳輸模式",此屬性可以在session中指定,也可以在發送消息時指定;用來標記此消息是否需要被持久化,對於JMS而言,支持2種(作為DeliveryMode的靜態屬性):
1) PERSISTENT: 表示消息持久化,對於JMS Provider而言,這種類型的消息將會被存儲在磁盤上;以確保在server故障恢復后,消息仍然保留.
2) NON_PERSISTENT:表示消息非持久化,消息有可能被優先存儲在內存中,或者存儲在磁盤上某個臨時文件上;當server故障失效后,消息將不能被恢復.
4.Destination接口:
用來表示一個虛擬通道,或者說"目的地",消息的發送或者接收,都需要指定的destination.常見的2個子接口為:Queue和Topic;根據Destination的約束條件不同,可以分為"受托管Destination"、"動態Destination"、“臨時Destination”。
其中"受托管Destination"為JMS Provider中通過配置的方式聲明的,無法通過外部API直接修改,此destination的使用需要受到JMS server管理員的授權等.
"動態Destination"為通過JMS API方式創建的,比如session.createQueue(String queueName);這種類型的destination可以給JMS Client使用者提供了更多的自由空間,更加常用.
"臨時Destination"相對於"durable"(耐久的),這種類型的destination只能被當前Client感知到,它的生命周期和使用范圍局限於Connection;即只有創建它的session所屬的Connection中的其他消費者或者生產者才能使用它.這種destination在某些場景下很有用.
5. ExceptionListener接口:
主要用來處理connection級別異常,比如鏈接異常斷開;你可以在此listener中增加比如"鏈接重建"/"會話恢復"等措施;但是對於業務異常,此listener將不負責管理.
void onMessage(JMSException exception)
6. Session接口:
最常用接口,考慮到Connection的資源開支較大,那么JMS提供了API級別的邏輯上的"鏈接",即Session;它可以更小粒度的控制消息屬性(比如,確認模式,事務支持)以及消息的發送和接收.
session通常在單線程中使用,無論是MessageProducer還是Consumer;而且通常情況下,一個Session只維護一個Producer實例或者Consumer實例;此外session的創建是非常便捷的.
通過Session,可以創建多種類型的Producer和Consumer,而且事務的支持,也被控制在session級別.因此在支持事務的情況下,多個consumer或者Producer公用一個Session實例是不明智的.當然這也不是錯誤,不過前提需要注意:session中數據的操作並非在多線程下,能夠得到預期的效果.
如果你期望一個Producer持續的send消息,而另一個Consumer能夠通過listener的方式接收消息,那么你應該將它們放在兩個session中.此外如果你的消費者是基於listener異步接收消息,那么你應該為每個listener使用不同的session.
在支持事務的Session中,多個消息的發送或者接收作為一個原子性單元,當事務提交后,消息的"確認"也是作為原子性單元(此處消息的確認包括事務中多個發送的消息,或者消費者中連續消費的多個消息);如果事務回滾,將導致此事務中發送的消息被銷毀(JMS server端做刪除操作),對於消費者而言,事務中接收的消息將會被恢復(即認為消息未被收到,將會被重發).由此可見,在事務類型的session中,消息的確認時機將和事務提交的時機保持一致.
在事務類型的Session中,如果事務沒有提交,那么生產者send的消息對其他消費者不可見;對於消費者而言,如果事務沒有提交,那么消息將不會從queue中刪除,對於topic類型,消息將不會從自己的"消息副本"中刪除.
session並沒有start()方法,默認每次commit之后就會開啟一個新的事務.
static int AUTO_ACKNOWLEDGE: 消息自動確認,即消費者從receive()方法成功返回時或者當messageListener.onMessage(..)方法成功返回時,進行消息確認.如果receive()方法內部或者onMessage方法內部拋出異常(未捕獲),將會導致此消息不能被"確認";那么對於JMS Provider而言,則認為此消息消費失敗,將會重發.(對於某些Provider而言,將會在重發多次后仍然失敗,將會考慮將消息轉發給其他Connection).
static int CLIENT_ACKNOWLEDGE: 客戶端確認,即消息的確認需要client端選擇時機手動去觸發。這個方式給消息的確認提供了更加自由的方式;此方式針對消息消費者,消費者可以在接收到消息后,在任意時間調用此message.acknowledge()方法來確認消息。
static int DUPS_OK_ACKNOWLEDGE: "可重復消息確認",此模式可以允許JMS提供者將一條消息向同一個目的地發送兩次以上。
void close(): 關閉session;直接導致與此session有關的資源被釋放,如果消費者的recevie()正在接收消息(而不是wait阻塞)或者messageListener.onMessage()方法正在執行,那么session關閉將會被阻塞。session關閉將會導致尚未提交的事務被回滾。session關閉后,此session創建的Producer或者Consumer將無法繼續工作。
BytesMessage createByteMessage():MapMessage createMapMessage():創建一條消息void commit():提交事務。
void rollback():事務回滾。void recover():此方法只會被JMS Provider調用,JMS提供者將會暫停消息的發送,將此前已經發送但沒有“確認”消息標記為“redelivered”,然后按照消息的原始順序,將消息發送給Client端(包括redelivered和新消息)。
MessageProducer createProducer(Destination des): 創建一個消息生產者。
MessageConsumer createConsumer(Destination des): 創建一個消息消費者。
MessageConsumer createConsumer(Destination des,String selector): 指定消息選擇器。
MessageConsumer createConsumer(Destination des,String selector,boolean noLocal): 指明當前消費者是否可以接受"本地"消息.noLocal參數只對Topic類型的"目的地"有效,如果消息消費者和生產者有一個Connection創建(即它們具有同一個ClientID,或者說底層是一個TCP鏈接),即使它們是在不同的session中,它們均被認為是“本地”。
Queue createQueue(String queueName):創建一個“隊列”類型的目的地(動態隊列)。
Topic createTopic(String tpoicName):創建一個“主題”類型的目的地(動態主題)。
TopicSubscriber createDurableSubscriber(Topic topic,String name): 創建一個“耐久訂閱者”,並指定訂閱者的名稱(name,需要全局唯一);如果一個Client需要接收Topic的全部信息,即使當Client的鏈接失效時也需要JMS 提供者保存它“錯過”的消息。考慮到JMS 提供者內部的機制(消息副本),需要為此“耐久訂閱者”指定一個名稱,且名稱不能改動,否則此后消息不能接收到。創建“耐久訂閱者”有個必要的先決條件:ClientID必須一致,你在創建“durableSubscriber”時,需要顯示的指定“connection.setClientID(xxx)”,且ClientID在每次啟動時必須一樣,否則將無法使用“耐久訂閱者”。(You cannot create a durable subscriber without specifying a unique clientID on a Connection)。
QueueBrowser createBrowser(Queue queue,String selector): 類似於創建一個Consumer,不過此時創建的是一個“Browser”,只能查看消息隊列,但不能消費(也不能干擾消費)。
TemporaryQueue createTemporaryQueue(): 創建一個“臨時隊列”,其生命周期為當前Connection;如果當前session或者Connection關閉,那么queue也將不可用;同時此Queue只能被當前Connection下的其他session使用,對於其他Connection,此Queue是不可見的。(不能跨Connection,同時此方法並沒有指名參數;Don't understand null destinations)
TemporaryTopic createTemporaryTopic(): 同上void unsubscribe(String name): 取消“耐久訂閱者”,此后JMS Provider將不會對此訂閱者保留消息副本。
7. MessageConsumer接口:
消息消費者頂級接口,其子接口有QueueReceiver和TopicSubscriber;可以在Session接口中通過各種方式創建MessageConsumer.消息接收可以是同步的,也可以是異步的.這取決於編碼方式.
String getMessageSelector(): 獲取當前消息的"消息選擇器";Session接口中,已經提供了基於選擇器創建Consumer的方法,注意:消息選擇器必須在創建Consumer時指定,且整個session期間將無法再次改動,當然MessageConsumer接口中也沒有提供設置選擇器的方法;這個和JMS Provider對消息選擇器的使用機制有關(后端過濾機制),稍后有專題專門介紹"消息選擇器"的原理.
Message receive(): 以同步的方式接收消息,同步意味着"阻塞",當在connection活躍期間,當前"目的地"中沒有消息push過來(對於Topic)或者沒有偵聽到消息(對於Queue,普遍采用polling策略),那么此方法將會阻塞,直到接收到消息,或者consumer被關閉(close方法),或者connection/session被關閉,此時將會返回null.前文已經提到,在事務類型的session中,此方法返回時表示此消息已經被確認.
Message receive(long timeout): 已同步的方式接收消息,不過指定了方法阻塞的時間;如果在超時時仍未收到消息,將會返回null。
Message receiveNoWait(): 以非阻塞的方式接收消息,不過此處的非阻塞,只是嘗試去獲取,如果此時有消息亟待接收,將會返回message.否則返回null。此方法在一定程度上要求consumer使用"輪詢"的方式獲取消息,比如在while循環中.
void close(): 關閉消費者,此方法會阻塞,直到正在接收消息的receive方法返回,或者基於messageListener.onMessage()方法執行結束.如果receive方法尚未收到消息,則返回null.
8. MessageProducer接口:
消息生產者的頂級接口,負責向指定的"目的地"發送消息.兩個子接口:QueneSender和TopicPublisher
void setDisableMessageID(boolean value): 是否關閉"messageID"選項,不過首先聲明並不是所有的JMS Provider都會此選項感興趣,有些JMS Provider會忽略此選項,會對所有的消息都生成messageID.有些JMS Provider則反其道而行之,將會忽略所有的MessageID,以便減少此值帶來的額外的開支(比如網絡傳輸數據量).在很多場景下,我們需要跟蹤消息(或者過濾消息,"請求應答"模式),那么MessageID對我們將會非常有效.JMS API規定,任何一個MessageID必須為全局唯一的,以便Client端可以使用MessageID作為某種檢索/存儲的主鍵.
setDisableMessageTimestamp(): 是否關閉"messageTimestamp"選項,它的機制和messageID一樣;messageTimestamp用來表示此消息被發送的時間戳(send方法執行時);此選項通常可以用來跟蹤消息被創建的時間.
void setDeliveryMode(int mode): 設置消息的"傳輸模式";此設置將直接影響此Producer下所有發送的消息.
void setPriority( int priority): 設置消息的"權重",共0~9個級別,默認為4,其中0~4表示普通優先級,5~9表示高優先級;優先級越高,將會導致此消息被優先發送給消費者.此后我們會詳解"消息權重"和系統設計.
void setTimeToLive(long timeToLive): 設置消息的最大存活時間,毫秒數.默認為0表示永不過期;JMS Provider在發送消息時都會檢測消息的"存活有效性",如果消息過期,將會直接刪除,而不發送給消費者.其中mesageTimestamp + timeToLive最終表示為消息過期的時間.在JMS Provider將消息發送給Client時,將使用過期時間和server的本地時間比較,以決定其是否過期.
void close(): 關閉消息生產者.
void send(Message message): 發送消息 ,不過此時將會使用MessageProducer指定的priority/deliveryMode等.
void send(Message message,int deliveryMode,int priority,long timeToLive); 發送消息,並為此消息指定"傳輸模式"和"消息權重".如果某條消息需要特定聲明這些屬性,那么你可以使用此方法.
void send(Destinantion des,Message message): 向指定的目的地,發送消息;這個方法並不常用,通常用在"請求-應答"模式中:即消息消費者接收到消息之后,需要臨時創建一個Producer並將"應答消息"發送到指定的"replyTo"地址.
---稍后介紹JMS API中所關聯的具體知識點.