AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。 RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端。本文主要介紹RabbitMQ的概念、安裝及簡單使用,文中使用到的軟件版本:erlang 23.1、RabbitMQ 3.8.9、Centos 7.
1、簡介
1.1、RabbitMQ組件
Broker:標識消息隊列服務器實體。
Virtual Host:虛擬主機,標識一批交換機、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個vhost本質上就是一個mini版的RabbitMQ服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost是AMQP概念的基礎,必須在鏈接時指定,RabbitMQ默認的vhost是"/"。
Exchange:交換機,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
Queue:消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
Binding:綁定,用於消息隊列和交換機之間的關聯。一個綁定就是基於綁定鍵(bing-Key)將交換機和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
Channel:信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內的虛擬連接,AMQP命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對於操作系統來說,建立和銷毀TCP都是非常昂貴的開銷,所以引入了信道的概念,以復用一條TCP連接。
Connection:網絡連接,比如一個TCP連接。
Publisher:消息的生產者,表示一個向交換器發布消息的客戶端應用程序。
Consumer:消息的消費者,表示一個從一個消息隊列中取得消息的客戶端應用程序。
Message:消息,消息包括消息內容及一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(優先級)、headers(消息頭)等。
1.2、RabbitMQ中的Exchange類型
1.2.1、fanout
fanout類型的exchange會忽略路由鍵的設置,直接將Message廣播到所有綁定的Queue中。
1.2.2、direct
交換機通過消息上的路由鍵匹配具有相同值的綁定鍵來路由消息到對應的隊列上。
1.2.3、topic
與direct基本相同,唯一區別在於綁定鍵;topic的綁定鍵可以設置表達式,用來模糊匹配;表達式里的通配符可以是"*"或"#"。
* 表示一個單詞。例如,綁定鍵是 *.orange.*,則路由鍵 lazy.orange.elephant、quick.orange.fox 匹配該綁定鍵。
# 表示0或多個單詞。例如,綁定鍵是 lazy.#,則路由鍵 lazy.brown.fox、lazy.pink.rabbit 匹配該綁定鍵。
1.2.4、headers
headers與fanout、direct、topic不同,它時通過匹配AMQP消息的header而非路由鍵。headers與direct類似,性能方面比后者查很多,所以在實際項目中用的很少。
1.3、RabbitMQ集群
1.3.1、集群原理
上面圖中采用三個節點組成了一個RabbitMQ的集群,Exchange A的元數據信息在所有節點上是一致的,而Queue只會存在於它所創建的那個節點上,其他節點只知道這個queue的metadata信息和一個指向該queue的owner node的指針。生產者或消費者通過tcp代理(HAProxy或nginx)來訪問RabbitMQ。
1.3.2、Mirrored Queues(鏡像隊列)
默認集群中的Queue只存在於它所創建的那個節點上,如果該節點掛了將會造成數據的丟失;使用鏡像隊列將會在所有其他節點上創建同樣的隊列,發送數據時所有的隊列都會有消息。可以通過設置策略來設置鏡像隊列。設置策略時有兩個關鍵參數:ha-mode和ha-params
ha-mode | ha-params | Result |
---|---|---|
all | 在集群中的每個節點都有鏡像。當一個節點添加到集群中時,這個節點同樣會有相應的鏡像 | |
exactly | count | 指定在集群中鏡像的個數。如果集群中節點的個數小於count的值,那么所有的節點都會配置鏡像。 如果其中一個鏡像掛掉,那么會在另一個節點生成新的鏡像。 |
nodes | node names | 在指定的節點列表中配置鏡像。如果這些指定的節點都處於不可用狀態(宕機或者關閉服務等), 那么客戶端程序會在自己所連接的那么節點上創建queue。 |
設置策略例子:
a、ha.開頭的隊列在所有節點上設置為鏡像隊列
rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'
b、two.開頭的隊列設置鏡像數為2,數據自動同步(默認為手動)
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
c、nodes.開頭的隊列鏡像設置在節點rabbit@pxc1,rabbit@pxc2上
rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@pxc1", "rabbit@pxc2"]}'
在控制台可以看到鏡像隊列的詳細信息:
1.3.3、Quorum Queues(仲裁隊列)
仲裁隊列是鏡像隊列(又稱為HA隊列)的替代方案,該隊列把數據安全作為首要目標,在3.8.0版本可以使用。聲明仲裁隊列和聲明普通隊列方法一樣,只需要把x-queue-type設置為quorum即可。
適用仲裁隊列的情況:如銷售系統中訂單信息及選舉系統的投票,消息的丟失將對系統的正確性和功能產生重大影響。
不適用仲裁隊列的情況:臨時性質的隊列、低延遲隊列、數據安全性不那么高的情況、隊列積壓很長。
仲裁隊列有一個leader、多個成員;管理成員:
rabbitmq-queues add_member [-p <vhost>] <queue-name> <node> #增加成員
rabbitmq-queues delete_member [-p <vhost>] <queue-name> <node> #刪除成員
如:
./rabbitmq-queues add_member quorum.1 rabbit@pxc2 ./rabbitmq-queues delete_member quorum.1 rabbit@pxc2
可以在控制台查看仲裁隊列的詳細信息:
2、安裝
2.1、安裝erlang
下載erlang安裝包http://www.erlang.org/downloads,erlang和rabbitmq的版本的對應關系參見:https://www.rabbitmq.com/which-erlang.html
2.1.1、編譯安裝
shell> tar zxvf otp_src_23.1.tar.gz shell> cd otp_src_23.1 shell> ./configure --prefix=/home/hadoop/app/otp_23.1 shell> make && make install
2.1.2、設置環境變量
PATH=$PATH: /home/hadoop/soft/otp_23.1/bin export PATH
2.1.3、安裝常見錯誤處理
a、configure: error: No curses library functions found
需安裝ncurses-devel:
yum install ncurses-devel
a、wx-config: command not found
需安裝wxWidgets-devel:
yum install epel-release
yum install wxWidgets-devel cd /usr/bin ln -s wx-config-3.0 wx-config
2.2、單機版安裝RabbitMQ
2.2.1、下載並解壓通用包
下載通用包https://www.rabbitmq.com/install-generic-unix.html並解壓:
shell> xz -d rabbitmq-server-generic-unix-3.7.17.tar.xz shell> tar -xvf rabbitmq-server-generic-unix-3.7.17.tar
2.2.2、配置
新建配置文件:${RABBITMQ_HOME}etc/rabbitmq/rabbitmq.conf
loopback_users = none #讓guest用戶可以從其他機器訪問
2.2.3、管理
a、啟動服務
sbin/rabbitmq-server –detached #后台運行
b、停止服務
sbin/rabbitmqctl stop
c、查看狀態
sbin/rabbitmqctl status
d、查看交換機
sbin/rabbitmqctl list_exchanges
e、查看隊列
sbin/rabbitmqctl list_queues
f、查看綁定
sbin/rabbitmqctl list_bindings
f、用戶
sbin/rabbitmqctl add_user <username> <password> #增加用戶 sbin/rabbitmqctl set_user_tags <username> <tag> ... #設置角色 sbin/rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read> #設置權限
如:
sbin/rabbitmqctl add_user admin admin sbin/rabbitmqctl set_user_tags admin administrator sbin/rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
2.2.4、控制台
開啟管理插件:
sbin/rabbitmq-plugins enable rabbitmq_management
訪問控制台:
http://10.49.196.10:15672
控制台可以直觀的查看RabbitMQ的運行情況,並可進行創建隊列、創建交換機、設置綁定、增加用戶等管理工作,很方便。
2.3、集群安裝RabbitMQ
2.3.1、先按單機版安裝多台RabbitMQ
假設在10.49.196.10、10.49.196.11、10.49.196.12上都安裝了單機版的RabbitMQ。
2.3.2、設置hosts文件
使三台機器之間可以通過主機名相互訪問
10.49.196.10 pxc1 10.49.196.11 pxc2 10.49.196.12 pxc3
2.3.3、修改.erlang.cookie
修改/home/$user.erlang.cookie,使三台機器上.erlang.cookie文件里的值一樣
2.3.4、以一個節點為主節點,添加其他的節點
假設以10.49.196.10為主機節點,在10.49.196.11上執行:
./rabbitmqctl stop_app ./rabbitmqctl reset ./rabbitmqctl join_cluster rabbit@pxc1 ./rabbitmqctl start_app
添加之后可以在控制台看到節點信息:
10.49.196.12節點類似添加。
2.3.5、安裝HAProxy或nginx
使用HAProxy或nginx來做代理,這里就不詳細介紹這兩個工具的安裝了。程序訪問代理地址,達到高可用的目的。
2.3.6、重置節點
重置節點就會刪除該節點所有的資源和數據
./rabbitmqctl stop_app ./rabbitmqctl reset ./rabbitmqctl start_app
2.3.6、移除集群中一個節點
假設移除10.49.196.11節點
2.3.6.1、正常移除
在10.49.196.11上重置節點:
./rabbitmqctl stop_app ./rabbitmqctl reset ./rabbitmqctl start_app
2.3.6.1、遠程移除(適用於節點沒有響應的情況)
a、主節點(10.49.196.11)上執行:
./rabbitmqctl stop_app
b、10.49.196.10上執行:
./rabbitmqctl forget_cluster_node rabbit@pxc2
c、重置10.49.196.11節點:
rabbitmqctl reset
rabbitmqctl start_app
2.4、 鏡像隊列
在默認集群的基礎上,可以設置鏡像隊列來保證數據的可靠性。如設置以ha.開頭的隊列在所有節點上為鏡像隊列
rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'
2.5、 仲裁隊列
如果還需進一步保證數據的安全性,可以使用仲裁隊列。
編碼方式聲明仲裁隊列:
Map<String, Object> arguments = new HashMap<>(); //隊列類型設為quorum,默認為classic arguments.put("x-queue-type", "quorum"); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
也可以在控制台創建仲裁隊列: