上一part《RabbitMQ上手記錄–part 5-節點集群高可用(多服務器)》講到了通過多個服務器來搭建RabbitMQ的節點集群,示例當中提到的服務器都是在同一個局域網中的(實際上是一個機器上的多個不同虛擬機而已),這種使用方式適用於在同一個數據中心的情況。互聯網里常常提到異地多活、多數據中心來實現更高級別的高可用。我的理解是當數據或者訪問量超過當個數據中心規模時,通過更多的數據中心來提供更多的訪問量支持,同時當某地數據中心出問題時,也不會讓數據因為都放在同一個數據中心而導致整個系統宕機。
RabbitMQ通過Shovel插件實現節點集群跨多數據中心的需求。下面來簡單了解一下Shovel的一些基本概念。
Shovel基本概念
Shovel是RabbitMQ的一個插件,這個插件的功能就是將源節點的消息發布到目標節點,這個過程中Shovel就是一個客戶端,它負責連接源節點,讀取某個隊列的消息,然后將消息寫入到目標節點的exchange中。根據這么一個概念,其實也可以自己開發一個簡單的程序,負責從一個節點讀取數據然后發送到目標節點。
使用Shovel的好處
1.Shovel能在不同數據中心之間傳遞消息,源節點和目標節點可以使用不同的用戶和vhosts,不同的RabbitMQ版本,並且不需要使用相同的cookie token(在上一part實現多服務器節點集群是,我們特意將每個主機的cookie token都設置成一樣)
2.客戶端的連接允許連接斷開的同時不丟失消息
3.支持多個版本的AMQP協議
具體工作方式
Shovel插件通過定義一個或多個shovel來實現消息的傳遞。
shovel實現了以下功能
1.連接源節點和目標節點
2.讀取(或者說是consume)隊列里的消息
3.發布消息到目標節點(通過將消息發布到目標節點的exchange,並通過routing_key的方式發布)
使用Shovel
梳理完理論之后,接下來將使用兩個不同地域雲主機來實踐一下(有點小成本,需要自行租用雲主機)。
1.准備雲主機
1.需要有兩台雲主機,我這里兩個雲主機分別來自vulrtr和阿里雲(來源不重要,只要是雲主機並且分布在不同的地區),練習用最低配的就夠了。
雲主機信息
名稱 | 角色 | 主機提供方 | 操作系統 | IP | 端口 | 備注 |
主機1 | 來源節點 | vultr | ubuntu1604 | 45.32.250.47 | 5672 | |
主機2 | 目標節點 | aliyun | ubuntu1604 | 47.106.179.208 | 5672 | 阿里雲需要在安全策略組中單獨開放5672端口 |
然后在各個主機安裝好RabbitMQ,並且確認5672端口號可被外部訪問到。
注意這里我們並沒有同步兩個機器的cookie token,是為了證明在使用shovel時不需要依賴於cookie token。
2.安裝Shovel
Shovel是RabbitMQ的一個插件,在已經安裝好RabbitMQ的基礎上,把相關的插件啟用即可。
我們只需要在主機1,也就是來源節點啟用shovel插件即可。
執行如下命令啟用Shovel插件
rabbitmq-plugins enable rabbitmq_shovel
看到如下輸出即表明啟用成功
The following plugins have been enabled:
amqp_client
rabbitmq_shovel
Applying plugin configuration to rabbit@vultr... started 2 plugins.
3.配置和運行Shovel
shovel分成兩種
靜態shovel:在配置文件中定了源節點和目標節點信息,修改配置后需要重啟
動態shovel:通過運行時參數指定,可在運行時創建或刪除
這里我使用靜態shovel,在配置里定義shovel配置。
首先要找到RabbitMQ使用的配置,默認情況下是沒有創建的,我們可以通過啟動日志查看目前是否有指定的配置文件。
通常log文件在/var/log/rabbitmq下的rabbit@{hostname}.log文件中,打開文件發現
config file(s) : /etc/rabbitmq/rabbitmq.config (not found)
說明配置文件沒有創建。
重新創建一個rabbitmq.config文件太麻煩了,我們從基於官方提供的example配置文件來修改會簡單一些。
打開目錄/usr/share/doc/rabbitmq-server/
cd /usr/share/doc/rabbitmq-server/
然后找到rabbitmq.config.example.gz,解壓后復制到/etc/rabbitmq目錄下
gzip -dk rabbitmq.config.example.gz
mv rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
這時候的rabbitmq.config文件里所有配置都是注釋的,這里我們現在只關注shovel部分的配置。
a.准備配置信息
在主機1和主機2上分別創建一個RabbitMQ用戶,用戶名是shovel_user,密碼是123456,並設置授權
sudo rabbitmqctl add_user shovel_user 123456
sudo rabbitmqctl set_user_tags shovel_user administrator
sudo rabbitmqctl set_permissions -p / shovel_user ".*" ".*" ".*"
b.配置shovel
在主機1上打開rabbitmq.config文件,修改shovel部分的配置為如下內容
{rabbitmq_shovel,
[{shovels,
[%% A named shovel worker.
{my_test_shovel,
[
% List the source broker(s) from which to consume.
{sources,
[%% URI(s) and pre-declarations for all source broker(s).
{brokers, ["amqp://shovel_user:123456@45.32.250.47:5672"]},
{declarations, [
{'exchange.declare',
[ {exchange, <<"shovel_exchange">>},
{type, <<"direct">>},
durable
]},
{'queue.declare',
[{queue, <<"shovel_outcome_queue">>},durable]},
{'queue.bind',
[ {exchange, <<"shovel_exchange">>},
{queue, <<"shovel_outcome_queue">>},
{routing_key, <<"shovel_key">>}
]}
]}
]},
% List the destination broker(s) to publish to.
{destinations,
[%% A singular version of the 'brokers' element.
{broker, "amqp://shovel_user:123456@47.106.179.208:5672"},
{declarations, [{'exchange.declare',
[ {exchange, <<"shovel_exchange">>},
{type, <<"direct">>},
durable
]},
{'queue.declare',
[{queue, <<"shovel_income_queue">>},durable]},
{'queue.bind',
[ {exchange, <<"shovel_exchange">>},
{queue, <<"shovel_income_queue">>},
{routing_key, <<"shovel_key">>}
]}]}
]},
% Name of the queue to shovel messages from.
{queue, <<"shovel_outcome_queue">>},
% Optional prefetch count.
{prefetch_count, 10},
% when to acknowledge messages:
% - no_ack: never (auto)
% - on_publish: after each message is republished
% - on_confirm: when the destination broker confirms receipt
{ack_mode, no_ack},
% Overwrite fields of the outbound basic.publish.
{publish_fields, [{exchange, <<"shovel_exchange">>},
{routing_key, <<"shovel_key">>}]},
% Static list of basic.properties to set on re-publication.
{publish_properties, [{delivery_mode, 2}]},
% The number of seconds to wait before attempting to
% reconnect in the event of a connection failure.
{reconnect_delay, 2.5}
]} %% End of my_first_shovel
]}
%% Rather than specifying some values per-shovel, you can specify
%% them for all shovels here.
%%
%% {defaults, [{prefetch_count, 0},
%% {ack_mode, on_confirm},
%% {publish_fields, []},
%% {publish_properties, [{delivery_mode, 2}]},
%% {reconnect_delay, 2.5}]}
]}
RabbitMQ的官網的shovel配置示例不可用,這里使用配置的是RabbitMQ在github提供的配置示例基礎上修改的(https://github.com/rabbitmq/rabbitmq-server/blob/master/docs/rabbitmq.config.example),然后結合官網文檔的說明自己摸索配置出來。
配置說明
簡單介紹一下上述配置中的關鍵部分
shovels之后接着可定義多個shovel,這里只定義了一個shovel,名稱是my_test_shovel。
sources:定義了消息的來源
brokers
需要給出來源服務的地址,通常格式為amqp://用戶名:密碼@主機名(IP):端口號/vhost名稱。
之前定義的shovel_user這時候就可以用上了,配置中我們使用的是默認的vhost,所以沒有設置vhost名稱
amqp://shovel_user:123456@45.32.250.47:5672
declarations:
declarations里面的內容就是執行一些amqp的命令,這些命令跟使用API調用的過程類似,
比如聲明隊列,Exchange和綁定信息等。
destinations:定義了消息的去向,里面的內容跟sources類似,實際上就是定義接收的exchange和隊列
queue:這里單獨配置了一個queue,是表示從哪個隊列讀取消息,這里跟sources里聲明的隊列一致。
其他一些可選的配置就不詳細介紹了,具體可以查看官網文檔http://www.rabbitmq.com/configure.html。
我們這里的配置表示從主機1的shovel_outcome_queue隊列獲取消息,然后轉發到主機2的shovel_income_queue隊列,兩邊使用的exchange名稱都是shovel_exchange,並且routing_key的值都是shovel_key。
完整的配置請參考
https://github.com/shenba2014/RabbitMQ/blob/master/shovel/rabbitmq.config。
c.驗證配置
配置完成之后,重啟主機1的RabbitMQ服務
service rabbitmq-server restart
然后在主機1查看shovel的狀態
rabbitmqctl eval 'rabbit_shovel_status:status().'
可以看到類似如下輸出信息,看到running字樣表明shovel服務已正常運行
[{my_test_shovel,static,
{running,[{src_uri,<<"amqp://45.32.250.47:5672">>},
{dest_uri,<<"amqp://47.106.179.208:5672">>}]},
{{2018,5,13},{12,4,48}}}]
輸出信息中列出了源節點和目標節點信息,最后一行是時間戳。
4.使用Shovel
接下來我們使用代碼來測試Shovel是否可用,我們的代碼跟連接普通的RabbitMQ服務類似,只是具體連接的服務器地址不同。
測試的代碼包含consumer和producer兩部分,consumer將連接主機2,producer將連接到主機1。
分別運行consumer和producer的代碼,producer在運行之后會發送消息到隊列shovel_outcome_queue,然后consumer會接收到消息。
具體consumer和producer的代碼不貼出來,完整代碼請參考
https://github.com/shenba2014/RabbitMQ/tree/master/shovel
我們來直接看看運行代碼的效果
運行消費者的代碼,參數依次是:主機IP,端口,RabbitMQ用戶名和密碼,這里的我們連接的是主機2(目標節點)。
運行
python shovel_consumer.py 47.106.179.208 5672 shovel_user 123456
輸出
Ready for orders!
然后新開一個控制台運行生產者的代碼,參數一樣,但是生產者連接的是主機1(源節點)。
運行
python shovel_producer.py 45.32.250.47 5672 shovel_user 123456
輸出
Sent order message.
然后在消費者的那個控制台應該會看到類似如下輸出
Received order 92 for test type.
數字是隨機生成的,所以最終會結果可能不同。
好了,到目前為止,整個演練就完成了,前面的准備工作較多,測試的代碼很簡單,主要是演示shovel的跨數據中心的消息傳遞功能。
結合實際的IP地址,exchange和queue,最后用一個圖來說明使用shovel的消息流向。