消息隊列概念和使用場景
聲明:本文轉自:MQ入門總結(一)消息隊列概念和使用場景
寫的很好,都不用自己在整理了,非常感謝該作者的用心。
一、什么是消息隊列
消息即是信息的載體。為了讓消息發送者和消息接收者都能夠明白消息所承載的信息(消息發送者需要知道如何構造消息;消息接收者需要知道如何解析消息),它們就需要按照一種統一的格式描述消息,這種統一的格式稱之為消息協議(JMS)。所以,有效的消息一定具有某一種格式;而沒有格式的消息是沒有意義的。
而消息從發送者到接收者的方式也有兩種。一種我們可以稱為即時消息通訊,也就是說消息從一端發出后(消息發送者)立即就可以達到另一端(消息接收者),這種方式的具體實現就是RPC(當然單純的http通訊也滿足這個定義);另一種方式稱為延遲消息通訊,即消息從某一端發出后,首先進入一個容器進行臨時存儲,當達到某種條件后,再由這個容器發送給另一端。 這個容器的一種具體實現就是消息隊列。
二、消息隊列的應用場景
以下介紹消息隊列在實際應用中常用的使用場景。異步處理,應用解耦,流量削鋒和消息通訊四個場景。
2.1、異步處理
場景說明:用戶注冊后,需要發注冊郵件和注冊短信。傳統的做法有兩種1.串行的方式;2.並行方式。
(1)串行方式:將注冊信息寫入數據庫成功后,發送注冊郵件,再發送注冊短信。以上三個任務全部完成后,返回給客戶端。
(2)並行方式:將注冊信息寫入數據庫成功后,發送注冊郵件的同時,發送注冊短信。以上三個任務完成后,返回給客戶端。與串行的差別是,並行的方式可以提高處理的時間。
假設三個業務節點每個使用50毫秒鍾,不考慮網絡等其他開銷,則串行方式的時間是150毫秒,並行的時間可能是100毫秒。
因為CPU在單位時間內處理的請求數是一定的,假設CPU1秒內吞吐量是100次。則串行方式1秒內CPU可處理的請求量是7次(1000/150)。並行方式處理的請求量是10次(1000/100)。
小結:如以上案例描述,傳統的方式系統的性能(並發量,吞吐量,響應時間)會有瓶頸。如何解決這個問題呢?
引入消息隊列,將不是必須的業務邏輯,異步處理。改造后的架構如下:
按照以上約定,用戶的響應時間相當於是注冊信息寫入數據庫的時間,也就是50毫秒。注冊郵件,發送短信寫入消息隊列后,直接返回,因此寫入消息隊列的速度很快,基本可以忽略,因此用戶的響應時間可能是50毫秒。因此架構改變后,系統的吞吐量提高到每秒20 QPS。比串行提高了3倍,比並行提高了兩倍。
2.2、應用解耦
場景說明:用戶下單后,訂單系統需要通知庫存系統。傳統的做法是,訂單系統調用庫存系統的接口。如下圖:
傳統模式的缺點:
1) 假如庫存系統無法訪問,則訂單減庫存將失敗,從而導致訂單失敗;
2) 訂單系統與庫存系統耦合;
如何解決以上問題呢?引入應用消息隊列后的方案,如下圖:
- 訂單系統:用戶下單后,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。
- 庫存系統:訂閱下單的消息,采用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操作。
- 假如:在下單時庫存系統不能正常使用。也不影響正常下單,因為下單后,訂單系統寫入消息隊列就不再關心其他的后續操作了。實現訂單系統與庫存系統的應用解耦。
2.3、流量削鋒
流量削鋒也是消息隊列中的常用場景,一般在秒殺或團搶活動中使用廣泛。
應用場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。為解決這個問題,一般需要在應用前端加入消息隊列。
- 可以控制活動的人數;
- 可以緩解短時間內高流量壓垮應用;
- 用戶的請求,服務器接收后,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接拋棄用戶請求或跳轉到錯誤頁面;
- 秒殺業務根據消息隊列中的請求信息,再做后續處理。
2.4、日志處理
日志處理是指將消息隊列用在日志處理中,比如Kafka的應用,解決大量日志傳輸的問題。架構簡化如下:
- 日志采集客戶端,負責日志數據采集,定時寫受寫入Kafka隊列;
- Kafka消息隊列,負責日志數據的接收,存儲和轉發;
- 日志處理應用:訂閱並消費kafka隊列中的日志數據;
以下是新浪kafka日志處理應用案例:
(1)Kafka:接收用戶日志的消息隊列。
(2)Logstash:做日志解析,統一成JSON輸出給Elasticsearch。
(3)Elasticsearch:實時日志分析服務的核心技術,一個schemaless,實時的數據存儲服務,通過index組織數據,兼具強大的搜索和統計功能。
(4)Kibana:基於Elasticsearch的數據可視化組件,超強的數據可視化能力是眾多公司選擇ELK stack的重要原因。
三、消息模式
它有兩種消息模式:點對點模式和發布訂閱模式
3.1、點對點模式
點對點模式包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。每個消息都被發送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留着消息,直到他們被消費或超時。
點對點的特點
- 每個消息只有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中)
- 發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之后,不管接收者有沒有正在運行,它不會影響到消息被發送到隊列
- 接收者在成功接收消息之后需向隊列應答成功
如果希望發送的每個消息都會被成功處理的話,那么需要P2P模式。
3.2、發布訂閱模式
包含三個角色:主題(Topic),發布者(Publisher),訂閱者(Subscriber) 。多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。
Pub/Sub的特點
- 每個消息可以有多個消費者
- 發布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創建一個訂閱者之后,才能消費發布者的消息。
- 為了消費消息,訂閱者必須保持運行的狀態。
為了緩和這樣嚴格的時間相關性,JMS允許訂閱者創建一個可持久化的訂閱。這樣,即使訂閱者沒有被激活(運行),它也能接收到發布者的消息。
如果希望發送的消息可以不被做任何處理、或者只被一個消息者處理、或者可以被多個消費者處理的話,那么可以采用Pub/Sub模型。
JMS
一、理解JMS
1、什么是JMS?
JMS即Java消息服務(Java Message Service)應用程序接口,API是一個消息服務的標准或者說是規范,允許應用程序組件基於JavaEE平台創建、發送、接收和讀取消息。它使分布式通信耦合度更低,消息服務更加可靠以及異步性。我們可以簡單的理解:兩個應用程序之間需要進行通信,我們使用一個JMS服務,進行中間的轉發,通過JMS 的使用,我們可以解除兩個程序之間的耦合。
JMS不是消息隊列,更不是某種消息隊列協議。JMS是Java消息服務接口,是一套規范的JAVA API 接口。這套規范接口由SUN提出,並在2002年發布JMS規范的Version 1.1版本。JMS和消息中間件廠商無關,既然是一套接口規范,就代表這它需要各個廠商進行實現。好消息是,大部分消息中間件產品都支持JMS 接口規范。也就是說,您可以使用JMS API來連接Stomp協議的產品(例如ActiveMQ)。就像您可以使用JDBC API來連接ORACLE或者MYSQL一樣。
2、JMS的消息模型
JMS具有兩種通信模式:(點對點)和(發布/訂閱模式)這個在上篇文章已經詳細講過。
3、JMS中消息的產生和消費
在JMS中,消息的產生和消息是異步的。對於消費來說,JMS的消息者可以通過兩種方式來消費消息。
○ 同步 :訂閱者或接收者調用receive方法來接收消息,receive方法在能夠接收到消息之前(或超時之前)將一直阻塞
○ 異步 :訂閱者或接收者可以注冊為一個消息監聽器。當消息到達之后,系統自動調用監聽器的onMessage方法。
4、對象模型
(1) ConnectionFactory
創建Connection對象的工廠,針對兩種不同的jms消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查找ConnectionFactory對象。
(2) Destination
Destination的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對於消息生產者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對於消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。所以,Destination實際上就是兩種類型的對象:Queue、Topic。可以通過JNDI來查找Destination。
(3) Connection
Connection表示在客戶端和JMS系統之間建立的鏈接(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。
(4) Session
Session是我們操作消息的接口。可以通過session創建生產者、消費者、消息等。Session提供了事務的功能。當我們需要使用session發送/接收多個消息時,可以將這些發送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。
(5) 消息的生產者
消息生產者由Session創建,並用於將消息發送到Destination。同樣,消息生產者分兩種類型:QueueSender和TopicPublisher。可以調用消息生產者的方法(send或publish方法)發送消息。
(6) 消息消費者
消息消費者由Session創建,用於接收被發送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創建。當然,也可以session的creatDurableSubscriber方法來創建持久化的訂閱者。
(7) MessageListener
消息監聽器。如果注冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。
5、消息的組成
Message主要由三部分組成,分別是Header,Properties和Body, 解釋如下:
- Header: 消息頭,所有類型的這部分格式都是一樣的
- Properties: 屬性,按類型可以分為應用設置的屬性,標准屬性和消息中間件定義的屬性
- Body: 消息正文,指我們具體需要消息傳輸的內容。
消息頭
序號 | 屬性名稱 | 說明 | 設置者 |
1 |
JMSDestination |
消息發送的目的地,是一個Topic或Queue | send |
2 |
JMSDeliveryMode |
消息的發送模式,分為NON_PERSISTENT和PERSISTENT,即持久化的和非持久化的 | send |
3 |
JMSMessageID |
消息ID,需要以ID:開頭 | send |
4 |
JMSTimestamp |
消息發送時的時間,也可以理解為調用send()方法時的時間,而不是該消息發送完成的時間 | send |
5 |
JMSCorrelationID |
關聯的消息ID,這個通常用在需要回傳消息的時候 | client |
6 |
JMSReplyTo |
消息回復的目的地,其值為一個Topic或Queue, 這個由發送者設置,但是接收者可以決定是否響應 | client |
7 |
JMSRedelivered |
消息是否重復發送過,如果該消息之前發送過,那么這個屬性的值需要被設置為true, 客戶端可以根據這個屬性的值來 確認這個消息是否重復發送過,以避免重復處理。 |
Provider |
8 |
JMSType |
由消息發送者設置的個消息類型,代表消息的結構,有的消息中間件可能會用到這個,但這個並不是是批消息的種類,比如 TextMessage之類的 |
client |
9 |
JMSExpiration |
消息的過期時間,以毫秒為單位,根據定義,它應該是timeToLive的值再加上發送時的GMT時間,也就是說這個指的是過期 時間,而不是有效期 |
send |
10 |
JMSPriority |
消息的優先級,0-4為普通的優化級,而5-9為高優先級,通常情況下,高優化級的消息需要優先發送 | send |
消息屬性
消息屬性的主要作用是可以對頭信息進行一個額外的補充,畢竟消息頭信息一是有限,二是很多不能由應用程序設定。通常,消息屬性可以用在消息選擇器的表達式里,結合起來實現對消息的過濾。消息屬性的值只能是基本的類型,或者這些基本類型對應的包裝類型。也就是說,不能將一個自定義的對象作為屬性值。通常情況下,如果能夠放在body里的內容,就不必放在消息屬性里。
消息體
為了適應不同場景下的消息,提高消息存儲的靈活性,JMS定義了幾種具體類型的消息,不同的子類型的消息體也不一樣,需要注意的是,Message接口並沒有提供一個統一的getBody之類的方法。消息子接口定義如下。
1)TextMessage: 最簡單的消息接口,用於發送文本類的消息,設置/獲取其body的方法定義如下setText()/getText().
2)StreamMessage: 流式消息接口,里面定義了一系列的對基本類型的set/get方法,消息發送者可以通過這些方法寫入基本類型的數據,消息接收者需要按發送者的寫入順序來讀取相應的數據。
3)MapMessage:把消息內容存儲在Map里,本接口定義了一系列對基本類型的的set/get方法,與StreamMessage不同的是,每個值都對應了一個相應的key,所以消息接收者不必按順序去讀取數據。
4)ObjectMessage: 將對象作為消息的接口,提供了一個set/get 對象的方法,需要注意的是只能設置一個對象,這個對象可以是一個Collection,但必須是序列化的。
5)BytesMessage:以字節的形式來傳遞消息的接口,除了提供了對基本類型的set/get,還提供了按字節方式進行set/get。
Springboot整合Active消息隊列
簡單理解:
Active是Apache公司旗下的一個消息總線,ActiveMQ是一個開源兼容Java Message Service(JMS) 面向消息的中件間. 是一個提供松耦合的應用程序架構.
主要用來在服務與服務之間進行異步通信的。
一、搭建步驟
1、相應jar包
<!-- 整合消息隊列ActiveMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!-- 如果配置線程池則加入 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
2、application.properties文件
#整合jms測試,安裝在別的機器,防火牆和端口號記得開放 spring.activemq.broker-url=tcp://47.96.44.110:61616 spring.activemq.user=admin spring.activemq.password=admin #下列配置要增加依賴 spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=100 #集群配置(后續需要在配上) #spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617) #消息隊列默認是點對點的,如果需要發布/訂閱模式那么需要加上下面注解(如果同時需要點對點發布訂閱這里也需注釋掉) # spring.jms.pub-sub-domain=true
3、Springboot主類
<!-- 主類需要多加一個@EnableJms注解,不過貌似我沒有加的時候,也能運行,為安全起見姑且加上 --> @SpringBootApplication @EnableJms
4.5.......根據不同消息模式來寫了。
二、點對點案例
我在這里案例中創建了兩個點對點隊列,所以他會有兩個queue對象,同樣對應每個queue對象,都會有單一對應的消費者。
1、Springboot主類
@SpringBootApplication @EnableJms public class Main { public static void main(String[] args) { SpringApplication.run(Main.class, args); } //新建一個的Queue對象,交給sringboot管理,這個queue的名稱叫"first.queue". @Bean public Queue queue(){ return new ActiveMQQueue("first.queue"); } }
2.1、first.queue對應消費者
@Component public class FirstConsumer { //名為"first.queue"消息隊列的消費者,通過JmsListener進行監聽有沒有消息,有消息會立刻讀取過來 @JmsListener(destination="first.queue") public void receiveQueue(String text){ System.out.println("FirstConsumer收到的報文為:"+text); } }
2.2、two.queue對應消費者(后面會創建)
@Component public class TwoConsumer { //名為"two.queue"消息隊列的消費者 @JmsListener(destination="two.queue") public void receiveQueue(String text){ System.out.println("TwoConsumer收到的報文為:"+text); } }
3、Service類
/** * 功能描述:消息生產 */ public interface ProducerService { // 功能描述:指定消息隊列,還有消息 public void sendMessage(Destination destination, final String message); // 功能描述:使用默認消息隊列, 發送消息 public void sendMessage( final String message); }
4、ServiceImpl實現類
/** * 功能描述:消息生產者實現類 */ @Service public class ProducerServiceImpl implements ProducerService{ //這個隊列就是Springboot主類中bean的對象 @Autowired private Queue queue; //用來發送消息到broker的對象,可以理解連接數據庫的JDBC @Autowired private JmsMessagingTemplate jmsTemplate; //發送消息,destination是發送到的隊列,message是待發送的消息 @Override public void sendMessage(Destination destination, String message) { jmsTemplate.convertAndSend(destination, message); } //發送消息,queue是發送到的隊列,message是待發送的消息 @Override public void sendMessage(final String message) { jmsTemplate.convertAndSend(this.queue, message); } }
5.QueueController類
/** * 功能描述:點對點消息隊列控制層 */ @RestController @RequestMapping("/api/v1") public class QueueController { @Autowired private ProducerService producerService; // 這里后面調用的是Springboot主類的quene隊列 @GetMapping("first") public Object common(String msg){ producerService.sendMessage(msg); return "Success"; } // 這個隊列是新建的一個名為two.queue的點對點消息隊列 @GetMapping("two") public Object order(String msg){ Destination destination = new ActiveMQQueue("two.queue"); producerService.sendMessage(destination, msg); return "Success"; } }
6、案例演示:
從演示效果可以得出以下結論:
1:當springboot啟動時候,就生成了這兩個隊列,而且他們都會有一個消費者
2:當我通過頁面訪問的時候,就相當於生產者把消息放到隊列中,一旦放進去就會被消費者監聽到,就可以獲取生產者放進去的值並在后台打印出
順便對頁面中四個單詞進行解釋:
Number Of Pending Messages :待處理消息的數量。我們每次都會被監聽處理掉,所以不存在待處理,如果存在就說這里面哪里出故障了,需要排查
Number Of Consumers : 消費者數量
Messages Enqueued: 消息排列,這個只增不見,代表已經處理多少消息
Messages Dequeued: 消息出隊。
三、發布/訂閱者模式
在上面點對點代碼的基礎上,添加發布/訂閱相關代碼
1.appliaction.properties文件
#消息隊列默認是點對點的,如果需要發布/訂閱模式那么需要加上下面注解(如果同時需要點對點發布訂閱這里也需注釋掉) spring.jms.pub-sub-domain=true
2.Springboot主類添加
//新建一個topic隊列 @Bean public Topic topic(){ return new ActiveMQTopic("video.topic"); }
3.添加多個消費者類
//這里定義了三個消費者 @Component public class TopicSub { @JmsListener(destination="video.topic") public void receive1(String text){ System.out.println("video.topic 消費者:receive1="+text); } @JmsListener(destination="video.topic") public void receive2(String text){ System.out.println("video.topic 消費者:receive2="+text); } @JmsListener(destination="video.topic") public void receive3(String text){ System.out.println("video.topic 消費者:receive3="+text); } }
4.Service類
//功能描述:消息發布者 public void publish(String msg);
5.ServiceImpl實現類
//=======發布訂閱相關代碼========= @Autowired private Topic topic; @Override public void publish(String msg) { this.jmsTemplate.convertAndSend(this.topic, msg); }
6.Controller類
// 這個隊列是新建的一個名為two.queue的點對點消息隊列 @GetMapping("topic") public Object topic(String msg){ producerService.publish(msg); return "Success"; }
7.演示效果:
從演示效果總結如下:
1:Springboot啟動的時候,在Topics目錄下,一共出現了5個消費者。first.queue一個消費者、two.queue一個消費者、video.topic三個消費者
2:當我在控制台輸入信息后,video.topic的三個消費者都會監聽video.topic發布的消息,並在控制台打印。
四、如何讓點對點和發布訂閱同時有效
為什么這么說呢,因為當我向上面一樣同時開啟,會發現點對點模式已經失效了。
效果演示
從演示效果,可以得出如下結論:
1:我們發現我們在頁面輸入..../two?msg=555消息后,后台並沒有成功打印消息。再看Active界面發現,這個queue對象,確實有一條待處理的消息,但是我們發現,它對應的消費者數量是為0.
2:然而我們在打開topic頁面發現,這里卻存在一個消費者。
所以我個人理解是,當同時啟動的時候,所產生的消費者默認都是Topic消費者,沒有Queue消費者,所以它監聽不到queue所待處理的消息。
當配置文件不加:spring.jms.pub-sub-domain=true 那么系統會默認支持quene(點對點模式),但一旦加上這段配置,系統又變成只支持發布訂閱模式。
那如何同時都可以成功呢?
思路如下:
第一步:還是需要去掉配置文件中的:
#消息隊列默認是點對點的,如果需要發布/訂閱模式那么需要加上下面注解(如果同時需要點對點發布訂閱這里也需注釋掉) #spring.jms.pub-sub-domain=true
第二步:在發布訂閱者的中消費者中指定獨立的containerFactory
因為你去掉上面的配置,那么系統就默認是queue,所以@JmsListener如果不指定獨立的containerFactory的話是只能消費queue消息
@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic") public void receive1(String text){ System.out.println("video.topic 消費者:receive1="+text); } @JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic") public void receive2(String text){ System.out.println("video.topic 消費者:receive2="+text); } //第三步我不添加containerFactory="jmsListenerContainerTopic"看等下是否會打印出 @JmsListener(destination="video.topic") public void receive3(String text){ System.out.println("video.topic 消費者:receive3="+text); }
第三步:定義獨立的topic定義獨立的JmsListenerContainer
在springboot主類中添加:
@Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(activeMQConnectionFactory); return bean; }
效果:
得出結論:
1:點對點,和發布訂閱都有用
2:receive3沒有指定獨立的containerFactory一樣沒有打印出來。