先來個ActiveMQ介紹哈:
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法,是一個消息中間件。
應用場景:為了實現系統之間的通信,把系統之間的調用耦合度降低就可以使用MQ。
1) activeMQ 是Apache出品,最流行的,能力強勁的開源消息總線。
2) avtiveMQ主要特點:完全支持JMS1.1和J2EE 1.4規范;支持spring,很容易內嵌到spring中;支持ajax。
3) activeMQ的消息形式:
a) 點對點形式,即生產者和接收者一一對應
b) 發布/訂閱形式,即一個生產者發布消息后,可以有多個接收者訂閱接收。
4) JMS的五種消息正文格式:
a) StreamMessage -- Java原始值的數據流
b) MapMessage--一套名稱-值對
c) TextMessage--一個字符串對象(常用)
d) ObjectMessage--一個序列化的 Java對象
e) BytesMessage--一個字節的數據流
好了直接我開始ActiveMQ的入門案例!
創建一個maven工程activeMQ_helloworld,提供兩個測試類進行演示.
pom文件導入的依賴
創建一個測試類來做生產者生產消息,這里我用的是隊列形式(queue),一對一的消費,我創建了一個隊列叫
HelloActiveMQ,並發送了十條消息.
1 public class ActiveMQProducer { 2 @Test 3 public void testProduceMsg() throws Exception{ 4 //連接工廠 5 //使用默認的用戶名,密碼,路徑 6 //路徑為 tcp://host:61616 7 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 8 //獲取一個連接 9 Connection connection = connectionFactory.createConnection(); 10 //創建一個會話 11 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 12 //創建隊列或者話題 13 Queue queue = session.createQueue("HelloActiveMQ"); 14 //創建生產者或者消費者 15 MessageProducer producer = session.createProducer(queue); 16 //發送消息 17 for (int i = 0; i < 10; i++) { 18 producer.send(session.createTextMessage("activeMQ,你好!"+i)); 19 } 20 //提交操作 21 session.commit(); 22 } 23 }
熟悉ActiveMQ的API,根據API來發送消息,最后的commit不要忘了!!!
在創建一個消費者來對消息進行消費,消費者引用的隊列名為之前創建的生產者隊列名HelloActiveMQ
1 public class ActiveMQConsumer { 2 @Test 3 public void testConsumeMsg() throws Exception{ 4 // 連接工廠 5 // 使用默認用戶名、密碼、路徑 6 // 路徑 tcp://host:61616 7 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 8 // 獲取一個連接 9 Connection connection = connectionFactory.createConnection(); 10 //開啟連接 11 connection.start(); 12 //建立會話,第一個參數是否開啟事務,為true時,最后需要session.conmit()的提交 13 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 14 // 創建隊列或者話題對象 15 Queue queue = session.createQueue("HelloActiveMQ"); 16 // 創建消費者 17 MessageConsumer messageConsumer = session.createConsumer(queue); 18 19 while (true) { 20 TextMessage message = (TextMessage) messageConsumer.receive(5000); 21 if (message != null) { 22 System.out.println(message.getText()); 23 } else { 24 break; 25 } 26 } 27 } 28 }
生產者和消費者都已經創建好,現在就可以開始愉快的測試了~~~
哦,還沒開啟呢...
安裝好的ActiveMQ在本地,進入bin選擇win64(我電腦64的),activemq.bat開啟
開啟后
進入Activemq管理頁面,地址http://服務器ip:8161,用戶名admin,密碼admin,如圖
這個消息管理頁面非常好用,用的很多,后面說~
現在執行一次生產者testProduceMsg(),生產了十條消息,可以在管理頁面看到(queues隊列)
顯然有十條消息生產了~
現在調用消費者testConsumeMsg(),去消費這十條消息!
控制台打印出十條消息,再去看看消息管理頁面>
十條消息已經消費了~~~ok
然而然而業務場景中用的最多的是監聽機制,對生產者的消息進行監聽,生產者一生產出消息,消費者立馬進行消費掉!!!
這里我再進行監聽測試>>
在消費者測試類里添加第二個方法(監聽消費的方法),線程得一直開着.
1 @Test 2 // 使用監聽器消費 3 public void testCosumeMQ2() throws Exception { 4 // 連接工廠 5 // 使用默認用戶名、密碼、路徑 6 // 路徑 tcp://host:61616 7 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 8 // 獲取一個連接 9 Connection connection = connectionFactory.createConnection(); 10 // 開啟連接 11 connection.start(); 12 // 建立會話 13 // 第一個參數,是否使用事務,如果設置true,操作消息隊列后,必須使用 session.commit(); 14 Session session = connection.createSession(false, 15 Session.AUTO_ACKNOWLEDGE); 16 // 創建隊列或者話題對象 17 Queue queue = session.createQueue("HelloActiveMQ"); 18 // 創建消費者 19 MessageConsumer messageConsumer = session.createConsumer(queue); 20 messageConsumer.setMessageListener(new MessageListener() { 21 // 每次接收消息,自動調用 onMessage 22 public void onMessage(Message message) { 23 TextMessage textMessage = (TextMessage)message; 24 try { 25 System.out.println(textMessage.getText()); 26 } catch (JMSException e) { 27 e.printStackTrace(); 28 } 29 } 30 }); 31 //不能關閉線程 32 while(true){ 33 34 } 35 }
先執行這個方法使線程一直開啟監聽,再去執行生產者生產十條消息,可以發現>>>
消息一生產出來立馬被監聽到消費掉!
簡單的入門案例就寫到這里,Active整合Spring的簡單使用下面開寫~~~
注意了!!! 開始整合Spring了...
這次分別用Queue和Topic演示
創建maven工程activeMQ_spring
pom的依賴
1 <dependencies> 2 <dependency> 3 <groupId>org.springframework</groupId> 4 <artifactId>spring-context</artifactId> 5 <version>4.1.7.RELEASE</version> 6 </dependency> 7 <dependency> 8 <groupId>org.springframework</groupId> 9 <artifactId>spring-test</artifactId> 10 <version>4.1.7.RELEASE</version> 11 </dependency> 12 <dependency> 13 <groupId>junit</groupId> 14 <artifactId>junit</artifactId> 15 <version>4.12</version> 16 </dependency> 17 <dependency> 18 <groupId>org.apache.activemq</groupId> 19 <artifactId>activemq-all</artifactId> 20 <version>5.14.0</version> 21 </dependency> 22 <dependency> 23 <groupId>org.springframework</groupId> 24 <artifactId>spring-jms</artifactId> 25 <version>4.1.7.RELEASE</version> 26 </dependency> 27 </dependencies>
如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建議使用5.11.2,5.14.2
此時用到spring-jms消息服務,jms模版和jms的監聽處理
在consumer包下創建兩個Queue消費者(隊列消費者)
QueueConsumer1:
1 package cn.bowen.activemq.consume; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.MessageListener; 6 import javax.jms.TextMessage; 7 8 import org.springframework.stereotype.Service; 9 @Service 10 public class QueueConsumer1 implements MessageListener{ 11 12 public void onMessage(Message message) { 13 TextMessage textMessage = (TextMessage)message; 14 try { 15 System.out.println("消費的QueueConsumer1獲取消息:"+textMessage.getText()); 16 } catch (JMSException e) { 17 e.printStackTrace(); 18 } 19 } 20 21 }
QueueConsumer2:
1 package cn.bowen.activemq.consume; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.MessageListener; 6 import javax.jms.TextMessage; 7 8 import org.springframework.stereotype.Service; 9 @Service 10 public class QueueConsumer2 implements MessageListener{ 11 12 public void onMessage(Message message) { 13 TextMessage textMessage = (TextMessage)message; 14 try { 15 System.out.println("消費的QueueConsumer2獲取消息:"+textMessage.getText()); 16 } catch (JMSException e) { 17 e.printStackTrace(); 18 } 19 } 20 21 }
創建兩個Topic消費者(話題/廣播消費者)
TopicConsumer1:
1 package cn.bowen.activemq.consume; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.MessageListener; 6 import javax.jms.TextMessage; 7 8 import org.springframework.stereotype.Service; 9 @Service 10 public class TopicConsumer1 implements MessageListener{ 11 12 public void onMessage(Message message) { 13 TextMessage textMessage = (TextMessage)message; 14 try { 15 System.out.println("消費的TopicConsumer1獲取消息:"+textMessage.getText()); 16 } catch (JMSException e) { 17 e.printStackTrace(); 18 } 19 } 20 21 }
TopicConsumer2:
1 package cn.bowen.activemq.consume; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.MessageListener; 6 import javax.jms.TextMessage; 7 8 import org.springframework.stereotype.Service; 9 @Service 10 public class TopicConsumer2 implements MessageListener{ 11 12 public void onMessage(Message message) { 13 TextMessage textMessage = (TextMessage)message; 14 try { 15 System.out.println("消費的TopicConsumer2獲取消息:"+textMessage.getText()); 16 } catch (JMSException e) { 17 e.printStackTrace(); 18 } 19 } 20 21 }
配置applicationContext-mq-consumer.xml,注釋說明配置信息~~~
1 <!-- 掃描包 --> 2 <context:component-scan base-package="cn.bowen.activemq.consume" /> 3 4 <!-- ActiveMQ 連接工廠 --> 5 <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> 6 <!-- 如果連接網絡:tcp://ip:61616;未連接網絡:tcp://localhost:61616 以及用戶名,密碼--> 7 <amq:connectionFactory id="amqConnectionFactory" 8 brokerURL="tcp://localhost:61616" userName="admin" password="admin" /> 9 10 <!-- Spring Caching連接工廠 --> 11 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 12 <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 13 <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 14 <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> 15 <!-- 同上,同理 --> 16 <!-- <constructor-arg ref="amqConnectionFactory" /> --> 17 <!-- Session緩存數量 --> 18 <property name="sessionCacheSize" value="100" /> 19 </bean> 20 21 <!-- 消息消費者 start--> 22 23 <!-- 定義Queue監聽器 --> 24 <jms:listener-container destination-type="queue" container-type="default" 25 connection-factory="connectionFactory" acknowledge="auto"> 26 <!-- destination是隊列或話題名稱 --> 27 <!-- 默認注冊bean名稱,應該是類名首字母小寫 --> 28 <jms:listener destination="springQueue" ref="queueConsumer1"/> 29 <jms:listener destination="springQueue" ref="queueConsumer2"/> 30 </jms:listener-container> 31 32 <!-- 定義Topic監聽器 --> 33 <jms:listener-container destination-type="topic" container-type="default" 34 connection-factory="connectionFactory" acknowledge="auto"> 35 <jms:listener destination="springTopic" ref="topicConsumer1"/> 36 <jms:listener destination="springTopic" ref="topicConsumer2"/> 37 </jms:listener-container> 38 39 <!-- 消息消費者 end -->
配置applicationContext-mq.xml,注釋說明配置信息
1 <!-- 掃描包 --> 2 <context:component-scan base-package="cn.bowen.activemq.produce" /> 3 4 <!-- ActiveMQ 連接工廠 --> 5 <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> 6 <!-- 如果連接網絡:tcp://ip:61616;未連接網絡:tcp://localhost:61616 以及用戶名,密碼--> 7 <amq:connectionFactory id="amqConnectionFactory" 8 brokerURL="tcp://localhost:61616" userName="admin" password="admin" /> 9 10 <!-- Spring Caching連接工廠 --> 11 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 12 <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 13 <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 14 <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> 15 <!-- 同上,同理 --> 16 <!-- <constructor-arg ref="amqConnectionFactory" /> --> 17 <!-- Session緩存數量 --> 18 <property name="sessionCacheSize" value="100" /> 19 </bean> 20 21 <!-- Spring JmsTemplate 的消息生產者 start--> 22 23 <!-- 定義JmsTemplate的Queue類型 --> 24 <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> 25 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> 26 <constructor-arg ref="connectionFactory" /> 27 <!-- 非pub/sub模型(發布/訂閱),即隊列模式 --> 28 <property name="pubSubDomain" value="false" /> 29 </bean> 30 31 <!-- 定義JmsTemplate的Topic類型 --> 32 <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> 33 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> 34 <constructor-arg ref="connectionFactory" /> 35 <!-- pub/sub模型(發布/訂閱) --> 36 <property name="pubSubDomain" value="true" /> 37 </bean> 38 39 <!--Spring JmsTemplate 的消息生產者 end-->
在produce包下創建QueueProducer生產者,引用模版的JmsTemplate的Queue類型
1 package cn.bowen.activemq.produce; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.Session; 6 7 import org.springframework.beans.factory.annotation.Autowired; 8 import org.springframework.beans.factory.annotation.Qualifier; 9 import org.springframework.jms.core.JmsTemplate; 10 import org.springframework.jms.core.MessageCreator; 11 import org.springframework.stereotype.Service; 12 13 @Service 14 public class QueueProducer { 15 @Autowired 16 @Qualifier("jmsQueueTemplate") 17 private JmsTemplate jmsTemplate; 18 19 public void send(String queueName,final String msg){ 20 jmsTemplate.send(queueName, new MessageCreator() { 21 22 public Message createMessage(Session session) throws JMSException { 23 return session.createTextMessage(msg); 24 } 25 }); 26 } 27 }
在produce包下創建TopicProducer生產者,引用模版的JmsTemplate的Topic類型
1 package cn.bowen.activemq.produce; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.Session; 6 7 import org.springframework.beans.factory.annotation.Autowired; 8 import org.springframework.beans.factory.annotation.Qualifier; 9 import org.springframework.jms.core.JmsTemplate; 10 import org.springframework.jms.core.MessageCreator; 11 import org.springframework.stereotype.Service; 12 13 @Service 14 public class TopicProducer { 15 @Autowired 16 @Qualifier("jmsTopicTemplate") 17 private JmsTemplate jmsTemplate; 18 19 public void send(String topicName,final String msg){ 20 jmsTemplate.send(topicName, new MessageCreator() { 21 22 public Message createMessage(Session session) throws JMSException { 23 return session.createTextMessage(msg); 24 } 25 }); 26 } 27 }
最后生產者和消費者的Queue和Topic倆種類型都准備好了~~~
准備測試>>>
測試我使用的是spring的JUnit4來進行注解測試
在test包下創建ConsumerTest(消費者監聽)
1 package cn.bowen.activemq; 2 3 import org.junit.Test; 4 import org.junit.runner.RunWith; 5 import org.springframework.test.context.ContextConfiguration; 6 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 7 8 @RunWith(SpringJUnit4ClassRunner.class) 9 @ContextConfiguration(locations="classpath:applicationContext-mq-consumer.xml") 10 public class ConsumerTest { 11 12 @Test 13 public void testProduce(){ 14 //線程不能關閉 15 while(true){} 16 } 17 }
創建生產者ProducerTest生產
1 package cn.bowen.activemq; 2 3 import org.junit.Test; 4 import org.junit.runner.RunWith; 5 import org.springframework.beans.factory.annotation.Autowired; 6 import org.springframework.test.context.ContextConfiguration; 7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 8 9 import cn.bowen.activemq.produce.QueueProducer; 10 import cn.bowen.activemq.produce.TopicProducer; 11 12 @RunWith(SpringJUnit4ClassRunner.class) 13 @ContextConfiguration(locations="classpath:applicationContext-mq.xml") 14 public class ProducerTest { 15 @Autowired 16 private QueueProducer queueProducer; 17 18 @Autowired 19 private TopicProducer topicProducer; 20 21 @Test 22 public void testProduce(){ 23 queueProducer.send("springQueue", "這是一個隊列消息!"); 24 topicProducer.send("springTopic", "這是一個廣播/話題消息!"); 25 } 26 }
先執行消費者進行監聽>>>
在通過生產者生產第一次消息發現>>
在通過生產者生產第二次消息發現>>
在通過生產者生產第三次消息發現>>
不難發現Queue和Topic的區別???
發送消息類型為Topic時,是以廣播的形式,每一個消費者都能消費到~~~
而發送消息Queue類型時,是作為一對一隊列形式的消費,一條消息只能一個消費者消費~~~(兩個消費者又好像是輪流消費哈)
兩種類型應用的業務場景不一樣!
今天Active就寫到這里啦,第一次寫見笑見笑~~~別噴我哦~~~