1、什么是ActiveMQ
1 ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。 2 主要特點: 3 1). 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP 4 2). 完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)。 5 3.) 對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去,而且也支持Spring2.0的特性。 6 4.) 通過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業服務器上。 7 5). 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA。 8 6). 支持通過JDBC和journal提供高速的消息持久化。 9 7). 從設計上保證了高性能的集群,客戶端-服務器,點對點。 10 8). 支持Ajax。 11 9). 支持與Axis的整合。 12 10). 可以很容易得調用內嵌JMS provider,進行測試。
2、JMS介紹:
1 1)、JMS的全稱是Java Message Service,即Java消息服務。用於在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。 2 3 2)、它主要用於在生產者和消費者之間進行消息傳遞,生產者負責產生消息,而消費者負責接收消息。把它應用到實際的業務需求中的話我們可以在特定的時候利用生產者生成一消息,並進行發送,對應的消費者在接收到對應的消息后去完成對應的業務邏輯。
3、ActiveMQ的兩種消息形式。
1 1)、對於消息的傳遞有兩種類型。 2 a)、一種是點對點的,即一個生產者和一個消費者一一對應。 3 b)、另一種是發布/訂閱模式,即一個生產者產生消息並進行發送后,可以由多個消費者進行接收。 4 5 2)、JMS定義了五種不同的消息正文格式,以及調用的消息類型,允許你發送並接收以一些不同形式的數據,提供現有消息格式的一些級別的兼容性。 6 a)、 StreamMessage -- Java原始值的數據流。 7 b)、 MapMessage--一套名稱-值對。 8 c)、 TextMessage--一個字符串對象。 9 d)、 ObjectMessage--一個序列化的 Java對象。 10 e)、 BytesMessage--一個字節的數據流。
4、ActiveMQ的安裝。官方網址:http://activemq.apache.org/
由於ActiveMQ是java開發的,所以需要先安裝jdk(注意:安裝jdk,需要jdk1.7以上版本)的哦。這里使用的是apache-activemq-5.12.0-bin.tar.gz版本的。
開始進行解壓縮操作。
1 [root@localhost package]# ls 2 apache-activemq-5.12.0-bin.tar.gz apache-activemq-5.12.0-bin.zip apache-tomcat-7.0.47.tar.gz IK Analyzer 2012FF_hf1 IK Analyzer 2012FF_hf1.rar jdk-7u55-linux-i586.tar.gz solr-4.10.3.tgz.tgz zookeeper-3.4.6.tar.gz 3 [root@localhost package]# tar -zxvf apache-activemq-5.12.0-bin.tar.gz -C /home/hadoop/soft/
解壓縮完以后進入bin目錄。開始進行啟動操作。
啟動:[root@localhost bin]# ./activemq start
停止:[root@localhost bin]# ./activemq stop
查看狀態:[root@localhost bin]# ./activemq status
1 [root@localhost soft]# cd apache-activemq-5.12.0/ 2 [root@localhost apache-activemq-5.12.0]# ls 3 activemq-all-5.12.0.jar bin conf data docs examples lib LICENSE NOTICE README.txt webapps webapps-demo 4 [root@localhost apache-activemq-5.12.0]# ll 5 total 9384 6 -rwxr-xr-x. 1 root root 9524668 Aug 10 2015 activemq-all-5.12.0.jar 7 drwxr-xr-x. 5 root root 4096 Sep 15 00:39 bin 8 drwxr-xr-x. 2 root root 4096 Sep 15 00:39 conf 9 drwxr-xr-x. 2 root root 4096 Sep 15 00:39 data 10 drwxr-xr-x. 2 root root 4096 Sep 15 00:39 docs 11 drwxr-xr-x. 8 root root 4096 Sep 15 00:39 examples 12 drwxr-xr-x. 6 root root 4096 Sep 15 00:39 lib 13 -rw-r--r--. 1 root root 40580 Aug 10 2015 LICENSE 14 -rw-r--r--. 1 root root 3334 Aug 10 2015 NOTICE 15 -rw-r--r--. 1 root root 2610 Aug 10 2015 README.txt 16 drwxr-xr-x. 7 root root 4096 Sep 15 00:39 webapps 17 drwxr-xr-x. 3 root root 4096 Sep 15 00:39 webapps-demo 18 [root@localhost apache-activemq-5.12.0]# cd bin/ 19 [root@localhost bin]# ls 20 activemq activemq-diag activemq.jar env linux-x86-32 linux-x86-64 macosx wrapper.jar 21 [root@localhost bin]# ./activemq start 22 INFO: Loading '/home/hadoop/soft/apache-activemq-5.12.0//bin/env' 23 INFO: Using java '/home/hadoop/soft/jdk1.7.0_55/bin/java' 24 INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details 25 INFO: pidfile created : '/home/hadoop/soft/apache-activemq-5.12.0//data/activemq.pid' (pid '9318') 26 [root@localhost bin]# ./activemq status 27 INFO: Loading '/home/hadoop/soft/apache-activemq-5.12.0//bin/env' 28 INFO: Using java '/home/hadoop/soft/jdk1.7.0_55/bin/java' 29 ActiveMQ is running (pid '9318') 30 [root@localhost bin]#
然后你可以訪問后台管理界面,賬號和密碼默認都是admin的。訪問地址:http://192.168.110.142:8161/admin
Home是當前的歡迎頁,Queues是點到點形式,Topics是發布訂閱模式,Subscribers話題消息的發布與訂閱,Connections客戶端鏈接,Network當前網絡的鏈接狀態,Scheduled計划任務,Send可以測試發送消息。
5、ActiveMQ的使用方法,JMS消息發送模式。
注意:
1)、在點對點或隊列模型下,一個生產者向一個特定的隊列發布消息,一個消費者從該隊列中讀取消息。這里,生產者知道消費者的隊列,並直接將消息發送到消費者的隊列。這種模式被概括為:只有一個消費者將獲得消息。生產者不需要在接收者消費該消息期間處於運行狀態,接收者也同樣不需要在消息發送時處於運行狀態。每一個成功處理的消息都由接收者簽收。
2)、發布者/訂閱者模型支持向一個特定的消息主題發布消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發布者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。這種模式被概括為:多個消費者可以獲得消息,在發布者和訂閱者之間存在時間依賴性。發布者需要建立一個訂閱(subscription),以便客戶能夠購訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連接時發布的消息將在訂閱者重新連接時重新發布。
6、JMS應用程序接口。
1 1)、ConnectionFactory 接口(連接工廠) 2 用戶用來創建到JMS提供者的連接的被管對象。JMS客戶通過可移植的接口訪問連接,這樣當下層的實現改變時,代碼不需要進行修改。 管理員在JNDI名字空間中配置連接工廠,這樣,JMS客戶才能夠查找到它們。根據消息類型的不同,用戶將使用隊列連接工廠,或者主題連接工廠。 3 2)、Connection 接口(連接) 4 連接代表了應用程序和消息服務器之間的通信鏈路。在獲得了連接工廠后,就可以創建一個與JMS提供者的連接。根據不同的連接類型,連接允許用戶創建會話,以發送和接收隊列和主題到目標。 5 3)、Destination 接口(目標) 6 目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發布和接收的地點,或者是隊列,或者是主題。JMS管理員創建這些對象,然后用戶通過JNDI發現它們。和連接工廠一樣,管理員可以創建兩種類型的目標,點對點模型的隊列,以及發布者/訂閱者模型的主題。 7 4)、MessageConsumer 接口(消息消費者) 8 由會話創建的對象,用於接收發送到目標的消息。消費者可以同步地(阻塞模式),或異步(非阻塞)接收隊列和主題類型的消息。 9 5)、MessageProducer 接口(消息生產者) 10 由會話創建的對象,用於發送消息到目標。用戶可以創建某個目標的發送者,也可以創建一個通用的發送者,在發送消息時指定目標。 11 6)、Message 接口(消息) 12 是在消費者和生產者之間傳送的對象,也就是說從一個應用程序創送到另一個應用程序。一個消息有三個主要部分: 13 消息頭(必須):包含用於識別和為消息尋找路由的操作設置。 14 一組消息屬性(可選):包含額外的屬性,支持其他提供者和用戶的兼容。可以創建定制的字段和過濾器(消息選擇器)。 15 一個消息體(可選):允許用戶創建五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)。 16 消息接口非常靈活,並提供了許多方式來定制消息的內容。 17 7)、Session 接口(會話) 18 表示一個單線程的上下文,用於發送和接收消息。由於會話是單線程的,所以消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支持事務。如果用戶選擇了事務支持,會話上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務之前,用戶可以使用回滾操作取消這些消息。一個會話允許用戶創建消息生產者來發送消息,創建消息消費者來接收消息。
7、如何使用java操作activeMQ呢,把ActiveMQ依賴的jar包添加到工程中。
使用maven工程,則添加jar包的依賴:
1 <dependency> 2 <groupId>org.apache.activemq</groupId> 3 <artifactId>activemq-all</artifactId> 4 <version>5.11.2</version> 5 </dependency>
然后你就可以愉快得開發了。是不是很開森呢。
8、ActiveMQ點對點模式(point-to-point)。
ActiveMq的點對點生產者。
1 package com.taotao.activemq; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory; 5 import javax.jms.JMSException; 6 import javax.jms.MessageProducer; 7 import javax.jms.Queue; 8 import javax.jms.Session; 9 import javax.jms.TextMessage; 10 11 import org.apache.activemq.ActiveMQConnectionFactory; 12 import org.apache.activemq.command.ActiveMQTextMessage; 13 import org.junit.Test; 14 15 /** 16 * 17 * @ClassName: ActiveMqMain.java 18 * @author: biehl 19 * @since: 2019年9月15日 下午4:44:57 20 * @Copyright: ©2019 biehl 版權所有 21 * @version: 0.0.1 22 * @Description: 23 */ 24 public class ActiveMqMain { 25 26 // activeMq得點對點生產者 27 @Test 28 public void queueProducer() throws JMSException { 29 // 1、創建一個連接工廠對象ConnectionFactory對象。需要指定mq服務得ip以及端口號61616。 30 String brokerURL = "tcp://192.168.110.142:61616"; 31 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); 32 // 2、使用ConnectionFactory創建一個連接Connection對象。 33 Connection connection = connectionFactory.createConnection(); 34 // 3、開啟連接。調用Connection對象得start方法。 35 connection.start(); 36 // 4、使用Connection對象創建一個Session對象。 37 // 參數一是否開啟事務,一般不開啟事務,保證數據得最終一致性,可以使用消息隊列實現數據最終一致性。如果第一個參數為true,第二個參數自動忽略 38 // 參數二是消息得應答模式。兩種模式,自動應答和手動應答。一般使用自動應答。 39 boolean transacted = false;// 不開啟事務 40 int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;// 1 41 Session session = connection.createSession(transacted, acknowledgeMode); 42 // 5、使用Session對象創建一個Destination對象。兩種形式queue、topic。現在應該使用queue。 43 String queueName = "queue1";// 當前消息隊列得名稱 44 Queue queue = session.createQueue(queueName); 45 // 6、使用Session對象創建一個Producer對象。 46 // interface Queue extends Destination。destination是一個接口。 47 MessageProducer producer = session.createProducer(queue); 48 // 7、創建一個TextMessage對象。 49 // 創建TextMessage方式一 50 // TextMessage textMessage = new ActiveMQTextMessage(); 51 // textMessage.setText("hello activeMq......"); 52 // 方式二 53 TextMessage textMessage = session.createTextMessage("hello activeMq......"); 54 // 8、發送消息。 55 producer.send(textMessage); 56 // 9、關閉資源。 57 producer.close();// 關閉producer 58 session.close();// 關閉session 59 connection.close();// 關閉connection 60 } 61 62 }
ActiveMQ的點對點消息生產成功以后,可以在ActiveMQ提供的web界面可以看到一些信息。
activeMq的點對點消費者。
1 package com.taotao.activemq; 2 3 import java.io.IOException; 4 5 import javax.jms.Connection; 6 import javax.jms.ConnectionFactory; 7 import javax.jms.JMSException; 8 import javax.jms.Message; 9 import javax.jms.MessageConsumer; 10 import javax.jms.MessageListener; 11 import javax.jms.MessageProducer; 12 import javax.jms.Queue; 13 import javax.jms.Session; 14 import javax.jms.TextMessage; 15 16 import org.apache.activemq.ActiveMQConnectionFactory; 17 import org.apache.activemq.command.ActiveMQTextMessage; 18 import org.junit.Test; 19 20 /** 21 * 22 * @ClassName: ActiveMqMain.java 23 * @author: biehl 24 * @since: 2019年9月15日 下午4:44:57 25 * @Copyright: ©2019 biehl 版權所有 26 * @version: 0.0.1 27 * @Description: 28 */ 29 public class ActiveMqMain { 30 31 // activeMq的點對點生產者 32 @Test 33 public void queueProducer() throws JMSException { 34 // 1、創建一個連接工廠對象ConnectionFactory對象。需要指定mq服務得ip以及端口號61616。 35 String brokerURL = "tcp://192.168.110.142:61616"; 36 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); 37 // 2、使用ConnectionFactory創建一個連接Connection對象。 38 Connection connection = connectionFactory.createConnection(); 39 // 3、開啟連接。調用Connection對象得start方法。 40 connection.start(); 41 // 4、使用Connection對象創建一個Session對象。 42 // 參數一是否開啟事務,一般不開啟事務,保證數據得最終一致性,可以使用消息隊列實現數據最終一致性。如果第一個參數為true,第二個參數自動忽略 43 // 參數二是消息得應答模式。兩種模式,自動應答和手動應答。一般使用自動應答。 44 boolean transacted = false;// 不開啟事務 45 int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;// 1 46 Session session = connection.createSession(transacted, acknowledgeMode); 47 // 5、使用Session對象創建一個Destination對象。兩種形式queue、topic。現在應該使用queue。 48 String queueName = "queue1";// 當前消息隊列得名稱 49 Queue queue = session.createQueue(queueName); 50 // 6、使用Session對象創建一個Producer對象。 51 // interface Queue extends Destination。destination是一個接口。 52 MessageProducer producer = session.createProducer(queue); 53 // 7、創建一個TextMessage對象。 54 // 創建TextMessage方式一 55 // TextMessage textMessage = new ActiveMQTextMessage(); 56 // textMessage.setText("hello activeMq......"); 57 // 方式二 58 TextMessage textMessage = session.createTextMessage("hello activeMq......"); 59 // 8、發送消息。 60 producer.send(textMessage); 61 // 9、關閉資源。 62 producer.close();// 關閉producer 63 session.close();// 關閉session 64 connection.close();// 關閉connection 65 } 66 67 // activeMq的點對點消費者 68 @Test 69 public void queueConsumer() throws JMSException { 70 // 1、創建一個連接工廠ConnectionFactory 對象 71 String brokerURL = "tcp://192.168.110.142:61616"; 72 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); 73 // 2、使用連接工廠對象創建一個連接 74 Connection connection = connectionFactory.createConnection(); 75 // 3、開啟連接 76 connection.start(); 77 // 4、使用連接對象創建一個Session對象 78 boolean transacted = false;// 關閉事務 79 int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;// 自動響應 80 Session session = connection.createSession(transacted, acknowledgeMode); 81 // 5、使用Session創建一個Destination,Destination應該和消息的發送端一致的。 82 String queueName = "queue1"; 83 Queue queue = session.createQueue(queueName); 84 // 6、使用Session創建一個Consumer對象。 85 MessageConsumer consumer = session.createConsumer(queue); 86 // 7、向Consumer對象中設置一個MessageListener對象,用來接受消息。 87 // 匿名內部類,new 接口,后面加上{},相當於實現了這個接口的實現類。然后創建這個實現類的對象listener。 88 MessageListener listener = new MessageListener() { 89 90 @Override 91 public void onMessage(Message message) { 92 // 接受事件的。當消息到達就可以在這里接受到消息了的。 93 // 8、取出消息的內容。 94 if (message instanceof TextMessage) { 95 TextMessage textMessage = (TextMessage) message; 96 // 9、打印消息內容。 97 try { 98 String text = textMessage.getText(); 99 System.out.println(text); 100 } catch (JMSException e) { 101 e.printStackTrace(); 102 } 103 } 104 } 105 }; 106 consumer.setMessageListener(listener); 107 108 // 關閉資源以前,系統等待,等待接受消息。 109 /*while (true) { 110 try { 111 Thread.sleep(100); 112 } catch (InterruptedException e) { 113 e.printStackTrace(); 114 } 115 }*/ 116 117 // 等待鍵盤輸入。才回接着向下執行的。 118 try { 119 System.in.read(); 120 } catch (IOException e) { 121 e.printStackTrace(); 122 } 123 124 125 // 10、關閉資源。 126 consumer.close();// 關閉consumer 127 session.close();// 關閉session 128 connection.close();// 關閉connection 129 } 130 131 }
執行了activeMq的點對點消費者。可以在界面看到變化。可以看到有一個消費者,然后生產了7條消息,7條消息進隊和7條消息出隊。
9、ActiveMQ發布訂閱模式(publish/subscribe)。
消費者有兩種消費方法(這里使用異步消費):
a、同步消費。通過調用消費者的receive方法從目的地中顯式提取消息。receive方法可以一直阻塞到消息到達。
b、異步消費。客戶可以為消費者注冊一個消息監聽器,以定義在消息到達時所采取的動作。
實現MessageListener接口,在MessageListener()方法中實現消息的處理邏輯。
1 package com.taotao.activemq; 2 3 import java.io.IOException; 4 5 import javax.jms.Connection; 6 import javax.jms.ConnectionFactory; 7 import javax.jms.JMSException; 8 import javax.jms.Message; 9 import javax.jms.MessageConsumer; 10 import javax.jms.MessageListener; 11 import javax.jms.MessageProducer; 12 import javax.jms.Session; 13 import javax.jms.TextMessage; 14 import javax.jms.Topic; 15 16 import org.apache.activemq.ActiveMQConnectionFactory; 17 import org.junit.Test; 18 19 /** 20 * Active的發布訂閱模式 21 * 22 * @ClassName: ActiveMqTopics.java 23 * @author: biehl 24 * @since: 2019年9月19日 上午10:51:14 25 * @Copyright: ©2019 biehl 版權所有 26 * @version: 0.0.1 27 * @Description: 28 */ 29 public class ActiveMqTopics { 30 31 // 發布訂閱模式,生產者。topic生產者生產消息默認不持久化客戶端的。 32 @Test 33 public void topicProducer() { 34 try { 35 // 1、創建一個連接工廠對象。需要指定mq服務的ip地址以及端口號61616 36 String brikerURL = "tcp://192.168.110.142:61616"; 37 // 創建ConnectionFactory接口對象,實現類ActiveMQConnectionFactory 38 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brikerURL); 39 40 // 2、創建Connection連接 41 Connection connection = connectionFactory.createConnection(); 42 43 // 3、開啟連接,調用Connection的start方法。 44 connection.start(); 45 46 // 4、創建Session,使用Connection對象創建一個session 47 // 參數一是否開啟事務,一般不開啟事務,保證數據得最終一致性,可以使用消息隊列實現數據最終一致性。如果第一個參數為true,第二個參數自動忽略 48 boolean transacted = false; 49 // 參數二是消息得應答模式。兩種模式,自動應答和手動應答。一般使用自動應答。 50 int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; 51 Session session = connection.createSession(transacted, acknowledgeMode); 52 53 // 5、創建Destination,應該使用topic,區別於點對點的queue 54 String topicName = "topic01"; 55 Topic topic = session.createTopic(topicName); 56 57 // 6、創建一個Producer對象 58 // interface Topic extends Destination. 59 // Destination是一個接口,Topic接口繼承Destination這個接口。 60 MessageProducer producer = session.createProducer(topic); 61 62 // 7、創建一個TextMessage對象 63 String message = null; 64 TextMessage textMessage = null; 65 for (int i = 0; i < 100; i++) { 66 message = i + " ActiveMQ topics......"; 67 textMessage = session.createTextMessage(message); 68 69 // 8、發送消息 70 producer.send(textMessage); 71 } 72 73 // 9、關閉資源 74 producer.close();// 關閉producer 75 session.close();// 關閉session 76 connection.close();// 關閉connection 77 } catch (JMSException e) { 78 e.printStackTrace(); 79 } 80 } 81 82 // 發布訂閱模式,消費者必須一直等待生產者生產的消息,因為發布訂閱模式不持久化。 83 @Test 84 public void topicConsumer() { 85 try { 86 // 1、創建一個連接工廠對象 87 String brokerURL = "tcp://192.168.110.142:61616"; 88 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); 89 90 // 2、使用連接工廠對象創建一個連接 91 Connection connection = connectionFactory.createConnection(); 92 93 // 3、開啟連接 94 connection.start(); 95 96 // 4、使用連接對象創建一個Session對象 97 // 參數一是否開啟事務,一般不開啟事務,保證數據得最終一致性,可以使用消息隊列實現數據最終一致性。如果第一個參數為true,第二個參數自動忽略 98 boolean transacted = false; 99 // 參數二是消息得應答模式。兩種模式,自動應答和手動應答。一般使用自動應答。 100 int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; 101 Session session = connection.createSession(transacted, acknowledgeMode); 102 103 // 5、使用session創建destination,注意,destination應該和消息的發送端一致的。 104 String topicName = "topic01"; 105 Topic topic = session.createTopic(topicName); 106 107 // 6、使用session創建一個consumer對象 108 MessageConsumer consumer = session.createConsumer(topic); 109 110 // 7、向Consumer對象中設置一個MessageListener對象,用來接受消息。 111 // 匿名內部類,new 接口,后面加上{},相當於實現了這個接口的實現類。然后創建這個實現類的對象listener。 112 MessageListener listener = new MessageListener() { 113 // 接受事件的。當消息到達就可以在這里接受到消息了的。 114 // 8、取出消息的內容。 115 @Override 116 public void onMessage(Message message) { 117 if (message instanceof TextMessage) { 118 TextMessage textMessage = (TextMessage) message; 119 // 9、打印消息內容。 120 try { 121 String text = textMessage.getText(); 122 System.out.println(text); 123 } catch (JMSException e) { 124 e.printStackTrace(); 125 } 126 } 127 } 128 }; 129 consumer.setMessageListener(listener); 130 131 // 啟動三次,模擬是三個消費者 132 System.out.println("消費者1......."); 133 // System.out.println("消費者2......."); 134 // System.out.println("消費者3......."); 135 136 // 等待鍵盤輸入。才回接着向下執行的。 137 try { 138 System.in.read(); 139 } catch (IOException e) { 140 e.printStackTrace(); 141 } 142 143 // 9、關閉資源 144 consumer.close();// 關閉producer 145 session.close();// 關閉session 146 connection.close();// 關閉connection 147 } catch (JMSException e) { 148 e.printStackTrace(); 149 } 150 151 } 152 153 }
執行了activeMq的發布訂閱模式。可以在界面看到變化。可以看到有三個消費者,然后生產了201條消息,201條消息進隊和603條消息出隊。
10、ActiveMQ與Spring整合如下所示:
在pom.xml配置文件中引入自己的依賴的jar包。
1 <dependency> 2 <groupId>org.springframework</groupId> 3 <artifactId>spring-jms</artifactId> 4 </dependency> 5 <dependency> 6 <groupId>org.springframework</groupId> 7 <artifactId>spring-context-support</artifactId> 8 </dependency>
在配置文件applicationContext-activemq.xml里面配置ConnectionFactory。如下所示:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:p="http://www.springframework.org/schema/p" 5 xmlns:aop="http://www.springframework.org/schema/aop" 6 xmlns:tx="http://www.springframework.org/schema/tx" 7 xmlns:jms="http://www.springframework.org/schema/jms" 8 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 9 xsi:schemaLocation="http://www.springframework.org/schema/beans 10 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 11 http://www.springframework.org/schema/context 12 http://www.springframework.org/schema/context/spring-context-4.0.xsd 13 http://www.springframework.org/schema/aop 14 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 15 http://www.springframework.org/schema/tx 16 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 17 http://www.springframework.org/schema/jms 18 http://www.springframework.org/schema/jms/spring-jms-4.0.xsd 19 http://www.springframework.org/schema/util 20 http://www.springframework.org/schema/util/spring-util-4.0.xsd"> 21 22 23 <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> 24 <bean id="targetConnectionFactory" 25 class="org.apache.activemq.ActiveMQConnectionFactory"> 26 <property name="brokerURL" 27 value="tcp://192.168.110.142:61616" /> 28 </bean> 29 30 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 31 <bean id="connectionFactory" 32 class="org.springframework.jms.connection.SingleConnectionFactory"> 33 <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 34 <property name="targetConnectionFactory" 35 ref="targetConnectionFactory" /> 36 </bean> 37 </beans>
開始配置生產者的spring配置。
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:p="http://www.springframework.org/schema/p" 5 xmlns:aop="http://www.springframework.org/schema/aop" 6 xmlns:tx="http://www.springframework.org/schema/tx" 7 xmlns:jms="http://www.springframework.org/schema/jms" 8 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 9 xsi:schemaLocation="http://www.springframework.org/schema/beans 10 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 11 http://www.springframework.org/schema/context 12 http://www.springframework.org/schema/context/spring-context-4.0.xsd 13 http://www.springframework.org/schema/aop 14 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 15 http://www.springframework.org/schema/tx 16 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 17 http://www.springframework.org/schema/jms 18 http://www.springframework.org/schema/jms/spring-jms-4.0.xsd 19 http://www.springframework.org/schema/util 20 http://www.springframework.org/schema/util/spring-util-4.0.xsd"> 21 22 23 <!-- 1、真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> 24 <bean id="targetConnectionFactory" 25 class="org.apache.activemq.ActiveMQConnectionFactory"> 26 <property name="brokerURL" 27 value="tcp://192.168.110.142:61616" /> 28 </bean> 29 30 <!-- 2、Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 31 <bean id="connectionFactory" 32 class="org.springframework.jms.connection.SingleConnectionFactory"> 33 <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 34 <!-- 給屬性targetConnectionFactory傳值 --> 35 <property name="targetConnectionFactory" 36 ref="targetConnectionFactory" /> 37 </bean> 38 39 <!-- 3、開始配置生產者配置 --> 40 <!-- 配置生產者 --> 41 <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --> 42 <bean id="jmsTemplate" 43 class="org.springframework.jms.core.JmsTemplate"> 44 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> 45 <!-- 給屬性connectionFactory傳值 --> 46 <property name="connectionFactory" ref="connectionFactory" /> 47 </bean> 48 49 <!-- 4、配置消息的Destination對象 --> 50 <!-- 點對點模式 --> 51 <!-- 這個是隊列目的地,點對點的。 --> 52 <bean id="queueDestination" 53 class="org.apache.activemq.command.ActiveMQQueue"> 54 <constructor-arg> 55 <!-- 給ActiveMQQueue構造參數傳遞一個值為queue --> 56 <value>queue</value> 57 </constructor-arg> 58 </bean> 59 60 <!-- 發布訂閱模式 --> 61 <!-- 這個是主題目的地,一對多的。 --> 62 <bean id="topicDestination" 63 class="org.apache.activemq.command.ActiveMQTopic"> 64 <!-- 給ActiveMQTopic構造參數傳遞一個值為topic --> 65 <constructor-arg value="topic" /> 66 </bean> 67 68 </beans>
生產者測試代碼如下所示:
可以根據之前的消費者測試一下,消息的消費。
1 package com.taotao.activemq; 2 3 import javax.jms.Destination; 4 import javax.jms.JMSException; 5 import javax.jms.Message; 6 import javax.jms.Session; 7 import javax.jms.TextMessage; 8 9 import org.junit.Test; 10 import org.springframework.context.ApplicationContext; 11 import org.springframework.context.support.ClassPathXmlApplicationContext; 12 import org.springframework.jms.core.JmsTemplate; 13 import org.springframework.jms.core.MessageCreator; 14 15 /** 16 * 17 * @ClassName: SpringActiveMQ.java 18 * @author: biehl 19 * @since: 2019年9月19日 下午7:01:43 20 * @Copyright: ©2019 biehl 版權所有 21 * @version: 0.0.1 22 * @Description: 23 */ 24 public class SpringActiveMQ { 25 26 // 使用spring與activemq整合,是喲個jmsTemplate發送消息 27 @Test 28 public void jmsTemplateProducer() { 29 // 1、初始化spring容器 30 ApplicationContext applicationContext = new ClassPathXmlApplicationContext( 31 "classpath:/spring/applicationContext-activemq.xml"); 32 // 2、從容器中獲得jmsTemplate對象。根據類型獲取到bean的對象 33 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); 34 // 3、從容器中獲得Destination對象。根據名稱獲取到bean的對象 35 Destination destination = (Destination) applicationContext.getBean("queueDestination"); 36 37 // 4、發送消息 38 jmsTemplate.send(destination, new MessageCreator() { 39 40 @Override 41 public Message createMessage(Session session) throws JMSException { 42 // 定義一個消息 43 String message = "hello activeMq......"; 44 // 發送消息 45 TextMessage textMessage = session.createTextMessage(message); 46 return textMessage; 47 } 48 }); 49 } 50 51 }
效果如下所示:
開始配置消費者的spring配置。
1)、注意:那么消費者是通過Spring為我們封裝的消息監聽容器MessageListenerContainer實現的,它負責接收信息,並把接收到的信息分發給真正的MessageListener進行處理。每個消費者對應每個目的地都需要有對應的MessageListenerContainer。
2)、對於消息監聽容器而言,除了要知道監聽哪個目的地之外,還需要知道到哪里去監聽,也就是說它還需要知道去監聽哪個JMS服務器,這是通過在配置MessageConnectionFactory的時候往里面注入一個ConnectionFactory來實現的。
3)、所以在配置一個MessageListenerContainer的時候有三個屬性必須指定:
a、一個是表示從哪里監聽的ConnectionFactory
b、一個是表示監聽什么的Destination;
c、一個是接收到消息以后進行消息處理的MessageListener。
4)、常用的MessageListenerContainer實現類是DefaultMessageListenerContainer。
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:p="http://www.springframework.org/schema/p" 5 xmlns:aop="http://www.springframework.org/schema/aop" 6 xmlns:tx="http://www.springframework.org/schema/tx" 7 xmlns:jms="http://www.springframework.org/schema/jms" 8 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 9 xsi:schemaLocation="http://www.springframework.org/schema/beans 10 http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 11 http://www.springframework.org/schema/context 12 http://www.springframework.org/schema/context/spring-context-4.0.xsd 13 http://www.springframework.org/schema/aop 14 http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 15 http://www.springframework.org/schema/tx 16 http://www.springframework.org/schema/tx/spring-tx-4.0.xsd 17 http://www.springframework.org/schema/jms 18 http://www.springframework.org/schema/jms/spring-jms-4.0.xsd 19 http://www.springframework.org/schema/util 20 http://www.springframework.org/schema/util/spring-util-4.0.xsd"> 21 22 23 <!-- 1、真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> 24 <bean id="targetConnectionFactory" 25 class="org.apache.activemq.ActiveMQConnectionFactory"> 26 <property name="brokerURL" 27 value="tcp://192.168.110.142:61616" /> 28 </bean> 29 30 <!-- 2、Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 31 <bean id="connectionFactory" 32 class="org.springframework.jms.connection.SingleConnectionFactory"> 33 <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 34 <!-- 給屬性targetConnectionFactory傳值 --> 35 <property name="targetConnectionFactory" 36 ref="targetConnectionFactory" /> 37 </bean> 38 39 <!-- 3、配置消息的Destination對象。接受消息的目的地。 --> 40 <!-- 點對點模式 --> 41 <!-- 這個是隊列目的地,點對點的。 --> 42 <bean id="queueDestination" 43 class="org.apache.activemq.command.ActiveMQQueue"> 44 <constructor-arg> 45 <!-- 給ActiveMQQueue構造參數傳遞一個值為queue --> 46 <value>queue</value> 47 </constructor-arg> 48 </bean> 49 50 <!-- 發布訂閱模式 --> 51 <!-- 這個是主題目的地,一對多的。 --> 52 <bean id="topicDestination" 53 class="org.apache.activemq.command.ActiveMQTopic"> 54 <!-- 給ActiveMQTopic構造參數傳遞一個值為topic --> 55 <constructor-arg value="topic" /> 56 </bean> 57 58 <!-- 4、配置消息接收者 --> 59 <!-- 配置一個監聽器 --> 60 <bean id="activeMqMessageListener" 61 class="com.taotao.search.listener.ActiveMqMessageListener" /> 62 63 <!-- 配置監聽容器 --> 64 <bean 65 class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 66 <!-- 屬性設置 --> 67 <!-- 一個是表示從哪里監聽的ConnectionFactory --> 68 <property name="connectionFactory" ref="connectionFactory" /> 69 <!-- 一個是表示監聽什么的Destination --> 70 <property name="destination" ref="queueDestination" /> 71 <!-- 一個是接收到消息以后進行消息處理的MessageListener --> 72 <property name="messageListener" ref="activeMqMessageListener" /> 73 </bean> 74 75 76 </beans>
然后可以寫消息監聽器,用來監聽生產者生產的消息,以便實現自己的業務邏輯。
1 package com.taotao.search.listener; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 6 import javax.jms.JMSException; 7 import javax.jms.Message; 8 import javax.jms.MessageListener; 9 import javax.jms.TextMessage; 10 11 /** 12 * 接受ActiveMQ發送的消息. 13 * 14 * @ClassName: ActiveMqMessageListener.java 15 * @author: biehl 16 * @since: 2019年9月19日 下午7:55:24 17 * @Copyright: ©2019 biehl 版權所有 18 * @version: 0.0.1 19 * @Description: 20 */ 21 public class ActiveMqMessageListener implements MessageListener { 22 23 @Override 24 public void onMessage(Message message) { 25 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); 26 System.out.println("監聽生產者生產的消息,消費者進行消息消費......."); 27 // 消息到了onMessage就接受到了消息 28 if (message instanceof TextMessage) { 29 TextMessage textMessage = (TextMessage) message; 30 try { 31 String text = textMessage.getText(); 32 System.out.println(sdf.format(new Date()) + " : " + text); 33 } catch (JMSException e) { 34 e.printStackTrace(); 35 } 36 } 37 } 38 39 }
由於這里只是簡單的測試,如果是正式項目的話,直接加載這個配置文件,然后就可以進行消息的監聽消費,我這里只是加載一下這個配置文件即可。
1 package com.taotao.search.service; 2 3 import java.io.IOException; 4 5 import org.springframework.context.ApplicationContext; 6 import org.springframework.context.support.ClassPathXmlApplicationContext; 7 8 /** 9 * 10 * @ClassName: ActiveMqConsumer.java 11 * @author: biehl 12 * @since: 2019年9月19日 下午8:10:55 13 * @Copyright: ©2019 biehl 版權所有 14 * @version: 0.0.1 15 * @Description: 16 */ 17 public class ActiveMqConsumer { 18 19 // 啟動spring容器。就可以實現監聽生產者發送消息,消費者消費小的目的地。 20 public static void main(String[] args) { 21 // 初始化spring容器 22 ApplicationContext applicationContext = new ClassPathXmlApplicationContext( 23 "classpath:/spring/applicationContext-activemq.xml"); 24 System.out.println("spring容器加載完畢,開始監聽生產者生產的消息......."); 25 try { 26 System.in.read(); 27 } catch (IOException e) { 28 e.printStackTrace(); 29 } 30 } 31 }
實現效果如下所示:
控制台打印如下所示,只要你生產消息,這里就可以進行消息的消費。
待續......