1.JMS簡介
JMS的全稱是Java Message Service,即Java消息服務。它主要用於在生產者和消費者之間進行消息傳遞,生產者負責產生消息,而消費者負責接收消息。把它應用到實際的業務需求中可以在特定的時候利用生產者生成消息,並進行發送,對應的消費者在接收到對應的消息后去完成對應的業務邏輯。對於消息的傳遞有兩種類型,一種是點對點的,即一個生產者和一個消費者一一對應;另一種是發布/訂閱模式,即一個生產者產生消息並進行發送后,可以由多個消費者進行接收。
JMS編程模型
(1) ConnectionFactory
創建Connection對象的工廠,針對兩種不同的jms消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查找ConnectionFactory對象。
(2) Destination
Destination的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對於消息生產者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對於消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。所以,Destination實際上就是兩種類型的對象:Queue、Topic可以通過JNDI來查找Destination。
(3) Connection
Connection表示在客戶端和JMS系統之間建立的鏈接(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。
(4) Session
Session是操作消息的接口。可以通過session創建生產者、消費者、消息等。Session提供了事務的功能。當需要使用session發送/接收多個消息時,可以將這些發送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。
(5) 消息的生產者
消息生產者由Session創建,並用於將消息發送到Destination。同樣,消息生產者分兩種類型:QueueSender和TopicPublisher。可以調用消息生產者的方法(send或publish方法)發送消息。
(6) 消息消費者
消息消費者由Session創建,用於接收被發送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創建。當然,也可以session的creatDurableSubscriber方法來創建持久化的訂閱者。
(7) MessageListener
消息監聽器。如果注冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。
2.ActiveMQ簡介
ActiveMQ是Apache軟件基金下的一個開源軟件,它遵循JMS規范(Java Message Service),是消息驅動中間件軟件(MOM)。它為企業消息傳遞提供高可用,出色性能,可擴展,穩定和安全保障。ActiveMQ使用Apache許可協議。因此,任何人都可以使用和修改它而不必反饋任何改變。這對於商業上將ActiveMQ用在重要用途的人尤為關鍵。MOM的工作是在分布式的各應用之間調度事件和消息,使之到達指定的接收者。所以高可用,高性能,高可擴展性尤為關鍵。
- ActiveMQ特性
⒈支持多種語言客戶端,如:Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議有 OpenWire,Stomp REST,WS Notification,XMPP,AMQP。
⒉ 完全支持JMS1.1和J2EE1.4規范,它們包括同步和異步消息傳遞,一次和只有一次的消息傳遞,對於預訂者的持久消息等。依附於JMS規范意味着,不論JMS消息提供者是誰,同樣的基本特性(持久化,XA消息,事務)都是有效的。
⒊ 對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去。
⒋ 通過了常見J2EE服務器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業服務器上。
⒌ ActiveMQ提供各種連接選擇,包括HTTP,HTTPS,IP多點傳送,SSL,STOMP,TCP,UDP,XMPP等。大量的連接協議支持使之具有更好的靈活性。很多現有的系統使用一種特定協議並且不能改變,所以一個支持多種協議的消息平台降低了使用的門檻。雖然連接很重要,但是和其他容器集成也同樣重要。
6.ActiveMQ提供多種持久性方案可供選擇,也可以完全按自己需求定制驗證和授權。例如,ActiveMQ通過KahaDB提供自己的超快速消息持久方案(ultra-fast message persistence),但也支持標准的JDBC方案。ActiveMQ可以通過配置文件提供簡單的驗證和授權,也提供標准的JAAS登陸模塊。
7.ActiveMQ是為開發者設計的。它並不需要專門的管理工具,因為它提供各種易用且強大的管理特性。有很多方法去監控ActiveMQ的各個方面,可以通過JMX使用JConsole或ActiveMQ web console;可以運行ActiveMQ消息報告;可以用命令行腳本;可以通過日志。
8.代理器集群(Broker clustering)----為了利於擴展,多個ActiveMQ broker能夠聯合工作。這個方式就是network of brokers並且能支持多種拓撲結構;支持客戶端-服務器,點對點。
9.支持Ajax, 支持與Axis的整合
- ActiveMQ優勢
1.與OpenJMS、JbossMQ等開源jms provider相比,ActiveMQ有Apache的支持,持續發展的優勢明顯。
2.消息處理速度很快
3.提高系統資源的利用率,主要是任務的派發不是24小時平均的,而是高峰時期任務量很多,比如1秒1000多個,有的時候很低,比如十幾秒鍾才來一個。應用服務通過JMS隊列一個一個的取任務,做完一個再領一個,使系統資源的運用趨於平均。比如ActiveMQ在賽揚(2.40GHz)機器上能夠達到2000/s,消息大小為1-2k。好一些的服務器可以達到2萬以上/秒。
3.ActiveMQ安裝
ActiveMQ在linux服務上安裝操作如下:
1.在官網下載activemq安裝文件。地址:http://activemq.apache.org/download.html
2.上傳下載的tar.gz安裝文件到linux服務器上,並解壓到指定目錄:如 tar -xf apache-activemq-5.15.2-bin.tar.gz
3.運行activemq,進入到解壓的 apache-activemq-5.15.2/bin目錄,執行命令:activemq start
4.開放端口8161,61616,保證端口可訪問。
運行activemq截圖如下:
本機訪問啟動成功的activemq截圖如下:
4.ActiveMQ類別及開發流程
1)、Point-to-Point (點對點)消息模式開發流程
1、生產者(producer)開發流程:
1.1 創建Connection: 根據url,user和password創建一個jms Connection。
1.2 創建Session: 在connection的基礎上創建一個session,同時設置是否支持事務和ACKNOWLEDGE標識。
1.3 創建Destination對象: 需指定其對應的主題(subject)名稱,producer和consumer將根據subject來發送/接收對應的消息。
1.4 創建MessageProducer: 根據Destination創建MessageProducer對象,同時設置其持久模式。
1.5 發送消息到隊列(Queue): 封裝Message消息,使用MessageProducer的send方法將消息發送出去。
2、消費者(consumer)開發流程:
2.1 實現MessageListener接口: 消費者類必須實現MessageListener接口,然后在onMessage()方法中監聽消息的到達並處理。
2.2 創建Connection: 根據url,user和password創建一個jms Connection,如果是durable模式,還需要給connection設置一個clientId。
2.3 創建Session和Destination: 與ProducerTool.java中的流程類似,不再贅述。
2.4 創建replyProducer【可選】:可以用來將消息處理結果發送給producer。
2.5 創建MessageConsumer: 根據Destination創建MessageConsumer對象。
2.6 消費message: 在onMessage()方法中接收producer發送過來的消息進行處理,並可以通過replyProducer反饋信息給producer
樣例代碼:
在消息生產者中定義一個隊列,destination_request,提供消息,同時定義一個監聽消息的隊列擁有接受消費者回復的消息,destination_response。

1 package com.tiantian.springintejms.test; 2 3 import com.tiantian.springintejms.entity.Email; 4 import com.tiantian.springintejms.entity.TestMqBean; 5 import com.tiantian.springintejms.service.ProducerService; 6 import org.apache.activemq.ActiveMQConnectionFactory; 7 import org.junit.Test; 8 import org.junit.runner.RunWith; 9 import org.springframework.beans.factory.annotation.Autowired; 10 import org.springframework.beans.factory.annotation.Qualifier; 11 import org.springframework.test.context.ContextConfiguration; 12 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 13 14 import javax.jms.*; 15 16 @RunWith(SpringJUnit4ClassRunner.class) 17 public class ProducerSendTest { 18 19 @Test 20 public static void main(String[] args) { 21 ConnectionFactory connectionFactory; 22 Connection connection; 23 Session session; 24 Destination destination_request,destination_response; 25 MessageProducer producer; 26 MessageConsumer consumer; 27 connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.210.128:61616"); 28 try { 29 connection = connectionFactory.createConnection(); 30 connection.start(); 31 //第一個參數是是否是事務型消息,設置為true,第二個參數無效 32 //第二個參數是 33 //Session.AUTO_ACKNOWLEDGE為自動確認,客戶端發送和接收消息不需要做額外的工作。異常也會確認消息,應該是在執行之前確認的 34 //Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到消息后,必須調用javax.jms.Message的acknowledge方法。jms服務器才會刪除消息。可以在失敗的 35 //時候不確認消息,不確認的話不會移出隊列,一直存在,下次啟動繼續接受。接收消息的連接不斷開,其他的消費者也不會接受(正常情況下隊列模式不存在其他消費者) 36 //DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;而且允許重復確認。在需要考慮資源使用時,這種模式非常有效。 37 //待測試 38 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 39 destination_request = session.createQueue("request-queue"); 40 destination_response = session.createQueue("response-queue"); 41 producer = session.createProducer(destination_request); 42 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 43 44 consumer = session.createConsumer(destination_response); 45 //優先級不能影響先進先出。。。那這個用處究竟是什么呢呢呢呢 46 TestMqBean bean = new TestMqBean(); 47 bean.setAge(13); 48 for (int i = 0; i < 10; i++) { 49 bean.setName("send to data -" + i); 50 producer.send(session.createObjectMessage(bean)); 51 } 52 producer.close(); 53 System.out.println("消息發送成功..."); 54 55 consumer.setMessageListener(new MessageListener() { 56 @Override 57 public void onMessage(Message message) { 58 try { 59 if (null != message) { 60 TextMessage textMsg = (TextMessage) message; 61 System.out.println("收到回饋消息" +textMsg.getText()); 62 } 63 } catch (Exception e) { 64 // TODO: handle exception 65 } 66 } 67 }); 68 69 } catch (JMSException e) { 70 e.printStackTrace(); 71 } 72 } 73 }
在消息消費者中定義一個隊列,destination_request,用於接受消息,同時定義一個回復收到消息的隊列回復生產者已經收到消息,destination_response。

1 package com.tiantian.springintejms.test; 2 3 import com.tiantian.springintejms.entity.TestMqBean; 4 import org.apache.activemq.ActiveMQConnectionFactory; 5 import org.junit.Test; 6 import org.junit.runner.RunWith; 7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 8 9 import javax.jms.*; 10 import java.util.Date; 11 12 @RunWith(SpringJUnit4ClassRunner.class) 13 public class ConsumerReceiveTest { 14 15 @Test 16 public static void main(String[] args) { 17 ConnectionFactory connectionFactory; 18 // Connection :JMS 客戶端到JMS Provider 的連接 19 Connection connection = null; 20 // Session: 一個發送或接收消息的線程 21 final Session session; 22 // Destination :消息的目的地;消息發送給誰. 23 Destination destination_request,destination_response; 24 // 消費者,消息接收者 25 MessageConsumer consumer; 26 //回復接收到的消息 27 final MessageProducer producer; 28 connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.210.128:61616"); 29 try { 30 // 構造從工廠得到連接對象 31 connection = connectionFactory.createConnection(); 32 // 啟動 33 connection.start(); 34 // 獲取操作連接 35 //這個最好還是有事務 36 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 37 // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置 38 destination_request = session.createQueue("request-queue"); 39 destination_response = session.createQueue("response-queue"); 40 consumer = session.createConsumer(destination_request); 41 42 producer= session.createProducer(destination_response); 43 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 44 45 consumer.setMessageListener(new MessageListener() { 46 @Override 47 public void onMessage(Message message) { 48 try { 49 TestMqBean bean = (TestMqBean) ((ObjectMessage) message).getObject(); 50 System.out.println(bean); 51 if (null != message) { 52 System.out.println("收到消息" + bean.getName()); 53 Message textMessage = session.createTextMessage("已經成功收到消息,現在開始回復"+new Date().toString()); 54 producer.send(textMessage); 55 } 56 } catch (Exception e) { 57 // TODO: handle exception 58 } 59 } 60 }); 61 } catch (Exception e) { 62 e.printStackTrace(); 63 } 64 } 65 }
消息消費者收到消息,並打印出來,同時發送回復消息。截圖如下:
消息生產者生產消息,同時接受到消費者回復的消息並打印出來。截圖如下:
2)、Publisher/Subscriber(發布/訂閱者)消息模式開發流程
1、訂閱者(Subscriber)開發流程:
1.1 實現MessageListener接口: 在onMessage()方法中監聽發布者發出的消息隊列,並做相應處理。
1.2 創建Connection: 根據url,user和password創建一個jms Connection。
1.3 創建Session: 在connection的基礎上創建一個session,同時設置是否支持事務和ACKNOWLEDGE標識。
1.4 創建Topic: 創建2個Topic, topictest.messages用於接收發布者發出的消息,topictest.control 用於向發布者發送消息,實現雙方的交互。
1.5 創建consumer和producer對象:根據topictest.messages創建consumer,根據topictest.control創建 producer。
1.6 接收處理消息:在onMessage()方法中,對收到的消息進行處理,可直接簡單在本地顯示消息,或者根據消息內容不同處理對應的業務邏輯(比如:數據庫更新、文件操作等等),並且可以使用producer對象將處理結果返回給發布者。
2、發布者(Publisher)開發流程:
2.1 實現MessageListener接口:在onMessage()方法中接收訂閱者的反饋消息。
2.2 創建Connection: 根據url,user和password創建一個jms Connection。
2.3 創建Session: 在connection的基礎上創建一個session,同時設置是否支持事務和ACKNOWLEDGE標識。
2.4 創建Topic: 創建2個Topic,topictest.messages用於向訂閱者發布消息,topictest.control用於接 收訂閱者反饋的消息。這2個topic與訂閱者開發流程中的topic是一一對應的。
2.5 創建consumer和producer對象: 根據topictest.messages創建publisher; 根據topictest.control 創建consumer,同時監聽訂閱者反饋的消息。
2.6 給所有訂閱者發送消息,並接收反饋消息。 注:可同時運行多個訂閱者測試查看此模式效果 。
樣例代碼:
消息發布者發布消息,定義一個主題example.A

1 package com.tiantian.springintejms.test; 2 3 import com.tiantian.springintejms.entity.TestMqBean; 4 import org.apache.activemq.ActiveMQConnectionFactory; 5 import org.junit.Test; 6 import org.junit.runner.RunWith; 7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 8 9 import javax.jms.*; 10 11 @RunWith(SpringJUnit4ClassRunner.class) 12 public class TopicProducerSendTest { 13 14 private static String user = "admin"; 15 private static String password = "admin"; 16 private static String url = "tcp://192.168.210.128:61616"; 17 18 public static void main(String[] args)throws Exception { 19 // ConnectionFactory :連接工廠,JMS 用它創建連接 20 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); 21 // Connection :JMS 客戶端到JMS Provider 的連接 22 Connection connection = connectionFactory.createConnection(); 23 // Connection 啟動 24 connection.start(); 25 System.out.println("Connection is start..."); 26 // Session: 一個發送或接收消息的線程 27 Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); 28 // Topicr :消息的目的地;消息發送給誰. 29 Topic destination = session.createTopic("example.A"); 30 // MessageProducer:消息發送者 31 MessageProducer producer = session.createProducer(destination); 32 // 設置不持久化,此處學習,實際根據項目決定 33 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 34 // 構造消息,此處寫死,項目就是參數,或者方法獲取 35 sendMessage(session, producer); 36 session.commit(); 37 38 connection.close(); 39 System.out.println("send text ok."); 40 } 41 42 public static void sendMessage(Session session, MessageProducer producer) 43 throws Exception { 44 for (int i = 1; i <= 10; i++) {//有限制 45 TextMessage message = session.createTextMessage("ActiveMq 發送的消息" + i); 46 // 發送消息到目的地方 47 System.out.println("發送消息:" + "ActiveMq 發送的消息" + i); 48 producer.send(message); 49 } 50 } 51 52 }
消息訂閱者接收消息,定義一個與發布者相對應的主題example.A。

1 package com.tiantian.springintejms.test; 2 3 import org.apache.activemq.ActiveMQConnectionFactory; 4 import org.junit.Test; 5 import org.junit.runner.RunWith; 6 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 7 8 import javax.jms.*; 9 10 @RunWith(SpringJUnit4ClassRunner.class) 11 public class TopicSubscriberTest { 12 private static String user = "admin"; 13 private static String password = "admin"; 14 private static String url = "tcp://192.168.210.128:61616"; 15 public static void main(String[] args) throws Exception{ 16 // ConnectionFactory :連接工廠,JMS 用它創建連接 17 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url); 18 // Connection :JMS 客戶端到JMS Provider 的連接 19 Connection connection = connectionFactory.createConnection(); 20 connection.start(); 21 // Session: 一個發送或接收消息的線程 22 final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 23 // Destination :消息的目的地;消息發送給誰. 24 Topic destination=session.createTopic("example.A"); 25 // 消費者,消息接收者 26 MessageConsumer consumer = session.createConsumer(destination); 27 consumer.setMessageListener(new MessageListener(){//有事務限制 28 @Override 29 public void onMessage(Message message) { 30 try { 31 TextMessage textMessage=(TextMessage)message; 32 System.out.println("接收到消息:"+textMessage.getText()); 33 } catch (JMSException e1) { 34 e1.printStackTrace(); 35 } 36 try { 37 session.commit(); 38 } catch (JMSException e) { 39 e.printStackTrace(); 40 } 41 } 42 }); 43 } 44 }
消息發布者發布消息,並打印截圖如下:
消息訂閱者接受消息並打印截圖如下:(消息訂閱者需在發布者之前啟動,可保證能取到訂閱的消息)
3)、延遲消息發送
有時候不希望消息馬上被broker投遞出去,而是想要消息停留一段時間以后發給消費者,或者想讓消息每隔一定時間投遞一次,一共投遞指定的次數。類似這種需求,ActiveMQ提供了一種broker端消息定時調度機制。開發者使用時只需要把幾個描述消息定時調度方式的參數作為屬性添加到消息,broker端的調度器就會按照定義的行為去處理消息。ActiveMQ定義調度消息參數為:
Property name | type | description |
---|---|---|
AMQ_SCHEDULED_DELAY | long | 延遲投遞的時間 |
AMQ_SCHEDULED_PERIOD | long | 重復投遞的時間間隔 |
AMQ_SCHEDULED_REPEAT | int | 重復投遞次數 |
AMQ_SCHEDULED_CRON | String | Cron表達式 |
//延遲60秒發送消息 MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); long time = 60 * 1000; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); producer.send(message);
//基於CRON表達式定時投遞消息 MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); producer.send(message);
5.源碼下載
在Git上面下載:https://github.com/wuya11/SpringinteJMSonActiveMQ