【rabbitmq】rabbitmq概念解析--消息確認--示例程序


概述

本示例程序全部來自rabbitmq官方示例程序,rabbitmq-demo
官方共有6個demo,針對不同的語言(如 C#,Java,Spring-AMQP等),都有不同的示例程序;
本示例程序主要是Spring-AMQP的參考示例,如果需要其他語言的參考示例,可以參考官網;

rabbitmq模擬器

模擬器

rabbitmq簡介

核心架構圖

核心架構圖
數據流轉圖
架構圖

AMQP 0-9-1 Model Explained

重要語法說明

  • producer或publisher: 消息生產者/發布者,即:產生消息的;
  • Exchange:producer或publisher只會將message發送到Exchange,目前有4種不同的Exchange類型;
  • Queue:消息隊列,所有的消費者都是直接從Queue獲取Message並消費;
  • Binging:連接Exchange和Queue的紐帶,決定Exchange如何路由消息到不同的Queue;
  • routingKey:生產者-->message-->Exchange,需要指定一個key,叫做routingKey;
  • routingKey:Exchange-->Binging-->Queue,Binging有一個Key值,叫routingKey或bingingKey;
  • bingingKey:Exchange-->Binging-->Queue,Binging有一個Key值,bingingKey;

核心理解

4種不同的Exchange,對routingKey的解釋都不相同;
對routingKey的不同解釋,決定了Exchange路由Message到Queue的不同方案;

  1. direct exchange: 匹配2個routingKey(即routingKey和bingingKey)是否相等,相等時才進行消息路由;
  2. fanout exchange: 忽略routingKey,會將Message路由到所有綁定的Queue;
  3. topic exchange: routingKey格式形如aaa.bbb.xxx*.ccc.dd.#,類似正則表達式匹配;
  4. headers exchange:

jar包說明

  • Java版本:
    Java版本使用如下jar(說明:若是使用):
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.0.2</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

demo1: 單生產者-單消費者

單生產者-單消費者官方示例

spring.profiles.active=hello-world, sender, receiver

demo2: 單生產者-多消費者

Work queues官方示例
workQueue
application.properties配置

spring.profiles.active=work-queues, sender, receiver
#spring.profiles.active=work-queues, sender
#spring.profiles.active=work-queues, receiver

詳細描述參見:單生產者-多消費者詳細


demo3: 發布/訂閱

Publish/Subscribe官方示例
發布/訂閱

  • 消費廣播到多個消費者進行消費;
  • 使用fanout pattern;

application.properties配置

spring.profiles.active=pub-sub, receiver , sender 

詳細描述參見:發布/訂閱詳細


demo4: Routing

Routing官方示例

Direct exchange 模式進行route結構圖

direct-exchange
a message goes to the queues whose binding key exactly matches the routing key of the message;(相等時才路由)

Multiple bindings

Multiple bindings
兩個Queue使用相同的BingingKey(black) ==> 效果類似於:發布/訂閱模式(demo3);

完整的結構圖

together

application.properties配置

pring.profiles.active=routing, receiver , sender  

詳細描述參見:發布/訂閱詳細


demo5: Topics

Topics官方示例
結構示例圖

  • 使用 Topic exchange實現;
  • 發送到Topic exchange的routingKey必須滿足一定要求:用"."分割的words列表,如:*.aaa.bbb.#
  • BingingKey和routingKey有相同的格式要求;
  • * : 可以匹配一個word;
  • #: 可以匹配0個或多個words;

application.properties配置

pring.profiles.active=topics, receiver , sender 

詳細描述參見:Topics


demo6: RPC over RabbitMQ

RPC官方示例

結構圖

架構圖

application.properties配置

spring.profiles.active=rpc,server
#spring.profiles.active=rpc,client

詳細描述參見:RPC


消費端確認

Delivery Identifiers: Delivery Tags

消費者注冊后,rabbitmq將消息交付給消費者時,都會帶有一個“Delivery Tags”,這個是唯一的ID標識,id以整數的遞增的方式實現。

Acknowledgement Modes(消費端)

自動確認模式

  • 發送之后,就認為是發送成功(fire-and-forget)
  • 消息不停的發送到消費端消費,無需等待消費端任何確認;

缺點:

  • 可能造成消費端不堪重負;

手動模式

  1. basic.ack: 肯定的確認;
  2. basic.nack: 否定的確認(RabbitMQ對AMQP 0-9-1的擴展),支持消息批量確認;
  3. basic.reject:否定的確認,消息消費失敗后,直接從broker中將消息delete不支持批量確認

Acknowledging Multiple Deliveries at Once(消息批量確認)

  • 一次確認多個消息發送,而不是每一個消息單獨確認;
  • basic.reject:不具備該功能;
  • basic.nack: 具備該功能;

實現方式

  • multiple field: 設置為true;

示例

假設:在Channel(ch)上有5,6,7,8這4個delivery tags未確認;

  • 情況1,delivery_tag=8 & multiple=true: 則5,6,7,8這4個tags都將被確認;
  • 情況2,delivery_tag=8 & multiple=false:則只有8被確認,而5,6,7將不會被被確認;

Channel Prefetch Count (QoS)[可以設置消費端消費的速率]

  • 消息消費是異步完成的,手動確認也是異步的;
  • 有一部分消息是被消費了,但是還未來得及確認:希望控制未被確認消息的size,防止無界的緩存
  • prefetch count:使用basic.qos方法設置該值可以控制未被確認消息的max size;
  • 當達到該最大值時,rabbitmq將停止交付消息進行消費;
  • 僅對basic.qos方法有效,對basic.get方法無效;

示例

假設:在Channel(Ch)上有5,6,7,8共4個未被確認的消息,且ch的prefetch count=4
結果:rabbitmq將不會再交付任何消息到該Channel上,除非有消息被確認;

消費確認選擇,prefetch設置以及吞吐量

  • 情況1:增大prefetch:提高向消費者傳遞消息的速度;
  • 情況2:自動確認模式可以產生最佳的傳送速率;

應避免:

  1. 自動確認模式
  2. 手動確認模式 + 無限制的prefetch

結論:

  • 情況1情況2都可能導致交付但未來得及處理的Message增加,增大RAM的消耗;

推薦值:

  • prefetch: 100~300,可以有效提高吞吐量,並避免RAM消耗過多的風險;

消費失敗或連接中斷: 自動重新reQueue

當消息發送給消費端后,如果出現如下情況,則消息會重新reQueue,會被再次發送;

  1. TCP連接中斷;
  2. 消費端掛掉:無法進行消息確認;

Client Errors: Double Acking and Unknown Tags

消費端無法對同一個消息確認超過一次,當超過一次之后,將拋出Channel error: PRECONDITION_FAILED - unknown delivery tag XXXX

總結

  • 每個交付給消費端的消息,都有一個唯一的標識delivery tag
  • 自動消息確認;
  • 手動消息確認:每個消息單獨確認批量消息確認;
  • prefetchCount:可以控制消息端的吞吐量,避免消費端消費過慢,產生RAM大量消耗;
  • 失敗重傳:TCP連接中斷消費端掛掉,都會引起消息重新入隊列,重新消費(手動消息確認時);
  • 無法對同一個消息進行2次或2次以上的確認,否則會拋出異常;

發送端確認

Channel事務

  • 不推薦使用: 會嚴重降低吞吐量;

在 AMQP 0-9-1中,保證消息不丟失的唯一方法,就是使用事務;

  1. 開啟Channel事務;
  2. 發送消息,提交事務;

類似消費端的應答確認機制

  • confirm.select: 應用於Channel時,表示使用確認模式
  • 事務確認模式無法共存:二者只能選擇其一;

確認模式 (confirm.select)

  • 發送端使用confirm.select;
  • broker發送basic.ack來確認Message已被處理;
  • delivery-tag: 消息序列,具有唯一性;
  • multiple=true: 用於設置批量消息確認
  • 無法保證消息何時被確認;
  • 確認模式:消息要么被confirmed(OK),要么被nack(fail),且only once;

Java示例:(發送端發送大量messages,使用確認模式)
程序-確認模式

否定確認

異常情況時,服務端無法處理消息,則broker發送basic.nack來進行否定確認

應答延時和持久化消息

  • 僅當消息被持久化到disk之后,才會發送basic.ack應答;
  • 吞吐量提高建議:異步處理應答批量發送消息;

應答順序

當使用異步發送和持久化消息時,broker對消息的確認順序可能和發送者的消息發送順序不一致;

發送確認 + 保證交付

  • 消息持久化: 並不能保證消息不丟失(在寫入disk前broker就掛掉);

限制

Delivery tag is a 64 bit long value, and thus its maximum value is 9223372036854775807.Since delivery tags are scoped per channel, it is very unlikely that a publisher or consumer will run over this value in practice.

參考

Consumer Acknowledgements and Publisher Confirms


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM