上文已經詳細介紹了點對點模式(Queue)下的消息隊列,今天就來再介紹一下消息隊列的另一種模式:訂閱模式。
一、訂閱模式的流程
生產者產生一條消息message放入一個topic中,該topic已經三個消費者訂閱了,那么被放入topic中的這條消息,就會同時被這三個消費者取走(當然他們必須都處於在線狀態),並進行“消費”。其實就類似現實生活中的手機接收推送。
二、訂閱模式的應用場景
發布訂閱模式下,當發布者消息量很大時,顯然單個訂閱者的處理能力是不足的。實際上現實場景中是多個訂閱者節點組成一個訂閱組負載均衡消費topic消息即分組訂閱,這樣訂閱者很容易實現消費能力線性擴展。可以看成是一個topic下有多個Queue,每個Queue是點對點的方式,Queue之間是發布訂閱方式。
三、具體實現
ActiveMq的配置以及pom導入的jar包可以參考上文;
1、創建生產者:
/**
*
* @author yuyan
* @create 2018-08-28 16:09
**/
@Service
public class Topic_Producer {
public void sendMessage(String msg){
try {
//創建連接工廠
ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
connFactory.setMaxThreadPoolSize(1);
//連接到JMS提供者
Connection conn = connFactory.createConnection();
// conn.setClientID("producer1");
conn.start();
//事務性會話,自動確認消息
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//消息的目的地
Destination destination = session.createTopic("topic1");
//消息生產者
MessageProducer producer = session.createProducer(destination);
// producer.setDeliveryMode(DeliveryMode.PERSISTENT); //持久化
// //文本消息
// TextMessage textMessage = session.createTextMessage("這是文本消息");
// producer.send(textMessage);
//鍵值對消息
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("reqDesc", msg);
producer.send(mapMessage);
//
// //流消息
// StreamMessage streamMessage = session.createStreamMessage();
// streamMessage.writeString("這是流消息");
// producer.send(streamMessage);
//
// //字節消息
// String s = "BytesMessage字節消息";
// BytesMessage bytesMessage = session.createBytesMessage();
// bytesMessage.writeBytes(s.getBytes());
// producer.send(bytesMessage);
//
// //對象消息
// User user = new User("obj_info", "對象消息"); //User對象必須實現Serializable接口
// ObjectMessage objectMessage = session.createObjectMessage();
// objectMessage.setObject(user);
// producer.send(objectMessage);
session.commit(); //提交會話,該條消息會進入"queue"隊列,生產者也完成了歷史使命
producer.close();
session.close();
conn.close();
//在事務性會話中,只有commit之后,消息才會真正到達目的地
}catch (Exception e){
e.printStackTrace();
}
}
}
2、創建消費者
package com.springjms.queue_message;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.jms.*;
import java.util.Date;
/**
*
* @author
* @create 2018-09-06 9:55
**/
@Component
public class Topic_Consumer implements ApplicationRunner{
@Override
public void run(ApplicationArguments args) throws Exception {
init();
}
public void init() throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
Connection conn = factory.createConnection();
// conn.setClientID("consumer1");
conn.start();
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//與生產者的消息目的地相同
Destination dest = session.createTopic("topic1");
MessageConsumer messConsumer = session.createConsumer(dest);
messConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
MapMessage m = (MapMessage)message;
System.out.println("consumer1接收到"+m.getString("reqDesc")+"的請求並開始處理,時間是"+new Date());
System.out.println("這里會停頓5s,模擬系統處理請求,時間是"+new Date());
Thread.sleep(5000);
System.out.println("consumer1接收到"+m.getString("reqDesc")+"的請求並處理完畢,時間是"+new Date());
}catch (Exception e){
e.printStackTrace();
}
}
});
}
}
現在的生產者和消費者都處於同一個項目中,且是一對一的關系,如果想要驗證一個生產者對應多個消費者的情況,可以再新建一個項目,並且創建一個消費者,只要保證topic相同即可。
3、接口測試
@RequestMapping(value = "/SendMessageByTopic", method = RequestMethod.GET)
@ResponseBody
public void sendTopic(String msg) {
try {
System.out.println(msg+"開始發出一次請求,時間是"+new Date());
topic_producer.sendMessage(msg);
System.out.println(msg+"請求發送完成,時間是"+new Date());
}catch (Exception e){
e.printStackTrace();
}
}
測試結果:
可以看到兩個消費者consumer1、consumer2同時收到了來自topic的請求,並且同時完成了處理;
觀察http://localhost:8161/admin/topics.jsp:
未發請求時,topic1中有兩個消費者,入隊列與出隊列的消息數都是0:
發出請求后,topic1中有了一條消息,入隊列數為1,出隊列數為0:
請求處理完畢后,topic1中的出隊列數為2,入隊列數為1,證明這條消息分別被兩個消費者消費了:
這樣,消息隊列的兩種模式就已經介紹完了,文章中介紹的方式都是基於ActiveMq這種傳統的消息隊列,其實還有諸如rabbitMq、kafka、rocketMq等消息隊列,它們的原理和實現方式都不盡相同,以后有時間,還是需要再研究一下!
————————————————
版權聲明:本文為CSDN博主「superyu1992」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/superyu1992/java/article/details/82461200