ActiveMQ發布-訂閱消息模式(同點對點模式的區別)


點對點與發布訂閱最初是由JMS定義的。這兩種模式主要區別或解決的問題就是發送到隊列的消息能否重復消費(多訂閱)

點對點: 
消息生產者生產消息發送到queue中,然后消息消費者從queue中取出並且消費消息。這里要注意: 
消息被消費以后,queue中不再有存儲,所以消息消費者不可能消費到已經被消費的消息。 
Queue支持存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費。 
發布/訂閱 
消息生產者(發布)將消息發布到topic中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發布到topic的消息會被所有訂閱者消費。

1、消息生產者-消息發布-Topic

[html]  view plain  copy
 
 
  1. /**  
  2.  * 消息生產者-消息發布者  
  3.  * @author Administrator  
  4.  *  
  5.  */  
  6. public class JMSProducer {  
  7.     private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默認的連接用戶名  
  8.     private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默認的連接密碼  
  9.     private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默認的連接地址  
  10.     private static final int SENDNUM=10; // 發送的消息數量  
  11.       
  12.     public static void main(String[] args) {  
  13.           
  14.         ConnectionFactory connectionFactory; // 連接工廠  
  15.         Connection connection = null; // 連接  
  16.         Session session; // 會話 接受或者發送消息的線程  
  17.         Destination destination; // 消息的目的地  
  18.         MessageProducer messageProducer; // 消息生產者         
  19.         // 實例化連接工廠  
  20.         connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);  
  21.           
  22.         try {  
  23.             connection=connectionFactory.createConnection(); // 通過連接工廠獲取連接  
  24.             connection.start(); // 啟動連接  
  25.             session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 創建Session  
  26.             // destination=session.createQueue("FirstQueue1"); // 創建消息隊列  
  27.             destination=session.createTopic("FirstTopic1");  
  28.             messageProducer=session.createProducer(destination); // 創建消息生產者  
  29.             sendMessage(session, messageProducer); // 發送消息  
  30.             session.commit();  
  31.         } catch (Exception e) {  
  32.             // TODO Auto-generated catch block  
  33.             e.printStackTrace();  
  34.         } finally{  
  35.             if(connection!=null){  
  36.                 try {  
  37.                     connection.close();  
  38.                 } catch (JMSException e) {  
  39.                     // TODO Auto-generated catch block  
  40.                     e.printStackTrace();  
  41.                 }  
  42.             }  
  43.         }  
  44.     }  
  45.       
  46.     /**  
  47.      * 發送消息  
  48.      * @param session  
  49.      * @param messageProducer  
  50.      * @throws Exception  
  51.      */  
  52.     public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{  
  53.         for(int i=0;i<JMSProducer.SENDNUM;i++){  
  54.             TextMessage message=session.createTextMessage("ActiveMQ 發送的消息"+i);  
  55.             System.out.println("發送消息:"+"ActiveMQ 發布的消息"+i);  
  56.             messageProducer.send(message);  
  57.         }  
  58.     }  
  59. }  

2、多個消息訂閱者-消息消費者

消息訂閱者一

[html]  view plain  copy
 
 
  1. /**  
  2.  * 消息消費者-消息訂閱者一  
  3.  * @author Administrator  
  4.  *  
  5.  */  
  6. public class JMSConsumer {  
  7.   
  8.     private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默認的連接用戶名  
  9.     private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默認的連接密碼  
  10.     private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默認的連接地址  
  11.       
  12.     public static void main(String[] args) {  
  13.         ConnectionFactory connectionFactory; // 連接工廠  
  14.         Connection connection = null; // 連接  
  15.         Session session; // 會話 接受或者發送消息的線程  
  16.         Destination destination; // 消息的目的地  
  17.         MessageConsumer messageConsumer; // 消息的消費者  
  18.           
  19.         // 實例化連接工廠  
  20.         connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);  
  21.                   
  22.         try {  
  23.             connection=connectionFactory.createConnection();  // 通過連接工廠獲取連接  
  24.             connection.start(); // 啟動連接  
  25.             session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 創建Session  
  26.             // destination=session.createQueue("FirstQueue1");  // 創建連接的消息隊列  
  27.             destination=session.createTopic("FirstTopic1");  
  28.             messageConsumer=session.createConsumer(destination); // 創建消息消費者  
  29.             messageConsumer.setMessageListener(new Listener()); // 注冊消息監聽  
  30.         } catch (JMSException e) {  
  31.             // TODO Auto-generated catch block  
  32.             e.printStackTrace();  
  33.         }   
  34.     }  
  35. }  

消息訂閱者二

[html]  view plain  copy
 
 
  1. /**  
  2.  * 消息消費者-消息訂閱者二  
  3.  * @author Administrator  
  4.  *  
  5.  */  
  6. public class JMSConsumer2 {  
  7.   
  8.     private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默認的連接用戶名  
  9.     private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默認的連接密碼  
  10.     private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默認的連接地址  
  11.       
  12.     public static void main(String[] args) {  
  13.         ConnectionFactory connectionFactory; // 連接工廠  
  14.         Connection connection = null; // 連接  
  15.         Session session; // 會話 接受或者發送消息的線程  
  16.         Destination destination; // 消息的目的地  
  17.         MessageConsumer messageConsumer; // 消息的消費者  
  18.           
  19.         // 實例化連接工廠  
  20.         connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);  
  21.                   
  22.         try {  
  23.             connection=connectionFactory.createConnection();  // 通過連接工廠獲取連接  
  24.             connection.start(); // 啟動連接  
  25.             session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 創建Session  
  26.             // destination=session.createQueue("FirstQueue1");  // 創建連接的消息隊列  
  27.             destination=session.createTopic("FirstTopic1");  
  28.             messageConsumer=session.createConsumer(destination); // 創建消息消費者  
  29.             messageConsumer.setMessageListener(new Listener2()); // 注冊消息監聽  
  30.         } catch (JMSException e) {  
  31.             // TODO Auto-generated catch block  
  32.             e.printStackTrace();  
  33.         }   
  34.     }  
  35. }  

     兩個Linsner用於打印不同的標識信息,故省略。

     注:發布訂閱模式適用於1個消息生產者,多個消費者場景,首先啟動消息訂閱方,在消息發布方開始執行后,接收該消息進行處理。在ActiveMQ管理界面會動態跟進消息產生-消費(入隊、出隊)情況;以及生產者個數,消費者個數。

http://blog.csdn.net/Daybreak1209/article/details/51672277

http://blog.csdn.net/zbw18297786698/article/details/53000605


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM