公眾號首發、歡迎關注
什么是AMQP 和 JMS?
AMQP:即Advanced Message Queuing Protocol,是一個應用層標准高級消息隊列協議,提供統一消息服務。是應用層協議的一個開放標准,為面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不同產品,不同的開發語言等條件的限制。Erlang中的實現有RabbitMQ等。
JMS:即Java消息服務(Java Message Service)應用程序接口,由sun公司提出,並且sun公司定義好了接口。包括create、send、recieve。只要想使用它,就得實現它定義的接口。 消息服務是一個與具體平台無關的API,絕大多數MOM提供商都對JMS提供支持。不好的地方是語言層面的限制,只能為JAVA,這其實稍微有點和微服務的觀點相違背。要求語言只能是JAVA,而不能是py等。
常見的MQ產品
ActiveMQ:基於JMS,Apache
RocketMQ:(Rocket,火箭)阿里巴巴的產品,基於JMS,目前由Apache基於會維護
Kafka:分布式消息系統,亮點:吞吐量超級高,沒秒中數十萬的並發。
RabbitMQ:(Rabbit,兔子)由erlang語言開發,基於AMQP協議,在erlang語言特性的加持下,RabbitMQ穩定性要比其他的MQ產品好一些,而且erlang語言本身是面向高並發的編程的語言,所以RabbitMQ速度也非常快。且它基於AMQP協議,對分布式、微服務更友好。
安裝RabbitMQ
安裝使用的rpm包我提前准備好了,如下:
操作系統:Centos7.3
推薦:這三個包我提前下載好了,關注白日夢的公眾號(文末有二維碼),后台回復:rbmq 可以直接領取。
如果你不怕麻煩也想自己參照文檔自行下載,可參考文末的鏈接。
科普一下:
比如你安裝軟件A,結果這個軟件可能依賴了軟件B,於是你直接安裝A就會接到報錯,說當前操作系統環境中缺少軟件B,讓你先安裝軟件B后,再嘗試安裝軟件A。
如果你看過Linux私房菜類似書,其實你應該也知道,rmp其實已經處理好各種依賴關系的軟件包,所以安裝起來相對來說是比較省心的。
# 安裝erlang
yum install esl-erlang_23.0-1_centos_7_amd64.rpm -y
yum install esl-erlang-compat-18.1-1.noarch.rpm -y
# 安裝rabbitmq
rpm -ivh rabbitmq-server-3.8.9-1.el7.noarch.rpm
比如遇到如下的安裝包錯,按提示解決就好了
下載依賴后重試即可完成安裝。
啟動RabbitMQ
通過如下命令可啟動:
service rabbitmq-server start
你可以像上面這樣,安裝之后立刻啟動。
這時rabbitmq使用的是默認的配置參數。但是一般都來說我們都希望rabbitmq能使用我們可修改的配置文件啟動,這樣也方便我們后續對mq的控制,下面就一起看一rabbitmq的認證、授權、訪問控制、配置文件。
你還可以像下面這樣開啟web插件。
開啟web管理模塊插件之后訪問:http://服務器的ip:15672/ 可以找到登陸入口。
設置rabbitmq開機啟動:
chkconfig rabbitmq-server on
什么是Authentication(認證)
RabbitMQ啟動之后,我們想使用它的前提是用username、password連接上它。這里所說的username和passowrd其實就是一個被授予一定權限的用戶。
用戶連接上RabbitMQ即可創建virtual host使用MQ。在說什么是virtual host之前,先說下RabbitMQ默認有的被授權的用戶:username=guest、password=guest、virtualhost=/。
但是這個用戶被限制了只能在RabbitMQ所在機器的本地才能登陸MQ(不允許你使用該用戶通過ip+port遠程登錄RabbitMQ),就像下面這樣:
你使用特定的用戶去連接MQ的過程即為Authentication
指定RabbitMQ的啟動配置文件
rabbitmq提供給我們一個配置文件模版,默認在:/usr/share/doc/rabbitmq-server-xxx/rabitmq.conf.example
如果你沒有找到的話也沒關系,去github上拷貝一份模版配置,手動創建 /etc/rabbitmq/rabbitmq.conf
配置文件,然后將你拷貝的配置放進去也是ok的。
rabbitmq github addr:https://github.com/rabbitmq/rabbitmq-server/blob/v3.8.9/docs/
涉及到的都是基礎的shell命令,不再贅述。
注意文件名為:rabbitmq.config,且要放在/etc/rabbitmq目錄下。
如何讓guest用戶遠程登陸RabbitMQ
可以像下面這樣修改你的MQ的配置文件:
然后通過service命令重啟MQ,在web頁面嘗試登陸,接着你會成功登陸:
官方:強烈不建議允許默認的用戶可遠程登陸MQ,用過RabbitMQ的程序員都知道默認用戶名和密碼是啥,這會讓你的系統的安全性大大降低!
推薦的做法是:刪除默認用戶、使用新的安全憑證創建新的用戶
管理用戶和權限
其實文章看到這里,什么是用戶?什么是權限?你肯定已經非常清楚了。
那什么是管理用戶和權限?很簡單,就比如:添加/刪除 User,這個User可能屬於某個業務線,有了User可以使用RabbitMQ這款中間件軟件。以及為User分配他能讀寫的virtual host。
下一小節我們會細說什么是 virtual host
本小節主要是通過實驗的方式展開,實戰Rabbit的用戶和權限管理!
主要有兩種方式:
**1、通過web控制台管理 **
2、通過cli命令行管理
因為我們剛才允許guest這個超級管理員可以遠程登陸MQ,於是你可以像下圖這樣在web頁面上管理用戶,比如我可以為業務線A,新添加一個用戶changwu01,並且給他administrator的權限,然后這個業務線通過該用戶使用MQ。
你也可以像下面這樣使用cli,通過命令行的方式添加用戶:
然后使用該用戶嘗試登陸,你會發現:報錯了,說白日夢01不是管理員。不能登陸控制台。
如果你實戰一下,現將bairimeng01的權限tags改成management,再嘗試登陸,它會提示你說:
所以,這時你可以直接使用guest用戶登陸,然后將bairimeng01的權限改成:administrator
然后修改bairiemeng01的權限,並點擊update user
修改之后重新使用bairimeng01登陸:
你會發現bairimeng01可以成功登陸!
查看當前RabbitMQ有哪些用戶:
通過命令行創建用戶airimeng03、並通過命令行讓白日夢03有對virtualhost=/有讀寫權
可以通過控制台確認一下,我們的配置確實生效了。
RabbitMQ中的概念
什么是virtual host
可以通過MySQL和MySQL中的數據庫來理解RabbitMQ和virtual host的關系。
MySQL大家都不陌生,經常會出現多個業務線混用一個MySQL數據庫的情況,就像下圖這樣,每個業務線都在MySQL中創建自己的數據庫,使用時各自往各自的數據庫中存儲數據,彼此相互不干涉。
RabbitMQ和virtual host的關系也差不多,可以讓多個業務線同時使用一個RabbitMQ,只要為業務線各個業務線綁定上不同的virtual host即可:
創建virtual host 並指定用戶可以使用它
Step1:
Step2:
Step3:
Step4: 校驗
RabbitMQ的五種消息模型
RabbitMQ支持以下五種消息模型,第六種RPC本質上是服務調用,所以不算做服務通信消息模型。
Hello World
P(producer/ publisher):生產者,發送消息的服務
C(consumer):消費者,接收消息的服務
紅色區域就是MQ中的Queue,可以把它理解成一個郵箱
- 首先信件來了不強求必須馬上馬去拿
- 其次,它是有最大容量的(受主機和磁盤的限制,是一個緩存區)
- 允許多個消費者監聽同一個隊列,爭搶消息
Worker模型
Worker模型中也只有一個工作隊列。但它是一種競爭消費模式。可以看到同一個隊列我們綁定上了多個消費者,消費者爭搶着消費消息,這可以有效的避免消息堆積。
比如對於短信微服務集群來說就可以使用這種消息模型,來了請求大家搶着消費掉。
如何實現這種架構:對於上面的HelloWorld這其實就是相同的服務我們啟動了多次罷了,自然就是這種架構。
訂閱模型
訂閱模型借助一個新的概念:Exchange(交換機)實現,不同的訂閱模型本質上是根據交換機(Exchange)的類型划分的。
訂閱模型有三種
- Fanout(廣播模型): 將消息發送給綁定給交換機的所有隊列(因為他們使用的是同一個RoutingKey)。
- Direct(定向): 把消息發送給擁有指定Routing Key (路由鍵)的隊列。
- Topic(通配符): 把消息傳遞給擁有 符合Routing Patten(路由模式)的隊列。
訂閱之Fanout模型
這個模型的特點就是它在發送消息的時候,並沒有指明Rounting Key , 或者說他指定了Routing Key,但是所有的消費者都知道,大家都能接收到消息,就像聽廣播。
訂閱之Direct模型
P:生產者,向Exchange發送消息,發送消息時,會指定一個routing key。
X:Exchange(交換機),接收生產者的消息,然后把消息遞交給 與routing key完全匹配的隊列
C1:消費者,其所在隊列指定了需要routing key 為 error 的消息
C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息
擁有不同的RoutingKey的消費者,會收到來自交換機的不同信息,而不是大家都使用同一個Routing Key 和廣播模型區分開來。
訂閱之Topic模型
類似於Direct模型。區別是Topic的Routing Key支持通配符。
### JAVA客戶端
后台回復:rbmq 即可獲取如下資料:
本文中涉及到的:Golang Case、Java Case以及erlang虛擬機rpm包、rabbitmq-server的rpm包等軟件,直接通過yum安裝即可。
Hello World
在本小節中你可以重點看一下當你通過代碼建立連接、創建channel、發送消息、接受消息的同時,在web view中,都有何變化。
Send.java:
查看新創建的連接:
查看新創建的通道:
查看RabbitMQ中消息的傳送狀態:
Recv.java:
執行如下的消息接受者,可以收到發送過來的消息。
再去web view中觀察RabbitMQ中消息的消費狀態:
查看系統中連接的狀態,由於我沒有顯示的關閉連接和channl,所以你能看到系統中有兩個連接:
channel也還存在:
Worker模型
本質上是相同的服務我們啟動了多次罷了,自然就是這種架構。
補充點1:可以給隊列添加一條屬性,不再是隊列把任務平均分配開給消費者。而是讓消費者消費完了后,問隊列要新的任務,這樣能者多勞。
// 設置每個消費者同時只能處理一條消息
channel.basicQos(1);
補充點2:接受者接受消息時,可以像下圖這樣配置手動ACK
訂閱模型
訂閱模型借助一個新的概念:Exchange(交換機)實現,不同的訂閱模型本質上是根據交換機(Exchange)的類型划分的。
訂閱模型有三種
- Fanout(廣播模型): 將消息發送給綁定給交換機的所有隊列(因為他們使用的是同一個RoutingKey)。
- Direct(定向): 把消息發送給擁有指定Routing Key (路由鍵)的隊列。
- Topic(通配符): 把消息傳遞給擁有 符合Routing Patten(路由模式)的隊列。
訂閱之Fanout模型
這個模型的特點就是它在發送消息的時候,並沒有指明Rounting Key ,或者說他指定了Routing Key,但是所有的消費者都知道,大家都能接收到消息,就像聽廣播。
發送者:
去web view中查看狀態:
運行接受者消費消息
訂閱之Direct模型
和Fanout模型相似,發送方發送時:指定了routingkey如下
接收方接受時,也指定了routingkey如下:
訂閱之Topic模型
topic模型和direc模型相似。
區別:交換機的類型:topic、routingkey:支持正則表達式
發送者:
接收者:
消息確認機制
ACK機制
所謂的ACK確認機制:
自動ACK:消費者接收到消息后自動發送ACK給RabbitMQ。
手動ACK:我們手動控制消費者接收到並成功消息后發送ACK給RabbitMQ。
你可以看上圖:如果使用自動ACK,當消息者將消息從channel中取出后,RabbitMQ隨即將消息給刪除。接着不幸的是,消費者沒來得及處理消息就掛了。那也就意味着消息其實丟失了。
你可能會說:會不會存在重復消費的情況呢?這其實就不是MQ的問題了。你完全可以在你代碼的邏輯層面上進行諸如去重、插入前先檢查是否已存在等邏輯規避重復消費問題。
具體的實現方式可以參考上面的:JAVA客戶端/Worker模型
持久化交換機
持久化隊列
持久化消息
SpringAMQP
SpringAMQP幫我們實現了--生產者確認機制,對於不可路由的消息交換機會告訴生產者,使其重新發送
環境搭建
配置文件:生產者
生產者使用AmqpTemplate模板發送消息
消費端不需要AmqpTemplate模板發送消息,因此不配置
virtual-host,和當前用戶綁定的虛擬主機名, 這就Oralce里面,不同限權的用戶可以看到的界面,擁有的能力是不用的,在RabbitMQ中,用戶只能看到和它相關的虛擬主機下面的信息。
Golang客戶端
關注白日夢,后台回復:rbmq 即可獲取如下資料:
本文中涉及到的:Golang Case、Java Case以及erlang虛擬機rpm包、rabbitmq-server的rpm包等軟件,直接通過yum安裝即可。
文末有二維碼
下載依賴包:
go get github.com/streadway/amqp
Hello World
發送端:
Step1: 獲取連接: Dial最后面的//test
比較迷惑,其實/test
是我的virtualhost,如果只寫成/host會把錯說:"no access to this vhost"
Step2: 創建channel
Step3: 聲明queue,后續往這個隊列中發送消息
Step5: 發送消息
接受端:
消費者同樣需要建立連接和channel、然后聲明我們想消費的channel,和上面的生產者代碼相同,就不粘出來了。
消費者從channel中接受消息:
處理消息:
Worker 模型
同樣的Worker模型和Simple模型也是相似的。無外乎是simple模型的消費者啟動了多個實例。
消息分發策略:默認情況下RabbitMQ后將P生產的消息以round-robin的策略分發給C1、C2。
你也可以像下圖這樣設置一個相對公平的分發策略: 當消費者把消息處理完后MQ才會給他新的消息,這樣可以實現能者多勞。
消息確認機制:
什么是ACK機制,你可以往下翻看 Golang客戶端/消息確認機制/ACK機制部分的描述。
如果手動ACK如下:
當我們像上面這樣設置手動ACK之后,可以確保如果消費者沒處理完消息就掛了,MQ中的消息不會丟失。
但是如果這時MQ掛了,消息同樣會丟失。
為了避免這種情況,可以將設置將MQ中的消息也持久化
訂閱模型
訂閱模型借助一個新的概念:Exchange(交換機)實現,不同的訂閱模型本質上是根據交換機(Exchange)的類型划分的。
訂閱模型有三種
- Fanout(廣播模型): 將消息發送給綁定給交換機的所有隊列(因為他們使用的是同一個RoutingKey)。
- Direct(定向): 把消息發送給擁有指定Routing Key (路由鍵)的隊列。
- Topic(通配符): 把消息傳遞給擁有 符合Routing Patten(路由模式)的隊列。
訂閱模型之Fanout模型
這個模型的特點就是它在發送消息的時候,並沒有指明Rounting Key , 或者說他指定了Routing Key,但是所有的消費者都知道,大家都能接收到消息,就像聽廣播。
生產者:在獲取channel之后緊接着創建一個交換機,交換機的類型為 fanout 扇出。
注意,fanout對應的routingkey(路由key為空)
消費者:需要消費者獲取到channel后也要聲明交換機。消費者的queue無名稱,queue沒有routingkey。注意交換機的名字別寫錯。
訂閱模型之Direct模型
生產者:和Fanout類似,注意交換機的名稱為direct 以及添加 特定的routingkey
消費者:
訂閱模型之Topic模型
和Direct模型相似,不同點:type為topic、並別routingkey支持正則表達式。
詳細代碼不再重復貼了。可以自行領取源碼學習。
消息確認機制
ACK機制
所謂的ACK確認機制:
自動ACK:消費者接收到消息后自動發送ACK給RabbitMQ。
手動ACK:我們手動控制消費者接收到並成功消息后發送ACK給RabbitMQ。
你可以看上圖:如果使用自動ACK,當消息者將消息從channel中取出后,RabbitMQ隨即將消息給刪除。接着不幸的是,消費者沒來得及處理消息就掛了。那也就意味着消息其實丟失了。
你可能會說:會不會存在重復消費的情況呢?這其實就不是MQ的問題了。你完全可以在你代碼的邏輯層面上進行諸如去重、插入前先檢查是否已存在等邏輯規避重復消費問題。
具體的實現方式可以參考上面的Golang或JAVA客戶端的Worker模型部分。
持久化交換機
持久化隊列
持久化消息
資料獲取
本文中涉及到的:Golang Case、Java Case以及erlang虛擬機rpm包、rabbitmq-server的rpm包等軟件,直接通過yum安裝即可。
關注后台回復:rbmq 即可獲取如下資料:
參考:
get start:https://www.rabbitmq.com/getstarted.html
download rabbitmq:https://www.rabbitmq.com/download.html
rabbitmq和erlang版本對應關系:https://www.rabbitmq.com/which-erlang.html
download erlang:https://www.erlang-solutions.com/resources/download.html
rabbitmq推薦的erlang:https://www.rabbitmq.com/releases/erlang/
了解更多rabbitmq3.8.X配置:https://www.cnblogs.com/masy-lucifer/p/13551067.html
rabbitmq3.8.9 GitHub:https://github.com/rabbitmq/rabbitmq-server/tree/v3.8.9/docs
認證、授權、訪問控制:https://www.rabbitmq.com/access-control.html