MQ---消息隊列概念和使用場景


消息隊列概念和使用場景

                       

        聲明:本文轉自:MQ入門總結(一)消息隊列概念和使用場景

       寫的很好,都不用自己在整理了,非常感謝該作者的用心。

 一、什么是消息隊列

        消息即是信息的載體。為了讓消息發送者和消息接收者都能夠明白消息所承載的信息(消息發送者需要知道如何構造消息;消息接收者需要知道如何解析消息),它們就需要按照一種統一的格式描述消息,這種統一的格式稱之為消息協議(JMS)。所以,有效的消息一定具有某一種格式;而沒有格式的消息是沒有意義的。

     而消息從發送者到接收者的方式也有兩種。一種我們可以稱為即時消息通訊,也就是說消息從一端發出后(消息發送者)立即就可以達到另一端(消息接收者),這種方式的具體實現就是RPC(當然單純的http通訊也滿足這個定義);另一種方式稱為延遲消息通訊,即消息從某一端發出后,首先進入一個容器進行臨時存儲,當達到某種條件后,再由這個容器發送給另一端。 這個容器的一種具體實現就是消息隊列

 

二、消息隊列的應用場景

       以下介紹消息隊列在實際應用中常用的使用場景。異步處理應用解耦流量削鋒消息通訊四個場景。

2.1、異步處理

   場景說明:用戶注冊后,需要發注冊郵件和注冊短信。傳統的做法有兩種1.串行的方式;2.並行方式

   (1)串行方式:將注冊信息寫入數據庫成功后,發送注冊郵件,再發送注冊短信。以上三個任務全部完成后,返回給客戶端。

   (2)並行方式:將注冊信息寫入數據庫成功后,發送注冊郵件的同時,發送注冊短信。以上三個任務完成后,返回給客戶端。與串行的差別是,並行的方式可以提高處理的時間。

      假設三個業務節點每個使用50毫秒鍾,不考慮網絡等其他開銷,則串行方式的時間是150毫秒,並行的時間可能是100毫秒。

因為CPU在單位時間內處理的請求數是一定的,假設CPU1秒內吞吐量是100次。則串行方式1秒內CPU可處理的請求量是7次(1000/150)。並行方式處理的請求量是10次(1000/100)。

小結:如以上案例描述,傳統的方式系統的性能(並發量,吞吐量,響應時間)會有瓶頸。如何解決這個問題呢?

   引入消息隊列,將不是必須的業務邏輯,異步處理。改造后的架構如下:

按照以上約定,用戶的響應時間相當於是注冊信息寫入數據庫的時間,也就是50毫秒。注冊郵件,發送短信寫入消息隊列后,直接返回,因此寫入消息隊列的速度很快,基本可以忽略,因此用戶的響應時間可能是50毫秒。因此架構改變后,系統的吞吐量提高到每秒20 QPS。比串行提高了3倍,比並行提高了兩倍。

2.2、應用解耦

 場景說明:用戶下單后,訂單系統需要通知庫存系統。傳統的做法是,訂單系統調用庫存系統的接口。如下圖:

傳統模式的缺點:

1)  假如庫存系統無法訪問,則訂單減庫存將失敗,從而導致訂單失敗;

2)  訂單系統與庫存系統耦合;

如何解決以上問題呢?引入應用消息隊列后的方案,如下圖:

  • 訂單系統:用戶下單后,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。
  • 庫存系統:訂閱下單的消息,采用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操作。
  • 假如:在下單時庫存系統不能正常使用。也不影響正常下單,因為下單后,訂單系統寫入消息隊列就不再關心其他的后續操作了。實現訂單系統與庫存系統的應用解耦。

2.3、流量削鋒

     流量削鋒也是消息隊列中的常用場景,一般在秒殺或團搶活動中使用廣泛。

    應用場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。為解決這個問題,一般需要在應用前端加入消息隊列。

  1. 可以控制活動的人數;
  2. 可以緩解短時間內高流量壓垮應用;

  1. 用戶的請求,服務器接收后,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接拋棄用戶請求或跳轉到錯誤頁面;
  2. 秒殺業務根據消息隊列中的請求信息,再做后續處理。

2.4、日志處理

日志處理是指將消息隊列用在日志處理中,比如Kafka的應用,解決大量日志傳輸的問題。架構簡化如下:

  • 日志采集客戶端,負責日志數據采集,定時寫受寫入Kafka隊列;
  • Kafka消息隊列,負責日志數據的接收,存儲和轉發;
  • 日志處理應用:訂閱並消費kafka隊列中的日志數據;

以下是新浪kafka日志處理應用案例:

(1)Kafka:接收用戶日志的消息隊列。

(2)Logstash:做日志解析,統一成JSON輸出給Elasticsearch。

(3)Elasticsearch:實時日志分析服務的核心技術,一個schemaless,實時的數據存儲服務,通過index組織數據,兼具強大的搜索和統計功能。

(4)Kibana:基於Elasticsearch的數據可視化組件,超強的數據可視化能力是眾多公司選擇ELK stack的重要原因。

 

三、消息模式

    它有兩種消息模式:點對點模式發布訂閱模式

3.1、點對點模式

點對點模式包含三個角色:消息隊列(Queue)發送者(Sender)接收者(Receiver)。每個消息都被發送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留着消息,直到他們被消費或超時。

點對點的特點

  1. 每個消息只有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中)
  2. 發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之后,不管接收者有沒有正在運行,它不會影響到消息被發送到隊列
  3. 接收者在成功接收消息之后需向隊列應答成功

 如果希望發送的每個消息都會被成功處理的話,那么需要P2P模式。

3.2、發布訂閱模式

包含三個角色:主題(Topic)發布者(Publisher)訂閱者(Subscriber) 。多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。

Pub/Sub的特點

  1. 每個消息可以有多個消費者
  2. 發布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創建一個訂閱者之后,才能消費發布者的消息。
  3. 為了消費消息,訂閱者必須保持運行的狀態。

     為了緩和這樣嚴格的時間相關性,JMS允許訂閱者創建一個可持久化的訂閱。這樣,即使訂閱者沒有被激活(運行),它也能接收到發布者的消息。
如果希望發送的消息可以不被做任何處理、或者只被一個消息者處理、或者可以被多個消費者處理的話,那么可以采用Pub/Sub模型。

 

JMS

一、理解JMS

  1、什么是JMS?

        JMSJava消息服務(Java Message Service)應用程序接口,API是一個消息服務的標准或者說是規范,允許應用程序組件基於JavaEE平台創建、發送、接收和讀取消息。它使分布式通信耦合度更低,消息服務更加可靠以及異步性。我們可以簡單的理解:兩個應用程序之間需要進行通信,我們使用一個JMS服務,進行中間的轉發,通過JMS 的使用,我們可以解除兩個程序之間的耦合。

       JMS不是消息隊列,更不是某種消息隊列協議。JMS是Java消息服務接口,是一套規范的JAVA API 接口。這套規范接口由SUN提出,並在2002年發布JMS規范的Version 1.1版本。JMS和消息中間件廠商無關,既然是一套接口規范,就代表這它需要各個廠商進行實現。好消息是,大部分消息中間件產品都支持JMS 接口規范。也就是說,您可以使用JMS API來連接Stomp協議的產品(例如ActiveMQ)。就像您可以使用JDBC API來連接ORACLE或者MYSQL一樣。

 

2、JMS的消息模型

 JMS具有兩種通信模式:(點對點)和(發布/訂閱模式)這個在上篇文章已經詳細講過。

 

3、JMS中消息的產生和消費

 在JMS中,消息的產生和消息是異步的。對於消費來說,JMS的消息者可以通過兩種方式來消費消息。 
○ 同步 :訂閱者或接收者調用receive方法來接收消息,receive方法在能夠接收到消息之前(或超時之前)將一直阻塞 
○ 異步 :訂閱者或接收者可以注冊為一個消息監聽器。當消息到達之后,系統自動調用監聽器的onMessage方法。

 

4、對象模型

(1) ConnectionFactory
創建Connection對象的工廠,針對兩種不同的jms消息模型,分別有QueueConnectionFactoryTopicConnectionFactory兩種。可以通過JNDI來查找ConnectionFactory對象。

(2) Destination
Destination的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對於消息生產者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對於消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。所以,Destination實際上就是兩種類型的對象:Queue、Topic。可以通過JNDI來查找Destination。

(3) Connection
Connection表示在客戶端和JMS系統之間建立的鏈接(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnectionTopicConnection

(4) Session
Session是我們操作消息的接口。可以通過session創建生產者、消費者、消息等。Session提供了事務的功能。當我們需要使用session發送/接收多個消息時,可以將這些發送/接收動作放到一個事務中。同樣,也分QueueSessionTopicSession

(5) 消息的生產者
消息生產者由Session創建,並用於將消息發送到Destination。同樣,消息生產者分兩種類型:QueueSenderTopicPublisher。可以調用消息生產者的方法(send或publish方法)發送消息。

(6) 消息消費者
消息消費者由Session創建,用於接收被發送到Destination的消息。兩種類型:QueueReceiverTopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創建。當然,也可以session的creatDurableSubscriber方法來創建持久化的訂閱者。

(7) MessageListener
消息監聽器。如果注冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。

 

 5、消息的組成

 Message主要由三部分組成,分別是Header,PropertiesBody, 解釋如下:

  1. Header: 消息頭,所有類型的這部分格式都是一樣的
  2. Properties: 屬性,按類型可以分為應用設置的屬性,標准屬性和消息中間件定義的屬性
  3. Body: 消息正文,指我們具體需要消息傳輸的內容。

消息頭

序號 屬性名稱 說明 設置者

1

JMSDestination

消息發送的目的地,是一個Topic或Queue  send

2

JMSDeliveryMode

消息的發送模式,分為NON_PERSISTENT和PERSISTENT,即持久化的和非持久化的  send

3

JMSMessageID

消息ID,需要以ID:開頭     send

4

JMSTimestamp 

 消息發送時的時間,也可以理解為調用send()方法時的時間,而不是該消息發送完成的時間  send

5

JMSCorrelationID

 關聯的消息ID,這個通常用在需要回傳消息的時候  client

6

JMSReplyTo

 消息回復的目的地,其值為一個Topic或Queue, 這個由發送者設置,但是接收者可以決定是否響應  client

7

JMSRedelivered 

 消息是否重復發送過,如果該消息之前發送過,那么這個屬性的值需要被設置為true, 客戶端可以根據這個屬性的值來

確認這個消息是否重復發送過,以避免重復處理。

 Provider

8

JMSType

 由消息發送者設置的個消息類型,代表消息的結構,有的消息中間件可能會用到這個,但這個並不是是批消息的種類,比如

TextMessage之類的

 client

9

JMSExpiration 

 消息的過期時間,以毫秒為單位,根據定義,它應該是timeToLive的值再加上發送時的GMT時間,也就是說這個指的是過期

時間,而不是有效期

send 

10

JMSPriority

 消息的優先級,0-4為普通的優化級,而5-9為高優先級,通常情況下,高優化級的消息需要優先發送  send

 

 消息屬性

       消息屬性的主要作用是可以對頭信息進行一個額外的補充,畢竟消息頭信息一是有限,二是很多不能由應用程序設定。通常,消息屬性可以用在消息選擇器的表達式里,結合起來實現對消息的過濾。消息屬性的值只能是基本的類型,或者這些基本類型對應的包裝類型。也就是說,不能將一個自定義的對象作為屬性值。通常情況下,如果能夠放在body里的內容,就不必放在消息屬性里。

 消息體

       為了適應不同場景下的消息,提高消息存儲的靈活性,JMS定義了幾種具體類型的消息,不同的子類型的消息體也不一樣,需要注意的是,Message接口並沒有提供一個統一的getBody之類的方法。消息子接口定義如下。

1)TextMessage: 最簡單的消息接口,用於發送文本類的消息,設置/獲取其body的方法定義如下setText()/getText().
2)StreamMessage: 流式消息接口,里面定義了一系列的對基本類型的set/get方法,消息發送者可以通過這些方法寫入基本類型的數據,消息接收者需要按發送者的寫入順序來讀取相應的數據。
3)MapMessage:把消息內容存儲在Map里,本接口定義了一系列對基本類型的的set/get方法,與StreamMessage不同的是,每個值都對應了一個相應的key,所以消息接收者不必按順序去讀取數據。
4)ObjectMessage: 將對象作為消息的接口,提供了一個set/get 對象的方法,需要注意的是只能設置一個對象,這個對象可以是一個Collection,但必須是序列化的。
5)BytesMessage:以字節的形式來傳遞消息的接口,除了提供了對基本類型的set/get,還提供了按字節方式進行set/get。

 

Springboot整合Active消息隊列

       簡單理解:

       Active是Apache公司旗下的一個消息總線,ActiveMQ是一個開源兼容Java Message Service(JMS) 面向消息的中件間. 是一個提供松耦合的應用程序架構.

       主要用來在服務與服務之間進行異步通信的。

一、搭建步驟

    1、相應jar包

復制代碼
 <!-- 整合消息隊列ActiveMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        
  <!-- 如果配置線程池則加入 -->
        <dependency>  
            <groupId>org.apache.activemq</groupId>  
            <artifactId>activemq-pool</artifactId>  
        </dependency>    
復制代碼

    2、application.properties文件

復制代碼
#整合jms測試,安裝在別的機器,防火牆和端口號記得開放
spring.activemq.broker-url=tcp://47.96.44.110:61616

spring.activemq.user=admin
spring.activemq.password=admin
#下列配置要增加依賴
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100

#集群配置(后續需要在配上)
#spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
#消息隊列默認是點對點的,如果需要發布/訂閱模式那么需要加上下面注解(如果同時需要點對點發布訂閱這里也需注釋掉)
# spring.jms.pub-sub-domain=true
復制代碼

 3、Springboot主類

<!-- 主類需要多加一個@EnableJms注解,不過貌似我沒有加的時候,也能運行,為安全起見姑且加上 -->
@SpringBootApplication
@EnableJms

4.5.......根據不同消息模式來寫了。

 

二、點對點案例

   我在這里案例中創建了兩個點對點隊列,所以他會有兩個queue對象,同樣對應每個queue對象,都會有單一對應的消費者。

      1、Springboot主類

復制代碼
@SpringBootApplication
@EnableJms
public class Main {

    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
    
    //新建一個的Queue對象,交給sringboot管理,這個queue的名稱叫"first.queue".
    @Bean
    public Queue queue(){
        return new ActiveMQQueue("first.queue");
    }
}
復制代碼

      2.1、first.queue對應消費者

復制代碼
@Component
public class FirstConsumer {

    //名為"first.queue"消息隊列的消費者,通過JmsListener進行監聽有沒有消息,有消息會立刻讀取過來
    @JmsListener(destination="first.queue")
    public void receiveQueue(String text){
        System.out.println("FirstConsumer收到的報文為:"+text);
    }
}
復制代碼

       2.2、two.queue對應消費者(后面會創建)

復制代碼
@Component
public class TwoConsumer {

    //名為"two.queue"消息隊列的消費者
    @JmsListener(destination="two.queue")
    public void receiveQueue(String text){
        System.out.println("TwoConsumer收到的報文為:"+text);
    }
}
復制代碼

      3、Service類

復制代碼
   /**
    * 功能描述:消息生產
    */
public interface ProducerService {

    // 功能描述:指定消息隊列,還有消息    
    public void sendMessage(Destination destination, final String message);
    

    // 功能描述:使用默認消息隊列, 發送消息
    public void sendMessage( final String message);

}
復制代碼

      4、ServiceImpl實現類

復制代碼
/**
 * 功能描述:消息生產者實現類
 */
@Service
public class ProducerServiceImpl implements ProducerService{

    //這個隊列就是Springboot主類中bean的對象
    @Autowired
    private Queue queue;
    
    //用來發送消息到broker的對象,可以理解連接數據庫的JDBC
    @Autowired
    private JmsMessagingTemplate jmsTemplate;    
    
    //發送消息,destination是發送到的隊列,message是待發送的消息
    @Override
    public void sendMessage(Destination destination, String message) {        
        jmsTemplate.convertAndSend(destination, message);    
    }
    
    //發送消息,queue是發送到的隊列,message是待發送的消息
    @Override
    public void sendMessage(final String message) { 
        jmsTemplate.convertAndSend(this.queue, message);    
    }        
}
復制代碼

     5.QueueController類

復制代碼
/**
 * 功能描述:點對點消息隊列控制層
 */
@RestController
@RequestMapping("/api/v1")
public class QueueController {
    
    @Autowired
    private ProducerService producerService;        

    // 這里后面調用的是Springboot主類的quene隊列
    @GetMapping("first")
    public Object common(String msg){
        producerService.sendMessage(msg);    
       return "Success";
    }        
    
    // 這個隊列是新建的一個名為two.queue的點對點消息隊列
    @GetMapping("two")
    public Object order(String msg){
        
        Destination destination = new ActiveMQQueue("two.queue");
        producerService.sendMessage(destination, msg);
        
       return "Success";
    }        
}
復制代碼

      6、案例演示:

從演示效果可以得出以下結論:

     1:當springboot啟動時候,就生成了這兩個隊列,而且他們都會有一個消費者

     2:當我通過頁面訪問的時候,就相當於生產者把消息放到隊列中,一旦放進去就會被消費者監聽到,就可以獲取生產者放進去的值並在后台打印出

順便對頁面中四個單詞進行解釋:

   Number Of Pending Messages :待處理消息的數量。我們每次都會被監聽處理掉,所以不存在待處理,如果存在就說這里面哪里出故障了,需要排查

   Number Of Consumers : 消費者數量

   Messages Enqueued:    消息排列,這個只增不見,代表已經處理多少消息

   Messages Dequeued:    消息出隊。

     

 三、發布/訂閱者模式

 在上面點對點代碼的基礎上,添加發布/訂閱相關代碼

     1.appliaction.properties文件

#消息隊列默認是點對點的,如果需要發布/訂閱模式那么需要加上下面注解(如果同時需要點對點發布訂閱這里也需注釋掉)
spring.jms.pub-sub-domain=true

      2.Springboot主類添加

//新建一個topic隊列
    @Bean
    public Topic topic(){
        return new ActiveMQTopic("video.topic");
    }

       3.添加多個消費者類

復制代碼
//這里定義了三個消費者
@Component
public class TopicSub {
    
    @JmsListener(destination="video.topic")
    public void receive1(String text){
        System.out.println("video.topic 消費者:receive1="+text);
    }
        
    @JmsListener(destination="video.topic")
    public void receive2(String text){
        System.out.println("video.topic 消費者:receive2="+text);
    }
        
    @JmsListener(destination="video.topic")
    public void receive3(String text){
        System.out.println("video.topic 消費者:receive3="+text);
    }   
}
復制代碼

       4.Service類

 //功能描述:消息發布者
    public void publish(String msg);

      5.ServiceImpl實現類

復制代碼
//=======發布訂閱相關代碼=========
    
        @Autowired
        private Topic topic;
                
         @Override
        public void publish(String msg) {
            this.jmsTemplate.convertAndSend(this.topic, msg);
            
        }
復制代碼

       6.Controller類

復制代碼
// 這個隊列是新建的一個名為two.queue的點對點消息隊列
        @GetMapping("topic")
        public Object topic(String msg){

            producerService.publish(msg);
            
           return "Success";
        }    
復制代碼

      7.演示效果:

    從演示效果總結如下:

     1:Springboot啟動的時候,在Topics目錄下,一共出現了5個消費者。first.queue一個消費者、two.queue一個消費者、video.topic三個消費者

      2:當我在控制台輸入信息后,video.topic的三個消費者都會監聽video.topic發布的消息,並在控制台打印。

 

四、如何讓點對點和發布訂閱同時有效

為什么這么說呢,因為當我向上面一樣同時開啟,會發現點對點模式已經失效了。

 效果演示

從演示效果,可以得出如下結論:

     1:我們發現我們在頁面輸入..../two?msg=555消息后,后台並沒有成功打印消息。再看Active界面發現,這個queue對象,確實有一條待處理的消息,但是我們發現,它對應的消費者數量是為0.

     2:然而我們在打開topic頁面發現,這里卻存在一個消費者。

所以我個人理解是,當同時啟動的時候,所產生的消費者默認都是Topic消費者,沒有Queue消費者,所以它監聽不到queue所待處理的消息。

當配置文件不加:spring.jms.pub-sub-domain=true  那么系統會默認支持quene(點對點模式),但一旦加上這段配置,系統又變成只支持發布訂閱模式。

 

那如何同時都可以成功呢?

 思路如下:

第一步:還是需要去掉配置文件中的:

#消息隊列默認是點對點的,如果需要發布/訂閱模式那么需要加上下面注解(如果同時需要點對點發布訂閱這里也需注釋掉)
#spring.jms.pub-sub-domain=true

第二步:在發布訂閱者的中消費者中指定獨立的containerFactory

因為你去掉上面的配置,那么系統就默認是queue,所以@JmsListener如果不指定獨立的containerFactory的話是只能消費queue消息

復制代碼
    @JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
    public void receive1(String text){
        System.out.println("video.topic 消費者:receive1="+text);
    }
    
    
    @JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
    public void receive2(String text){
        System.out.println("video.topic 消費者:receive2="+text);
    }
    
    //第三步我不添加containerFactory="jmsListenerContainerTopic"看等下是否會打印出
    @JmsListener(destination="video.topic")
    public void receive3(String text){
        System.out.println("video.topic 消費者:receive3="+text);
    }
復制代碼

第三步:定義獨立的topic定義獨立的JmsListenerContainer

在springboot主類中添加:

復制代碼
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
            bean.setPubSubDomain(true);
            bean.setConnectionFactory(activeMQConnectionFactory);
            return bean;
        }
復制代碼

效果:

得出結論:

    1:點對點,和發布訂閱都有用

   2:receive3沒有指定獨立的containerFactory一樣沒有打印出來。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM