RabbitMQ-初見


目錄

什么是中間件

一個企業可能同時運行着多個不同的業務系統,

這些系統可能基於不同的操作系統、不同的數據庫、異構的網絡環境。

現在的問題是,如何把這些信息系統結合成一個有機地協同工作的整體,真正實現企業跨平台、分布式應用。

中間件便是解決之道,它用自己的復雜換取了企業應用的簡單。

中間件(Middleware)是處於操作系統和應用程序之間的軟件,也有人認為它應該屬於操作系統中的一部分。

人們在使用中間件時,往往是一組中間件集成在一起,構成一個平台(包括開發平台和運行平台),

但在這組中間件中必須要有一個通信中間件,

即中間件=平台+通信,

這個定義也限定了只有用於分布式系統中才能稱為中間件,同時還可以把它與支撐軟件和實用軟件區分開來。

消息隊列協議

比如我MQ發送一個信息,是以什么數據格式發送到隊列中,然后每個部分的含義是什么,

發送完畢以后的執行的動作,以及消費者消費消息的動作,消費完畢的響應結果和反饋是什么,

然后按照對應的執行順序進行處理。

例如:大家每天都在接觸的http請求協議:

1:語法:http規定了請求報文和響應報文的格式。
2:語義:客戶端主動發起請求稱之為請求。(這是一種定義,同時你發起的是post/get請求)
3:時序:一個請求對應一個響應。(一定先有請求在有響應,這個是時序)

而消息中間件采用的並不是http協議,

而常見的消息中間件協議有:OpenWire、AMQP、MQTT、Kafka,OpenMessage協議。

為什么消息中間件不直接使用http協議呢?

  • 因為http請求報文頭和響應報文頭是比較復雜的,包含了cookie,數據的加密解密,狀態碼,響應碼等附加的功能,

    但是對於一個消息而言,我們並不需要這么復雜,

    它其實就是負責數據傳遞,存儲,分發就行,

    一定要追求的是高性能。盡量簡潔,快速。

  • 大部分情況下http大部分都是短鏈接,

    一個請求到響應很有可能會中斷,中斷以后就不會就行持久化,就會造成請求的丟失。

    這樣就不利於消息中間件的業務場景,因為消息中間件可能是一個長期的獲取消息的過程,出現問題和故障要對數據或消息就行持久化等,

    目的是為了保證消息和數據的高可靠和穩健的運行。

AMQP協議

AMQP:(全稱:Advanced Message Queuing Protocol) 是高級消息隊列協議。

是一個提供統一消息服務的應用層標准高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。

基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不同產品,不同的開發語言等條件的限制。

特性:

  • 分布式事務支持。
  • 消息的持久化支持。
  • 高性能和高可靠的消息處理優勢。

MQTT協議

MQTT協議:(Message Queueing Telemetry Transport)消息隊列是IBM開放的一個即時通訊協議,物聯網系統架構中的重要組成部分。

特點:

  • 輕量

  • 結構簡單

  • 傳輸快,不支持事務

  • 沒有持久化設計。

應用場景:

  • 適用於計算能力有限
  • 低帶寬
  • 網絡不穩定的場景。

OpenMessage協議

由阿里、雅虎和滴滴出行、Stremalio等公司共同參與創立的分布式消息中間件、流處理等領域的應用開發標准。

特點:

  • 結構簡單
  • 解析速度快
  • 支持事務和持久化設計

Kafka協議

Kafka協議是基於TCP/IP的二進制協議。

消息內部是通過長度來分割,由一些基本數據類型組成。

特點是:

  • 結構簡單
  • 解析速度快
  • 無事務支持
  • 有持久化設計

消息隊列持久化

簡單來說就是將數據存入磁盤,而不是存在內存中隨服務器重啟斷開而消失,使數據能夠永久保存。

ActiveMQ RabbitMQ Kafka RocketMQ
文件存儲 支持 支持 支持 支持
數據庫 支持 / / /

消息的分發策略

MQ消息隊列有如下幾個角色

  • 生產者
  • 存儲消息
  • 消費者

那么生產者生成消息以后,MQ進行存儲,消費者是如何獲取消息的呢?

一般獲取數據的方式無外乎推(push)或者拉(pull)兩種方式,典型的git就有推拉機制,

我們發送的http請求就是一種典型的拉取數據庫數據返回的過程。

而消息隊列MQ是一種推送的過程,而這些推機制會適用到很多的業務場景也有很多對應推機制策略。

ActiveMQ RabbitMQ Kafka RocketMQ
發布訂閱 支持 支持 支持 支持
輪詢分發 支持 支持 支持 /
公平分發 / 支持 支持 /
重發 支持 支持 / 支持
消息拉取 / 支持 支持 支持

消息隊列高可用和高可靠

所謂高可用:是指產品在規定的條件和規定的時刻或時間內處於可執行規定功能狀態的能力。

當業務量增加時,請求也過大,一台消息中間件服務器的會觸及硬件(CPU,內存,磁盤)的極限,

一台消息服務器你已經無法滿足業務的需求,所以消息中間件必須支持集群部署。來達到高可用的目的。

什么是高可用機制

集群模式1 - Master-slave主從共享數據的部署方式

解說:生產者講消費發送到Master節點,所有的都連接這個消息隊列共享這塊數據區域,Master節點負責寫入,一旦Master掛掉,slave節點繼續服務。從而形成高可用,

集群模式2 - Master- slave主從同步部署方式

解釋:這種模式寫入消息同樣在Master主節點上,但是主節點會同步數據到slave節點形成副本,和zookeeper或者redis主從機制很類同。

這樣可以達到負載均衡的效果,如果消費者有多個這樣就可以去不同的節點就行消費,以為消息的拷貝和同步會暫用很大的帶寬和網絡資源。

集群模式3 - 多主集群同步部署模式

解釋:和上面的區別不是特別的大,但是它的寫入可以往任意節點去寫入。

集群模式4 - 多主集群轉發部署模式

解釋:如果你插入的數據是broker-1中,元數據信息會存儲數據的相關描述和記錄存放的位置(隊列)。

它會對描述信息也就是元數據信息就行同步,如果消費者在broker-2中進行消費,發現自己幾點沒有對應的消息,

可以從對應的元數據信息中去查詢,然后返回對應的消息信息

場景:比如買火車票或者黃牛買演唱會門票,比如第一個黃牛有顧客說要買的演唱會門票,但是沒有但是他會去聯系其他的黃牛詢問,如果有就返回。

集群模式5 Master-slave與Breoker-cluster組合的方案

解釋:實現多主多從的熱備機制來完成消息的高可用以及數據的熱備機制,在生產規模達到一定的階段的時候,這種使用的頻率比較高。

這么多集群模式,他們的最終目的都是為保證:消息服務器不會掛掉,出現了故障依然可以抱着消息服務繼續使用。

三句話:

  1. 要么消息共享,
  2. 要么消息同步
  3. 要么元數據共享

什么是高可靠機制

所謂高可用是指:是指系統可以無故障低持續運行,比如一個系統突然崩潰,報錯,異常等等並不影響線上業務的正常運行,出錯的幾率極低,就稱之為:高可靠。

在高並發的業務場景中,如果不能保證系統的高可靠,那造成的隱患和損失是非常嚴重的。

如何保證中間件消息的可靠性呢?可以從兩個方面考慮:

  1. 消息的傳輸:通過協議來保證系統間數據解析的正確性。
  2. 消息的存儲可靠:通過持久化來保證消息的可靠性。

RabbitMQ入門及安裝(Win)

官網:https://www.rabbitmq.com/

什么是RabbitMQ,官方給出來這樣的解釋:

RabbitMQ是部署最廣泛的開源消息代理。
RabbitMQ擁有成千上萬的用戶,是最受歡迎的開源消息代理之一。從T-Mobile 到Runtastic,RabbitMQ在全球范圍內的小型初創企業和大型企業中都得到使用。
RabbitMQ輕巧,易於在內部和雲中部署。它支持多種消息傳遞協議。RabbitMQ可以部署在分布式和聯合配置中,以滿足大規模,高可用性的要求。
RabbitMQ可在許多操作系統和雲環境上運行,並為大多數流行語言提供了廣泛的開發人員工具。


簡單概述:
RabbitMQ是一個開源的遵循AMQP協議實現的基於Erlang語言編寫,支持多種客戶端(語言)。用於在分布式系統中存儲消息,轉發消息,具有高可用,高可擴性,易用性等特征。

安裝RabbitMQ

下載地址:https://www.rabbitmq.com/download.html

環境准備:CentOS7.x+ / Erlang

RabbitMQ是采用Erlang語言開發的,所以系統環境必須提供Erlang環境,第一步就是安裝Erlang。

Erlang和RabbitMQ版本的按照比較: https://www.rabbitmq.com/which-erlang.html

Erlang安裝

安裝Erlang 時要注意安裝的RabbityMQ 所依賴的Erlang版本,根據RabbitMQ的要求選擇一個版本,下載Erlang安裝包后直接安裝就可以了。

設置ERLANG_HOME 環境變量

在開始菜單查找Erlang,點擊啟動 打開如下界面,那么Erlang就安裝成功了。

安裝RabbitMQ

可以在RabbitMQ的官方網站下載最新版本的RabbitMQ服務器安裝程序,RabbitMQ下載地址

RabbitMQ安裝好后是作為windows service 運行在后台。

設置環境變量 RABBITQM_SERVER變量

然后在系統的path變量中配置如下: 加入sbin的路徑

啟動 rabbitmq-server

安裝 rabbitmq_management

用下列命令安裝rabbitmq_management插件,這款插件是可以可視化的方式查看RabbitMQ 服務器實例的狀態,以及操控RabbitMQ服務器。

rabbitmq-plugins enable rabbitmq_management

Web頁面

現在我們在瀏覽器中輸入:http://localhost:15672 可以看到一個登錄界面

這里可以使用默認賬號guest/guest登錄

在瀏覽器中輸入 http://localhost:15672/api/ 就可以看到 RabbitMQ Management HTTP API 文檔

相關端口

5672:RabbitMQ的通訊端口

25672:RabbitMQ的節點間的CLI通訊端口是

15672:RabbitMQ HTTP_API的端口,管理員用戶才能訪問,用於管理RabbitMQ,需要啟動Management插件。

1883,8883:MQTT插件啟動時的端口。

61613、61614:STOMP客戶端插件啟用的時候的端口。

15674、15675:基於webscoket的STOMP端口和MOTT端口

授權賬號和密碼

新增用戶

rabbitmqctl add_user admin admin

設置用戶分配操作權限

rabbitmqctl set_user_tags admin administrator

用戶級別:

  • 1、administrator 可以登錄控制台、查看所有信息、可以對rabbitmq進行管理
  • 2、monitoring 監控者 登錄控制台,查看所有信息
  • 3、policymaker 策略制定者 登錄控制台,指定策略
  • 4、managment 普通管理員 登錄控制台

為用戶添加資源權限

rabbitmqctl.bat set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl add_user 賬號 密碼

rabbitmqctl set_user_tags 賬號 administrator

rabbitmqctl change_password Username Newpassword 修改密碼

rabbitmqctl delete_user Username 刪除用戶

rabbitmqctl list_users 查看用戶清單

rabbitmqctl set_permissions -p / 用戶名 ".*" ".*" ".*" 為用戶設置administrator角色

rabbitmqctl set_permissions -p / root ".*" ".*" ".*"

RabbitMQ入門及安裝(Linux)

[root@iZm5eauu5f1ulwtdgwqnsbZ ~]# lsb_release -a
LSB Version:    :core-4.1-amd64:core-4.1-noarch
Distributor ID: CentOS
Description:    CentOS Linux release 8.3.2011
Release:        8.3.2011
Codename:       n/a



wget https://packages.erlang-solutions.com/erlang-solutions-2.0-1.noarch.rpm
rpm -Uvh erlang-solutions-2.0-1.noarch.rpm



yum install -y erlang



erl -v



yum install -y socat



> wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.13/rabbitmq-server-3.8.13-1.el8.noarch.rpm
> rpm -Uvh rabbitmq-server-3.8.13-1.el8.noarch.rpm



# 啟動服務
> systemctl start rabbitmq-server
# 查看服務狀態
> systemctl status rabbitmq-server
# 停止服務
> systemctl stop rabbitmq-server
# 開機啟動服務
> systemctl enable rabbitmq-server



rabbitmq-plugins enable rabbitmq_management



systemctl restart rabbitmq-server



rabbitmqctl add_user admin admin



rabbitmqctl set_user_tags admin administrator



rabbitmqctl.bat set_permissions -p / admin ".*" ".*" ".*"

docker pull rabbitmq:management

docker run -di --name=myrabbit -p 15672:15672 rabbitmq:management

docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management


docker logs -f myrabbit



> more xxx.log  查看日記信息
> netstat -naop | grep 5672 查看端口是否被占用
> ps -ef | grep 5672  查看進程
> systemctl stop 服務




RabbitMQ的角色分類

1:none:

  • 不能訪問management plugin

2:management:查看自己相關節點信息

  • 列出自己可以通過AMQP登入的虛擬機
  • 查看自己的虛擬機節點 virtual hosts的queues,exchanges和bindings信息
  • 查看和關閉自己的channels和connections
  • 查看有關自己的虛擬機節點virtual hosts的統計信息。包括其他用戶在這個節點virtual hosts中的活動信息。

3:Policymaker

  • 包含management所有權限
  • 查看和創建和刪除自己的virtual hosts所屬的policies和parameters信息。

4:Monitoring

  • 包含management所有權限
  • 羅列出所有的virtual hosts,包括不能登錄的virtual hosts。
  • 查看其他用戶的connections和channels信息
  • 查看節點級別的數據如clustering和memory使用情況
  • 查看所有的virtual hosts的全局統計信息。

5:Administrator

  • 最高權限
  • 可以創建和刪除virtual hosts
  • 可以查看,創建和刪除users
  • 查看創建permisssions
  • 關閉所有用戶的connections

項目地址

https://gitee.com/zwtgit/rabbit-mq

RabbitMQ入門案例 - Simple 簡單模式

1:jdk1.8

2:構建一個maven工程

3:導入rabbitmq的maven依賴

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-amqp</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.5.2</version>
</dependency>

4:啟動rabbitmq-server服務

systemctl start rabbitmq-server
或者
docker start myrabbit

5:定義生產者

package com.zwt.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author: ML李嘉圖
 * @description: Producer 簡單隊列生產者
 * @Date : 2021/3/2
 */
public class Producer {
    public static void main(String[] args) {
        // 1: 創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();

        // 2: 設置連接屬性,本機的地址,默認端口5679
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        Connection connection = null;
        Channel channel = null;

        try {
            // 3: 從連接工廠中獲取連接
            connection = connectionFactory.newConnection("生產者");
            // 4: 從連接中獲取通道channel
            channel = connection.createChannel();
            // 5: 申明隊列queue存儲消息
            /*
             *  如果隊列不存在,則會創建
             *  Rabbitmq不允許創建兩個相同的隊列名稱,否則會報錯。
             *
             *  @params1: queue 隊列的名稱
             *  @params2: durable 隊列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果為true,會對當前隊列加鎖,其他的通道不能訪問,並且連接自動關閉
             *  @params4: autoDelete 是否自動刪除,當最后一個消費者斷開連接之后是否自動刪除消息。
             *  @params5: arguments 可以設置隊列附加參數,設置隊列的有效期,消息的最大長度,隊列的消息生命周期等等。
             * */
            channel.queueDeclare("queue1", false, false, false, null);
            // 6: 准備發送消息的內容
            String message = "你好,Ml李嘉圖!!!";
            // 7: 發送消息給中間件rabbitmq-server
            // @params1: 交換機exchange
            // @params2: 隊列名稱/routing
            // @params3: 屬性配置
            // @params4: 發送消息的內容
            channel.basicPublish("", "queue1", null, message.getBytes());
            System.out.println("消息發送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("發送消息出現異常...");
        } finally {
            // 7: 釋放連接關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

6:定義消費者

package com.zwt.simple;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author: ML李嘉圖
 * @description: Producer 簡單隊列生產者
 * @Date : 2021/3/2
 */
// 簡單模式----消費者
public class Consumer {
    public static void main(String[] args) {
        // 所有的中間件技術都是基於tcp/ip協議基礎上構建新型的協議規范,rabbitmq遵循的是amqp
        // 協議遵循 ip port

        // 1、創建連接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 通過連接工廠設置賬號密碼
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 虛擬訪問節點
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            // 2、創建連接connection
            connection = connectionFactory.newConnection("生產者");
            // 3、通過連接獲取通道channel
            channel = connection.createChannel();
            // 4、 通過創建交換機、聲明隊列,綁定關系,路由key,發送消息和接受消息
            channel.basicConsume("queue1", true, new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到消息是:" + new String(message.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                    System.out.println("接受消息失敗了。。。。");
                }
            });
            System.out.println("開始接受消息");
            // 進行阻斷,接受消息不關閉
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 7、關閉通道(先關通道,再關連接)
            if(channel!=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            // 8、關閉連接
            if(connection!=null && connection.isOpen()){
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

7:觀察消息的在rabbitmq-server服務中的過程

什么是AMQP

AMQP全稱:Advanced Message Queuing Protocol(高級消息隊列協議)。是應用層協議的一個開發標准,為面向消息的中間件設計。

RabbitMQ的核心組成部分

核心概念:
Server:又稱Broker ,接受客戶端的連接,實現AMQP實體服務。 安裝rabbitmq-server
Connection:連接,應用程序與Broker的網絡連接 TCP/IP/ 三次握手和四次揮手
Channel:網絡信道,幾乎所有的操作都在Channel中進行,Channel是進行消息讀寫的通道,客戶端可以建立對各Channel,每個Channel代表一個會話任務。
Message :消息:服務與應用程序之間傳送的數據,由Properties和body組成,Properties可是對消息進行修飾,比如消息的優先級,延遲等高級特性,Body則就是消息體的內容。
Virtual Host 虛擬地址,用於進行邏輯隔離,最上層的消息路由,一個虛擬主機理由可以有若干個Exhange和Queueu,同一個虛擬主機里面不能有相同名字的Exchange
Exchange:交換機,接受消息,根據路由鍵發送消息到綁定的隊列。(不具備消息存儲的能力)
Bindings:Exchange和Queue之間的虛擬連接,binding中可以保護多個routing key.
Routing key:是一個路由規則,虛擬機可以用它來確定如何路由一個特定消息。
Queue:隊列:也成為Message Queue,消息隊列,保存消息並將它們轉發給消費者。

RabbitMQ整體架構是什么樣子的?

RabbitMQ的運行流程

RabbitMQ支持消息的模式

參考官網:https://www.rabbitmq.com/getstarted.html

簡單模式 Simple

  • 參考上面

工作模式 Work

  • 類型:無
  • 特點:分發機制

發布訂閱模式

  • 類型:fanout
  • 特點:Fanout—發布與訂閱模式,是一種廣播機制,它是沒有路由key的模式。

路由模式

  • 類型:direct
  • 特點:有routing-key的匹配模式

主題Topic模式

  • 類型:topic
  • 特點:模糊的routing-key的匹配模式

參數模式

  • 類型:headers
  • 特點:參數匹配模式

rabbitmq發送消息一定有一個交換機

RabbitMQ入門案例 - fanout模式

發布訂閱模式

  • 類型:fanout
  • 特點:Fanout—發布與訂閱模式,是一種廣播機制,它是沒有路由key的模式。

代碼都是與簡單模式類似,只是調用的函數以及參數改變了

發布訂閱模式:大概就是咱們的交換機選定fanout模式,binding你自己需要使用的隊列,然后就可以通過這個交換機發布消息了。

RabbitMQ入門案例 - Direct模式

路由模式(Routing key)

例如:我只想發給微信用戶,不發QQ用戶,就是增加了 過濾的條件

  • 類型:direct
  • 特點:Direct模式是fanout模式上的一種疊加,增加了路由RoutingKey的模式。

RabbitMQ入門案例 - Topic模式

  • 類型:topic
  • 特點:Topic模式是direct模式上的一種疊加,增加了模糊路由RoutingKey的模式。

RabbitMQ入門案例 - Work模式 - 輪詢模式(Round-Robin)

當有多個消費者時,我們的消息會被哪個消費者消費呢,我們又該如何均衡消費者消費信息的多少呢?

主要有兩種模式:

1、輪詢模式的分發:一個消費者一條,按均分配;

2、公平分發:根據消費者的消費能力進行公平分發,處理快的處理的多,處理慢的處理的少;按勞分配;

  • 特點:該模式接收消息是當有多個消費者接入時,消息的分配模式是一個消費者分配一條,直至消息消費完成;

RabbitMQ入門案例 - Work模式 - 公平分發(Fair Dispatch)

RabbitMQ使用場景

解耦、削峰、異步

同步異步的問題(串行)

串行方式:將訂單信息寫入數據庫成功后,發送注冊郵件,再發送注冊短信。以上三個任務全部完成后,返回給客戶端

public void makeOrder(){
    // 1 :保存訂單 
    orderService.saveOrder();
    // 2: 發送短信服務
    messageService.sendSMS("order");//1-2 s
    // 3: 發送email服務
    emailService.sendEmail("order");//1-2 s
    // 4: 發送APP服務
    appService.sendApp("order");    
}

並行方式 異步線程池

並行方式:將訂單信息寫入數據庫成功后,發送注冊郵件的同時,發送注冊短信。以上三個任務完成后,返回給客戶端。與串行的差別是,並行的方式可以提高處理的時間 。

public void makeOrder(){
    // 1 :保存訂單 
    orderService.saveOrder();
   // 相關發送
   relationMessage();
}
public void relationMessage(){
    // 異步
     theadpool.submit(new Callable<Object>{
         public Object call(){
             // 2: 發送短信服務  
             messageService.sendSMS("order");
         }
     })
    // 異步
     theadpool.submit(new Callable<Object>{
         public Object call(){
              // 3: 發送email服務
            emailService.sendEmail("order");
         }
     })
      // 異步
     theadpool.submit(new Callable<Object>{
         public Object call(){
             // 4: 發送短信服務
             appService.sendApp("order");
         }
     })
      // 異步
         theadpool.submit(new Callable<Object>{
         public Object call(){
             // 4: 發送短信服務
             appService.sendApp("order");
         }
     })
}

存在問題:

1:耦合度高

2:需要自己寫線程池自己維護成本太高

3:出現了消息可能會丟失,需要你自己做消息補償

4:如何保證消息的可靠性你自己寫

5:如果服務器承載不了,你需要自己去寫高可用

異步消息隊列的方式

好處

1:完全解耦,用MQ建立橋接

2:有獨立的線程池和運行模型

3:出現了消息可能會丟失,MQ有持久化功能

4:如何保證消息的可靠性,死信隊列和消息轉移的等

5:如果服務器承載不了,你需要自己去寫高可用,HA鏡像模型高可用。

按照以上約定,用戶的響應時間相當於是訂單信息寫入數據庫的時間,也就是50毫秒。

注冊郵件,發送短信寫入消息隊列后,直接返回,因此寫入消息隊列的速度很快,基本可以忽略,因此用戶的響應時間可能是50毫秒。

因此架構改變后,系統的吞吐量提高到每秒20 QPS。比串行提高了3倍,比並行提高了兩倍

public void makeOrder(){
    // 1 :保存訂單 
    orderService.saveOrder();   
    rabbitTemplate.convertSend("ex","2","消息內容");
}

高內聚,低耦合

流量削峰

要對流量進行削峰,最容易想到的解決方案就是用消息隊列來緩沖瞬時流量,

把同步的直接調用轉換成異步的間接推送,中間通過一個隊列在一端承接瞬時的流量洪峰,

在另一端平滑地將消息推送出去。

消息隊列中間件主要解決應用耦合,異步消息, 流量削鋒等問題。

常用消息隊列系統:目前在生產環境,使用較多的消息隊列有 ActiveMQ、RabbitMQ、 ZeroMQ、Kafka、MetaMQ、RocketMQ 等。

在這里,消息隊列就像“水庫”一樣,攔蓄上游的洪水,削減進入下游河道的洪峰流量,從而達到減免洪水災害的目的。

針對秒殺場景還有一種方法,就是對請求進行分層過濾,從而過濾掉一些無效的請求。 分層過濾其實就是采用“漏斗”式設計來處理請求的。

還有一些場景(后面有文章補充介紹):

分布式事務的可靠消費和可靠生產
索引、緩存、靜態化處理的數據同步
流量監控
日志監控(ELK)
下單、訂單分發、搶票

RabbitMQ-SpringBoot案例 -fanout模式

在pom.xml中引入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

在application.yml進行配置

# 服務端口
server:
  port: 8080
# 配置rabbitmq服務
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    host: 47.104.141.27
    port: 5672

定義訂單的生產者

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
 * @author: ML
 * @description: OrderService
 * @Date : 2021/3/4
 */
@Component
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 1: 定義交換機
    private String exchangeName = "fanout_order_exchange";
    // 2: 路由key
    private String routeKey = "";
    public void makeOrder(Long userId, Long productId, int num) {
        // 1: 模擬用戶下單
        String orderNumer = UUID.randomUUID().toString();
        // 2: 根據商品id productId 去查詢商品的庫存
        // int numstore = productSerivce.getProductNum(productId);
        // 3:判斷庫存是否充足
        // if(num >  numstore ){ return  "商品庫存不足..."; }
        // 4: 下單邏輯
        // orderService.saveOrder(order);
        // 5: 下單成功要扣減庫存
        // 6: 下單完成以后
        System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer);
        // 發送訂單信息給RabbitMQ fanout
        rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
    }
}

綁定關系

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @Author : ML
 * @CreateTime : 2021/9/3
 * @Description :
 **/
@Configuration
public class DirectRabbitConfig {
    //隊列 起名:TestDirectQueue
    @Bean
    public Queue emailQueue() {
        // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
        // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable
        // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
        //   return new Queue("TestDirectQueue",true,true,false);
        //一般設置一下隊列的持久化就好,其余兩個就是默認false
        return new Queue("email.fanout.queue", true);
    }
    @Bean
    public Queue smsQueue() {
        // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
        // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable
        // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
        //   return new Queue("TestDirectQueue",true,true,false);
        //一般設置一下隊列的持久化就好,其余兩個就是默認false
        return new Queue("sms.fanout.queue", true);
    }
    @Bean
    public Queue weixinQueue() {
        // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效
        // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高於durable
        // autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。
        //   return new Queue("TestDirectQueue",true,true,false);
        //一般設置一下隊列的持久化就好,其余兩個就是默認false
        return new Queue("weixin.fanout.queue", true);
    }
    //Direct交換機 起名:TestDirectExchange
    @Bean
    public DirectExchange fanoutOrderExchange() {
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("fanout_order_exchange", true, false);
    }
    //綁定  將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting
    @Bean
    public Binding bindingDirect1() {
        return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");
    }
    @Bean
    public Binding bindingDirect2() {
        return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");
    }
    @Bean
    public Binding bindingDirect3() {
        return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");
    }
}

測試

import com.xuexiangban.rabbitmq.springbootrabbitmqfanoutproducer.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootRabbitmqFanoutProducerApplicationTests {
    @Autowired
    OrderService orderService;
    @Test
    public void contextLoads() throws Exception {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            Long userId = 100L + i;
            Long productId = 10001L + i;
            int num = 10;
            orderService.makeOrder(userId, productId, num);
        }
    }
}

定義消費者

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其實就是用來確定隊列和交換機綁定關系
@RabbitListener(bindings =@QueueBinding(
        // email.fanout.queue 是隊列名字,這個名字你可以自定隨便定義。
        value = @Queue(value = "email.fanout.queue",autoDelete = "false"),
        // order.fanout 交換機的名字 必須和生產者保持一致
        exchange = @Exchange(value = "fanout_order_exchange",
                // 這里是確定的rabbitmq模式是:fanout 是以廣播模式 、 發布訂閱模式
                type = ExchangeTypes.FANOUT)
))
@Component
public class EmailService {
    // @RabbitHandler 代表此方法是一個消息接收的方法。該不要有返回值
    @RabbitHandler
    public void messagerevice(String message){
        // 此處省略發郵件的邏輯
        System.out.println("email-------------->" + message);
    }
}

消費者 - 短信服務

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其實就是用來確定隊列和交換機綁定關系
@RabbitListener(bindings =@QueueBinding(
        // email.fanout.queue 是隊列名字,這個名字你可以自定隨便定義。
        value = @Queue(value = "sms.fanout.queue",autoDelete = "false"),
        // order.fanout 交換機的名字 必須和生產者保持一致
        exchange = @Exchange(value = "fanout_order_exchange",
                // 這里是確定的rabbitmq模式是:fanout 是以廣播模式 、 發布訂閱模式
                type = ExchangeTypes.FANOUT)
))
@Component
public class SMSService {
    // @RabbitHandler 代表此方法是一個消息接收的方法。該不要有返回值
    @RabbitHandler
    public void messagerevice(String message){
        // 此處省略發郵件的邏輯
        System.out.println("sms-------------->" + message);
    }
}

消費者 - XX服務

啟動服務SpringbootRabbitmqFanoutConsumerApplication,查看效果

代碼 + @EnableRabbit

RabbitMQ-SpringBoot案例 -direct模式

//消費者 - XX服務
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// bindings其實就是用來確定隊列和交換機綁定關系
@RabbitListener(bindings =@QueueBinding(
        // email.fanout.queue 是隊列名字,這個名字你可以自定隨便定義。
        value = @Queue(value = "email.fanout.queue",autoDelete = "false"),
        // order.fanout 交換機的名字 必須和生產者保持一致
        exchange = @Exchange(value = "fanout_order_exchange",
                // 這里是確定的rabbitmq模式是:fanout 是以廣播模式 、 發布訂閱模式
                type = ExchangeTypes.FANOUT)
))
@Component
public class EmailService {
    // @RabbitHandler 代表此方法是一個消息接收的方法。該不要有返回值
    @RabbitHandler
    public void messagerevice(String message){
        // 此處省略發郵件的邏輯
        System.out.println("email-------------->" + message);
    }
}

RabbitMQ-SpringBoot案例 -topic模式

代碼其實都是類似的,主要是理解原理。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM