了解一下RabbitMQ


RabbitMQ概述

MQ的應用場景:異步,削峰,解耦

RabbitMQ是遵從AMQP協議的 通信協議都設計到報文交互,換句話說RabbitMQ就是AMQP協議的Erlang的實現。

AMQP說到底還是一個通信協議從low-level層面舉例來說,AMQP本身是應用層的協議,其填充於TCP協議的數據部分。

從high-level層面來說,AMQP是通過協議命令進行交互的。命令類似HTTP中的方法(GET PUT POST DELETE等)。

信道(Channel)在AMQP是一個很重要的概念,大多數操作都是在信道這個層面展開的

我們完全可以用Connection就能完成信道的工作,為什么還要引入信道?

試想:一個程序中有很多個線程需要從RabbitMQ中消費消息,或者生產消息,那么必然需要建立很多個Connection,也就是多個TCP連接。

建立和銷毀TCP連接開銷很昂貴。所以RabbitMQ采用類似NIO的做法,選擇TCP連接復用不僅可以減少性能開銷,同時也便於管理。

我們知道無論是生產者還是消費者,都需要和 RabbitMQ Broker 建立連接,這個連接就是一條 TCP 連接,也就是 Connection。

一旦 TCP 連接建立起來,客戶端緊接着可以創建一個 AMQP 信道(Channel),每個信道都會被指派一個唯一的 ID。

信道是建立在 Connection 之上的虛擬連接,RabbitMQ 處理的每條 AMQP 指令都是通過信道完成的。

發布訂閱模式

廣播模式 fanout

  所謂廣播指的是一條消息將被所有的消費者進行處理。

直連模式 director

  直連模式的特點主要就是routingkey的使用,如果現在該消息就要求指定一個具備有指定Routingkey的操作者進行處理,那么只需要兩個的Routingkey匹配即可。

  可以將Routingkey比喻一個唯一標記,這樣就可以將消息准確的推送到消費者手中了。

主題模式 topic

  主題模式類似於廣播模式與直連模式的整合操作,所有的消費者都可以接收到主題信息,但是如果要想進行正確的處理,則一定需要有一個匹配的Routingkey完成操作。

  可以使用通配符模糊匹配("#"匹配一個或多個詞,"*"匹配不多不少一個詞)

交換器相當於投遞包裹的郵箱(一方面接收生產者發送的消息,另外一方面負責向隊列進行消息的推送),Routingkey相當於包裹的地址,BindingKey相當於包裹的目的地。

當填寫在包裹上的地址和要投遞的地址相匹配時,那么這個包裹就會正確投遞到目的地,最后這個目的地的主人(隊列)可以保留這個包裹。

如果填寫地址出錯,郵遞員不能正確的投遞到目的地,包裹可能被退回給寄件人,也有可能被丟棄。

RabbitMQ官方文檔和API都把Routingkey和BingdingKey都看做Routingkey下面代碼中紅色部分 就都當Routingkey使用

消息生產者

public class MessageProducer {
    private static final String EXCHANGE_NAME ="com.sunkun.topic";//消息隊列名稱
    private static final String HOST="192.168.1.105";
    private static final int PORT=5672;
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();//建立一個連接工廠
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername("sunkun");
        factory.setPassword("123456");
        //factory.setVirtualHost(virtualHost) 使用虛擬主機的最大好處 可以區分不同用戶的操作空間  每一個虛擬主機有一個自己的空間管理
        Connection conn = factory.newConnection();//定義一個新的RabbitMQ的連接
        Channel channel = conn.createChannel();//創建一個通訊的通道
        //定義該通道要使用的隊列名稱 此時隊列已經創建過了
        //第一個參數 隊列名稱(這個隊列可能存在也可能不存在)
        //第二個參數 是否持久保存
        //第三個參數 此隊列是否為專用的隊列信息
        //第四個參數 是否允許自動刪除
        //channel.queueDeclare(QUENE_NAME, true, false, true,null);
        channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);
        long start = System.currentTimeMillis();
        System.out.println("消息開始"+start);
        for(int i=0;i<1000;i++){
            String message = "sk - "+i;
            if(i%2==0){
                //MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
 channel.basicPublish(EXCHANGE_NAME, "sk1", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//進行消息發送             }else{
 channel.basicPublish(EXCHANGE_NAME, "sk2", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//進行消息發送             }

        }
        long end = System.currentTimeMillis();
        System.out.println("消息花費時間"+(end-start));
        channel.close();
    }
}

消息消費者

public class MessageConsumer {
    private static final String EXCHANGE_NAME ="com.sunkun.topic";//消息隊列名稱
    private static final String HOST="192.168.1.105";
    private static final int PORT=15672;
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();//建立一個連接工廠
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername("sunkun");
        factory.setPassword("123456");
        Connection conn = factory.newConnection();//定義一個新的RabbitMQ的連接
        Channel channel = conn.createChannel();//創建一個通訊的通道
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();//通過通道獲取一個隊列名稱
       channel.queueBind(queueName, EXCHANGE_NAME, "sk2");//進行綁定處理 //在RabbitMQ里面,所有的消費者信息是通過一個回調方法完成的
        Consumer consumer = new DefaultConsumer(channel){//需要復寫指定的方法實現消息處理
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("消費者sk2:"+message);//可以啟動多個消費者
                super.handleDelivery(consumerTag, envelope, properties, body);
            }
        };
        channel.basicConsume(queueName,consumer);
    }
}

RabbitMQ可靠性投遞—保證消息成功發出

持久化可以提高RabbitMQ的可靠性,防止在異常情況(重啟,關閉,宕機)下的數據丟失。

持久化可分為三個部分:交換器的持久化,隊列的持久化和消息的持久化。從上面的代碼可以看到,生產者如果往交換器發消息,然后和消費者和隊列綁定,是不需要我們顯示的聲明隊列的(也就沒必要設置隊列的持久化)。

交換器的持久化:是通過聲明交換器時將druable參數設置為true來實現的。如果交換器不設置持久化,那么在RabbitMQ重啟之后相關的交換器元數據會丟失,不過消息不會丟失,只是不能將消息發送到這個交換器中了。

對於一個長期使用的交換器來說,建議其置為持久化。(消息不直接往隊列發,先往exchange發送 可以實現廣播模式)

隊列的持久化:是通過聲明隊列時將durable參數置為true實現的(和交換器的持久化類似)。如果隊列不設置持久化,那么在RabbitMQ服務重啟之后,相關隊列的元數據會丟失,此時數據也會丟失。

消息的持久化:因為隊列的持久化能保證其本身的元數據不會因為異常情況而丟失,但是不能保證內部存儲的消息不會丟失。要確保消息不會丟失,需求將其設置為持久化。

消息的持久化是指當消息從交換機發送到隊列之后,被消費者消費之前,服務器突然宕機重啟,消息仍然存在。消息持久化的前提是隊列持久化,假如隊列不是持久化,那么消息的持久化毫無意義。

消息的持久化是設置Properties為MessageProperties.PERSITANT_TEXT_PLAIN,

RabbitMQ集群—保證mqbroker節點的成功接收

在持久化的消息正確存入到RabbitMQ之后 還需要一段時間(雖然時間很短,但不可忽視)才能存入磁盤中,如果這段時間發生了宕機,消息保存還沒來得及落盤,那么這些消息將會丟失。

可靠性投遞,是為了保證消息能夠100%到達mqbroker,而鏡像隊列是為了保證mqbroker出現意外情況,斷電,宕機,磁盤損耗而不丟失數據。保障MQ節點成功接收

本文主要講鏡像隊列

鏡像隊列只是進行數據的副本拷貝 (主從集群僅僅是數據備份,做不到故障轉移),當外部發送過的消息首先落到我們的主服務器上,然后主服務器把數據同步到另外的兩個節點上,

這就是鏡像隊列,可以保證數據百分之百的不丟失(主節點掛了還有兩個從節點)

如果想要安全的使用RabbitMQ就要繼續追加負載均衡組件,列如HAProxy LVS等等,如果要保證負載均衡組件的高可用,還應該繼續追加KeepAlive或者ZooKeeper組件。

參考Redis集群架構

生產者確認—發送端收到MQ節點(Broker)確認應答

除上面兩個問題外 我們還遇到一個新問題:當消息的生產者將消息發送出去之后,消息到底有沒有正確的到達服務器呢?

如果消息到達服務器之前就丟失,那么持久化也解決不了問題,因為消息就沒有到達服務器,何談持久化呢。

通常會有兩種方法解決此問題一時事物機制,只有消息被成功接收,事物才能提交成功,否則便可在捕獲異常之后進行事物回滾,於此同時可以進行消息重發。

但使用事物機制會大大降低RabbitMQ的性能,我們一般采取發送方確認機制。

發送方確認機制:生產者將信道設置成confirm模式(channel.confirmSelect()),一旦信道進入confirm模式,所有在該信道上面發布的消息都會被指派一個唯一的ID(從1開始),

一旦消息被投遞到所有的匹配隊列之后,RabbitMQ就會發送一個確認給生產者(包含消息的唯一ID),這就使得生產者知曉消息已經到達目的地了(生產者會添加監聽addConfirmListener)。

如果消息和隊列是持久化的,那么消息確認會在消息寫入磁盤后發出。

可以通過com.rabbitmq.client.Envelope實現應答處理

./rabbitmq-server -detached 后台啟動rabbitMQ

推薦文章:RabbitMQ教程

RabbitMQ消息可靠性投遞

消息信息落庫,對消息狀態進行打標—防止回ACK的時候失敗(網絡抖動)

RabbitMQ 消息的持久化,集群,生產者確認,這些RabbitMQ內部的功能應該已經可以保證消息不會被丟失而且正常接收了吧。為什么還要做消息落庫這么麻煩呢

因為回ack的時候可能會出現失敗,網絡抖動問題。

所以采用消息落地到數據庫 發送消息的時候 同時insert一條消息記錄狀態設置為0,如果ack成功了,把消息狀態設置為1

如果ack失敗了, 落地到數據庫的消息沒有更新,可以采用定時任務(消息存在了xx min狀態還是0,就重新發送)

  1. 進行數據的入庫
    比如我們要發送一條訂單消息,首先把業務數據也就是訂單信息進行入庫,然后生成一條消息,把消息也進行入庫,這條消息應該包含消息狀態屬性,並設置初始值比如為0,表示消息創建成功正在發送中,這種方式缺陷在於我們要對數據庫進行持久化兩次。

  2. 首先要保證第一步消息都存儲成功了,沒有出現任何異常情況,然后生產端再進行消息發送。如果失敗了就進行快速失敗機制。

  3. MQ把消息收到的結果應答(confirm)給生產端

  4. 生產端有一個Confirm Listener,去異步的監聽Broker回送的響應,從而判斷消息是否投遞成功,如果成功,去數據庫查詢該消息,並將消息狀態更新為1,表示消息投遞成功。

假設第二步OK了,在第三步回送響應時,網絡突然出現了閃斷,導致生產端的Listener就永遠收不到這條消息的confirm應答了,也就是說這條消息的狀態就一直為0了。

  1. 此時我們需要設置一個規則,比如說消息在入庫時候設置一個臨界值timeout,5分鍾之后如果還是0的狀態那就需要把消息抽取出來。這里我們使用的是分布式定時任務,去定時抓取DB中距離消息創建時間超過5分鍾的且狀態為0的消息。

  2. 把抓取出來的消息進行重新投遞(Retry Send),也就是從第二步開始繼續往下走(此時消息可能出現重復投遞的情況,需要消費者那邊冪等性防止重復消費)

  3. 當然有些消息可能就是由於一些實際的問題無法路由到Broker,比如routingKey設置不對,對應的隊列被誤刪除了,那么這種消息即使重試多次也仍然無法投遞成功,所以需要對重試次數做限制,比如限制3次,如果投遞次數大於三次,那么就將消息狀態更新為2,表示這個消息最終投遞失敗。

如何保證消息不會被重復消費(消息的冪等性)

在海量訂單產生的消息高峰期(高並發情況下),如何避免消息的重復消費問題,消息端實現冪等性就意味着,我們的消息永遠不會消費多次,即時我們收到了多條一樣的消息。

解決辦法

1)唯一ID加指紋嗎(業務規則或者時間戳等) 機制,利用數據庫主鍵去重

好處:實現簡單

壞處:高並發下有數據庫寫入的性能瓶頸

解決方案:根據ID進行分庫分表 算法路由

2)利用redis原子性(setnx命令)

消費端的限流策略(削峰)

當我們的RabbitMQ有數萬條未處理的消息時,我們隨便打開一個消費者客戶端,巨量的消息全部推送過來,但是我們的客戶端無法同時處理這么多數據。

解決辦法

RabbitMQ提供了一種qos(服務質量保證)功能,即在非自動確認消息的前提下,如果一定數目的消息未被確認前(通過consumer或者channel設置qos的值),不進行新的消息

autoask=false(不讓它自動簽收)的情況下才生效,即在自動應答的情況下 這兩個值是不生效的。

死信隊列

Dead-Letter-Exchange:當消息在一個隊列中變成死信(dead message)之后,它能被重新publish到另一個Exchange,

這個Exchange就是DLX,它會把消息路由到它鎖綁定的隊列上(消費端可能有一個監聽去監聽這個隊列),這樣我們可以對一些死信的消息進行后續的處理。

消息變為死信的情況:

1)消息被拒絕並且requeue=false

2)消息TTL過期

3)隊列達到最大長度

工作中死信隊列非常重要:死信隊列就是用來處理一些 消息沒有消費者消費 但已經處於死信的情況,我們可以做一些更完善的補償機制

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM