4.2.6 MessageConsumer
MessageConsumer是一個由Session創建的對象,用來從Destination接收消息。
4.2.6.1 創建MessageConsumer
Java客戶端:
ActiveMQSession方法:
MessageConsumer createConsumer(Destination destination);
MessageConsumer createConsumer(Destination destination, String messageSelector);
MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal);
TopicSubscriber createDurableSubscriber(Topic topic, String name);
TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal);
其中messageSelector為消息選擇器;noLocal標志默認為false,當設置為true時限制消費者只能接收和自己相同的連接(Connection)所發布的消息,此標志只適用於主題,不適用於隊列;name標識訂閱主題所對應的訂閱名稱,持久訂閱時需要設置此參數。
例如:
MessageConsumer consumer = session.createConsumer(destination);
C++客戶端:
函數原型:
cms::MessageConsumer* ActiveMQSession::createConsumer(
const cms::Destination* destination );
cms::MessageConsumer* ActiveMQSession::createConsumer(
const cms::Destination* destination,
const std::string& selector )
throw ( cms::CMSException );
cms::MessageConsumer* ActiveMQSession::createConsumer(
const cms::Destination* destination,
const std::string& selector,
bool noLocal )
throw ( cms::CMSException );
cms::MessageConsumer* ActiveMQSession::createDurableConsumer(
const cms::Topic* destination,
const std::string& name,
const std::string& selector,
bool noLocal )
throw ( cms::CMSException );
例如:
MessageConsumer* consumer = session->createConsumer( destination );
4.2.6.2消息的同步和異步接收
消息的同步接收是指客戶端主動去接收消息,客戶端可以采用MessageConsumer 的receive方法去接收下一個消息。
消息的異步接收是指當消息到達時,ActiveMQ主動通知客戶端。客戶端可以通過注冊一個實現MessageListener 接口的對象到MessageConsumer。MessageListener只有一個必須實現的方法 —— onMessage,它只接收一個參數,即Message。在為每個發送到Destination的消息實現onMessage時,將調用該方法。
Java客戶端:
ActiveMQMessageConsumer方法:
Message receive()
Message receive(long timeout)
Message receiveNoWait()
其中timeout為等待時間,單位為毫秒。
或者
實現MessageListener接口,每當消息到達時,ActiveMQ會調用MessageListener中的onMessage 函數。
例如:
Message message = consumer.receive();
C++客戶端:
函數原型:
cms::Message* ActiveMQConsumer::receive() throw ( cms::CMSException )
cms::Message* ActiveMQConsumer::receive( int millisecs )
throw ( cms::CMSException );
cms::Message* ActiveMQConsumer::receiveNoWait(void)
throw ( cms::CMSException );
或者
實現MessageListener接口,每當消息到達時,ActiveMQ會調用MessageListener中的onMessage 函數。
例如:
Message *message = consumer->receive();
或者
consumer->setMessageListener( this );
virtual void onMessage( const Message* message ){
//process message
}
4.2.6.3消息選擇器
JMS提 供了一種機制,使用它,消息服務可根據消息選擇器中的標准來執行消息過濾。生產者可在消息中放入應用程序特有的屬性,而消費者可使用基於這些屬性的選擇標 准來表明對消息是否感興趣。這就簡化了客戶端的工作,並避免了向不需要這些消息的消費者傳送消息的開銷。然而,它也使得處理選擇標准的消息服務增加了一些 額外開銷。
消息選擇器是用於MessageConsumer的過濾器,可以用來過濾傳入消息的屬性和消息頭部分(但不過濾消息體),並確定是否將實際消費該消息。按照JMS文檔的說法,消息選擇器是一些字符串,它們基於某種語法,而這種語法是SQL-92的子集。可以將消息選擇器作為MessageConsumer創建的一部分。
Java客戶端:
例如:
public final String SELECTOR = “JMSType = ‘TOPIC_PUBLISHER’”;
該選擇器檢查了傳入消息的JMSType屬性,並確定了這個屬性的值是否等於TOPIC_PUBLISHER。如果相等,則消息被消費;如果不相等,那么消息會被忽略。
4.2.7 Message
JMS程序的最終目的是生產和消費的消息能被其他程序使用,JMS的 Message是一個既簡單又不乏靈活性的基本格式,允許創建不同平台上符合非JMS程序格式的消息。Message由以下幾部分組成:消息頭,屬性和消息體。
Java客戶端:
ActiveMQSession方法:
BlobMessage createBlobMessage(File file)
BlobMessage createBlobMessage(InputStream in)
BlobMessage createBlobMessage(URL url)
BlobMessage createBlobMessage(URL url, boolean deletedByBroker)
BytesMessage createBytesMessage()
MapMessage createMapMessage()
Message createMessage()
ObjectMessage createObjectMessage()
ObjectMessage createObjectMessage(Serializable object)
TextMessage createTextMessage()
TextMessage createTextMessage(String text)
例如:
下例演示創建並發送一個TextMessage到一個隊列:
TextMessage message = queueSession.createTextMessage();
message.setText(msg_text); // msg_text is a String
queueSender.send(message);
下例演示接收消息並轉換為合適的消息類型:
Message m = queueReceiver.receive();
if (m instanceof TextMessage) {
TextMessage message = (TextMessage) m;
System.out.println("Reading message: " + message.getText());
} else {
// Handle error
}
C++客戶端:
函數原型:
cms::Message* ActiveMQSession::createMessage(void)
throw ( cms::CMSException )
cms::BytesMessage* ActiveMQSession::createBytesMessage(void)
throw ( cms::CMSException )
cms::BytesMessage* ActiveMQSession::createBytesMessage(
const unsigned char* bytes,
unsigned long long bytesSize )
throw ( cms::CMSException )
cms::TextMessage* ActiveMQSession::createTextMessage(void)
throw ( cms::CMSException )
cms::TextMessage* ActiveMQSession::createTextMessage( const std::string& text )
throw ( cms::CMSException )
cms::MapMessage* ActiveMQSession::createMapMessage(void)
throw ( cms::CMSException )
例如:
下例演示創建並發送一個TextMessage到一個隊列:
TextMessage* message = session->createTextMessage( text ); // text is a string
producer->send( message );
delete message;
下例演示接收消息:
Message *message = consumer->receive();
const TextMessage* textMessage = dynamic_cast< const TextMessage* >( message );
string text = textMessage->getText();
printf( "Received: %s/n", text.c_str() );
delete message;
4.3 可靠性機制
發送消息最可靠的方法就是在事務中發送持久性的消息,ActiveMQ默認發送持久性消息。結束事務有兩種方法:提交或者回滾。當一個事務提交,消息被處理。如果事務中有一個步驟失敗,事務就回滾,這個事務中的已經執行的動作將被撤銷。
接收消息最可靠的方法就是在事務中接收信息,不管是從PTP模式的非臨時隊列接收消息還是從Pub/Sub模式持久訂閱中接收消息。
對於其他程序,低可靠性可以降低開銷和提高性能,例如發送消息時可以更改消息的優先級或者指定消息的過期時間。
消 息傳送的可靠性越高,需要的開銷和帶寬就越多。性能和可靠性之間的折衷是設計時要重點考慮的一個方面。可以選擇生成和使用非持久性消息來獲得最佳性能。另 一方面,也可以通過生成和使用持久性消息並使用事務會話來獲得最佳可靠性。在這兩種極端之間有許多選擇,這取決於應用程序的要求。
4.3.1 基本可靠性機制
4.3.1.1 控制消息的簽收(Acknowledgment)
客戶端成功接收一條消息的標志是這條消息被簽收。成功接收一條消息一般包括如下三個階段:
1.客戶端接收消息;
2.客戶端處理消息;
3.消息被簽收。簽收可以由ActiveMQ發起,也可以由客戶端發起,取決於Session簽收模式的設置。
在帶事務的Session中,簽收自動發生在事務提交時。如果事務回滾,所有已經接收的消息將會被再次傳送。
在不帶事務的Session中,一條消息何時和如何被簽收取決於Session的設置。
1.Session.AUTO_ACKNOWLEDGE
當客戶端從receive或onMessage成功返回時,Session自動簽收客戶端的這條消息的收條。在AUTO_ACKNOWLEDGE的Session中,同步接收receive是上述三個階段的一個例外,在這種情況下,收條和簽收緊隨在處理消息之后發生。
2.Session.CLIENT_ACKNOWLEDGE
客戶端通過調用消息的acknowledge方法簽收消息。在這種情況下,簽收發生在Session層面:簽收一個已消費的消息會自動地簽收這個Session所有已消費消息的收條。
3.Session.DUPS_OK_ACKNOWLEDGE
此選項指示Session不必確保對傳送消息的簽收。它可能引起消息的重復,但是降低了Session的開銷,所以只有客戶端能容忍重復的消息,才可使用(如果ActiveMQ再次傳送同一消息,那么消息頭中的JMSRedelivered將被設置為true)。
Java客戶端:
簽收模式分別為:
1. Session.AUTO_ACKNOWLEDGE
2. Session.CLIENT_ACKNOWLEDGE
3. Session.DUPS_OK_ACKNOWLEDGE
ActiveMQConnection方法:
Session createSession(boolean transacted, int acknowledgeMode);
例如:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
C++客戶端:
簽收模式分別為:
1. Session::AUTO_ACKNOWLEDGE
2. Session::CLIENT_ACKNOWLEDGE
3. Session::DUPS_OK_ACKNOWLEDGE
4. Session::SESSION_TRANSACTED
函數原型:
cms::Session* ActiveMQConnection::createSession(
cms::Session::AcknowledgeMode ackMode )
throw ( cms::CMSException )
例如:
Session* session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
對隊列來說,如果當一個Session終止時它接收了消息但是沒有簽收,那么ActiveMQ將保留這些消息並將再次傳送給下一個進入隊列的消費者。
對主題來說,如果持久訂閱用戶終止時,它已消費未簽收的消息也將被保留,直到再次傳送給這個用戶。對於非持久訂閱,AtiveMQ在用戶Session關閉時將刪除這些消息。
如果使用隊列和持久訂閱,並且Session沒有使用事務,那么可以使用Session的recover方法停止Session,再次啟動后將收到它第一條沒有簽收的消息,事實上,重啟后Session一系列消息的傳送都是以上一次最后一條已簽收消息的下一條為起點。如果這時有消息過期或者高優先級的消息到來,那么這時消息的傳送將會和最初的有所不同。對於非持久訂閱用戶,重啟后,ActiveMQ有可能刪除所有沒有簽收的消息。
4.3.1.2 指定消息傳送模式
ActiveMQ支持兩種消息傳送模式:PERSISTENT和NON_PERSISTENT兩種。
1.PERSISTENT(持久性消息)
這是ActiveMQ的 默認傳送模式,此模式保證這些消息只被傳送一次和成功使用一次。對於這些消息,可靠性是優先考慮的因素。可靠性的另一個重要方面是確保持久性消息傳送至目 標后,消息服務在向消費者傳送它們之前不會丟失這些消息。這意味着在持久性消息傳送至目標時,消息服務將其放入持久性數據存儲。如果消息服務由於某種原因 導致失敗,它可以恢復此消息並將此消息傳送至相應的消費者。雖然這樣增加了消息傳送的開銷,但卻增加了可靠性。
2.NON_PERSISTENT(非持久性消息)
保證這些消息最多被傳送一次。對於這些消息,可靠性並非主要的考慮因素。此模式並不要求持久性的數據存儲,也不保證消息服務由於某種原因導致失敗后消息不會丟失。
有兩種方法指定傳送模式:
1.使用setDeliveryMode方法,這樣所有的消息都采用此傳送模式;
2.使用send方法為每一條消息設置傳送模式;
Java客戶端:
傳送模式分別為:
1. DeliveryMode.PERSISTENT
2. DeliveryMode.NON_PERSISTENT
ActiveMQMessageProducer方法:
void setDeliveryMode(int newDeliveryMode);
或者
void send(Destination destination, Message message, int deliveryMode, int priority,
long timeToLive);
void send(Message message, int deliveryMode, int priority, long timeToLive);
其中deliveryMode為傳送模式,priority為消息優先級,timeToLive為消息過期時間。
例如:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
C++客戶端:
傳送模式分別為:
1. DeliveryMode::PERSISTANT
2. DeliveryMode::NON_PERSISTANT
函數原型:
void setDeliveryMode( int mode );
或者
void ActiveMQProducer::send( cms::Message* message, int deliveryMode,
int priority,
long long timeToLive )
throw ( cms::CMSException );
void ActiveMQProducer::send( const cms::Destination* destination,
cms::Message* message, int deliveryMode,
int priority, long long timeToLive)
throw ( cms::CMSException );
例如:
producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT );
如果不指定傳送模式,那么默認是持久性消息。如果容忍消息丟失,那么使用非持久性消息可以改善性能和減少存儲的開銷。
4.3.1.3 設置消息優先級
通常,可以確保將單個會話向目標發送的所有消息按其發送順序傳送至消費者。然而,如果為這些消息分配了不同的優先級,消息傳送系統將首先嘗試傳送優先級較高的消息。
有兩種方法設置消息的優先級:
1.使用setDeliveryMode方法,這樣所有的消息都采用此傳送模式;
2.使用send方法為每一條消息設置傳送模式;
Java客戶端:
ActiveMQMessageProducer方法:
void setPriority(int newDefaultPriority);
或者
void send(Destination destination, Message message, int deliveryMode, int priority,
long timeToLive);
void send(Message message, int deliveryMode, int priority, long timeToLive);
其中deliveryMode為傳送模式,priority為消息優先級,timeToLive為消息過期時間。
例如:
producer.setPriority(4);
C++客戶端:
函數原型:
void setPriority( int priority );
或者
void ActiveMQProducer::send( cms::Message* message, int deliveryMode,
int priority,
long long timeToLive )
throw ( cms::CMSException );
void ActiveMQProducer::send( const cms::Destination* destination,
cms::Message* message, int deliveryMode,
int priority, long long timeToLive)
throw ( cms::CMSException );
例如:
producer-> setPriority(4);
消息優先級從0-9十個級別,0-4是普通消息,5-9是加急消息。如果不指定優先級,則默認為4。JMS不要求嚴格按照這十個優先級發送消息,但必須保證加急消息要先於普通消息到達。
4.3.1.4 允許消息過期
默認情況下,消息永不會過期。如果消息在特定周期內失去意義,那么可以設置過期時間。
有兩種方法設置消息的過期時間,時間單位為毫秒:
1.使用setTimeToLive方法為所有的消息設置過期時間;
2.使用send方法為每一條消息設置過期時間;
Java客戶端:
ActiveMQMessageProducer方法:
void setTimeToLive(long timeToLive);
或者
void send(Destination destination, Message message, int deliveryMode, int priority,
long timeToLive);
void send(Message message, int deliveryMode, int priority, long timeToLive);
其中deliveryMode為傳送模式,priority為消息優先級,timeToLive為消息過期時間。
例如:
producer.setTimeToLive(1000);
C++客戶端:
函數原型:
void setTimeToLive( long long time );
或者
void ActiveMQProducer::send( cms::Message* message, int deliveryMode,
int priority,
long long timeToLive )
throw ( cms::CMSException );
void ActiveMQProducer::send( const cms::Destination* destination,
cms::Message* message, int deliveryMode,
int priority, long long timeToLive)
throw ( cms::CMSException );
例如:
Producer->setTimeToLive(1000);
消息過期時間,send 方法中的timeToLive 值加上發送時刻的GMT 時間值。如果timeToLive值等於零,則JMSExpiration 被設為零,表示該消息永不過期。如果發送后,在消息過期時間之后消息還沒有被發送到目的地,則該消息被清除。
4.3.1.5 創建臨時目標
ActiveMQ通過createTemporaryQueue和createTemporaryTopic創建臨時目標,這些目標持續到創建它的Connection關閉。只有創建臨時目標的Connection所創建的客戶端才可以從臨時目標中接收消息,但是任何的生產者都可以向臨時目標中發送消息。如果關閉了創建此目標的Connection,那么臨時目標被關閉,內容也將消失。
Java客戶端:
ActiveMQSession方法:
TemporaryQueue createTemporaryQueue();
TemporaryTopic createTemporaryTopic();
C++客戶端:
函數原型:
cms::TemporaryQueue* ActiveMQSession::createTemporaryQueue(void)
throw ( cms::CMSException );
cms::TemporaryTopic* ActiveMQSession::createTemporaryTopic(void)
throw ( cms::CMSException );
某些客戶端需要一個目標來接收對發送至其他客戶端的消息的回復。這時可以使用臨時目標。Message的屬性之一是JMSReplyTo屬性,這個屬性就是用於這個目的的。可以創建一個臨時的Destination,並把它放入Message的JMSReplyTo屬性中,收到該消息的消費者可以用它來響應生產者。
Java客戶端:
如下所示代碼段,將創建臨時的Destination,並將它放置在TextMessage的JMSReplyTo屬性中:
// Create a temporary queue for replies...
Destination tempQueue = session.createTemporaryQueue();
// Set ReplyTo to temporary queue...
msg.setJMSReplyTo(tempQueue);
消費者接收這條消息時,會從JMSReplyTo字段中提取臨時Destination,並且會通過應用程序構造一個MessageProducer,以便將響應消息發送回生產者。這展示了如何使用JMS Message的屬性,並顯示了私有的臨時Destination的有用之處。它還展示了客戶端可以既是消息的生產者,又可以是消息的消費者。
// Get the temporary queue from the JMSReplyTo
// property of the message...
Destination tempQueue = msg.getJMSReplyTo();
...
// create a Sender for the temporary queue
MessageProducer Sender = session.createProducer(tempQueue);
TextMessage msg = session.createTextMessage();
msg.setText(REPLYTO_TEXT);
...
// Send the message to the temporary queue...
sender.send(msg);
4.3.2 高級可靠性機制
4.3.2.1 創建持久訂閱
通過為發布者設置PERSISTENT傳送模式,為訂閱者時使用持久訂閱,這樣可以保證Pub/Sub程序接收所有發布的消息。
消息訂閱分為非持久訂閱(non-durable subscription)和持久訂閱(durable subscription),非持久訂閱只有當客戶端處於激活狀態,也就是和ActiveMQ保持連接狀態才能收到發送到某個主題的消息,而當客戶端處於離線狀態,這個時間段發到主題的消息將會丟失,永遠不會收到。持久訂閱時,客戶端向ActiveMQ注冊一個識別自己身份的ID,當這個客戶端處於離線時,ActiveMQ會為這個ID 保存所有發送到主題的消息,當客戶端再次連接到ActiveMQ時,會根據自己的ID 得到所有當自己處於離線時發送到主題的消息。持久訂閱會增加開銷,同一時間在持久訂閱中只有一個激活的用戶。
建立持久訂閱的步驟:
1. 為連接設置一個客戶ID;
2. 為訂閱的主題指定一個訂閱名稱;
上述組合必須唯一。
4.3.2.1.1 創建持久訂閱
Java客戶端:
ActiveMQConnection方法:
void setClientID(String newClientID)
和
ActiveMQSession方法:
TopicSubscriber createDurableSubscriber(Topic topic, String name)
TopicSubscriber createDurableSubscriber(Topic topic, String name, String
messageSelector, boolean noLocal)
其中messageSelector為消息選擇器;noLocal標志默認為false,當設置為true時限制消費者只能接收和自己相同的連接(Connection)所發布的消息,此標志只適用於主題,不適用於隊列;name標識訂閱主題所對應的訂閱名稱,持久訂閱時需要設置此參數。
C++客戶端:
函數原型:
virtual void setClientId( const std::string& clientId );
和
cms::MessageConsumer* ActiveMQSession::createDurableConsumer(
const cms::Topic* destination,
const std::string& name,
const std::string& selector,
bool noLocal )
throw ( cms::CMSException )
4.3.2.1.2 刪除持久訂閱
Java客戶端:
ActiveMQSession方法:
void unsubscribe(String name);
4.3.2.2 使用本地事務
在事務中生成或使用消息時,ActiveMQ跟蹤各個發送和接收過程,並在客戶端發出提交事務的調用時完成這些操作。如果事務中特定的發送或接收操作失敗,則出現異常。客戶端代碼通過忽略異常、重試操作或回滾整個事務來處理異常。在事務提交時,將完成所有成功的操作。在事務進行回滾時,將取消所有成功的操作。
本地事務的范圍始終為一個會話。也就是說,可以將單個會話的上下文中執行的一個或多個生產者或消費者操作組成一個本地事務。
不但單個會話可以訪問 Queue 或 Topic (任一類型的 Destination ), 而且單個會話實例可以用來操縱一個或多個隊列以及一個或多個主題,一切都在單個事務中進行。這意味着單個會話可以(例如)創建隊列和主題中的生產者,然后 使用單個事務來同時發送隊列和主題中的消息。因為單個事務跨越兩個目標,所以,要么隊列和主題的消息都得到發送,要么都未得到發送。類似地,單個事務可以 用來接收隊列中的消息並將消息發送到主題上,反過來也可以。
由於事務的范圍只能為單個的會話,因此不存在既包括消息生成又包括消息使用的端對端事務。(換句話說,至目標的消息傳送和隨后進行的至客戶端的消息傳送不能放在同一個事務中。)
4.3.2.2.1 使用事務
Java客戶端:
ActiveMQConnection方法:
Session createSession(boolean transacted, int acknowledgeMode);
其中transacted為使用事務標識,acknowledgeMode為簽收模式。
例如:
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
C++客戶端:
函數原型:
cms::Session* ActiveMQConnection::createSession(
cms::Session::AcknowledgeMode ackMode );
其中AcknowledgeMode ackMode需指定為SESSION_TRANSACTED。
例如:
Session* session = connection->createSession( Session:: SESSION_TRANSACTED );
4.3.2.2.2 提交
Java客戶端:
ActiveMQSession方法:
void commit();
例如:
try {
producer.send(consumer.receive());
session.commit();
}
catch (JMSException ex) {
session.rollback();
}
C++客戶端:
函數原型:
void ActiveMQSession::commit(void) throw ( cms::CMSException )
4.3.2.2.3 回滾
Java客戶端:
ActiveMQSession方法:
void rollback();
C++客戶端:
函數原型:
void ActiveMQSession::rollback(void) throw ( cms::CMSException )
4.4 高級特征
4.4.1 異步發送消息
ActiveMQ支持生產者以同步或異步模式發送消息。使用不同的模式對send方法的反應時間有巨大的影響,反映時間是衡量ActiveMQ吞吐量的重要因素,使用異步發送可以提高系統的性能。
在默認大多數情況下,AcitveMQ是以異步模式發送消息。例外的情況:在沒有使用事務的情況下,生產者以PERSISTENT傳送模式發送消息。在這種情況下,send方法都是同步的,並且一直阻塞直到ActiveMQ發回確認消息:消息已經存儲在持久性數據存儲中。這種確認機制保證消息不會丟失,但會造成生產者阻塞從而影響反應時間。
高性能的程序一般都能容忍在故障情況下丟失少量數據。如果編寫這樣的程序,可以通過使用異步發送來提高吞吐量(甚至在使用PERSISTENT傳送模式的情況下)。
Java客戶端:
使用Connection URI配置異步發送:
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
在ConnectionFactory層面配置異步發送:
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
在Connection層面配置異步發送,此層面的設置將覆蓋ConnectionFactory層面的設置:
((ActiveMQConnection)connection).setUseAsyncSend(true);
4.4.2 消費者特色
4.4.2.1 消費者異步分派
在ActiveMQ4中,支持ActiveMQ以同步或異步模式向消費者分派消息。這樣的意義:可以以異步模式向處理消息慢的消費者分配消息;以同步模式向處理消息快的消費者分配消息。
ActiveMQ默認以同步模式分派消息,這樣的設置可以提高性能。但是對於處理消息慢的消費者,需要以異步模式分派。
Java客戶端:
在ConnectionFactory層面配置同步分派:
((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);
在Connection層面配置同步分派,此層面的設置將覆蓋ConnectionFactory層面的設置:
((ActiveMQConnection)connection).setDispatchAsync(false);
在消費者層面以Destination URI配置同步分派,此層面的設置將覆蓋ConnectionFactory和Connection層面的設置:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");
consumer = session.createConsumer(queue);
4.4.2.2 消費者優先級
在ActveMQ分布式環境中,在有消費者存在的情況下,如果更希望ActveMQ發送消息給消費者而不是其他的ActveMQ到ActveMQ的傳送,可以如下設置:
Java客戶端:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prority=10");
consumer = session.createConsumer(queue);
4.4.2.3 獨占的消費者
ActiveMQ維護隊列消息的順序並順序把消息分派給消費者。但是如果建立了多個Session和MessageConsumer,那么同一時刻多個線程同時從一個隊列中接收消息時就並不能保證處理時有序。
有時候有序處理消息是非常重要的。ActiveMQ4支持獨占的消費。ActiveMQ挑選一個MessageConsumer,並把一個隊列中所有消息按順序分派給它。如果消費者發生故障,那么ActiveMQ將自動故障轉移並選擇另一個消費者。可以如下設置:
Java客戶端:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
4.4.2.4 再次傳送策略
在以下三種情況中,消息會被再次傳送給消費者:
1.在使用事務的Session中,調用rollback()方法;
2.在使用事務的Session中,調用commit()方法之前就關閉了Session;
3.在Session中使用CLIENT_ACKNOWLEDGE簽收模式,並且調用了recover()方法。
可以通過設置ActiveMQConnectionFactory和ActiveMQConnection來定制想要的再次傳送策略。
屬性
|
默認值
|
描述
|
|||||||||||
collisionAvoidanceFactor
|
0.15
|
maximumRedeliveries |
6
|
initialRedeliveryDelay |
1000L
|
useCollisionAvoidance |
false
|
useExponentialBackOff |
false
|
backOffMultiplier
|
5
|
The back-off multiplier
|
4.4.3 目標特色
4.4.3.1 復合目標
在1.1版本之后,ActiveMQ支持混合目標技術。它允許在一個JMS目標中使用一組JMS目標。
例如可以利用混合目標在同一操作中用向12個隊列發送同一條消息或者在同一操作中向一個主題和一個隊列發送同一條消息。
在混合目標中,通過“,”來分隔不同的目標。
Java客戶端:
例如:
// send to 3 queues as one logical operation
Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");
producer.send(queue, someMessage);
如果在一個目標中混合不同類別的目標,可以通過使用“queue://”和“topic://”前綴來識別不同的目標。
例如:
// send to queues and topic one logical operation
Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");
producer.send(queue, someMessage);
4.4.3.2 目標選項
屬性
|
默認值
|
描述
|
||||||||||
consumer.prefetchSize
|
variable
|
consumer.maximumPendingMessageLimit |
0
|
consumer.noLocal |
false
|
consumer.dispatchAsync |
false
|
consumer.retroactive |
false
|
consumer.selector |
null
|
JMS Selector used with the consumer.
|
consumer.exclusive
|
false
|
Is this an Exclusive Consumer.
|
||||||||||
consumer.priority
|
0
|
Allows you to configure a Consumer Priority.
|
Java客戶端:
例如:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10");
consumer = session.createConsumer(queue);
4.4.4 消息預取
ActiveMQ的目標之一就是高性能的數據傳送,所以ActiveMQ使用“預取限制”來控制有多少消息能及時的傳送給任何地方的消費者。
一旦預取數量達到限制,那么就不會有消息被分派給這個消費者直到它發回簽收消息(用來標識所有的消息已經被處理)。
可以為每個消費者指定消息預取。如果有大量的消息並且希望更高的性能,那么可以為這個消費者增大預取值。如果有少量的消息並且每條消息的處理都要花費很長的時間,那么可以設置預取值為1,這樣同一時間,ActiveMQ只會為這個消費者分派一條消息。
Java客戶端:
在ConnectionFactory層面為所有消費者配置預取值:
tcp://localhost:61616?jms.prefetchPolicy.all=50
在ConnectionFactory層面為隊列消費者配置預取值:
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
使用“目標選項”為一個消費者配置預取值:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
consumer = session.createConsumer(queue);
4.4.5 配置連接URL
ActiveMQ支持通過Configuration URI明確的配置連接屬性。
例如:當要設置異步發送時,可以通過在Configuration URI中使用jms.$PROPERTY來設置。
tcp://localhost:61616?jms.useAsyncSend=true
以下的選項在URI必須以“jms.”為前綴。
屬性
|
默認值
|
描述
|
||||
alwaysSessionAsync
|
true
|
clientID |
null
|
closeTimeout |
15000 (milliseconds)
|
Sets the timeout before a close is considered complete. Normally a close() on a connection waits for confirmation from the broker; this allows that operation to timeout to save the client hanging if there is no broker. |
copyMessageOnSend
|
true
|
disableTimeStampsByDefault |
false
|
dispatchAsync |
false
|
Should the broker dispatch messages asynchronously to the consumer.
|
nestedMapAndListEnabled
|
true
|
Enables/disables whether or not Structured Message Properties and MapMessages are supported so that Message properties and MapMessage entries can contain nested Map and List objects. Available since version 4.1 onwards
|
||||
objectMessageSerializationDefered
|
false
|
When an object is set on an ObjectMessage, the JMS spec requires the object to be serialized by that set method. Enabling this flag causes the object to not get serialized. The object may subsequently get serialized if the message needs to be sent over a socket or stored to disk.
|
||||
optimizeAcknowledge
|
false
|
Enables an optimised acknowledgement mode where messages are acknowledged in batches rather than individually. Alternatively, you could use Session.DUPS_OK_ACKNOWLEDGE acknowledgement mode for the consumers which can often be faster. WARNING enabling this issue could cause some issues with auto-acknowledgement on reconnection
|
||||
optimizedMessageDispatch
|
true
|
If this flag is set then an larger prefetch limit is used - only applicable for durable topic subscribers
|
||||
useAsyncSend
|
false
|
Forces the use of Async Sends which adds a massive performance boost; but means that the send() method will return immediately whether the message has been sent or not which could lead to message loss.
|
||||
useCompression
|
false
|
Enables the use of compression of the message bodies
|
||||
useRetroactiveConsumer
|
false
|
Sets whether or not retroactive consumers are enabled. Retroactive consumers allow non-durable topic subscribers to receive old messages that were published before the non-durable subscriber started.
|
4.5 優化
優化部分請參閱:http://devzone.logicblaze.com/site/how-to-tune-activemq.html
5. ActiveMQ配置
5.1 配置文件
ActiveMQ配置文件:$AcrtiveMQ/conf/activemq.xml
5.2 配置ActiveMQ服務IP和端口
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
</transportConnectors>
在transportConnectors標識中配置ActiveMQ服務IP和端口,其中name屬性指定協議的名稱,uri屬性指定協議所對應的協議名,IP地址和端口號。上述IP地址和端口可以根據實際需要指定。Java客戶端默認使用openwire協議,所以ActiveMQ服務地址為tcp://localhost:61616;目前C++客戶端僅支持stomp協議,所以ActiveMQ服務地址為tcp://localhost:61613。
5.3 分布式部署
分布式部署請參閱:http://activemq.apache.org/networks-of-brokers.html
5.4 監控ActiveMQ
本節將使用JXM和JXM控制台(JDK1.5控制台)監控ActiveMQ。
5.4.1 配置JXM
<broker brokerName="emv219" useJmx="true" xmlns="http://activemq.org/config/1.0">
…
<managementContext>
<managementContext connectorPort="1099" jmxDomainName="org.apache.activemq"/>
</managementContext>
…
</broker>
配置JXM步驟如下:
1. 設置broker標識的useJmx屬性為true;
2. 取消對managementContext標識的注釋(系統默認注釋managementContext標識),監控的默認端口為1099。
5.4.2 在Windows平台監控
進入%JAVA_HOME%/bin,雙擊jconsole.exe即出現如下畫面,在對話框中輸入ActiveMQ服務主機的地址,JXM的端口和主機登陸帳號。
6. 目前存在問題
6.1 C++客戶端丟失消息問題
ActiveMQ版本:ActiveMQ 4.1.1SNAPSHOT
C++客戶端版本:ActiveMQ CPP 1.1 Release
測試中發現,當C++客戶端異常退出時(即沒有正常調用close函數關閉連接),ActiveMQ並不能檢測到C++客戶端的連接已經中斷,這時如果向隊列中發送消息,那么第一條消息就會丟失,這時ActiveMQ才能檢測到這個連接是中斷的。
在ActiveMQ論壇反應此問題后,開發人員答復並建議使用CLIENT_ACKNOWLEDGE簽收模式。但是此模式會造成消息重復接收。
測試ActiveMQ 4.2SNAPSHOT時並未發現上述問題。
6.2 隊列消息堆積過多后有可能阻塞程序
默認activemq.xml中配置的內存是20M,這就意味着當消息堆積超過20M后,程序可能出現問題。在mial list中其他用戶對此問題的描述是:send方法會阻塞或拋出異常。ActiveMQ開發人員的答復:The memory model is different for ActiveMQ 4.1 in that for Queues, only small references to the Queue messages are held in memory. This means that the Queue depth can be considerably bigger than for ActiveMQ 3.2.x.However, our next major release (5.0 nee 4.2) has a more robust model in that Queue messages are paged in from storage only when space is available - hence Queue depth is now limited by how much disk space you have.
6.3 目前版本的C++客戶端僅支持stomp協議
目前版本的C++客戶端程序(ActiveMQ CPP 1.1 Release)僅支持stomp協議,因此傳輸消息的速度應該沒有使用openwire協議的Java客戶端快。ActiveMQ網站顯示不久將會有支持openwire協議的C++客戶端程序發布。
6.4 分布式部署問題
ActiveMQ版本:ActiveMQ 4.1.1SNAPSHOT和ActiveMQ 4.2SNAPSHOT
測試選用上述兩個未正式發布的版本,未選用正式發布的ActiveMQ 4.1.0 Release版本是因為此版本bug較多。
在測試中發現,如果重啟其中一台機器上的ActiveMQ,其他機器的ActiveMQ有可能會打印:
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:358)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:156)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:136)
at java.lang.Thread.run(Thread.java:595)
WARN TransportConnection - Unexpected extra broker info command received: BrokerInfo {commandId = 6, responseRequired = false, brokerId = ID:emv219n-33945-1174458770157-1:0, brokerURL = tcp://emv219n:61616, slaveBroker = false, masterBroker = false, faultTolerantConfiguration = false, networkConnection = false, duplexConnection = false, peerBrokerInfos = [], brokerName = emv219, connectionId = 0}.
INFO FailoverTransport - Transport failed, attempting to automatically reconnect due to: java.io.EOFException。
這時分布式的消息傳輸就會出現問題,此問題目前還沒找到原因。
7. 附錄
7.1 完整的Java客戶端例子
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* Hello world!
*/
public class App {
public static void main(String[] args) throws Exception {
thread(new HelloWorldProducer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
Thread.sleep(1000);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
Thread.sleep(1000);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldProducer(), false);
Thread.sleep(1000);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldConsumer(), false);
thread(new HelloWorldProducer(), false);
}
public static void thread(Runnable runnable, boolean daemon) {
Thread brokerThread = new Thread(runnable);
brokerThread.setDaemon(daemon);
brokerThread.start();
}
public static class HelloWorldProducer implements Runnable {
public void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a messages
String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
TextMessage message = session.createTextMessage(text);
// Tell the producer to send the message
System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
producer.send(message);
// Clean up
session.close();
connection.close();
}
catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
}
public static class HelloWorldConsumer implements Runnable, ExceptionListener {
public void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(this);
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message
Message message = consumer.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
} else {
System.out.println("Received: " + message);
}
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
public synchronized void onException(JMSException ex) {
System.out.println("JMS Exception occured. Shutting down client.");
}
}
}
7.2 完整的C++客戶端例子
#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Runnable.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Integer.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <iostream>
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::concurrent;
using namespace cms;
using namespace std;
class HelloWorldProducer : public Runnable {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
int numMessages;
bool useTopic;
public:
HelloWorldProducer( int numMessages, bool useTopic = false ){
connection = NULL;
session = NULL;
destination = NULL;
producer = NULL;
this->numMessages = numMessages;
this->useTopic = useTopic;
}
virtual ~HelloWorldProducer(){
cleanup();
}
virtual void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61613");
// Create a Connection
connection = connectionFactory->createConnection();
connection->start();
// Create a Session
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
// Create the destination (Topic or Queue)
if( useTopic ) {
destination = session->createTopic( "TEST.FOO" );
} else {
destination = session->createQueue( "TEST.FOO" );
}
// Create a MessageProducer from the Session to the Topic or Queue
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT );
// Create the Thread Id String
string threadIdStr = Integer::toString( Thread::getId() );
// Create a messages
string text = (string)"Hello world! from thread " + threadIdStr;
for( int ix=0; ix<numMessages; ++ix ){
TextMessage* message = session->createTextMessage( text );
// Tell the producer to send the message
printf( "Sent message from thread %s/n", threadIdStr.c_str() );
producer->send( message );
delete message;
}
}catch ( CMSException& e ) {
e.printStackTrace();
}
}
private:
void cleanup(){
// Destroy resources.
try{
if( destination != NULL ) delete destination;
}catch ( CMSException& e ) {}
destination = NULL;
try{
if( producer != NULL ) delete producer;
}catch ( CMSException& e ) {}
producer = NULL;
// Close open resources.
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch ( CMSException& e ) {}
try{
if( session != NULL ) delete session;
}catch ( CMSException& e ) {}
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch ( CMSException& e ) {}
connection = NULL;
}
};
class HelloWorldConsumer : public ExceptionListener,
public MessageListener,
public Runnable {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageConsumer* consumer;
long waitMillis;
bool useTopic;
public:
HelloWorldConsumer( long waitMillis, bool useTopic = false ){
connection = NULL;
session = NULL;
destination = NULL;
consumer = NULL;
this->waitMillis = waitMillis;
this->useTopic = useTopic;
}
virtual ~HelloWorldConsumer(){
cleanup();
}
virtual void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory =
new ActiveMQConnectionFactory( "tcp://127.0.0.1:61613" );
// Create a Connection
connection = connectionFactory->createConnection();
delete connectionFactory;
connection->start();
connection->setExceptionListener(this);
// Create a Session
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
// Create the destination (Topic or Queue)
if( useTopic ) {
destination = session->createTopic( "TEST.FOO" );
} else {
destination = session->createQueue( "TEST.FOO" );
}
// Create a MessageConsumer from the Session to the Topic or Queue
consumer = session->createConsumer( destination );
consumer->setMessageListener( this );
// Sleep while asynchronous messages come in.
Thread::sleep( waitMillis );
} catch (CMSException& e) {
e.printStackTrace();
}
}
// Called from the consumer since this class is a registered MessageListener.
virtual void onMessage( const Message* message ){
static int count = 0;
try
{
count++;
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
string text = textMessage->getText();
printf( "Message #%d Received: %s/n", count, text.c_str() );
} catch (CMSException& e) {
e.printStackTrace();
}
}
// If something bad happens you see it here as this class is also been
// registered as an ExceptionListener with the connection.
virtual void onException( const CMSException& ex ) {
printf("JMS Exception occured. Shutting down client./n");
}
private:
void cleanup(){
//*************************************************
// Always close destination, consumers and producers before
// you destroy their sessions and connection.
//*************************************************
// Destroy resources.
try{
if( destination != NULL ) delete destination;
}catch (CMSException& e) {}
destination = NULL;
try{
if( consumer != NULL ) delete consumer;
}catch (CMSException& e) {}
consumer = NULL;
// Close open resources.
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch (CMSException& e) {}
// Now Destroy them
try{
if( session != NULL ) delete session;
}catch (CMSException& e) {}
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch (CMSException& e) {}
connection = NULL;
}
};
int main(int argc, char* argv[]) {
std::cout << "=====================================================/n";
std::cout << "Starting the example:" << std::endl;
std::cout << "-----------------------------------------------------/n";
//============================================================
// set to true to use topics instead of queues
// Note in the code above that this causes createTopic or
// createQueue to be used in both consumer an producer.
//============================================================
bool useTopics = false;
HelloWorldProducer producer( 1000, useTopics );
HelloWorldConsumer consumer( 8000, useTopics );
// Start the consumer thread.
Thread consumerThread( &consumer );
consumerThread.start();
// Start the producer thread.
Thread producerThread( &producer );
producerThread.start();
// Wait for the threads to complete.
producerThread.join();
consumerThread.join();
std::cout << "-----------------------------------------------------/n";
std::cout << "Finished with the example, ignore errors from this"
<< std::endl
<< "point on as the sockets breaks when we shutdown."
<< std::endl;
std::cout << "=====================================================/n";
}