------------------------------------------------
開發一個JMS的基本步驟如下:
1.創建一個JMS connection factory
2.通過connection factory來創建JMS connection
3.啟動JMS connection
4.通過connection創建JMS session
5.創建JMS destination
6.創建JMS producer 或者創建JMS message,並設置destination
7.創建JMS consumer 或者注冊一個JMS message listener
8.發送或者接受JMS message
9.關閉所有的JMS資源(connection、session、producer、consumer等)
可以參考下圖:
非持久的Topic消息示例
對於非持久化的消息,當發送方發送消息的時候:
如果接收方不在線,則接收方永遠也收不到這些消息了
如果接收方在線,則接收方會收到這些消息
1、消息發送程序
1 import javax.jms.Connection; 2 import javax.jms.ConnectionFactory; 3 import javax.jms.Destination; 4 import javax.jms.MessageProducer; 5 import javax.jms.Session; 6 import javax.jms.TextMessage; 7 8 import org.apache.activemq.ActiveMQConnectionFactory; 9 10 /** 11 * 非持久化Topic消息發送者 12 * @author Administrator 13 * 14 */ 15 public class NoPersistenceSender { 16 public static void main(String[] args) throws Exception { 17 //創建一個JMS connection factory 18 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616"); 19 //通過connection factory來創建JMS connection 20 Connection connection = connectionFactory.createConnection(); 21 //啟動JMS connection 22 connection.start(); 23 //通過connection創建JMS session 24 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 25 //創建JMS destination 26 Destination destination = session.createTopic("noPersistenceTopic"); 27 //創建JMS producer 28 MessageProducer producer = session.createProducer(destination); 29 30 for(int i = 0;i < 10;i++){ 31 TextMessage message = session.createTextMessage("message-"+i); 32 //發送message 33 producer.send(message); 34 } 35 //關閉所有的JMS資源 36 session.commit(); 37 session.close(); 38 connection.close(); 39 } 40 }
運行完消息發送程序后,可以訪問192.168.1.81:8161
2、消息接收程序
對於非持久的Topic消息的接收需要注意以下幾點:
a.接收程序必須在線,然后消息發送方再發送消息,接收程序才能接收到消息
b.由於不知道消息發送方要發送多少條消息,所以利用while循環的方式來接收消息
c.如果接收程序不在線,此時發送程序發送了消息的話,則該消息將永遠不會被接收方收到。
1 import javax.jms.Connection; 2 import javax.jms.ConnectionFactory; 3 import javax.jms.Destination; 4 import javax.jms.Message; 5 import javax.jms.MessageConsumer; 6 import javax.jms.Session; 7 import javax.jms.TextMessage; 8 9 import org.apache.activemq.ActiveMQConnectionFactory; 10 11 /** 12 * 非持久化Topic消息接收者 13 * @author Administrator 14 * 15 */ 16 public class NoPersistenceReceiver { 17 public static void main(String[] args) throws Exception { 18 //創建一個JMS connection factory 19 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616"); 20 //通過connection factory來創建JMS connection 21 Connection connection = connectionFactory.createConnection(); 22 //啟動JMS connection 23 connection.start(); 24 //通過connection創建JMS session 25 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 26 //創建JMS destination 27 Destination destination = session.createTopic("noPersistenceTopic"); 28 //創建JMS consumer 29 MessageConsumer consumer = session.createConsumer(destination); 30 31 Message message = consumer.receive(); 32 while(message != null){ 33 TextMessage txtMsg = (TextMessage)message; 34 System.out.println("收到消息:"+txtMsg.getText()); 35 message = consumer.receive(); 36 } 37 //關閉所有的JMS資源 38 session.commit(); 39 session.close(); 40 connection.close(); 41 } 42 }
運行結果:
持久的Topic消息示例
1.消息發送程序
對於持久的Topic消息的發送方需要注意以下幾點:
a.要用持久化訂閱,發送消息者要用DeliveryMode.PERSISTENT模式來發送
1 import javax.jms.Connection; 2 import javax.jms.ConnectionFactory; 3 import javax.jms.DeliveryMode; 4 import javax.jms.Destination; 5 import javax.jms.MessageProducer; 6 import javax.jms.Session; 7 import javax.jms.TextMessage; 8 9 import org.apache.activemq.ActiveMQConnectionFactory; 10 11 /** 12 * 持久化Topic消息發送者 13 * @author Administrator 14 */ 15 public class PersistenceSender { 16 public static void main(String[] args) throws Exception { 17 //創建一個JMS connection factory 18 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616"); 19 //通過connection factory來創建JMS connection 20 Connection connection = connectionFactory.createConnection(); 21 //通過connection創建JMS session 22 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 23 //創建JMS destination 24 Destination destination = session.createTopic("PersistenceTopic"); 25 //創建JMS producer 26 MessageProducer producer = session.createProducer(destination); 27 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 28 //啟動JMS connection 29 connection.start(); 30 for(int i = 0;i < 10;i++){ 31 TextMessage message = session.createTextMessage("message-"+i); 32 //發送message 33 producer.send(message); 34 } 35 //關閉所有的JMS資源 36 session.commit(); 37 session.close(); 38 connection.close(); 39 } 40 }
2.消息接收程序
對於持久的Topic消息的接收方需要注意以下幾點:
a.需要在連接上設置消費者id,用來識別消費者
b.需要創建TopicSubscriber來訂閱
c.一定要先運行一次該消費者程序,等於向消費服務中間件注冊這個消費者,然后再運行消息發送者來發送消息,這樣的話,無論消費者是否在線都會收到消息,如果不在線的話,則下次連接的時候會把沒有收過的消息都接收下來。
1 import javax.jms.Connection; 2 import javax.jms.ConnectionFactory; 3 import javax.jms.Message; 4 import javax.jms.Session; 5 import javax.jms.TextMessage; 6 import javax.jms.Topic; 7 import javax.jms.TopicSubscriber; 8 9 import org.apache.activemq.ActiveMQConnectionFactory; 10 11 /** 12 * 持久化Topic消息接收者 13 * @author Administrator 14 * 15 */ 16 public class PersistenceReceiver { 17 public static void main(String[] args) throws Exception { 18 //創建一個JMS connection factory 19 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://120.76.123.81:61616"); 20 //通過connection factory來創建JMS connection 21 Connection connection = connectionFactory.createConnection(); 22 connection.setClientID("con1"); 23 //通過connection創建JMS session 24 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 25 //創建JMS destination 26 Topic destination = session.createTopic("PersistenceTopic"); 27 //創建JMS consumer 28 TopicSubscriber ts = session.createDurableSubscriber(destination, "TT"); 29 //啟動JMS connection 30 connection.start(); 31 Message message = ts.receive(); 32 while(message != null){ 33 TextMessage txtMsg = (TextMessage)message; 34 session.commit(); 35 System.out.println("收到消息:"+txtMsg.getText()); 36 message = ts.receive(1000L); 37 } 38 //關閉所有的JMS資源 39 session.close(); 40 connection.close(); 41 } 42 }
關於持久化和非持久化消息
有兩種方式指定傳送模式:
1.使用setDeliveryMode方法,這樣所有的消息都采用此傳送模式;如producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
2.使用send方法為每條消息設置傳送模式
持久化消息
這是ActiveMQ的默認傳送模式,此模式保證這些消息只被傳送一次和成功使用一次。對於這些消息,可靠性是優先考慮的因素。可靠性的另一個重要方面是確保持久性消息傳送至目標后,消息服務在向消費者傳送它們之前不會丟失這些消息。
這意味着在持久性消息傳送至目標時,消息服務將其放入持久性數據存儲。如果消息服務由於某種原因導致失敗,它可以恢復此消息並將此消息傳送至相應的消費者。雖然這樣增加了消息傳送的開銷,但是卻增加了可靠性。
非持久化消息
保證這些消息最多被傳送一次。對於這些消息,可靠性並非主要的考慮因素。此模式並不要求持久性的數據存儲,也不保證消息服務由於某種原因導致失敗后消息不會丟失。