ActiveMQ入門案例以及整合Spring的簡單實用


先來個ActiveMQ介紹哈:

 MQ全稱為Message Queue, 消息隊列MQ)是一種應用程序對應用程序的通信方法,是一個消息中間件。   

 應用場景:為了實現系統之間的通信,把系統之間的調用耦合度降低就可以使用MQ

 

 

1) activeMQ 是Apache出品,最流行的,能力強勁的開源消息總線。

 

 

2) avtiveMQ主要特點:完全支持JMS1.1和J2EE 1.4規范;支持spring,很容易內嵌到spring中;支持ajax

 

 

3) activeMQ的消息形式:

 

a) 點對點形式,即生產者和接收者一一對應

 

b) 發布/訂閱形式,即一個生產者發布消息后,可以有多個接收者訂閱接收。

 

 

4) JMS五種消息正文格式:

 

a) StreamMessage -- Java原始值的數據流

 

b) MapMessage--一套名稱-值對

 

c) TextMessage--一個字符串對象(常用)

 

d) ObjectMessage--一個序列化的 Java對象

 

e) BytesMessage--一個字節的數據流

 

 

好了直接我開始ActiveMQ的入門案例!

創建一個maven工程activeMQ_helloworld,提供兩個測試類進行演示.

 

pom文件導入的依賴

 

創建一個測試類來做生產者生產消息,這里我用的是隊列形式(queue),一對一的消費,我創建了一個隊列叫

HelloActiveMQ,並發送了十條消息.

 

 1 public class ActiveMQProducer {
 2     @Test
 3     public void testProduceMsg() throws Exception{
 4         //連接工廠
 5         //使用默認的用戶名,密碼,路徑
 6         //路徑為 tcp://host:61616
 7         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
 8         //獲取一個連接
 9         Connection connection = connectionFactory.createConnection();
10         //創建一個會話
11         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
12         //創建隊列或者話題
13         Queue queue = session.createQueue("HelloActiveMQ");
14         //創建生產者或者消費者
15         MessageProducer producer = session.createProducer(queue);
16         //發送消息
17         for (int i = 0; i < 10; i++) {
18             producer.send(session.createTextMessage("activeMQ,你好!"+i));
19         }
20         //提交操作
21         session.commit();
22     }
23 }

 

熟悉ActiveMQ的API,根據API來發送消息,最后的commit不要忘了!!!

在創建一個消費者來對消息進行消費,消費者引用的隊列名為之前創建的生產者隊列名HelloActiveMQ

 1 public class ActiveMQConsumer {
 2     @Test
 3     public void testConsumeMsg() throws Exception{
 4         // 連接工廠
 5         // 使用默認用戶名、密碼、路徑
 6         // 路徑 tcp://host:61616
 7         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
 8         // 獲取一個連接
 9         Connection connection = connectionFactory.createConnection();
10         //開啟連接
11         connection.start();
12         //建立會話,第一個參數是否開啟事務,為true時,最后需要session.conmit()的提交
13         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
14         // 創建隊列或者話題對象
15         Queue queue = session.createQueue("HelloActiveMQ");
16         // 創建消費者
17         MessageConsumer messageConsumer = session.createConsumer(queue);
18         
19         while (true) {
20             TextMessage message = (TextMessage) messageConsumer.receive(5000);
21             if (message != null) {
22                 System.out.println(message.getText());
23             } else {
24                 break;
25             }
26         }
27     }
28 }

生產者和消費者都已經創建好,現在就可以開始愉快的測試了~~~

哦,還沒開啟呢...

安裝好的ActiveMQ在本地,進入bin選擇win64(我電腦64的),activemq.bat開啟

開啟后

進入Activemq管理頁面,地址http://服務器ip:8161,用戶名admin,密碼admin,如圖

 

 

這個消息管理頁面非常好用,用的很多,后面說~

現在執行一次生產者testProduceMsg(),生產了十條消息,可以在管理頁面看到(queues隊列)

 

顯然有十條消息生產了~

現在調用消費者testConsumeMsg(),去消費這十條消息!

 

控制台打印出十條消息,再去看看消息管理頁面>

 

十條消息已經消費了~~~ok

 

然而然而業務場景中用的最多的是監聽機制,對生產者的消息進行監聽,生產者一生產出消息,消費者立馬進行消費掉!!!

這里我再進行監聽測試>>

在消費者測試類里添加第二個方法(監聽消費的方法),線程得一直開着.

 1 @Test
 2     // 使用監聽器消費
 3     public void testCosumeMQ2() throws Exception {
 4         // 連接工廠
 5         // 使用默認用戶名、密碼、路徑
 6         // 路徑 tcp://host:61616
 7         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
 8         // 獲取一個連接
 9         Connection connection = connectionFactory.createConnection();
10         // 開啟連接
11         connection.start();
12         // 建立會話
13         // 第一個參數,是否使用事務,如果設置true,操作消息隊列后,必須使用 session.commit();
14         Session session = connection.createSession(false,
15                 Session.AUTO_ACKNOWLEDGE);
16         // 創建隊列或者話題對象
17         Queue queue = session.createQueue("HelloActiveMQ");
18         // 創建消費者
19         MessageConsumer messageConsumer = session.createConsumer(queue);
20         messageConsumer.setMessageListener(new MessageListener() {
21             // 每次接收消息,自動調用 onMessage
22             public void onMessage(Message message) {
23                 TextMessage textMessage = (TextMessage)message;
24                 try {
25                     System.out.println(textMessage.getText());
26                 } catch (JMSException e) {
27                     e.printStackTrace();
28                 }
29             }
30         });
31         //不能關閉線程
32         while(true){
33             
34         }
35     }

 

先執行這個方法使線程一直開啟監聽,再去執行生產者生產十條消息,可以發現>>>

消息一生產出來立馬被監聽到消費掉!

簡單的入門案例就寫到這里,Active整合Spring的簡單使用下面開寫~~~

注意了!!!  開始整合Spring了...

這次分別用Queue和Topic演示

創建maven工程activeMQ_spring

pom的依賴

 

 1 <dependencies>
 2       <dependency>
 3           <groupId>org.springframework</groupId>
 4           <artifactId>spring-context</artifactId>
 5           <version>4.1.7.RELEASE</version>
 6       </dependency>
 7       <dependency>
 8           <groupId>org.springframework</groupId>
 9           <artifactId>spring-test</artifactId>
10           <version>4.1.7.RELEASE</version>
11       </dependency>
12       <dependency>
13           <groupId>junit</groupId>
14           <artifactId>junit</artifactId>
15           <version>4.12</version>
16       </dependency>
17       <dependency>
18           <groupId>org.apache.activemq</groupId>
19           <artifactId>activemq-all</artifactId>
20           <version>5.14.0</version>
21       </dependency>
22       <dependency>
23           <groupId>org.springframework</groupId>
24           <artifactId>spring-jms</artifactId>
25           <version>4.1.7.RELEASE</version>
26       </dependency>
27   </dependencies>

 

如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建議使用5.11.2,5.14.2

此時用到spring-jms消息服務,jms模版和jms的監聽處理

在consumer包下創建兩個Queue消費者(隊列消費者)

QueueConsumer1:

 1 package cn.bowen.activemq.consume;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.MessageListener;
 6 import javax.jms.TextMessage;
 7 
 8 import org.springframework.stereotype.Service;
 9 @Service
10 public class QueueConsumer1 implements MessageListener{
11 
12     public void onMessage(Message message) {
13         TextMessage textMessage = (TextMessage)message;
14         try {
15             System.out.println("消費的QueueConsumer1獲取消息:"+textMessage.getText());
16         } catch (JMSException e) {
17             e.printStackTrace();
18         }
19     }
20 
21 }

QueueConsumer2:

 1 package cn.bowen.activemq.consume;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.MessageListener;
 6 import javax.jms.TextMessage;
 7 
 8 import org.springframework.stereotype.Service;
 9 @Service
10 public class QueueConsumer2 implements MessageListener{
11 
12     public void onMessage(Message message) {
13         TextMessage textMessage = (TextMessage)message;
14         try {
15             System.out.println("消費的QueueConsumer2獲取消息:"+textMessage.getText());
16         } catch (JMSException e) {
17             e.printStackTrace();
18         }
19     }
20 
21 }

 

創建兩個Topic消費者(話題/廣播消費者)

TopicConsumer1:

 1 package cn.bowen.activemq.consume;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.MessageListener;
 6 import javax.jms.TextMessage;
 7 
 8 import org.springframework.stereotype.Service;
 9 @Service
10 public class TopicConsumer1 implements MessageListener{
11 
12     public void onMessage(Message message) {
13         TextMessage textMessage = (TextMessage)message;
14         try {
15             System.out.println("消費的TopicConsumer1獲取消息:"+textMessage.getText());
16         } catch (JMSException e) {
17             e.printStackTrace();
18         }
19     }
20 
21 }

TopicConsumer2:

 1 package cn.bowen.activemq.consume;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.MessageListener;
 6 import javax.jms.TextMessage;
 7 
 8 import org.springframework.stereotype.Service;
 9 @Service
10 public class TopicConsumer2 implements MessageListener{
11 
12     public void onMessage(Message message) {
13         TextMessage textMessage = (TextMessage)message;
14         try {
15             System.out.println("消費的TopicConsumer2獲取消息:"+textMessage.getText());
16         } catch (JMSException e) {
17             e.printStackTrace();
18         }
19     }
20 
21 }

配置applicationContext-mq-consumer.xml,注釋說明配置信息~~~

 1     <!-- 掃描包 -->
 2     <context:component-scan base-package="cn.bowen.activemq.consume" />
 3     
 4     <!-- ActiveMQ 連接工廠 -->
 5     <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
 6     <!-- 如果連接網絡:tcp://ip:61616;未連接網絡:tcp://localhost:61616 以及用戶名,密碼-->
 7     <amq:connectionFactory id="amqConnectionFactory"
 8         brokerURL="tcp://localhost:61616" userName="admin" password="admin"  />
 9 
10     <!-- Spring Caching連接工廠 -->
11     <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  
12     <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
13         <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  
14         <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
15         <!-- 同上,同理 -->
16         <!-- <constructor-arg ref="amqConnectionFactory" /> -->
17         <!-- Session緩存數量 -->
18         <property name="sessionCacheSize" value="100" />
19     </bean>
20     
21      <!-- 消息消費者 start-->
22 
23     <!-- 定義Queue監聽器 -->
24     <jms:listener-container destination-type="queue" container-type="default" 
25         connection-factory="connectionFactory" acknowledge="auto">
26         <!-- destination是隊列或話題名稱 -->
27         <!-- 默認注冊bean名稱,應該是類名首字母小寫  -->
28         <jms:listener destination="springQueue" ref="queueConsumer1"/>
29         <jms:listener destination="springQueue" ref="queueConsumer2"/>
30     </jms:listener-container>
31     
32     <!-- 定義Topic監聽器 -->
33     <jms:listener-container destination-type="topic" container-type="default" 
34         connection-factory="connectionFactory" acknowledge="auto">
35         <jms:listener destination="springTopic" ref="topicConsumer1"/>
36         <jms:listener destination="springTopic" ref="topicConsumer2"/>
37     </jms:listener-container>
38 
39     <!-- 消息消費者 end -->

配置applicationContext-mq.xml,注釋說明配置信息

 1     <!-- 掃描包 -->
 2     <context:component-scan base-package="cn.bowen.activemq.produce" />
 3     
 4     <!-- ActiveMQ 連接工廠 -->
 5     <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
 6     <!-- 如果連接網絡:tcp://ip:61616;未連接網絡:tcp://localhost:61616 以及用戶名,密碼-->
 7     <amq:connectionFactory id="amqConnectionFactory"
 8         brokerURL="tcp://localhost:61616" userName="admin" password="admin"  />
 9 
10     <!-- Spring Caching連接工廠 -->
11     <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  
12     <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
13         <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  
14         <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
15         <!-- 同上,同理 -->
16         <!-- <constructor-arg ref="amqConnectionFactory" /> -->
17         <!-- Session緩存數量 -->
18         <property name="sessionCacheSize" value="100" />
19     </bean>
20     
21      <!-- Spring JmsTemplate 的消息生產者 start-->
22 
23     <!-- 定義JmsTemplate的Queue類型 -->
24     <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
25         <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->  
26         <constructor-arg ref="connectionFactory" />
27         <!-- 非pub/sub模型(發布/訂閱),即隊列模式 -->
28         <property name="pubSubDomain" value="false" />
29     </bean>
30 
31     <!-- 定義JmsTemplate的Topic類型 -->
32     <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
33          <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->  
34         <constructor-arg ref="connectionFactory" />
35         <!-- pub/sub模型(發布/訂閱) -->
36         <property name="pubSubDomain" value="true" />
37     </bean>
38 
39     <!--Spring JmsTemplate 的消息生產者 end-->

在produce包下創建QueueProducer生產者,引用模版的JmsTemplate的Queue類型

 1 package cn.bowen.activemq.produce;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.Session;
 6 
 7 import org.springframework.beans.factory.annotation.Autowired;
 8 import org.springframework.beans.factory.annotation.Qualifier;
 9 import org.springframework.jms.core.JmsTemplate;
10 import org.springframework.jms.core.MessageCreator;
11 import org.springframework.stereotype.Service;
12 
13 @Service
14 public class QueueProducer {
15     @Autowired
16     @Qualifier("jmsQueueTemplate")
17     private JmsTemplate jmsTemplate;
18     
19     public void send(String queueName,final String msg){
20         jmsTemplate.send(queueName, new MessageCreator() {
21             
22             public Message createMessage(Session session) throws JMSException {
23                 return session.createTextMessage(msg);
24             }
25         });
26     }
27 }

在produce包下創建TopicProducer生產者,引用模版的JmsTemplate的Topic類型

 1 package cn.bowen.activemq.produce;
 2 
 3 import javax.jms.JMSException;
 4 import javax.jms.Message;
 5 import javax.jms.Session;
 6 
 7 import org.springframework.beans.factory.annotation.Autowired;
 8 import org.springframework.beans.factory.annotation.Qualifier;
 9 import org.springframework.jms.core.JmsTemplate;
10 import org.springframework.jms.core.MessageCreator;
11 import org.springframework.stereotype.Service;
12 
13 @Service
14 public class TopicProducer {
15     @Autowired
16     @Qualifier("jmsTopicTemplate")
17     private JmsTemplate jmsTemplate;
18     
19     public void send(String topicName,final String msg){
20         jmsTemplate.send(topicName, new MessageCreator() {
21             
22             public Message createMessage(Session session) throws JMSException {
23                 return session.createTextMessage(msg);
24             }
25         });
26     }
27 }

最后生產者和消費者的Queue和Topic倆種類型都准備好了~~~

准備測試>>>

測試我使用的是spring的JUnit4來進行注解測試

在test包下創建ConsumerTest(消費者監聽)

 1 package cn.bowen.activemq;
 2 
 3 import org.junit.Test;
 4 import org.junit.runner.RunWith;
 5 import org.springframework.test.context.ContextConfiguration;
 6 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 7 
 8 @RunWith(SpringJUnit4ClassRunner.class)
 9 @ContextConfiguration(locations="classpath:applicationContext-mq-consumer.xml")
10 public class ConsumerTest {
11 
12     @Test
13     public void testProduce(){
14         //線程不能關閉
15         while(true){}
16     }
17 }

創建生產者ProducerTest生產

 1 package cn.bowen.activemq;
 2 
 3 import org.junit.Test;
 4 import org.junit.runner.RunWith;
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.test.context.ContextConfiguration;
 7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 8 
 9 import cn.bowen.activemq.produce.QueueProducer;
10 import cn.bowen.activemq.produce.TopicProducer;
11 
12 @RunWith(SpringJUnit4ClassRunner.class)
13 @ContextConfiguration(locations="classpath:applicationContext-mq.xml")
14 public class ProducerTest {
15     @Autowired
16     private QueueProducer queueProducer;
17     
18     @Autowired
19     private TopicProducer topicProducer;
20     
21     @Test
22     public void testProduce(){
23         queueProducer.send("springQueue", "這是一個隊列消息!");
24         topicProducer.send("springTopic", "這是一個廣播/話題消息!");
25     }
26 }

先執行消費者進行監聽>>>

在通過生產者生產第一次消息發現>>

在通過生產者生產第二次消息發現>>

在通過生產者生產第三次消息發現>>

 

不難發現Queue和Topic的區別???

發送消息類型為Topic時,是以廣播的形式,每一個消費者都能消費到~~~

而發送消息Queue類型時,是作為一對一隊列形式的消費,一條消息只能一個消費者消費~~~(兩個消費者又好像是輪流消費哈)

兩種類型應用的業務場景不一樣!

 

今天Active就寫到這里啦,第一次寫見笑見笑~~~別噴我哦~~~

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM