rabbitmq 詳解


mq

消息隊列 先進先出

1.為什么要使用mq?

異步 削峰 解耦

1.流量削峰

使用消息隊列做一個緩沖

2.應用解耦

可以解決系統之間的調用問題。如果物流系統出現故障,需要幾分鍾修復,通過消息隊列作為中間件,在這幾分鍾內,物流系統要處理的內存被緩存在消息隊列中,用戶可以正常下單。

缺點

3.異步處理

A調用B 只需要監聽b處理完成的消息,B處理完成之后,會發送一條消息給MQ ,MQ會將這條消息轉給A服務。

mq的種類

ActiveMQ

單機吞吐量高 時效性ms級,可用性高,消息可靠性高

官方社區對其維護越來越少,高吞吐量場景較少使用

Kafka

大數據領域內的消息傳輸 百萬級別吞吐量 

優點 吞吐量高 時效期ms級,分布式 少數機器宕機,不會導致不可用,消息有序,能保證所有消息被消費且只能消費一次 在日志領域比較成熟

主要用於大數據領域的實時計算以及日志采集

缺點:消息失敗不支持重試 單機超過64個分區,load(CPU)會發生明顯的飆高

采用短輪詢方式,實時性取決於輪詢間隔時間

一台代理宕機,會產生亂序 

 

Rocketmq

訂單 交易 充值 日志流式處理

優點:單機吞吐量十萬級 可用性高 分布式  消息可以做到0丟失 擴展性好 支持大量數據的數據堆積

缺點;支持語言少 支持java和c++

 

Rabbitmq

由於erlang的高並發性,吞吐量到萬級,支持多種語言,開源,提供了管理頁面,社區活躍性高

缺點;商業版需要收費

mq的選擇

Kafka 大量數據的互聯網公司

Rocketmq 金融互聯網

Rabbitmq 中小型公司

 

Rabbitmq

接收 存儲 轉發消息

Rabbitmq

接收 存儲 轉發消息

生產者 交換機  隊列  消費者

 

六大模式

簡單模式 工作模式  發布訂閱模式  路由模式  主題模式 發布確認模式

Broker 接收和分發消息的應用 mq的服務器 

-exchange

   -quenue

Channel 信道

連接里面多個信道 減少建立連接的開銷

Broker 里面有多個virtual host  每個用戶在自己的vhost創建exchange/queue 

 

簡單模式

一個消費者  mq  一個生產者 

工作模式

工作隊列的主要思想是避免立即執行資源密集型任務,而不得不等待它完成,相反我們安排任務在之后完成。我們把任務封裝為消息並將其發送到隊列。在后台運行的工作進程將彈出任務並最終執行作業。當有多個工作線程,這些工作線程將一起處理這些任務。

生產者大量發消息給隊列,造成很多消息停留在隊列中,無法進行及時處理。通過多個工作線程,采用輪詢的方式來處理。

消費者-》多個工作線程。輪詢 競爭關系

一個消息只能被處理一次 不可以處理多次

 

消息應答

問題:

某個消費者處理一個長的工作任務並且僅完成了部分就突然掛掉了。rabbitmq一旦向消費者發送了某條消息,就立即將消息設置為刪除。這種情況下,我們將會丟失正在處理的消息,以及后續發送給該消費者的消息,它將無法接收。

消息應答:消費者接收到消息並處理完消息之后,告訴rabbitmq消息已經處理了,rabbitmq可以把消息刪除了。

自動應答

高吞吐量和數據傳輸安全要有保證

手動應答 

手動應答的方法

basicAck 肯定確認(如果批量應答 是否批量 true)

basicNack 否定確認 比另一個多一個參數。是否批量

basicReject  否定確認

批量應答 最好別  multiple 

 

消息的自動重新入隊

消息未發送ACK確認 會重新入隊 rabbitmq會安排另一個消費者處理

 消息手動應答時是不丟失的 放回隊列中重新消費

 

//手動應答

channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

 

rabbitmq持久化

確保消息不丟失 隊列和消息持久化 

1.隊列持久化 durable true 

 boolean durable=true;

channel.queueDeclare(normal_queue,durable,false,false,arguments);

隊列不是持久化的 需要把原來的隊列先刪除掉 或者重新創建一個持久化的隊列 不然會報錯

2.消息持久化

生產者發消息時通知mq消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN

channel.basicPublish("",task_queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(“UTF-8"));

 

不公平分發

channel.basicQos(1);

消費者接收消息之前設置不公平分發

 

預取值

指定消費者分多少條消息   

prefetchCount 5

prefetchCount 2

channel.basicQos(prefetchCount);

如果超過7條 按照不公平分發

 

發布確認原理

生產者

設置隊列必須持久化

設置要求隊列中的消息必須持久化

發布確認mq 把消息保存到磁盤上 ,保存成功后 通知生產者

 1) 單個確認發布

  發布速度特別慢 如果沒有確認發布的消息就會阻塞后續所有消息的發布

channel.confirmSelect();

channel.waitForConfirms() //

 2) 批量確認發布

當發生故障導致發布出現問題時,不知道是哪個消息出現問題了。

 3) 異步確認發布.  利用回調函數 保證是否投遞成功

如何處理異步確認中確認失敗的消息?把未確認的消息放到一個基於內存的能被發布線程訪問的隊列  

//異步確認
    public static void publishMessageAsync() throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //發布確認
        //線程安全有序的一個哈希表 適用於高並發的情況下
        /**
         * 1.輕松將序號和消息關聯
         * 2。輕松批量刪除條目 只要給到序號
         * 3。支持高並發
         */
        ConcurrentSkipListMap<Long,String> outstandingConfirms =new ConcurrentSkipListMap<>();
        //消息確認成功
        ConfirmCallback ackCallback=(deliveryTag, multiple) -> {
            //刪除掉已經確認成功的消息
            if(multiple){
                ConcurrentNavigableMap<Long, String> confimed= outstandingConfirms.headMap(deliveryTag);
                confimed.clear();
                //批量
            }else{
                outstandingConfirms.remove(deliveryTag);
                //單個
            }

            System.out.println("確認成功的消息"+deliveryTag);
        };
        //消失確認失敗
        ConfirmCallback nackCallback=(deliveryTag, multiple) -> {
            String message = outstandingConfirms.get(deliveryTag);

            System.out.println("確認失敗的消息"+message);
        };

        channel.confirmSelect();

        //准備消息的監聽器
        channel.addConfirmListener(ackCallback,nackCallback);
        int batch =1000;
        long start = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message=i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
            //記錄下所有的消息
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);

        }
        long end = System.currentTimeMillis();
        System.out.println("發布1000個單獨確認需要時間:"+(end-start));
    }

 

交換機

發布訂閱模式  一個消息想被多個消費者消費 

生產者-消息- 交換機 -routingkey-隊列- 消息只能被消費一次-消費者

                              -routingkey-隊列-消息只能被消費一次 -消費者

 

生產者只能將消息發送到交換機,交換機一方面接收來自生產者的消息,另一方面將它們推入隊列,

 

exchange

直接direct 主題topic 標題headers(不常用) 扇出fanout

“”表示無名或者默認交換機

routingkey 綁定key 指定交換機

 

臨時隊列

不帶有持久化 名字是隨機的 隊列 一旦斷開了消息者的隊列,隊列將被自動刪除。

String queueName=Channel.queueDeclare().getQueue();

 

綁定 (交換機 queue 之間的橋梁)

通過routingkey進行綁定

通過routing key 區分不同的隊列

 

1)Fanout(廣播) 發布訂閱模式。 扇出

將接收到的所有的消息廣播到所有隊列中

綁定交換機和隊列

 

Channel.queueBind(queueName,Exchange_NAME,“”);//第三個參數 routingKey

兩個隊列的Routingkey相同 將都接收到消息

 

2)direct交換機 路由模式  routingkey模式

聲明隊列的時候 指明交換機是direct類型

生產者-消息- 交換機 -routingkey-隊列- 消息只能被消費一次-消費者

                              -routingkey-隊列-消息只能被消費一次 -消費者

routingkey相同是扇出交換機 不同是直接交換機

direct_logs   交換機->console 隊列 ->nfo                routingkey

                          - >console 隊列->warming(多重綁定)

                               ->disk 隊列->error

一個隊列,擁有多個routing key 多重綁定

誰能接收到消息 完全取決於rouingkey

routingkey->info console接收

routingkey-> warming console接收

routingkey->error  disk接收

 

3)topic交換機 

Routingkey不同,直接交換機只能給一個隊列發消息

主題交換機的routing key 必須是一個單詞列表 以點號分割

 “quick,orange.rabbit” 不能超過255個字節

(*.orange.*)匹配三個單詞中間是orange

lazy.#)#匹配多個單詞

 

當一個隊列綁定鍵是#,那么這個隊列將接收所有的數據,類似於fanout

當隊列綁定鍵中沒有#和*出現,那這個隊列綁定類型類似於direct

 

死信隊列

消息無法被消費

某些時候由於特定的原因導致queue中的某些消息無法被消費,這樣的消息如果沒有后續的處理,就變成了死信

應用場景:為了保證訂單業務的消息數據不丟失,需要用到死信隊列機制。

當消息發生異常,將消息投入死信隊列。

支付超時未付款訂單會自動失效

 

死信的來源

消息ttl過期(存活時間)

隊列達到最大長度

消息被拒絕

 

生產者 -普通交換機 type=direct->普通隊列—-》c1

                                        |

                                 消息TTL過期

                                 隊列達到最大長度   成為死信

                                消息被拒絕

                                      

                          死信交換機 type=direct

                                        |

                             死信隊列———》c2

 

 

死信消息

//通過參數轉發消息到死信隊列

HashMap<String, Object> arguments = new HashMap<>();
//過期時間. ms
arguments.put("x-message-ttl",100000);
//正常隊列設置死信交換機
arguments.put("x-dead-letter-exchange",dead_exchange);
//死信routingkey
arguments.put("x-dead-letter-routing-key","lisi");
channel.queueDeclare(normal_queue,false,false,false,arguments);ggu p

設置過期時間:

一種在普通隊列 設置過期時間

另一種在生產方發消息時設置過期時間。(比較靈活 可以隨意修改過期時間)

 

發消息時設置過期時間:props 設置死信消息的過期時間

AMQP.BasicProperties properties =new AMQP.BasicProperties().builder().expiration("1000").build();

 

設置死信隊列的長度

arguments.put("x-max-length",6);

超過部分會成為死信消息

 

消息被拒絕 指定某條消息被拒絕

需要開啟手動應答

if(message.equals(“info5")){

channel.basicReject(message.getEnvelope().getDeliveryTag(),false);//(消息的標志,是否放回隊列)

}

 

延遲隊列(死信隊列中ttl過期)

隊列內部是有序的 

在某個事件發生之前或者之后的指定時間完成某一項任務

訂單十分鍾內未支付則關閉

 

整合springboot:跳過

 

延遲隊列:延遲指定時間消費消息

 

優化

每新增一個時間需求,就要新增一個隊列

QA QB指定了過期時間。【QC不指定過期時間 沒設置ddl時間】

解決方法:發送消息的時候設置過期時間

rabbitTemplate.convertAndSend(“X”,”XC”,message,msg->{
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});

基於死信的延遲存在問題:

發送多條信息會排隊,rabbitmq只會檢查第一個隊列,如果第一個消息的延時時長很長,第二個消息的延時時長很短,第二條消息並不會得到優待。

 

基於延遲消息插件的延遲隊列:延遲交換機  x-delayed-message

 

生產者 -》延遲交換機 -〉隊列 -》消費者 

 

聲明一個延遲交換機 基於插件的延時隊列

public CustomExchange delayedExchange(){
    Map<String,Object> arguments =new HashMap<>();
     arguments.put("x-delayed_type","direct");
     return new CustomExchange(delayed_exchange_name,"x-delayed-message",true,false,arguments);

}

 

延時隊列:使用rabbitmq實現延時隊列可以很好的利用rabbitmq的特性,消息的可靠發送,可靠投遞,利用死信隊列保證消息至少被消費一次 以及未被正確處理的消息不會被丟棄。

Rabbitmq集群的特性 可以解決單點故障的問題 不會因為單個節點掛掉導致延時隊列不可用或者消息丟失。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM