1 queue與topic的技術特點對比
對比項 |
Topic |
Queue |
概要 |
Publish Subscribe messaging 發布訂閱消息 |
Point-to-Point 點對點 |
有無狀態 |
topic數據默認不落地,是無狀態的。 |
Queue數據默認會在mq服務器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存儲。 |
完整性保障 |
並不保證publisher發布的每條數據,Subscriber都能接受到。 |
Queue保證每條數據都能被receiver接收。 |
消息是否會丟失 |
一般來說publisher發布消息到某一個topic時,只有正在監聽該topic地址的sub能夠接收到消息;如果沒有sub在監聽,該topic就丟失了。 |
Sender發送消息到目標Queue,receiver可以異步接收這個Queue上的消息。Queue上的消息如果暫時沒有receiver來取,也不會丟失。 |
消息發布接收策略 |
一對多的消息發布接收策略,監聽同一個topic地址的多個sub都能收到publisher發送的消息。Sub接收完通知mq服務器 |
一對一的消息發布接收策略,一個sender發送的消息,只能有一個receiver接收。receiver接收完后,通知mq服務器已接收,mq服務器對queue里的消息采取刪除或其他操作。 |
Topic和queue的最大區別在於topic是以廣播的形式,通知所有在線監聽的客戶端有新的消息,沒有監聽的客戶端將收不到消息;而queue則是以點對點的形式通知多個處於監聽狀態的客戶端中的一個。
2 topic和queue方式的消息處理效率比較
通過增加監聽客戶端的並發數來驗證,topic的消息推送,是否會因為監聽客戶端的並發上升而出現明顯的下降,測試環境的服務器為ci環境的ActiveMQ,客戶端為我的本機。
從實測的結果來看,topic方式發送的消息,發送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者(線程)並發的前提下,效率差異很明顯(由於500線程並發的情況下,我本機的cpu占用率已高達70-90%,所以無法確認是我本機測試造成的性能瓶頸還是topic消息發送方式存在性能瓶頸,造成效率下降如此明顯)。
Topic方式發送的消息與queue方式發送的消息,發送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者並發的前提下,topic方式的效率明顯低於queue。
Queue方式發送的消息,在一個訂閱者、100個訂閱者和500個訂閱者的前提下,發送和接收的效率沒有明顯變化。
Topic實測數據:
|
發送者發送的消息總數 |
所有訂閱者接收到消息的總數 |
消息發送和接收平均耗時 |
單訂閱者 |
100 |
100 |
101ms |
100訂閱者 |
100 |
10000 |
103ms |
500訂閱者 |
100 |
50000 |
14162ms |
Queue實測數據:
|
發送者發送的消息總數 |
所有訂閱者接收到消息的總數 |
消息發送和接收平均耗時 |
單訂閱者 |
100 |
100 |
96ms |
100訂閱者 |
100 |
100 |
96ms |
500訂閱者 |
100 |
100 |
100ms |
3 topic方式的消息處理示例
3.1 通過客戶端代碼調用來發送一個topic的消息:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
publicclass SendTopic {
privatestaticfinalintSEND_NUMBER = 5;
publicstaticvoid sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i <=SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq發送的消息" + i);
//發送消息到目的地方
System.out.println("發送消息:" + "ActiveMq 發送的消息" + i);
producer.send(message);
}
}
publicstaticvoid main(String[] args) {
// ConnectionFactory:連接工廠,JMS用它創建連接
ConnectionFactory connectionFactory;
// Connection:JMS客戶端到JMS Provider的連接
Connection connection = null;
// Session:一個發送或接收消息的線程
Session session;
// Destination:消息的目的地;消息發送給誰.
Destination destination;
// MessageProducer:消息發送者
MessageProducer producer;
// TextMessage message;
//構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://10.20.8.198:61616");
try {
//構造從工廠得到連接對象
connection = connectionFactory.createConnection();
//啟動
connection.start();
//獲取操作連接
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//獲取session注意參數值FirstTopic是一個服務器的topic(與queue消息的發送相比,這里是唯一的不同)
destination = session.createTopic("FirstTopic");
//得到消息生成者【發送者】
producer = session.createProducer(destination);
//設置不持久化,此處學習,實際根據項目決定
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//構造消息,此處寫死,項目就是參數,或者方法獲取
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
3.2 啟動多個客戶端監聽來接收topic的消息:
publicclass ReceiveTopicimplements Runnable {
private StringthreadName;
ReceiveTopic(String threadName) {
this.threadName = threadName;
}
publicvoid run() {
// ConnectionFactory:連接工廠,JMS用它創建連接
ConnectionFactory connectionFactory;
// Connection:JMS客戶端到JMS Provider的連接
Connection connection =null;
// Session:一個發送或接收消息的線程
Session session;
// Destination:消息的目的地;消息發送給誰.
Destination destination;
//消費者,消息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://10.20.8.198:61616");
try {
//構造從工廠得到連接對象
connection = connectionFactory.createConnection();
//啟動
connection.start();
//獲取操作連接,默認自動向服務器發送接收成功的響應
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//獲取session注意參數值FirstTopic是一個服務器的topic
destination = session.createTopic("FirstTopic");
consumer = session.createConsumer(destination);
while (true) {
//設置接收者接收消息的時間,為了便於測試,這里設定為100s
TextMessage message = (TextMessage) consumer
.receive(100 * 1000);
if (null != message) {
System.out.println("線程"+threadName+"收到消息:" + message.getText());
} else {
continue;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
publicstaticvoid main(String[] args) {
//這里啟動3個線程來監聽FirstTopic的消息,與queue的方式不一樣三個線程都能收到同樣的消息
ReceiveTopic receive1=new ReceiveTopic("thread1");
ReceiveTopic receive2=new ReceiveTopic("thread2");
ReceiveTopic receive3=new ReceiveTopic("thread3");
Thread thread1=new Thread(receive1);
Thread thread2=new Thread(receive2);
Thread thread3=new Thread(receive3);
thread1.start();
thread2.start();
thread3.start();
}
}