1.在這個項目中新增兩個java類,主題生產者和主題消費者:
2.和點對點的代碼差別並不大,所以將消費者和生產者的分別代碼拷入新增的java類中,再修改就好了。
appProducerTopic代碼:標紅字體是做出了修改,由創建隊列改為了創建主題。
package com.liu.jms; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class appProducerTopic { private static final String url = "tcp://127.0.0.1:61616";//actvemq的服務器tcp連接方式 private static final String topicName = "topic-test";//定義主題的名稱 public static void main(String[] args) throws JMSException { //1.創建connectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //2.創建connection Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.創建session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建destination Destination destination = session.createTopic(topicName); //6.創建生產者 MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { TextMessage textMessage = session.createTextMessage("test" + i); //7.發送消息 producer.send(textMessage); System.out.println("發送消息" + textMessage.getText()); } //8.關閉連接 connection.close(); } }
appConsumerTopic代碼:標紅字體是做出了修改,由創建隊列改為了創建主題。
package com.liu.jms; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class appConsumerTopic { private static final String url = "tcp://127.0.0.1:61616"; private static final String topicName = "topic-test";//定義主題的名稱 public static void main(String[] args) throws JMSException { //1.創建connectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //2.創建connection Connection connection = connectionFactory.createConnection(); //3.啟動連接 connection.start(); //4.創建session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建destination Destination destination = session.createTopic(topicName); //6.創建消費者 MessageConsumer consumer = session.createConsumer(destination); //7.創建一個監聽器 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("接收到的消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //8.關閉連接(監聽器是異步的還沒有監聽到消息的時候,就關閉連接了) //connection.close(); } }
3.測試
首先啟動消費者這個java類,觀察控制台,如下圖:
接着啟動生產者的java類,觀察控制台,如下圖:生產了一百條消息。
此時切換至消費的控制台,觀察控制台,如下圖:已經打印出了一百條消息了,說明消費者已經接受到全部一百條消息。
6.打開activemq的控制台查看topics:(http://127.0.0.1:8161/admin/topics.jsp)如下圖所示:有一個名字是我們設置的queue-test的主題,消費者也有一個就是我們創建的那個消費者類,主題中有一百條消息,被移除了一百條,也就是上面所說的,消費者接收到了這100條全部的消息。
7.那么如果我啟動了兩個訂閱相同的消費者呢?為了結果能清晰,重啟activemq服務,關掉之前的Java類啟動,然后啟動兩邊消費者,再啟動一個生產者。如下圖:生產者生產了100條消息。
8.分別看看兩個消費者的接收消息,如下兩張圖:兩個消費者都接受到了一模一樣的100條消息。
9.總結:主題訂閱發布模式,有多個消費的訂閱相同時,消費者不會相互相互影響,都會分別接收到生產者的全部消息。