前言
前面博文已經將安裝配置和站點管理介紹了,現在開始正式學習rabbitmq的使用了;
rabbitMQ的構架
rabbitmq作為消息隊列,一條消息從發布到訂閱消費的完整流程為:
消息 --> 交換機exchange ---> 隊列queue ---> 消費者
rabbitmq的核心就在交換機和隊列
使用流程
發布者(推送消息的一端):
-
創建一個tcp長連接connection,連接rabbitmq的監聽端口5672;
-
在TCP長連接下創建一個信道channel,信道可以理解為connection的一個分支;
-
通過信道向rabbitmq聲明一個交換機exchange,設置交換機的類型,名稱,是否持久化等屬性;
-
通過信道向rabbitmq聲明一個隊列queue,設置隊列的名稱,是否持久化等參數;
-
通過信道向rabbitmq聲明一個綁定binding,設置綁定的交換機名稱,隊列名稱,綁定的路由鍵;
-
通過信道向rabbitmq推送一條消息,指定交換機和路由;
消費者(接收消息的一端):
-
從第一步到第四步和發布者做的事情是一模一樣的,如果已經存在相應的exchange、queue等則跳過;
-
通過信道向rabbitmq聲明一個訂閱,訂閱特定的queue,並且設置回調函數及是否確認等;
-
通過信道監聽rabbitmq推送過來的消息;
深入理解
創建連接connection
-
rabbitmq實現的是AMQP通信協議,其所允許的最大AMQP連接數,本質上是TCP連接數;
-
采用TCP長連接,自帶SSL認證機制,類似https,保證數據的傳輸安全;
-
其TCP連接的上限可以通過調整操作系統的限制來變更,詳情linux修改TCP最大連接數
創建信道channel
-
信道channel可以理解成是connection下的分支,也就是說多個channel共享一個connection,為什么要這么做呢?
-
創建TCP連接是一個非常耗時的操作,如果一個應用需要多個connection的話,每次的創建和關閉會性能降低,而創建多個channel共享一個connection提高性能,節約系統資源;
-
多個channel是相互獨立隔離的,這導致了一個問題就是,由於主機只能識別不同的TCP連接,但不能識別不同的信道發送過來的消息,因此rabbitmq為每個創建的信道分配了一個信道ID用來識別不同信道發送的消息;
-
每個信道都是為獨立的進程或線程准備的,因此多線程或多進程共享信道是不被允許的;
-
在一個TCP下可創建的chnnel的數量理論上是沒有上限的,只取決於系統資源,但可以通過配置channel_max參數設置上限;
聲明交換機exchange
-
作用:交換機使用來管理分發消息的,一邊接受發布者提交的消息,根據消息提供的參數選擇相應的處理辦法,交換機是不會存儲消息的,只有轉發或丟棄功能;
-
如果消息攜帶的路由鍵沒有對應的路由隊列,交換機會將消息丟棄;
聲明交換機時的重要屬性:
name:名稱,必須唯一;
type:類型,共有四種,默認direct;
Durability:是否持久化,默認否;
Auto delete:是否自動刪除;
Internal:是否是內部交換機,默認否;
Arguments:設置額外的參數;
交換機的類型
直連交換機direct
-
顧名思義直連交換機根據消息攜帶的路由鍵將該消息投遞到綁定的隊列;
-
如果有多個隊列使用相同的路由鍵和直連交換機綁定,那么直連交換機會將消息同時轉發到多個隊列;
-
rabbitmq本身存在默認交換機,默認交換機的本質就是名稱為空的直連交換機,當任何一個新的隊列被聲明的時候,rabbitmq會使用這個隊列的名字作為路由鍵自動綁定默認交換機;
-
通過rabbitmqctl list_exchanges命令查看所有的交換機,可以看到有一個名稱為空的direct;
扇形交換機funout
-
作用:扇形交換機會將消息推送到綁定的所有的隊列中,不會理會路由鍵是什么;
-
在聲明funout和隊列綁定的時候,也有一個路由鍵參數,但是寫什么都無所謂,funout會將其忽略的;
主題交換機topic
- 作用:主題交換機使用比較復雜的路由鍵匹配規則實現一路或多路路由;
路由鍵規則:
-
使用.分割的詞語列表,也可以是單個詞語,長度不能超過255個字節;
-
*號用來匹配一個單詞,這個單詞不能為空,#號用來匹配0個或多個單詞;
user.* 用來匹配所有綁定了以user開頭並且使用了兩個單詞的路由鍵;
user.# 用來匹配所有綁定了以user開頭並且使用了一個以上單詞的所有路由鍵;
頭交換機headers
-
通過判斷消息中攜帶的額外的屬性來分發消息給對應的隊列,性質和直連交換機很相似;
-
其區別在於使用消息的屬性替換路由鍵作為路由的規則;
注意
-
每個交換機的名字是唯一的,如果重復聲明交換機並且聲明的參數完全一樣,那么mq會忽略該聲明;
-
如果聲明交換機已經存在,但修改了它的屬性,比如類型或持久化等,那么會報錯;所以修改的方法是先刪除原來的交換機再創建一個新的交換機;
隊列queue
-
queue用來緩存消息,並向消費者推送消息;
-
聲明隊列的名稱最多255個字節,可以是任意字符串;
-
如果聲明時名字為空,mq會隨機生成一個名字;
-
聲明隊列不可以用amq.開頭,否則報錯, amq.是rabbitmq內部使用的默認隊列;
-
如果多個消費者訂閱同一個隊列的消息,那么隊列會用輪詢的方式推送消息,這種方式可以實現負載均衡;
注意:
-
每個隊列的名字是唯一的,如果重復聲明交換機並且聲明的參數完全一樣,那么mq會忽略該聲明;
-
如果聲明的該隊列已經存在,但修改了它的屬性,比如類型或持久化等,那么會報錯;所以修改的方法是先刪除原來的隊列再創建一個新的隊列;
消息訂閱
-
消費者可以向隊列訂閱消息,每個消費者都會被分配一個標志符;
-
訂閱時可以設置消費回執,告訴mq什么時候可以將消息刪除了,默認是自動刪除的;
-
消費者也可以向mq發送 拒絕消息 ,這時消息會被放回隊列等待投遞到其他的消費者;
-
消費者訂閱的時候可以設置預取計數,當存在多個消費者共同訂閱一個隊列的時候,由於是輪詢機制,但每個消費者的消費能力可能是不一樣的,這可能造成有的消費者閑的要死,有的忙的要死,設置預取數量,在沒有收到回執確認前,該隊列推送的消息是有限的,可以提高整體的吞吐量;
消息,交換機,隊列的持久化
-
消息持久化是在消息投遞前定義的,設置了持久化后,消息會被保存在磁盤,當被消費后會從磁盤中刪除,但是也不能保證100%持久化成功,因為消息是先放到內存中的,如果此時主機崩潰,還是會有一部分來不及寫入磁盤;
-
交換機持久化是在聲明該交換機的時候設置的,當主機崩潰或重啟后,mq一重新上線會自動重新聲明一個和原來完全一樣的交換機;
-
隊列持久化是在隊列被聲明的時候設置的,mq重新上線會自動重新聲明一個和原來完全一樣的queue,但是隊列里的消息會全部丟失;