ActiveMQ的queue以及topic兩種消息處理機制分析


 

       

1    queue與topic的技術特點對比

 

 

     對比項

Topic

Queue

概要

Publish Subscribe messaging 發布訂閱消息

Point-to-Point 點對點

有無狀態

topic數據默認不落地,是無狀態的。

Queue數據默認會在mq服務器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存儲。

完整性保障

並不保證publisher發布的每條數據,Subscriber都能接受到。

Queue保證每條數據都能被receiver接收。

消息是否會丟失

一般來說publisher發布消息到某一個topic時,只有正在監聽該topic地址的sub能夠接收到消息;如果沒有sub在監聽,該topic就丟失了。

Sender發送消息到目標Queue,receiver可以異步接收這個Queue上的消息。Queue上的消息如果暫時沒有receiver來取,也不會丟失。

消息發布接收策略

一對多的消息發布接收策略,監聽同一個topic地址的多個sub都能收到publisher發送的消息。Sub接收完通知mq服務器

一對一的消息發布接收策略,一個sender發送的消息,只能有一個receiver接收。receiver接收完后,通知mq服務器已接收,mq服務器對queue里的消息采取刪除或其他操作。

          Topic和queue的最大區別在於topic是以廣播的形式,通知所有在線監聽的客戶端有新的消息,沒有監聽的客戶端將收不到消息;而queue則是以點對點的形式通知多個處於監聽狀態的客戶端中的一個。

 

2    topic和queue方式的消息處理效率比較

        通過增加監聽客戶端的並發數來驗證,topic的消息推送,是否會因為監聽客戶端的並發上升而出現明顯的下降,測試環境的服務器為ci環境的ActiveMQ,客戶端為我的本機。

        從實測的結果來看,topic方式發送的消息,發送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者(線程)並發的前提下,效率差異很明顯(由於500線程並發的情況下,我本機的cpu占用率已高達70-90%,所以無法確認是我本機測試造成的性能瓶頸還是topic消息發送方式存在性能瓶頸,造成效率下降如此明顯)。

        Topic方式發送的消息與queue方式發送的消息,發送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者並發的前提下,topic方式的效率明顯低於queue。

        Queue方式發送的消息,在一個訂閱者、100個訂閱者和500個訂閱者的前提下,發送和接收的效率沒有明顯變化。

Topic實測數據:

 

 

發送者發送的消息總數

所有訂閱者接收到消息的總數

消息發送和接收平均耗時

單訂閱者

100

100

101ms

100訂閱者

100

10000

103ms

500訂閱者

100

50000

14162ms

 

Queue實測數據:

 

 

發送者發送的消息總數

所有訂閱者接收到消息的總數

消息發送和接收平均耗時

單訂閱者

100

100

96ms

100訂閱者

100

100

96ms

500訂閱者

100

100

100ms

 

3     topic方式的消息處理示例
3.1     通過客戶端代碼調用來發送一個topic的消息:

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

 

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

 

publicclass SendTopic {

    privatestaticfinalintSEND_NUMBER = 5;

    publicstaticvoid sendMessage(Session session, MessageProducer producer)

            throws Exception {

        for (int i = 1; i <=SEND_NUMBER; i++) {

            TextMessage message = session

                    .createTextMessage("ActiveMq發送的消息" + i);

            //發送消息到目的地方

            System.out.println("發送消息:" + "ActiveMq 發送的消息" + i);

            producer.send(message);

        }

    }

   

    publicstaticvoid main(String[] args) {

        // ConnectionFactory:連接工廠,JMS用它創建連接

        ConnectionFactory connectionFactory;

        // Connection:JMS客戶端到JMS Provider的連接

        Connection connection = null;

        // Session:一個發送或接收消息的線程

        Session session;

        // Destination:消息的目的地;消息發送給誰.

        Destination destination;

        // MessageProducer:消息發送者

        MessageProducer producer;

        // TextMessage message;

        //構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar

        connectionFactory = new ActiveMQConnectionFactory(

                ActiveMQConnection.DEFAULT_USER,

                ActiveMQConnection.DEFAULT_PASSWORD,

                "tcp://10.20.8.198:61616");

        try {

            //構造從工廠得到連接對象

            connection = connectionFactory.createConnection();

            //啟動

            connection.start();

            //獲取操作連接

            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

            //獲取session注意參數值FirstTopic是一個服務器的topic(與queue消息的發送相比,這里是唯一的不同)

            destination = session.createTopic("FirstTopic");

            //得到消息生成者【發送者】

            producer = session.createProducer(destination);

            //設置不持久化,此處學習,實際根據項目決定

            producer.setDeliveryMode(DeliveryMode.PERSISTENT);

            //構造消息,此處寫死,項目就是參數,或者方法獲取

            sendMessage(session, producer);

            session.commit();

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            try {

                if (null != connection)

                    connection.close();

            } catch (Throwable ignore) {

            }

        }

    }

}

 

3.2     啟動多個客戶端監聽來接收topic的消息:

publicclass ReceiveTopicimplements Runnable {

      private StringthreadName;

 

      ReceiveTopic(String threadName) {

           this.threadName = threadName;

      }

 

      publicvoid run() {

           // ConnectionFactory:連接工廠,JMS用它創建連接

           ConnectionFactory connectionFactory;

           // Connection:JMS客戶端到JMS Provider的連接

           Connection connection =null;

           // Session:一個發送或接收消息的線程

           Session session;

           // Destination:消息的目的地;消息發送給誰.

           Destination destination;

           //消費者,消息接收者

           MessageConsumer consumer;

           connectionFactory = new ActiveMQConnectionFactory(

                      ActiveMQConnection.DEFAULT_USER,

                      ActiveMQConnection.DEFAULT_PASSWORD,"tcp://10.20.8.198:61616");

           try {

                 //構造從工廠得到連接對象

                 connection = connectionFactory.createConnection();

                 //啟動

                 connection.start();

                 //獲取操作連接,默認自動向服務器發送接收成功的響應

                 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                 //獲取session注意參數值FirstTopic是一個服務器的topic

                 destination = session.createTopic("FirstTopic");

                 consumer = session.createConsumer(destination);

                 while (true) {

                      //設置接收者接收消息的時間,為了便於測試,這里設定為100s

                      TextMessage message = (TextMessage) consumer

                                  .receive(100 * 1000);

                      if (null != message) {

                            System.out.println("線程"+threadName+"收到消息:" + message.getText());

                      } else {

                            continue;

                      }

                 }

           } catch (Exception e) {

                 e.printStackTrace();

           } finally {

                 try {

                      if (null != connection)

                            connection.close();

                 } catch (Throwable ignore) {

                 }

           }

      }

 

      publicstaticvoid main(String[] args) {

            //這里啟動3個線程來監聽FirstTopic的消息,與queue的方式不一樣三個線程都能收到同樣的消息

           ReceiveTopic receive1=new ReceiveTopic("thread1");

           ReceiveTopic receive2=new ReceiveTopic("thread2");

           ReceiveTopic receive3=new ReceiveTopic("thread3");

           Thread thread1=new Thread(receive1);

           Thread thread2=new Thread(receive2);

           Thread thread3=new Thread(receive3);

           thread1.start();

           thread2.start();

           thread3.start();

      }

}

 

4     queue方式的消息處理示例


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM