1,Java消息服務-JMS


一,消息服務

消息服務指的是兩個應用程序之間進行異步通信的API,它為標准消息協議和消息服務提供了一組通用接口,包括創建、發送、讀取消息等,用於支持應用程序開發。在Java中,當兩個應用程序使用JMS進行通信時,它們之間並不是直接相連的,而是通過一個共同的消息收發服務連接起來,可以達到解耦的效果。

二,JMS

2.1,簡介

JMS即Java消息服務(Java Message Service)應用程序接口,是一個Java平台中關於面向消息中間件(MOM-分布式系統的集成)的API,用於在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。

JMS是一種與廠商無關的 API,用來訪問消息收發系統消息,它類似於JDBC(Java Database Connectivity)。

2.2,體系架構

JMS由以下元素組成:

JMS提供者

連接面向消息中間件的,JMS接口的一個實現。提供者可以是Java平台的JMS實現,也可以是非Java平台的面向消息中間件的適配器。

JMS客戶

生產或消費基於消息的Java的應用程序或對象。

JMS生產者

創建並發送消息的JMS客戶。

JMS消費者

接收消息的JMS客戶。

JMS消息

包括可以在JMS客戶之間傳遞的數據的對象。

JMS隊列

一個容納那些被發送的等待閱讀的消息的區域。與隊列名字所暗示的意思不同,消息的接受順序並不一定要與消息的發送順序相同。一旦一個消息被閱讀,該消息將被從隊列中移走。

JMS主題

一種支持發送消息給多個訂閱者的機制。

2.3,JMS對象模型

ConnectionFactory

創建Connection對象的工廠,針對兩種不同的JMS消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查找ConnectionFactory對象。

Connection

Connection表示在客戶端和JMS系統之間建立的鏈接(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。

Session

Session是操作消息的接口。可以通過session創建生產者、消費者、消息等。Session提供了事務的功能。當需要使用session發送/接收多個消息時,可以將這些發送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。

MessageProducer

消息生產者由Session創建,並用於將消息發送到Destination。同樣,消息生產者分兩種類型:QueueSender和TopicPublisher。可以調用消息生產者的方法(send或publish方法)發送消息。

MessageConsumer

消息消費者由Session創建,用於接收被發送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創建。當然,也可以session的creatDurableSubscriber方法來創建持久化的訂閱者。

Destination

Destination的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對於消息生產者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對於消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。

2.4,JMS消息模型

在JMS標准中,有兩種消息模型PTP(Point to Point),Publish/Subscribe(Pub/Sub)。

2.4.1,PTP模式-點對點消息傳送模型

在點對點消息傳送模型中,應用程序由消息隊列,發送者,接收者組成。每一個消息發送給一個特殊的消息隊列,該隊列保存了所有發送給它的消息(除了被接收者消費掉的和過期的消息)。

PTP的特點

1,每個消息只有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中)。

2,發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之后,不管接收者有沒有正在運行,它不會影響到消息被發送到隊列。

3,接收者在成功接收消息之后需向隊列發送確認收到通知(acknowledgement)。

2.4.2,Pub/Sub-發布/訂閱消息傳遞模型

在發布/訂閱消息模型中,發布者發布一個消息,該消息通過topic傳遞給所有的客戶端。在這種模型中,發布者和訂閱者彼此不知道對方,是匿名的且可以動態發布和訂閱topic。

在發布/訂閱消息模型中,目的地被稱為主題(topic),topic主要用於保存和傳遞消息,且會一直保存消息直到消息被傳遞給客戶端。

Pub/Sub特點

1,每個消息可以有多個消費者。

2,發布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創建一個或多個訂閱者之后,才能消費發布者的消息,而且為了消費消息,訂閱者必須保持運行的狀態。

3,為了緩和這樣嚴格的時間相關性,JMS允許訂閱者創建一個可持久化的訂閱。這樣,即使訂閱者沒有被激活(運行),它也能接收到發布者的消息。

2.5,接收消息

在JMS中,消息的接收可以使用以下兩種方式:

同步

使用同步方式接收消息的話,消息訂閱者調用receive()方法。在receive()中,消息未到達或在到達指定時間之前,方法會阻塞,直到消息可用。

異步

使用異步方式接收消息的話,消息訂閱者需注冊一個消息監聽者,類似於事件監聽器,只要消息到達,JMS服務提供者會通過調用監聽器的onMessage()遞送消息。

2.6,JMS消息結構(Message)

Message主要由三部分組成,分別是Header,Properties,Body, 詳細如下:

Header

消息頭,所有類型的這部分格式都是一樣的

Properties

屬性,按類型可以分為應用設置的屬性,標准屬性和消息中間件定義的屬性

Body

消息正文,指我們具體需要消息傳輸的內容。

下面是Message接口的部分定義,它顯示了JMS消息頭使用的所有方法:

public interface Message {
    public Destination getJMSDestination() throws JMSException;
    public void setJMSDestination(Destination destination) throws JMSException;
    public int getJMSDeliveryMode() throws JMSException
    public void setJMSDeliveryMode(int deliveryMode) throws JMSException;
    public String getJMSMessageID() throws JMSException;
    public void setJMSMessageID(String id) throws JMSException;
    public long getJMSTimestamp() throws JMSException'
    public void setJMSTimestamp(long timestamp) throws JMSException;
    public long getJMSExpiration() throws JMSException;
    public void setJMSExpiration(long expiration) throws JMSException;
    public boolean getJMSRedelivered() throws JMSException;
    public void setJMSRedelivered(boolean redelivered) throws JMSException;
    public int getJMSPriority() throws JMSException;
    public void setJMSPriority(int priority) throws JMSException;
    public Destination getJMSReplyTo() throws JMSException;
    public void setJMSReplyTo(Destination replyTo) throws JMSException;
    public String getJMScorrelationID() throws JMSException;
    public void setJMSCorrelationID(String correlationID) throws JMSException;
    public byte[] getJMSCorrelationIDAsBytes() throws JMSException;
    public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException;
    public String getJMSType() throws JMSException;
    public void setJMSType(String type) throws JMSException;
}

2.6.1,Header

header中的各個屬性,可以分為兩大類:

2.6.1.1,自動分配的消息頭:

這里這些JMS消息頭是自動分配的。

在傳送消息時,消息頭的值由JMS提供者來設置,因此開發者使用setJMSxxx()方法分配的值就被忽略了。換句話說,對於大多數自動分配的消息頭來說,使用賦值函數方法顯然是徒勞的。不過,這並非意味着開發者無法控制這些消息頭的值。一些自動分配的消息頭可以在創建Session和MessageProducer(也就是TopicPublisher)時,由開發者通過編程方式來設置。

屬性名稱        

說明        

設置者

JMSDeliveryMode

消息的發送模式,分為NON_PERSISTENTPERSISTENT,即非持久性模式的和持久性模式。默認設置為PERSISTENT(持久性)。

一條持久性消息應該被傳送一次(就一次),這就意味着如果JMS提供者出現故障,該消息並不會丟失; 它會在服務器恢復正常之后再次傳送。

一條非持久性消息最多只會傳送一次,這意味着如果JMS提供者出現故障,該消息可能會永久丟失。

在持久性和非持久性這兩種傳送模式中,消息服務器都不會將一條消息向同一消息者發送一次以上(成功算一次)。

//在消息生產者上設置JMS傳送模式

TopicPublisher topicPublisher = topicSession.createPublisher(topic);

topicPubiisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

send

JMSMessageID

消息ID,需要以ID:開頭,用於唯一地標識了一條消息

send

JMSTimestamp

消息發送時的時間。這條消息頭用於確定發送消息和它被消費者實際接收的時間間隔。時間戳是一個以毫秒來計算的Long類型時間值(自1970年1月1日算起)。

send

JMSExpiration

消息的過期時間,以毫秒為單位,用來防止把過期的消息傳送給消費者。任何直接通過編程方式來調用setJMSExpiration()方法都會被忽略。

TopicPublisher topicPublisher = topicSession.createPublisher(topic);

//將生存時間設置為1小時(1000毫秒 *60 60

topicPublisher.setTimeToLive(3600000);

send

JMSRedelivered

消息是否重復發送過,如果該消息之前發送過,那么這個屬性的值需要被設置為true, 客戶端可以根據這個屬性的值來確認這個消息是否重復發送過,以避免重復處理。

Provider

JMSPriority

消息的優先級,0-4為普通的優化級,而5-9為高優先級,通常情況下,高優化級的消息需要優先發送。任何直接通過編程方式調用setJMSPriority()方法都將被忽略。

TopicPublisher topicPublisher = TopicSession.createPublisher(someTopic);

//設置消息的優先級

topicPublisher.setPriority(9);

send

JMSDestination

消息發送的目的地,是一個Topic或Queue

send

2.6.1.2, 開發者分配的消息頭:

屬性名稱        

說明        

設置者

JMSCorrelationID

關聯的消息ID,這個通常用在需要回傳消息的時候

client

JMSReplyTo

消息回復的目的地,其值為一個Topic或Queue, 這個由發送者設置,但是接收者可以決定是否響應

client

JMSType

由消息發送者設置的消息類型,代表消息的結構,有的消息中間件可能會用到這個,但這個並不是是批消息的種類,比如TextMessage之類的

client

從上表中我們可以看到,系統提供的標准頭信息一共有10個屬性,其中有6個是由send方法在調用時設置的,有三個是由客戶端(client)設置的,還有一個是由消息中間件(Provider)設置的。

需要注意的是,這里的客戶端(client)不是指消費者,而是指使用JMS的客戶端,即開發者所寫的應用程序,即在生產消息時,這三個屬性是可以由應用程序來設定的,而其它的header要么由消息中間件設置,要么由發送方法來決定,開發者即使設置了,也是無效的。測試如下:

生產者:

//創建文本消息
TextMessage textMessage = session.createTextMessage("消息內容" + (i + 1 ));
//消息發送的目的地
textMessage.setJMSDestination(new Queue(){
    @Override
    public String getQueueName() throws JMSException {
        return name;
    }
});
//消息的發送模式
textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
//消息ID
textMessage.setJMSMessageID("ID:JMSMessageID");
//消息發送時的時間
textMessage.setJMSTimestamp(1000);
//關聯的消息ID
textMessage.setJMSCorrelationID("100:JMSCorrelationID");
//消息回復的目的地
textMessage.setJMSReplyTo(new Queue(){
    @Override
    public String getQueueName() throws JMSException {
        return name;
    }
});
//消息是否重復發送過
textMessage.setJMSRedelivered(true);
//消息類型,代表消息的結構
textMessage.setJMSType("type");
//消息的過期時間,以毫秒為單位
textMessage.setJMSExpiration(36000);
//消息的優先級,0-4為普通的優化級,而5-9為高優先級
textMessage.setJMSPriority(5);

消費者:

TextMessage msg = (TextMessage) messageConsumer.receive();
//獲得消息的發送模式
int jmsDeliveryMode = msg.getJMSDeliveryMode();
//獲得消息ID
String jmsMessageID = msg.getJMSMessageID();
//獲得消息發送時的時間
Long jmsTimestamp = msg.getJMSTimestamp();
//獲得關聯的消息ID
String jmsCorrelationID = msg.getJMSCorrelationID();
//獲得消息回復的目的地
String jmsReplyTo = ((Queue)msg.getJMSReplyTo()).getQueueName();
//獲得消息是否重復發送過
Boolean jmsRedelivered = msg.getJMSRedelivered();
//獲得消息類型,代表消息的結構
String jmsType = msg.getJMSType();
//獲得消息的過期時間,以毫秒為單位
Long jmsExpiration = msg.getJMSExpiration();
//獲得消息的優先級,0-4為普通的優化級,而5-9為高優先級
int jmsPriority = msg.getJMSPriority();
System.out.println("jmsDeliveryMode:" + jmsDeliveryMode);
System.out.println("jmsMessageID:" + jmsMessageID);
System.out.println("jmsTimestamp:" + jmsTimestamp);
System.out.println("jmsCorrelationID:" + jmsCorrelationID);
System.out.println("jmsReplyTo:" + jmsReplyTo);
System.out.println("jmsRedelivered:" + jmsRedelivered);
System.out.println("jmsType:" + jmsType);
System.out.println("jmsExpiration:" + jmsExpiration);
System.out.println("jmsPriority:" + jmsPriority);
System.out.println("----------------------------");

結果:

只有紅框的JmsType,ReplyTo,CorrelationId可以顯示設置,其它設置了都無效。

public interface Message {
    public Destination getJMSDestination() throws JMSException;
    public void setJMSDestination(Destination destination) throws JMSException;
    public int getJMSDeliveryMode() throws JMSException
    public void setJMSDeliveryMode(int deliveryMode) throws JMSException;
    public String getJMSMessageID() throws JMSException;
    public void setJMSMessageID(String id) throws JMSException;
    public long getJMSTimestamp() throws JMSException'
    public void setJMSTimestamp(long timestamp) throws JMSException;
    public long getJMSExpiration() throws JMSException;
    public void setJMSExpiration(long expiration) throws JMSException;
    public boolean getJMSRedelivered() throws JMSException;
    public void setJMSRedelivered(boolean redelivered) throws JMSException;
    public int getJMSPriority() throws JMSException;
    public void setJMSPriority(int priority) throws JMSException;
    public Destination getJMSReplyTo() throws JMSException;
    public void setJMSReplyTo(Destination replyTo) throws JMSException;
    public String getJMScorrelationID() throws JMSException;
    public void setJMSCorrelationID(String correlationID) throws JMSException;
    public byte[] getJMSCorrelationIDAsBytes() throws JMSException;
    public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException;
    public String getJMSType() throws JMSException;
    public void setJMSType(String type) throws JMSException;
}

2.6.2,消息屬性

消息的屬性就像可以分配給一條消息的附加消息頭一樣。它們允許開發者添加有關消息的不透明附加信息。它們還用於暴露消息選擇器在消息過濾時使用的數據。

Message接口為讀取和寫入屬性提供了若干個取值函數和賦值函數方法。消息的屬性值可以是Stringboolean , byte,shortdoubleint ,longfloat型。Message接口為每種類型的屬性值都提供了取值函數和賦值方法。如下:

public interface Message {
    public String getStringProperty(String name) throws JMSException,MessageFormatException;
    public void setStringProperty(String name,String value) throws JMSException,MessageNotWriteableException;
    public int getIntProperty(String name) throws JMSException,MessageFormatException;
    public void setIntProperty(String name,int value) throws JMSException,MessageNotWriteableException;
    public boolean getBooleanProperty(String name) throws JMSException,MessageFormatException;
    public void setBooleanProperty(String name,boolean value) throws JMSException,MessageNotWriteableException;
    public double getDoubleProperty(String name) throws JMSException,MessageFormatException;
    public void setDoubleProperty(String name) throws JMSException,MessageFormatException;
    public float getFloatProperty (String name) throws JMSException,MessageFormatExdeption;
    public void setFloatProperty(String name,float value) throws JMSException,MessageNotWriteableException;
    public byte getByteProperty(String name) throws JMSException,MessageFormatException;
    public void setByteProperty(String name) throws JMSException,MessageNotWriteableException;
    public long getLongProperty(String name) throws JMSException,MessageNotWriteableException;
    public void setLongProperty(String name,long value) throws JMSException,MessageNotWriteableException;
    public short getShortProperty(String name) throws JMSException,MessageFormatException;
    public void setShortProperty(String name,short value) throws JMSException,MessageNotWriteableException;
    public Object getObjectProperty(String name) throws JMSException,MessageNotWriteableException;
    public void setObjectProperty(String name,Object value) throws JMSException,MessageNotWriteableException;
    ......
}

消息屬性有3種基本類型

2.6.2.1,應用程序特定的屬性

由應用程序開發者定義的所有屬性都可以作為一個應用程序特定的屬性。應用程序屬性在消息傳送之前進行設置。並不存在預先定義的應用程序屬性,開發者可以自由定義能夠滿足它們需要的任何屬性。例如,在一個應用中,可以添加一個特定的屬性,該屬性用於標識正在發送消息的用戶:

TextMessage message = pubSession.createTextMessage();

message.setStringProperty("username",username);//自定義屬性

publisher.publish(message);

作為一個應用程序的特定屬性,username一旦離開該應用程序就變得毫無意義,它專門用於應用程序根據發布者身份對消息進行過濾。

一旦一條消息發布或發送以后,它就變成了只讀屬性;消費者或生產者都無法修改它的屬性。如果消費者試圖設置某個屬性,該方法就會拋出一個javax.jms.MessageNotWriteableException。

2.6.2.2,JMS定義的屬性

JMS定義的屬性具有和應用程序屬性相同的特性,除了前者大多數在消息發送時由JMS提供者來設置之外。JMS定義的屬性可以作為可選的JMS消息頭,下面是JMS定義的9個屬性清單:

JMSXUserID

JMSXAppID

JMSXProducerTXID

JMSXConsumerTXID

JMSXRcvTimestamp

JMSXDeliveryCount

JMSXState

JMSXGroupID

JMSXGroupSeq

在這份清單中,只有JMSXGroupID和JMSXGroupSeq需要所有JMS提供者的支持。這些可選屬性用於聚合消息。

請注意:在Message接口中,您將無法找到對應的setJMSX<PROPERTY>()和getJMSX<PROPERTY>()方法定義,在使用這些方法時,必須使用和應用程序特定屬性相同的方法來設置它們,如下:

message.setStringProperty("JMSXGroupID","GroupID-001");

message.setIntProperty("JMSXGroupSeq",5);

2.6.2.3,提供者特定的屬性

每個JMS提供者都可以定義一組私有屬性,這些屬性可以由客戶端或提供者自動設置。提供者特定的屬性必須以前綴JMS開頭,后面緊接着是屬性名稱(JMS<vendor-property-name>),例如:JMSUserID。提供者特定的屬性,其作用就是支持廠商的私有特性。

2.6.3,消息體

為了適應不同場景下的消息,提高消息存儲的靈活性,JMS定義了幾種具體類型的消息,不同的子類型的消息體也不一樣,需要注意的是,Message接口並沒有提供一個統一的getBody之類的方法。消息子接口定義如下:

TextMessage

最簡單的消息接口,用於發送文本類的消息,設置/獲取其body的方法定義如下setText()/getText()。

StreamMessage

流式消息接口,里面定義了一系列的對基本類型的set/get方法,消息發送者可以通過這些方法寫入基本類型的數據,消息接收者需要按發送者的寫入順序來讀取相應的數據。

MapMessage

把消息內容存儲在Map里,本接口定義了一系列對基本類型的的set/get方法,與StreamMessage不同的是,每個值都對應了一個相應的key,所以消息接收者不必按順序去讀取數據。

ObjectMessage

將對象作為消息的接口,提供了一個set/get 對象的方法,需要注意的是只能設置一個對象,這個對象可以是一個Collection,但必須是序列化的。

BytesMessage

以字節的形式來傳遞消息的接口,除了提供了對基本類型的set/get,還提供了按字節方式進行set/get。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM