RabbitMQ核心組件及應用場景


一、適用場景

1.解耦
2.最終一致性
3.廣播
4.錯峰與流控(秒殺業務用於流量削峰場景)

秒殺場景

二、核心組件,關鍵點(交換器、隊列、綁定)

AMPQ消息路由必要三部分:交換器、隊列、綁定。

Java核心組件:ConnectionFactory、Connection、Channel、Delivery、DeliverCallback、CancelCallback

隊列

1. 建立連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
2. 聲明隊列

如果在同一條信道上訂閱了另一個隊列,那就不能再聲明隊列,必須先取消訂閱。

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                             Map<String, Object> arguments) throws IOException;

queue:需要指定隊列名稱,如果不指定,MQ會隨機分配一個並在queue.declare命令中返回,

durable:隊列將在服務器重啟后存在。

exclusive:為true時,隊列變成私有的。

autoDelete: 為true時,當最后一個消費者取消訂閱時,隊列自動移除。

3. 消費者通過AMQP的basic.consume命令訂閱消息,將信道置為接收模式。

Java代碼Channel:

String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
4. 當有多個消費者存在時,隊列里的消息將以循環的方式發送給消費者。消費者接收到消息后必須進行確認,可通過basic.ack顯示確認:

Java代碼Channel:

void basicAck(long deliveryTag, boolean multiple) throws IOException;

上面的手動確認,第二個參數為true,批量確認;如果為false,會一次確認一條。當有耗時任務時,可以利用手動確認延遲確認消息,防止消息大量涌入應用導致過載。
也可以在訂閱隊列時就將basicConsume方法的autoAck參數設置為true,開啟自動確認。確認成功后rabbitmq會從隊列中刪除消息。

5. 如果在確認過程中和rabbitmq服務器斷鏈,那么這條消息就會發送給下一個消費者。可以使用basic.reject拒絕消息:

Java代碼Channel:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

第二個參數requeue設置為true,消息會重新排隊並發送給下一個消費者;為false則會丟棄該條消息。可以利用此性質丟棄錯誤格式的消息。

6. 發布消息
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

exchange:第一個參數是交換的名稱。空字符串表示默認或無名交換,消息通過routingKey路由到指定隊列。

交換器和綁定

1. 交換器一共有四種類型:fanout、direct 、topic、headers。

Java中Channel申明交換器:

Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;

BuiltinExchangeType對應有四種枚舉類型:

DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");
2. 隊列通過路由鍵(routing key)綁定到交換器。
channel.queueBind(String queue, String exchange, String routingKey)

RabbitMQ中消息傳遞模型的核心思想是生產者永遠不會將任何消息直接發送到隊列。實際上,生產者甚至不知道消息是否會被傳遞到哪個隊列。

不指定隊列名時,通過服務器隨機生成隊列名稱:

String queueName = channel.queueDeclare().getQueue();

不傳參數時,queueDeclare生成一個非持久的,獨占的自動刪除隊列

在linux服務器上可以通過命令查看所有交換器:

rabbitmqctl list_exchanges
3. fanout廣播方式

廣播方式會將消息投遞給所有附加在此交換器的隊列。

4. direct模式

direct類型在綁定時設定一個routing_key,消息的routing_key匹配時, 才會被交換器投遞到綁定的隊列中去.

5. topic模式

按規則投遞,通過通配符#和*組合

*(星號)可以替代一個單詞。

(hash)可以替換零個或多個單詞。

持久化

將隊列和交換器的durable屬性設置為true

申明隊列時:

channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                             Map<String, Object> arguments) throws IOException;

申明交換器時:

channel.exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException;

投遞消息時將投遞模式(delivery mode)設置為2,Java代碼中MessageProperties.PERSISTENT_TEXT_PLAIN來設置:

channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

在MessageProperties源碼中如下所示:

/** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
    new BasicProperties("text/plain",
                        null,
                        null,
                        2,
                        0, null, null, null,
                        null, null, null, null,
                        null, null);

實現事務功能(發送方確認)

使用事務會使rabbitmq的性能大大降低,為了避免這個問題,rabbitmq支持:發送方確認模式。通過這個模式來保證消息的投遞。當生產者P投遞消息后會等待消費者C發送確認,P收到確認后可以調用回調函數處理相關業務。在生產者P等待確認的同時也可以繼續發送下一條消息。

服務器管理

1、虛擬主機vhost

rabbitmq支持創建虛擬主機,默認的虛擬主機為“/”,默認用戶guest;當在rabbitmq集群中創建虛擬主機時,整個集群都會創建。

2、錯誤日志查看

rabbitmq的日志文件在/var/log/rabbitmq/下的rabbit@[localhost].log

3、rabbitmq配置文件

配置文件在rpm安裝/usr/share/doc/rabbitmq-server-3.6.5/rabbitmq.config.example,復制一份:

cp /usr/share/doc/rabbitmq-server-3.5.3/rabbitmq.config.example /etc/rabbitmq.config

參考官網配置:https://www.rabbitmq.com/configure.html#configuration-file

三、底層原理,主要實現

應用程序和rabbitmq服務器之間建立一條tcp連接,tcp連接打開后,應用程序就可以和rabbitmq創建多條AMQP信道,信道是建立在tcp連接上的虛擬連接。

四、同類技術產品比較

1.ActiveMQ

優點

單機吞吐量:萬級
topic數量都吞吐量的影響:
時效性:ms級
可用性:高,基於主從架構實現高可用性
消息可靠性:有較低的概率丟失數據
功能支持:MQ領域的功能極其完備
缺點:

官方社區現在對ActiveMQ 5.x維護越來越少,較少在大規模吞吐的場景中使用。

2.Kafka

號稱大數據的殺手鐧,談到大數據領域內的消息傳輸,則繞不開Kafka,這款為大數據而生的消息中間件,以其百萬級TPS的吞吐量名聲大噪,迅速成為大數據領域的寵兒,在數據采集、傳輸、存儲的過程中發揮着舉足輕重的作用。

Apache Kafka它最初由LinkedIn公司基於獨特的設計實現為一個分布式的提交日志系統( a distributed commit log),之后成為Apache項目的一部分。

目前已經被LinkedIn,Uber, Twitter, Netflix等大公司所采納。

優點

性能卓越,單機寫入TPS約在百萬條/秒,最大的優點,就是吞吐量高。
時效性:ms級
可用性:非常高,kafka是分布式的,一個數據多個副本,少數機器宕機,不會丟失數據,不會導致不可用
消費者采用Pull方式獲取消息, 消息有序, 通過控制能夠保證所有消息被消費且僅被消費一次;
有優秀的第三方Kafka Web管理界面Kafka-Manager;
在日志領域比較成熟,被多家公司和多個開源項目使用;
功能支持:功能較為簡單,主要支持簡單的MQ功能,在大數據領域的實時計算以及日志采集被大規模使用
缺點:

Kafka單機超過64個隊列/分區,Load會發生明顯的飆高現象,隊列越多,load越高,發送消息響應時間變長
使用短輪詢方式,實時性取決於輪詢間隔時間;
消費失敗不支持重試;
支持消息順序,但是一台代理宕機后,就會產生消息亂序;
社區更新較慢;

3.RabbitMQ

RabbitMQ 2007年發布,是一個在AMQP(高級消息隊列協議)基礎上完成的,可復用的企業消息系統,是當前最主流的消息中間件之一。

RabbitMQ優點:

由於erlang語言的特性,mq 性能較好,高並發;
吞吐量到萬級,MQ功能比較完備
健壯、穩定、易用、跨平台、支持多種語言、文檔齊全;
開源提供的管理界面非常棒,用起來很好用
社區活躍度高;
RabbitMQ缺點:

erlang開發,很難去看懂源碼,基本職能依賴於開源社區的快速維護和修復bug,不利於做二次開發和維護。
RabbitMQ確實吞吐量會低一些,這是因為他做的實現機制比較重。
需要學習比較復雜的接口和協議,學習和維護成本較高。

4.RocketMQ

RocketMQ出自 阿里公司的開源產品,用 Java 語言實現,在設計時參考了 Kafka,並做出了自己的一些改進。

RocketMQ在阿里集團被廣泛應用在訂單,交易,充值,流計算,消息推送,日志流式處理,binglog分發等場景。

RocketMQ優點:

單機吞吐量:十萬級
可用性:非常高,分布式架構
消息可靠性:經過參數優化配置,消息可以做到0丟失
功能支持:MQ功能較為完善,還是分布式的,擴展性好
支持10億級別的消息堆積,不會因為堆積導致性能下降
源碼是java,我們可以自己閱讀源碼,定制自己公司的MQ,可以掌控
RocketMQ缺點:

支持的客戶端語言不多,目前是java及c++,其中c++不成熟;
社區活躍度一般
沒有在 mq 核心中去實現JMS等接口,有些系統要遷移需要修改大量代碼

github代碼

https://github.com/LighterTang/RabbitMQUtils/tree/master/rabbitmq_base/src/main/java/com/mq/mqutils/mqdoc

參考資料

https://www.rabbitmq.com

http://youzhixueyuan.com/comparison-of-kafka-rocketmq-rabbitmq.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM