ActiveMQ--模式(隊列模式/主題模式)


兩種模式:隊列模式/主題模式

pom.xml

<dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.9</version>
</dependency>  

 

隊列模式,其實就是分食模式。

   比如生產方發了 10條消息到 activeMQ 服務器, 而此時有多個 消費方,那么這些消費方就會瓜分這些10條消息,一條消息只會被一個消費方得到。
主題模式,就是訂閱模式。

  比如生產方發了10條消息,而此時有多個消費方,那么多個消費方都能得到這 10條消息,就如同訂閱公眾號那樣。


 

隊列模式:

1. 首先運行兩次 TestConsumer 類,以啟動兩個不同的消費者
2. 運行一次 TestProducer, 以啟動 生產者

  生產者生產100個,兩個消費者瓜分

  消費者:

public class TestConsumer {
    //服務地址,端口默認61616
    private static final String url="tcp://127.0.0.1:61616";
    //這次消費的消息名稱
    private static final String topicName="queue_style";

    //消費者有可能是多個,為了區分不同的消費者,為其創建隨機名稱
    private static final String consumerName="consumer-" + RandomUtil.randomString(5);
    public static void main(String[] args) throws JMSException {
        //0. 先判斷端口是否啟動了 Active MQ 服務器
        ActiveMQUtil.checkServer();
        System.out.printf("%s 消費者啟動了。 %n", consumerName);

        //1.創建ConnectiongFactory,綁定地址
        ConnectionFactory factory=new ActiveMQConnectionFactory(url);
        //2.創建Connection
        Connection connection= factory.createConnection();
        //3.啟動連接
        connection.start();
        //4.創建會話
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建一個目標 (主題類型)
        Destination destination=session.createQueue(topicName);
        //6.創建一個消費者
        MessageConsumer consumer=session.createConsumer(destination);
        //7.創建一個監聽器
        consumer.setMessageListener(new MessageListener() {

            public void onMessage(Message arg0) {
                // TODO Auto-generated method stub
                TextMessage textMessage=(TextMessage)arg0;
                try {
                    System.out.println(consumerName +" 接收消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        });
        
        //8. 因為不知道什么時候有,所以沒法主動關閉,就不關閉了,一直處於監聽狀態
        //connection.close();
    }
}

生產者:

public class TestProducer {

    //服務地址,端口默認61616
    private static final String url="tcp://127.0.0.1:61616";
    //這次發送的消息名稱
    private static final String topicName="queue_style";
    public static void main(String[] args) throws JMSException {
        //0. 先判斷端口是否啟動了  Active MQ 服務器
        ActiveMQUtil.checkServer();
        //1.創建ConnectiongFactory,綁定地址
        ConnectionFactory factory=new ActiveMQConnectionFactory(url);
        //2.創建Connection
        Connection connection= factory.createConnection();
        //3.啟動連接
        connection.start();
        //4.創建會話
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建一個目標 (隊列類型)
        Destination destination=session.createQueue(topicName);
        //6.創建一個生產者
        MessageProducer producer=session.createProducer(destination);


        for (int i = 0; i < 100; i++) {
            //7.創建消息
            TextMessage textMessage=session.createTextMessage("隊列消息-"+i);
            //8.發送消息
            producer.send(textMessage);
            System.out.println("發送:"+textMessage.getText());
        }
        //7. 關閉連接
        connection.close();
    }
}

2個consumer:

 

 

生產者生產:

 

 

 

 

 

 

 

 

 

 


 

主題模式:

 

  消費者,生產者

public class TestConsumer {
    //服務地址,端口默認61616
    private static final String url="tcp://127.0.0.1:61616";
    //這次消費的消息名稱
    private static final String topicName="topic_style";

    //消費者有可能是多個,為了區分不同的消費者,為其創建隨機名稱
    private static final String consumerName="consumer-" + RandomUtil.randomString(5);
    public static void main(String[] args) throws JMSException {
        

        //0. 先判斷端口是否啟動了 Active MQ 服務器
        ActiveMQUtil.checkServer();
        System.out.printf("%s 消費者啟動了。 %n", consumerName);
        //1.創建ConnectiongFactory,綁定地址
        ConnectionFactory factory=new ActiveMQConnectionFactory(url);
        //2.創建Connection
        Connection connection= factory.createConnection();
        //3.啟動連接
        connection.start();
        //4.創建會話
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //5.創建一個目標 (主題類型)
        Destination destination=session.createTopic(topicName);

        //6.創建一個消費者
        MessageConsumer consumer=session.createConsumer(destination);
        //7.創建一個監聽器
        consumer.setMessageListener(new MessageListener() {

            public void onMessage(Message arg0) {
                // TODO Auto-generated method stub
                TextMessage textMessage=(TextMessage)arg0;
                try {
                    System.out.println(consumerName +" 接收消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        });
        
        //8. 因為不知道什么時候有,所以沒法主動關閉,就不關閉了,一直處於監聽狀態
        //connection.close();
    }
}
public class TestProducer {

    //服務地址,端口默認61616
    private static final String url="tcp://127.0.0.1:61616";
    //這次發送的消息名稱
    private static final String topicName="topic_style";
    public static void main(String[] args) throws JMSException {
        //0. 先判斷端口是否啟動了  Active MQ 服務器
        ActiveMQUtil.checkServer();
        //1.創建ConnectiongFactory,綁定地址
        ConnectionFactory factory=new ActiveMQConnectionFactory(url);
        //2.創建Connection
        Connection connection= factory.createConnection();
        //3.啟動連接
        connection.start();
        //4.創建會話
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建一個目標 (主題類型)
        Destination destination=session.createTopic(topicName);
        //6.創建一個生產者
        MessageProducer producer=session.createProducer(destination);


        for (int i = 0; i < 100; i++) {
            //7.創建消息
            TextMessage textMessage=session.createTextMessage("主題消息-"+i);
            //8.發送消息
            producer.send(textMessage);
            System.out.println("發送:"+textMessage.getText());
        }
        //7. 關閉連接
        connection.close();
    }
}

 

生產者生產100個,兩個消費者都分別接受了100個

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM