在上一篇《ActiveMQ入門系列二:入門代碼實例(點對點模式)》中提到了ActiveMQ中的兩種模式:點對點模式(PTP)和發布/訂閱模式(Pub & Sub),詳細介紹了點對點模式並用代碼實例進行說明,今天就介紹下發布/訂閱模式。
一、理論基礎
發布/訂閱模式的工作示意圖:
- 消息生產者將消息(發布)到topic中,可以同時有多個消息消費者(訂閱)消費該消息。
- 和點對點方式不同,發布到topic的消息會被所有訂閱者消費。
- 當生產者發布消息,不管是否有消費者,都不會保存消息。
- 一定要先有消息的消費者,后有消息的生產者。
二、代碼實現
- 生產者
package com.sam.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @author JAVA開發老菜鳥 * */ public class TopicProducer { public static final String QUEUE_NAME = "topic-demo";//隊列名 public void producer(String message) throws JMSException { ConnectionFactory factory = null; Connection connection = null; Session session = null; MessageProducer producer = null; try { /** * 1.創建連接工廠 * 創建工廠,構造方法有三個參數:分別是用戶名、密碼、連接地址 * 無參構造:有默認的連接地址,localhost * 一個參數:無驗證模式,無用戶的認證 * 三個參數:有認證和連接地址 */ factory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616"); /** * 2.創建連接 * 無參數 * 有參數:用戶名、密碼 */ connection = factory.createConnection(); /** * 3.啟動連接 * 生產者可以不啟動,因為在發送消息的時候回進行檢查 * 如果未啟動連接,會自動啟動 * 如果有特殊配置,需要配置完成后再啟動連接 */ connection.start(); /** * 4.用連接創建會話 * 有兩個參數:是否需要事務、消息確認機制 * 如果支持事務,對於生產者來說第二個參數就無效了,建議傳入Session.SESSION_TRANSACTED * 如果不支持事務,第二個參數必須傳遞且有效 * * AUTO_ACKNOWLEDGE:自動確認,消息處理后自動確認(商業開發不推薦) * CLIENT_ACKNOWLEDGE:客戶端手動確認,消費者處理后必須手動確認 * DUPS_OK_ACKNOWLEDGE:有副本的客戶端手動確認,消息可以多次處理(不建議) */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); /** * 5.用會話創建目的地(主題)、生產者、消息 * 隊列名是隊列的唯一標記 * 創建生產者的時候可以不指定目的地,可以在發送的時候指定 */ Destination destination = session.createTopic(QUEUE_NAME); producer = session.createProducer(destination); TextMessage textMessage = session.createTextMessage(message); /** * 6.生產者發送消息到目的地 */ producer.send(textMessage); System.out.println("消息發送成功"); } catch(Exception ex){ throw ex; } finally { /** * 7.釋放資源 */ if(producer != null){ producer.close(); } if(session != null){ session.close(); } if(connection != null){ connection.close(); } } } public static void main(String[] args){ TopicProducer producer = new TopicProducer(); try{ producer.producer("hello, activemq"); } catch (Exception ex){ ex.printStackTrace(); } } }
發布/訂閱模式的生產者和點對點模式的代碼主要區別就是Destination的創建方式,點對點模式是調用session.createQueue(QUEUE_NAME),而發布/訂閱模式是調用session.createTopic(QUEUE_NAME)。
- 消費者
package com.sam.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; /** * @author JAVA開發老菜鳥 * * 觀察者消費--監聽消費 */ public class TopicConsumer { public void consumer() throws JMSException, IOException { ConnectionFactory factory = null; Connection connection = null; Session session = null; MessageConsumer consumer = null; try { factory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616"); connection = factory.createConnection(); /** * 消費者必須啟動連接,否則無法消費 */ connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic(TopicProducer.QUEUE_NAME); consumer = session.createConsumer(destination); /** * 注冊監聽器,隊列中的消息變化會自動觸發監聽器,接收並自動處理消息 * * 監聽器一旦注冊,永久有效,一直到程序關閉 * 監聽器可以注冊多個,相當於集群 * activemq自動輪詢多個監聽器,實現並行處理 */ consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage om = (TextMessage) message; String data = om.getText(); System.out.println(data); } catch (JMSException e) { e.printStackTrace(); } } }); } catch(Exception ex){ throw ex; } } public static void main(String[] args){ TopicConsumer consumer = new TopicConsumer(); try{ consumer.consumer(); } catch (Exception ex){ ex.printStackTrace(); } } }
消費者在點對點監聽消費的基礎上進行變化,主要區別有兩個:1.同生產者一樣,也是Destination的創建方式不同; 2.消息無需手動確認,直接采用自動確認機制
代碼寫完了,接下來進行測試,由於subscribe可以有多個,而且每個都可以消費到相同的消息,因此我們消費者啟動兩個。
先執行生產者
在控制台頁面的Topics下出現了我定義的topic並且有1條消息發送成功且未消費
然后執行兩個消費者,兩個消費者都沒有消費到任何消息
並且,控制台頁面只是多了2個消費者,已經消費的消息還是0
為什么呢?還記得前面的理論基礎說的嗎?就是這個原因
繼續,我們在兩個消費者啟動好的前提下,再執行生產者, 這個時候會發現兩個消費者都消費了該消息
再看下控制台頁面
已消費消息這里是2,這個2並不是說之前發的兩個消息都消費了,而是說第二個消息消費了2次, 1 * 2 = 2
不信的話,可以再執行一遍生產者,這個時候就是4,而不是3
累計發送過3條消息,消息消費了4次,這里的4就是后面兩條分別被消費了2次, 2 * 2 = 4
三、兩種模式比較
好,到這里,發布/訂閱模式就介紹完了。
如果有收獲,就點個贊唄