ActiveMQ消息隊列,一對一推送,一對多訂閱


生產者:推送消息方,主要負責給用戶推送消息。(ActiveMQ接口)

消費者:接收消息方,主要查看推送過來的消息。(用戶)

ActiveMQ安裝非常簡單,在此我就不發了,安裝完ActiveMQ后,進入網址 http://localhost:8161/admin/ ,登錄管理后台,默認用戶名:admin        密碼:admin  到此結束,開始寫代碼。

寫代碼之前,還需要開啟ActiveMQ服務,在安裝文件夾里,打開bin文件夾,運行服務,如果是32位則選用32,64選64的bin。開啟服務后,就可以快樂的copy代碼了。

一、

  ①、一對一推送消息(Queue模式)生產者代碼:      ↓

  

 1 public class Sender {  
 2 
 3     public static void main(String[] args) throws JMSException, InterruptedException {  
 4         // ConnectionFactory :連接工廠,JMS 用它創建連接  
 5         //61616是ActiveMQ默認端口
 6         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(  
 7                                                   ActiveMQConnection.DEFAULT_USER,  
 8                                                   ActiveMQConnection.DEFAULT_PASSWORD,  
 9                                                   "tcp://localhost:61616");  
10 
11         // Connection :JMS 客戶端到JMS Provider 的連接  
12         Connection connection =  connectionFactory.createConnection();  
13 
14         connection.start();  
15         // Session: 一個發送或接收消息的線程  
16         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);  
17 
18         // Destination :消息的目的地;消息發送給誰.  
19         Destination destination =  session.createQueue("my-queue");  
20 
21         // MessageProducer:消息發送者  
22         MessageProducer producer =  session.createProducer(destination);  
23 
24         // 設置不持久化,可以更改  
25         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
26 
27         for(int i=0;i<10;i++){  
28             //創建文本消息  
29             TextMessage message = session.createTextMessage("hello.I am producer, this is a test message"+i);  
30 
31             Thread.sleep(1000);  
32             //發送消息  
33             producer.send(message);  
34         }  
35 
36         session.commit();  
37         session.close();  
38         connection.close();  
39     }  
40 
41 }  

 

 

 ②、消費者代碼(Queue模式)接收發送過來的消息:    ↓

 1 // ConnectionFactory :連接工廠,JMS 用它創建連接  
 2    private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,  
 3            ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");  
 4 
 5    public static void main(String[] args) throws JMSException {  
 6         // Connection :JMS 客戶端到JMS Provider 的連接  
 7         final Connection connection =  connectionFactory.createConnection();  
 8 
 9         connection.start();  
10         // Session: 一個發送或接收消息的線程  
11         final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);  
12         // Destination :消息的目的地;消息送誰那獲取.  
13         Destination destination =  session.createQueue("my-queue");  
14         // 消費者,消息接收者  
15         MessageConsumer consumer1 =  session.createConsumer(destination);  
16 
17         consumer1.setMessageListener(new MessageListener() {  
18                 @Override  
19                 public void onMessage(Message msg) {  
20 
21                     try {  
22 
23                         TextMessage message = (TextMessage)msg ;  
24                         System.out.println("consumerOne收到消息: "+message.getText());  
25                         session.commit();  
26                     } catch (JMSException e) {                
27                         e.printStackTrace();  
28                     }  
29 
30                 }  
31             });  
32 }  

運行之后控制台不會退出一直監聽消息庫,對於消息發送者的十條信息,控制輸出:

consumerOne收到消息: hello.I am producer, this is a test message0 
consumerOne收到消息: hello.I am producer, this is a test message1 
consumerOne收到消息: hello.I am producer, this is a test message2 
consumerOne收到消息: hello.I am producer, this is a test message3 
consumerOne收到消息: hello.I am producer, this is a test message4 
consumerOne收到消息: hello.I am producer, this is a test message5 
consumerOne收到消息: hello.I am producer, this is a test message6 
consumerOne收到消息: hello.I am producer, this is a test message7 
consumerOne收到消息: hello.I am producer, this is a test message8 
consumerOne收到消息: hello.I am producer, this is a test message9 
如果此時另外一個線程也存在消費者監聽該Queue,則兩者交換輸出。(注:Queue僅使用於 1V1 ,發送推送消息給個人

 

二、

  ①、一對多訂閱推送消息(Topic模式)生產者代碼:      ↓

 1 package com.ym.admin.config;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.DeliveryMode;
 6 import javax.jms.Destination;
 7 import javax.jms.JMSException;
 8 import javax.jms.MapMessage;
 9 import javax.jms.MessageProducer;
10 import javax.jms.Session;
11 import javax.jms.TextMessage;
12 
13 import org.apache.activemq.ActiveMQConnection;
14 import org.apache.activemq.ActiveMQConnectionFactory;
15 
16 //發送消息
17 public class Sender {  
18     public static void main(String[] args) throws JMSException, InterruptedException {
19         // ConnectionFactory :連接工廠,JMS 用它創建連接  
20         //61616是ActiveMQ默認端口
21         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(  
22                 "admin",  
23                 "admin",  
24                                                   "tcp://localhost:61616");  
25 
26         // Connection :JMS 客戶端到JMS Provider 的連接  
27         Connection connection =  connectionFactory.createConnection();  
28         connection.start();  
29         // Session: 一個發送或接收消息的線程  
30         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  //自動確認
31 
32         // Destination :消息的目的地;消息發送給誰.  
33         //Destination destination =  session.createQueue("my-queue");  
34         Destination destination = session.createTopic("STOCKS.myTopic"); //創建topic   myTopic
35         // MessageProducer:消息發送者  
36         MessageProducer producer =  session.createProducer(destination);  
37 
38         // 設置不持久化,可以更改  
39         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
40 
41          for(int i=0;i<3;i++){  
42             //創建文本消息  
43             TextMessage message = session.createTextMessage("j"+i);  
44             //發送消息  
45             System.out.println("發送消息成功:"+i);
46             producer.send(message);  
47         }  
48  
49         session.close();  
50         connection.close();
51     }  
52 }  

  

 

  ②、消費者代碼(Topic模式)接收發送過來的消息:    ↓

 1 package com.ym.admin.config;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.ConnectionMetaData;
 6 import javax.jms.Destination;
 7 import javax.jms.JMSException;
 8 import javax.jms.MapMessage;
 9 import javax.jms.Message;
10 import javax.jms.MessageConsumer;
11 import javax.jms.MessageListener;
12 import javax.jms.Session;
13 import javax.jms.TextMessage;
14 import javax.jms.Topic;
15 
16 import org.apache.activemq.ActiveMQConnection;
17 import org.apache.activemq.ActiveMQConnectionFactory;
18 import org.springframework.jms.support.JmsUtils;
19 import org.springframework.util.StringUtils;
20 
21 //接收訂閱
22 public class Receiver {  
23     private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin",
24             "admin", "tcp://localhost:61616");
25     public static void main(String[] args) {  
26 
27             // Connection :JMS 客戶端到JMS Provider 的連接
28             try {
29                 final Connection connection = connectionFactory.createConnection();
30                 connection.setClientID("anpei"); //持久訂閱需要設置這個。
31                  connection.start();
32                 int INDIVIDUAL_ACK_TYPE = 4;
33                 // Session: 一個發送或接收消息的線程
34                 final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
35                  // Destination :消息的目的地;消息送誰那獲取.
36                 Topic topic = session.createTopic("STOCKS.myTopic");// 創建topic
37                  
38                  // 消費者,消息接收者
39             //    MessageConsumer consumer1 = session.createConsumer(destination); 普通訂閱
40                 MessageConsumer consumer1 = session.createDurableSubscriber(topic,"anpei");//持久化訂閱
41                 
42                 consumer1.setMessageListener(new MessageListener() {
43                     
44                     @Override
45                     public void onMessage(Message msg) {
46 
47                         try {
48                             
49                             TextMessage message = (TextMessage) msg;
50                             System.out.println("AAAAAAAAA收到消息: " + message.getText());54                         //    JmsUtils.commitIfNecessary(session);
55                             session.commit();
56                             
57                              
58                         } catch (JMSException e) {
59                             e.printStackTrace();
60                         }
61 
62                     }
63                 });
64 
65                 /*// 再來一個消費者,消息接收者
66                 MessageConsumer consumer2 = session.createConsumer(destination);
67 
68                 consumer2.setMessageListener(new MessageListener() {
69                     @Override
70                     public void onMessage(Message msg) {
71 
72                         try {
73 
74                             TextMessage message = (TextMessage) msg;
75                             System.out.println("BBBBBBBBB收到消息: " + message.getText());
76                             JmsUtils.commitIfNecessary(session);
77                           //  session.commit();
78                         } catch (JMSException e) {
79                             e.printStackTrace();
80                         }
81 
82                     }
83                 });*/
84             } catch (Exception e) {
85                 e.printStackTrace();
86             }
87     }  
88   
89 }  

 

上面程序運行后輸出以下信息:

AAAAAAAAA收到消息: j0

AAAAAAAAA收到消息: j1

AAAAAAAAA收到消息: j2

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Session: 一個發送或接收消息的線程

  一、事務

  這一句的 true 代表是否使用事務,true:使用   false:不使用。

  使用事務和不使用事務的區別:↓

  如果使用事務,下面一定要session.commit();提交。 如果不使用事務,則需要把這句話刪掉,因為沒有事務可以提交,會報錯。

  如果使用事務,一切都要按照事務的規則來。如果為false,則需要使用ACK機制進行確認接收。如果不確認接收,將一直收到消息通知。

  二、ACK機制

Session.AUTO_ACKNOWLEDGE  ACK機制,對接收的消息進行處理
  • AUTO_ACKNOWLEDGE = 1    自動確認
  • CLIENT_ACKNOWLEDGE = 2    客戶端手動確認   
  • DUPS_OK_ACKNOWLEDGE = 3    自動批量確認
  • SESSION_TRANSACTED = 0    事務提交並確認

    此外AcitveMQ補充了一個自定義的ACK_MODE:

  • INDIVIDUAL_ACKNOWLEDGE = 4    單條消息確認
  • 具體請參考 http://blog.csdn.net/lulongzhou_llz/article/details/42270113 

    三、持久化消息
    定義:消息持久性對於可靠消息傳遞來說應該是一種比較好的方法,有了消息持久化,即使發送者和接受者不是同時在線(服務同時啟動)或者消息中心在發送者發送消息后宕機了,在消息中心重新啟動后仍然可以將消息發送出去。(意思就是說,即使消費者處於離線狀態,在重新登陸后,依然可以收到消息)
  • 上面一對多訂閱的代碼,是我持久化過后的代碼,通過:
                    connection.setClientID("zhangsan"); //持久訂閱需要設置這個。
                    MessageConsumer consumer1 = session.createDurableSubscriber(topic,"zhangsan");//持久化訂閱

    如:我是張三,我是張三,請給我饅頭。我是王七,我是王七,請給我饅頭。對比一對一消息推送,很容易就看懂了。

    上面程序接收的消息,接收一次,就不能接收了,因為

  • AUTO_ACKNOWLEDGE = 1    自動確認
    程序在第一次運行,就已經自動確認已經接收了消息,ActiveMQ將自動刪除已經確認的消息。

    如果哪個地方有問題,歡迎留言,謝謝。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM