RabbitMQ上手記錄–part 6-Shovel


上一part《RabbitMQ上手記錄–part 5-節點集群高可用(多服務器)》講到了通過多個服務器來搭建RabbitMQ的節點集群,示例當中提到的服務器都是在同一個局域網中的(實際上是一個機器上的多個不同虛擬機而已),這種使用方式適用於在同一個數據中心的情況。互聯網里常常提到異地多活、多數據中心來實現更高級別的高可用。我的理解是當數據或者訪問量超過當個數據中心規模時,通過更多的數據中心來提供更多的訪問量支持,同時當某地數據中心出問題時,也不會讓數據因為都放在同一個數據中心而導致整個系統宕機

 

RabbitMQ通過Shovel插件實現節點集群跨多數據中心的需求。下面來簡單了解一下Shovel的一些基本概念。

 

Shovel基本概念

Shovel是RabbitMQ的一個插件,這個插件的功能就是將源節點的消息發布到目標節點,這個過程中Shovel就是一個客戶端,它負責連接源節點,讀取某個隊列的消息,然后將消息寫入到目標節點的exchange中。根據這么一個概念,其實也可以自己開發一個簡單的程序,負責從一個節點讀取數據然后發送到目標節點。

 

使用Shovel的好處

1.Shovel能在不同數據中心之間傳遞消息,源節點和目標節點可以使用不同的用戶和vhosts,不同的RabbitMQ版本,並且不需要使用相同的cookie token(在上一part實現多服務器節點集群是,我們特意將每個主機的cookie token都設置成一樣)

2.客戶端的連接允許連接斷開的同時不丟失消息

3.支持多個版本的AMQP協議

 

具體工作方式

Shovel插件通過定義一個或多個shovel來實現消息的傳遞。

shovel實現了以下功能

1.連接源節點和目標節點

2.讀取(或者說是consume)隊列里的消息

3.發布消息到目標節點(通過將消息發布到目標節點的exchange,並通過routing_key的方式發布)

 

image(Shovel的工作過程簡要描述)

 

使用Shovel

梳理完理論之后,接下來將使用兩個不同地域雲主機來實踐一下(有點小成本,需要自行租用雲主機)。

 

1.准備雲主機

1.需要有兩台雲主機,我這里兩個雲主機分別來自vulrtr和阿里雲(來源不重要,只要是雲主機並且分布在不同的地區),練習用最低配的就夠了。

雲主機信息

名稱 角色 主機提供方 操作系統 IP 端口 備注
主機1 來源節點 vultr ubuntu1604 45.32.250.47 5672  
主機2 目標節點 aliyun ubuntu1604 47.106.179.208 5672 阿里雲需要在安全策略組中單獨開放5672端口

 

然后在各個主機安裝好RabbitMQ,並且確認5672端口號可被外部訪問到。

注意這里我們並沒有同步兩個機器的cookie token,是為了證明在使用shovel時不需要依賴於cookie token。

 

2.安裝Shovel

Shovel是RabbitMQ的一個插件,在已經安裝好RabbitMQ的基礎上,把相關的插件啟用即可。

我們只需要在主機1,也就是來源節點啟用shovel插件即可。

 

執行如下命令啟用Shovel插件

rabbitmq-plugins enable rabbitmq_shovel

看到如下輸出即表明啟用成功

The following plugins have been enabled:
   amqp_client
   rabbitmq_shovel

Applying plugin configuration to rabbit@vultr... started 2 plugins.

 

3.配置和運行Shovel

shovel分成兩種

靜態shovel:在配置文件中定了源節點和目標節點信息,修改配置后需要重啟

動態shovel:通過運行時參數指定,可在運行時創建或刪除

這里我使用靜態shovel,在配置里定義shovel配置。

 

首先要找到RabbitMQ使用的配置,默認情況下是沒有創建的,我們可以通過啟動日志查看目前是否有指定的配置文件。

 

通常log文件在/var/log/rabbitmq下的rabbit@{hostname}.log文件中,打開文件發現

config file(s) : /etc/rabbitmq/rabbitmq.config (not found)

說明配置文件沒有創建。

 

重新創建一個rabbitmq.config文件太麻煩了,我們從基於官方提供的example配置文件來修改會簡單一些。

 

打開目錄/usr/share/doc/rabbitmq-server/

cd /usr/share/doc/rabbitmq-server/

然后找到rabbitmq.config.example.gz,解壓后復制到/etc/rabbitmq目錄下

gzip -dk rabbitmq.config.example.gz

mv rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

 

這時候的rabbitmq.config文件里所有配置都是注釋的,這里我們現在只關注shovel部分的配置。

 

a.准備配置信息

在主機1和主機2上分別創建一個RabbitMQ用戶,用戶名是shovel_user,密碼是123456,並設置授權

sudo rabbitmqctl add_user shovel_user 123456

sudo rabbitmqctl set_user_tags shovel_user administrator

sudo rabbitmqctl set_permissions -p / shovel_user ".*" ".*" ".*"

 

b.配置shovel

在主機1上打開rabbitmq.config文件,修改shovel部分的配置為如下內容

{rabbitmq_shovel,
   [{shovels,
     [%% A named shovel worker.
      {my_test_shovel,
       [

    %  List the source broker(s) from which to consume.
     
        {sources,
         [%% URI(s) and pre-declarations for all source broker(s).
         {brokers, ["amqp://shovel_user:123456@45.32.250.47:5672"]},
          {declarations, [
              {'exchange.declare',
                 [ {exchange, <<"shovel_exchange">>},
                 {type, <<"direct">>},
                 durable
                 ]},
             {'queue.declare',
                 [{queue,    <<"shovel_outcome_queue">>},durable]},
             {'queue.bind',
                 [ {exchange, <<"shovel_exchange">>},
                 {queue,    <<"shovel_outcome_queue">>},
                 {routing_key, <<"shovel_key">>}
                 ]}
          ]}

         ]},

    %  List the destination broker(s) to publish to.
        {destinations,
         [%% A singular version of the 'brokers' element.
         {broker, "amqp://shovel_user:123456@47.106.179.208:5672"},
          {declarations, [{'exchange.declare',
                             [ {exchange, <<"shovel_exchange">>},
                             {type, <<"direct">>},
                             durable
                             ]},
                         {'queue.declare',
                             [{queue,    <<"shovel_income_queue">>},durable]},
                         {'queue.bind',
                             [ {exchange, <<"shovel_exchange">>},
                             {queue,    <<"shovel_income_queue">>},
                             {routing_key, <<"shovel_key">>}
                             ]}]}

         ]},

    %  Name of the queue to shovel messages from.
     
      {queue, <<"shovel_outcome_queue">>},

    %  Optional prefetch count.
     
      {prefetch_count, 10},

    %  when to acknowledge messages:
     %  - no_ack: never (auto)
     %  - on_publish: after each message is republished
     %  - on_confirm: when the destination broker confirms receipt
     
      {ack_mode, no_ack},

    %  Overwrite fields of the outbound basic.publish.
     
      {publish_fields, [{exchange,    <<"shovel_exchange">>},
                        {routing_key, <<"shovel_key">>}]},

    %  Static list of basic.properties to set on re-publication.
     
      {publish_properties, [{delivery_mode, 2}]},

    %  The number of seconds to wait before attempting to
     %  reconnect in the event of a connection failure.
     
      {reconnect_delay, 2.5}

     ]} %% End of my_first_shovel
     ]}
    %% Rather than specifying some values per-shovel, you can specify
    %% them for all shovels here.
    %%
    %% {defaults, [{prefetch_count,     0},
    %%             {ack_mode,           on_confirm},
    %%             {publish_fields,     []},
    %%             {publish_properties, [{delivery_mode, 2}]},
    %%             {reconnect_delay,    2.5}]}
   ]}

 

RabbitMQ的官網的shovel配置示例不可用,這里使用配置的是RabbitMQ在github提供的配置示例基礎上修改的(https://github.com/rabbitmq/rabbitmq-server/blob/master/docs/rabbitmq.config.example),然后結合官網文檔的說明自己摸索配置出來。

 

配置說明

簡單介紹一下上述配置中的關鍵部分

shovels之后接着可定義多個shovel,這里只定義了一個shovel,名稱是my_test_shovel。

sources:定義了消息的來源

            brokers

             需要給出來源服務的地址,通常格式為amqp://用戶名:密碼@主機名(IP):端口號/vhost名稱。

             之前定義的shovel_user這時候就可以用上了,配置中我們使用的是默認的vhost,所以沒有設置vhost名稱

            amqp://shovel_user:123456@45.32.250.47:5672

 

            declarations:

            declarations里面的內容就是執行一些amqp的命令,這些命令跟使用API調用的過程類似,

            比如聲明隊列,Exchange和綁定信息等。

 

destinations:定義了消息的去向,里面的內容跟sources類似,實際上就是定義接收的exchange和隊列

 

queue:這里單獨配置了一個queue,是表示從哪個隊列讀取消息,這里跟sources里聲明的隊列一致。

其他一些可選的配置就不詳細介紹了,具體可以查看官網文檔http://www.rabbitmq.com/configure.html

 

我們這里的配置表示從主機1的shovel_outcome_queue隊列獲取消息,然后轉發到主機2的shovel_income_queue隊列,兩邊使用的exchange名稱都是shovel_exchange,並且routing_key的值都是shovel_key。

完整的配置請參考

https://github.com/shenba2014/RabbitMQ/blob/master/shovel/rabbitmq.config

 

c.驗證配置

配置完成之后,重啟主機1的RabbitMQ服務

service rabbitmq-server restart

 

然后在主機1查看shovel的狀態

rabbitmqctl eval 'rabbit_shovel_status:status().'
可以看到類似如下輸出信息,看到running字樣表明shovel服務已正常運行

[{my_test_shovel,static,
                  {running,[{src_uri,<<"amqp://45.32.250.47:5672">>},
                            {dest_uri,<<"amqp://47.106.179.208:5672">>}]},
                  {{2018,5,13},{12,4,48}}}]


輸出信息中列出了源節點和目標節點信息,最后一行是時間戳。

 

4.使用Shovel

接下來我們使用代碼來測試Shovel是否可用,我們的代碼跟連接普通的RabbitMQ服務類似,只是具體連接的服務器地址不同。

測試的代碼包含consumer和producer兩部分,consumer將連接主機2,producer將連接到主機1。

分別運行consumer和producer的代碼,producer在運行之后會發送消息到隊列shovel_outcome_queue,然后consumer會接收到消息。

 

具體consumer和producer的代碼不貼出來,完整代碼請參考

https://github.com/shenba2014/RabbitMQ/tree/master/shovel

 

我們來直接看看運行代碼的效果

 

運行消費者的代碼,參數依次是:主機IP,端口,RabbitMQ用戶名和密碼,這里的我們連接的是主機2(目標節點)。

運行

python shovel_consumer.py 47.106.179.208 5672 shovel_user 123456

輸出

Ready for orders!

 

然后新開一個控制台運行生產者的代碼,參數一樣,但是生產者連接的是主機1(源節點)。

運行

python shovel_producer.py 45.32.250.47 5672 shovel_user 123456

輸出

Sent order message.

 

然后在消費者的那個控制台應該會看到類似如下輸出

Received order 92 for test type.

數字是隨機生成的,所以最終會結果可能不同。

 

好了,到目前為止,整個演練就完成了,前面的准備工作較多,測試的代碼很簡單,主要是演示shovel的跨數據中心的消息傳遞功能。

結合實際的IP地址,exchange和queue,最后用一個圖來說明使用shovel的消息流向。

 

image

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM