學習ActiveMQ(三):發布/訂閱模式(topic)演示


 

  1.在這個項目中新增兩個java類,主題生產者和主題消費者:

  2.和點對點的代碼差別並不大,所以將消費者和生產者的分別代碼拷入新增的java類中,再修改就好了。

appProducerTopic代碼:標紅字體是做出了修改,由創建隊列改為了創建主題。
package com.liu.jms;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class appProducerTopic {

    private static final String url = "tcp://127.0.0.1:61616";//actvemq的服務器tcp連接方式
    private static final String topicName = "topic-test";//定義主題的名稱

    public static void main(String[] args) throws  JMSException {
        //1.創建connectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        //2.創建connection
        Connection connection = connectionFactory.createConnection();
        //3.啟動連接
        connection.start();
        //4.創建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建destination
        Destination destination = session.createTopic(topicName);
        //6.創建生產者
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < 100; i++) {

            TextMessage textMessage = session.createTextMessage("test" + i);
            //7.發送消息
            producer.send(textMessage);

            System.out.println("發送消息" + textMessage.getText());

        }
        //8.關閉連接
        connection.close();
    }
}
appConsumerTopic代碼:標紅字體是做出了修改,由創建隊列改為了創建主題。
package com.liu.jms;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class appConsumerTopic {

    private static final String url = "tcp://127.0.0.1:61616";
    private static final String topicName = "topic-test";//定義主題的名稱

    public static void main(String[] args) throws  JMSException {
        //1.創建connectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        //2.創建connection
        Connection connection = connectionFactory.createConnection();
        //3.啟動連接
        connection.start();
        //4.創建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建destination
        Destination destination = session.createTopic(topicName); //6.創建消費者
        MessageConsumer consumer = session.createConsumer(destination);
        //7.創建一個監聽器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {

                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收到的消息:" + textMessage.getText());

                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.關閉連接(監聽器是異步的還沒有監聽到消息的時候,就關閉連接了)
        //connection.close();
    }
}

  3.測試

  首先啟動消費者這個java類,觀察控制台,如下圖:

 

  接着啟動生產者的java類,觀察控制台,如下圖:生產了一百條消息。

 

  此時切換至消費的控制台,觀察控制台,如下圖:已經打印出了一百條消息了,說明消費者已經接受到全部一百條消息。

   6.打開activemq的控制台查看topics:(http://127.0.0.1:8161/admin/topics.jsp)如下圖所示:有一個名字是我們設置的queue-test的主題,消費者也有一個就是我們創建的那個消費者類,主題中有一百條消息,被移除了一百條,也就是上面所說的,消費者接收到了這100條全部的消息。

  7.那么如果我啟動了兩個訂閱相同的消費者呢?為了結果能清晰,重啟activemq服務,關掉之前的Java類啟動,然后啟動兩邊消費者,再啟動一個生產者。如下圖:生產者生產了100條消息。

 

  8.分別看看兩個消費者的接收消息,如下兩張圖:兩個消費者都接受到了一模一樣的100條消息。

  9.總結:主題訂閱發布模式,有多個消費的訂閱相同時,消費者不會相互相互影響,都會分別接收到生產者的全部消息。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM