Ubuntu14.04+RabbitMQ3.6.3+Golang的最佳實踐


目錄

1、RabbitMQ介紹

1.1、什么是RabbitMQ?

RabbitMQ 是由 LShift 提供的一個 Advanced Message Queuing Protocol (AMQP) 的開源實現,由以高性能、健壯以及可伸縮性出名的 Erlang 寫成,因此也是繼承了這些優點。

1.2、什么是AMQP?

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。它從生產者接收消息並遞送給消費者,在這個過程中,根據規則進行路由,緩存與持久化。

AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

而在AMQP中主要有兩個組件:Exchange 和 Queue (在 AMQP 1.0 里還會有變動),如下圖所示,綠色的 X 就是 Exchange ,紅色的是 Queue ,這兩者都在 Server 端,又稱作 Broker ,這部分是 RabbitMQ 實現的,而藍色的則是客戶端,通常有 Producer 和 Consumer 兩種類型:

idoall.org

1.3、RabbitMQ的基礎概念

  • Broker:簡單來說就是消息隊列服務器實體
  • Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列
  • Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列
  • Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來
  • Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞
  • vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離
  • producer:消息生產者,就是投遞消息的程序
  • consumer:消息消費者,就是接受消息的程序
  • channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務

1.4、RabbitMQ的特性

  • 可靠性:包括消息持久化,消費者和生產者的消息確認
  • 靈活路由:遵循AMQP協議,支持多種Exchange類型實現不同路由策略
  • 分布式:集群的支持,包括本地網絡與遠程網絡
  • 高可用性:支持主從備份與鏡像隊列
  • 多語言支持:支持多語言的客戶端
  • WEB界面管理:可以管理用戶權限,exhange,queue,binding,與實時監控
  • 訪問控制:基於vhosts實現訪問控制
  • 調試追蹤:支持tracing,方便調試

2、RabbitMQ的官網在哪里?

http://www.rabbitmq.com/

3、RabbitMQ在哪里下載?

http://www.rabbitmq.com/download.html

4、如何安裝RabbitMQ

4.1、通過安裝RabbitMQ的源來安裝

在Ubuntu上安裝RabbitMQ非常簡單

lion@ubuntu1404:~$ sudo echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
lion@ubuntu1404:~$ wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
lion@ubuntu1404:~$ sudo apt-get update
lion@ubuntu1404:~$ sudo apt-get install rabbitmq-server

其他系統安裝方法:http://www.rabbitmq.com/download.html

4.2、通過源碼安裝

本文中的實例,主要通過源碼安裝來演示。

4.2.1、安裝Erlang

相關安裝文檔:http://erlang.org/erldoc

lion@node1:~$ sudo apt-get install -y erlang-nox erlang-dev erlang-src

4.2.2、Rabbitmq 3.6.3安裝

相關安裝文檔:http://www.rabbitmq.com/install-generic-unix.html

我們先下載源碼並解壓

lion@node1:~$ mkdir -p _app
lion@node1:~/_app$ wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.3/rabbitmq-server-generic-unix-3.6.3.tar.xz
lion@node1:~/_app$ xz -d rabbitmq-server-generic-unix-3.6.3.tar.xz
lion@node1:~/_app$ tar -xvf rabbitmq-server-generic-unix-3.6.3.tar
lion@node1:~/_app$ cd rabbitmq_server-3.6.3

設置環境變量$RABBITMQ_HOME

lion@node1:~$ vi .bashrc

在.bashrc中添加以下內容

export RABBITMQ_HOME="/home/lion/_app/rabbitmq_server-3.6.3"
export PATH="$RABBITMQ_HOME/sbin:$PATH"

讓環境變量生效

lion@node1:~$ source .bashrc

啟動Rabbitmq

lion@node1:~$ rabbitmq-server

安裝以后可以通過下面的命令,停止、啟動:

lion@node1:~$ rabbitmqctl stop
lion@node1:~$ rabbitmqctl start

4.3、開啟web管理插件

創建一個用戶lion,並設置密碼123456:

lion@node1:~$ rabbitmqctl add_user lion 123456

可以通過下面的命令,查看現有的用戶更表

lion@node1:~$ rabbitmqctl list_users
Listing users ...
guest   [administrator]
lion    []

這個時候lion用戶是不能訪問web管理插件的,需要配置用戶角色,用戶角色可分為五類,超級管理員, 監控者, 策略制定者, 普通管理者以及其他。

  • 超級管理員(administrator)

可登陸管理控制台(啟用management plugin的情況下),可查看所有的信息,並且可以對用戶,策略(policy)進行操作。

  • 監控者(monitoring)

可登陸管理控制台(啟用management plugin的情況下),同時可以查看rabbitmq節點的相關信息(進程數,內存使用情況,磁盤使用情況等)

  • 策略制定者(policymaker)

可登陸管理控制台(啟用management plugin的情況下), 同時可以對policy進行管理。但無法查看節點的相關信息。

  • 普通管理者(management)

僅可登陸管理控制台(啟用management plugin的情況下),無法看到節點信息,也無法對策略進行管理。

  • 其他

無法登陸管理控制台,通常就是普通的生產者和消費者。

通過下面的命令,可以將lion添加到administrator用戶組:

lion@node1:~$ rabbitmqctl set_user_tags lion administrator

然后可以用下面的命令來啟用/信上管理插件:

lion@node1:~$ rabbitmq-plugins enable rabbitmq_management  (啟用插件)
lion@node1:~$ rabbitmq-plugins disable rabbitmq_management (禁用插件)

通過瀏覽訪問 http://127.0.0.1:15672/
  輸入用戶名lion,密碼123456就可以看到后台了。

rabbitmqctl的更多命令參考:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

4.4、RabbitMQ 的配置文件介紹

RabbitMQ的配置文件目錄默認是$RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf,如果文件不存在,可以自己創建。

配置文件全部說明地址:http://www.rabbitmq.com/configure.html#configuration-file

%% -*- mode: erlang -*-
%% ----------------------------------------------------------------------------
%% RabbitMQ Sample Configuration File.
%%
%% See http://www.rabbitmq.com/configure.html for details.
%% ----------------------------------------------------------------------------
[
 {rabbit,
  [%%
   %% Network Connectivity
   %% ====================
   %%

   %% By default, RabbitMQ will listen on all interfaces, using
   %% the standard (reserved) AMQP port.
   %% 默認的監聽端口
   %% {tcp_listeners, [5672]},

   %% To listen on a specific interface, provide a tuple of {IpAddress, Port}.
   %% For example, to listen only on localhost for both IPv4 and IPv6:
   %% 也可以使用下面的格式進行指定IP和端口的監聽
   %% {tcp_listeners, [{"127.0.0.1", 5672},
   %%                  {"::1",       5672}]},

   %% SSL listeners are configured in the same fashion as TCP listeners,
   %% including the option to control the choice of interface.
   %% SSL連接端口配置
   %% {ssl_listeners, [5671]},

   %% Number of Erlang processes that will accept connections for the TCP
   %% and SSL listeners.
   %% TCP連接的進程數
   %% {num_tcp_acceptors, 10},
   %% {num_ssl_acceptors, 1},

   %% Maximum time for AMQP 0-8/0-9/0-9-1 handshake (after socket connection
   %% and SSL handshake), in milliseconds.
   %% 超時時間,單位毫秒
   %% {handshake_timeout, 10000},

   %% Log levels (currently just used for connection logging).
   %% One of 'debug', 'info', 'warning', 'error' or 'none', in decreasing
   %% order of verbosity. Defaults to 'info'.
   %% 日志的級別,默認是info
   %% {log_levels, [{connection, info}, {channel, info}]},

   %% Set to 'true' to perform reverse DNS lookups when accepting a
   %% connection. Hostnames will then be shown instead of IP addresses
   %% in rabbitmqctl and the management plugin.
   %%
   %% {reverse_dns_lookups, true},

   %%
   %% Security / AAA
   %% ==============
   %% 安全配置

   %% The default "guest" user is only permitted to access the server
   %% via a loopback interface (e.g. localhost).
   %% {loopback_users, [<<"guest">>]},
   %% 
   %% Uncomment the following line if you want to allow access to the
   %% guest user from anywhere on the network.
   %% {loopback_users, []},

   %% Configuring SSL.
   %% See http://www.rabbitmq.com/ssl.html for full documentation.
   %%
   %% {ssl_options, [{cacertfile,           "/path/to/testca/cacert.pem"},
   %%                {certfile,             "/path/to/server/cert.pem"},
   %%                {keyfile,              "/path/to/server/key.pem"},
   %%                {verify,               verify_peer},
   %%                {fail_if_no_peer_cert, false}]},

   %% Choose the available SASL mechanism(s) to expose.
   %% The two default (built in) mechanisms are 'PLAIN' and
   %% 'AMQPLAIN'. Additional mechanisms can be added via
   %% plugins.
   %%
   %% See http://www.rabbitmq.com/authentication.html for more details.
   %%
   %% {auth_mechanisms, ['PLAIN', 'AMQPLAIN']},

   %% Select an authentication database to use. RabbitMQ comes bundled
   %% with a built-in auth-database, based on mnesia.
   %%
   %% {auth_backends, [rabbit_auth_backend_internal]},

   %% Configurations supporting the rabbitmq_auth_mechanism_ssl and
   %% rabbitmq_auth_backend_ldap plugins.
   %%
   %% NB: These options require that the relevant plugin is enabled.
   %% See http://www.rabbitmq.com/plugins.html for further details.

   %% The RabbitMQ-auth-mechanism-ssl plugin makes it possible to
   %% authenticate a user based on the client's SSL certificate.
   %%
   %% To use auth-mechanism-ssl, add to or replace the auth_mechanisms
   %% list with the entry 'EXTERNAL'.
   %%
   %% {auth_mechanisms, ['EXTERNAL']},

   %% The rabbitmq_auth_backend_ldap plugin allows the broker to
   %% perform authentication and authorisation by deferring to an
   %% external LDAP server.
   %%
   %% For more information about configuring the LDAP backend, see
   %% http://www.rabbitmq.com/ldap.html.
   %%
   %% Enable the LDAP auth backend by adding to or replacing the
   %% auth_backends entry:
   %%
   %% {auth_backends, [rabbit_auth_backend_ldap]},

   %% This pertains to both the rabbitmq_auth_mechanism_ssl plugin and
   %% STOMP ssl_cert_login configurations. See the rabbitmq_stomp
   %% configuration section later in this file and the README in
   %% https://github.com/rabbitmq/rabbitmq-auth-mechanism-ssl for further
   %% details.
   %%
   %% To use the SSL cert's CN instead of its DN as the username
   %%
   %% {ssl_cert_login_from, common_name},

   %% SSL handshake timeout, in milliseconds.
   %%
   %% {ssl_handshake_timeout, 5000},

   %% Password hashing implementation. Will only affect newly
   %% created users. To recalculate hash for an existing user
   %% it's necessary to update her password.
   %%
   %% {password_hashing_module, rabbit_password_hashing_sha256},

   %%
   %% Default User / VHost
   %% ====================
   %% 用戶訪問設置

   %% On first start RabbitMQ will create a vhost and a user. These
   %% config items control what gets created. See
   %% http://www.rabbitmq.com/access-control.html for further
   %% information about vhosts and access control.
   %%
   %% {default_vhost,       <<"/">>},
   %% {default_user,        <<"guest">>},
   %% {default_pass,        <<"guest">>},
   %% {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},

   %% Tags for default user
   %%
   %% For more details about tags, see the documentation for the
   %% Management Plugin at http://www.rabbitmq.com/management.html.
   %%
   %% {default_user_tags, [administrator]},

   %%
   %% Additional network and protocol related configuration
   %% =====================================================
   %%

   %% Set the default AMQP heartbeat delay (in seconds).
   %% 設置默認AMQP心跳延遲(秒)
   %% {heartbeat, 600},

   %% Set the max permissible size of an AMQP frame (in bytes).
   %%
   %% {frame_max, 131072},

   %% Set the max frame size the server will accept before connection
   %% tuning occurs
   %%
   %% {initial_frame_max, 4096},

   %% Set the max permissible number of channels per connection.
   %% 0 means "no limit".
   %%
   %% {channel_max, 128},

   %% Customising Socket Options.
   %%
   %% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for
   %% further documentation.
   %%
   %% {tcp_listen_options, [{backlog,       128},
   %%                       {nodelay,       true},
   %%                       {exit_on_close, false}]},

   %%
   %% Resource Limits & Flow Control
   %% ==============================
   %%
   %% See http://www.rabbitmq.com/memory.html for full details.

   %% Memory-based Flow Control threshold.
   %%
   %% {vm_memory_high_watermark, 0.4},

   %% Alternatively, we can set a limit (in bytes) of RAM used by the node.
   %%
   %% {vm_memory_high_watermark, {absolute, 1073741824}},
   %%
   %% Or you can set absolute value using memory units.
   %%
   %% {vm_memory_high_watermark, {absolute, "1024M"}},
   %%
   %% Supported units suffixes:
   %%
   %% k, kiB: kibibytes (2^10 bytes)
   %% M, MiB: mebibytes (2^20)
   %% G, GiB: gibibytes (2^30)
   %% kB: kilobytes (10^3)
   %% MB: megabytes (10^6)
   %% GB: gigabytes (10^9)

   %% Fraction of the high watermark limit at which queues start to
   %% page message out to disc in order to free up memory.
   %%
   %% Values greater than 0.9 can be dangerous and should be used carefully.
   %% 內存最大使用比例
   %% {vm_memory_high_watermark_paging_ratio, 0.5},

   %% Interval (in milliseconds) at which we perform the check of the memory
   %% levels against the watermarks.
   %% 檢查內存的間隔(毫秒)
   %% {memory_monitor_interval, 2500},

   %% Set disk free limit (in bytes). Once free disk space reaches this
   %% lower bound, a disk alarm will be set - see the documentation
   %% listed above for more details.
   %%
   %% {disk_free_limit, 50000000},
   %%
   %% Or you can set it using memory units (same as in vm_memory_high_watermark)
   %% {disk_free_limit, "50MB"},
   %% {disk_free_limit, "50000kB"},
   %% {disk_free_limit, "2GB"},

   %% Alternatively, we can set a limit relative to total available RAM.
   %%
   %% Values lower than 1.0 can be dangerous and should be used carefully.
   %% {disk_free_limit, {mem_relative, 2.0}},

   %%
   %% Misc/Advanced Options
   %% =====================
   %%
   %% NB: Change these only if you understand what you are doing!
   %%

   %% To announce custom properties to clients on connection:
   %%
   %% {server_properties, []},

   %% How to respond to cluster partitions.
   %% See http://www.rabbitmq.com/partitions.html for further details.
   %%
   %% {cluster_partition_handling, ignore},

   %% Make clustering happen *automatically* at startup - only applied
   %% to nodes that have just been reset or started for the first time.
   %% See http://www.rabbitmq.com/clustering.html#auto-config for
   %% further details.
   %% 設置集群啟動的節點
   %% {cluster_nodes, {['rabbit@my.host.com'], disc}},

   %% Interval (in milliseconds) at which we send keepalive messages
   %% to other cluster members. Note that this is not the same thing
   %% as net_ticktime; missed keepalive messages will not cause nodes
   %% to be considered down.
   %% 集群消息同步的時間(毫秒)
   %% {cluster_keepalive_interval, 10000},

   %% Set (internal) statistics collection granularity.
   %%
   %% {collect_statistics, none},

   %% Statistics collection interval (in milliseconds).
   %%
   %% {collect_statistics_interval, 5000},

   %% Explicitly enable/disable hipe compilation.
   %%
   %% {hipe_compile, true},

   %% Timeout used when waiting for Mnesia tables in a cluster to
   %% become available.
   %%
   %% {mnesia_table_loading_timeout, 30000},

   %% Size in bytes below which to embed messages in the queue index. See
   %% http://www.rabbitmq.com/persistence-conf.html
   %%
   %% {queue_index_embed_msgs_below, 4096}

  ]},

 %% ----------------------------------------------------------------------------
 %% Advanced Erlang Networking/Clustering Options.
 %%
 %% See http://www.rabbitmq.com/clustering.html for details
 %% ----------------------------------------------------------------------------
 {kernel,
  [%% Sets the net_kernel tick time.
   %% Please see http://erlang.org/doc/man/kernel_app.html and
   %% http://www.rabbitmq.com/nettick.html for further details.
   %%
   %% {net_ticktime, 60}
  ]},

 %% ----------------------------------------------------------------------------
 %% RabbitMQ Management Plugin
 %%
 %% See http://www.rabbitmq.com/management.html for details
 %% ----------------------------------------------------------------------------

 {rabbitmq_management,
  [%% Pre-Load schema definitions from the following JSON file. See
   %% http://www.rabbitmq.com/management.html#load-definitions
   %%
   %% {load_definitions, "/path/to/schema.json"},

   %% Log all requests to the management HTTP API to a file.
   %% 所有請求的HTTP API文件日志的路徑。
   %% {http_log_dir, "/path/to/access.log"},

   %% Change the port on which the HTTP listener listens,
   %% specifying an interface for the web server to bind to.
   %% Also set the listener to use SSL and provide SSL options.
   %% Web管理的地址和端口
   %% {listener, [{port,     12345},
   %%             {ip,       "127.0.0.1"},
   %%             {ssl,      true},
   %%             {ssl_opts, [{cacertfile, "/path/to/cacert.pem"},
   %%                         {certfile,   "/path/to/cert.pem"},
   %%                         {keyfile,    "/path/to/key.pem"}]}]},

   %% One of 'basic', 'detailed' or 'none'. See
   %% http://www.rabbitmq.com/management.html#fine-stats for more details.
   %% {rates_mode, basic},

   %% Configure how long aggregated data (such as message rates and queue
   %% lengths) is retained. Please read the plugin's documentation in
   %% http://www.rabbitmq.com/management.html#configuration for more
   %% details.
   %%
   %% {sample_retention_policies,
   %%  [{global,   [{60, 5}, {3600, 60}, {86400, 1200}]},
   %%   {basic,    [{60, 5}, {3600, 60}]},
   %%   {detailed, [{10, 5}]}]}
  ]},

 %% ----------------------------------------------------------------------------
 %% RabbitMQ Shovel Plugin
 %%
 %% See http://www.rabbitmq.com/shovel.html for details
 %% ----------------------------------------------------------------------------

 {rabbitmq_shovel,
  [{shovels,
    [%% A named shovel worker.
     %% {my_first_shovel,
     %%  [

     %% List the source broker(s) from which to consume.
     %%
     %%   {sources,
     %%    [%% URI(s) and pre-declarations for all source broker(s).
     %%     {brokers, ["amqp://user:password@host.domain/my_vhost"]},
     %%     {declarations, []}
     %%    ]},

     %% List the destination broker(s) to publish to.
     %%   {destinations,
     %%    [%% A singular version of the 'brokers' element.
     %%     {broker, "amqp://"},
     %%     {declarations, []}
     %%    ]},

     %% Name of the queue to shovel messages from.
     %%
     %% {queue, <<"your-queue-name-goes-here">>},

     %% Optional prefetch count.
     %%
     %% {prefetch_count, 10},

     %% when to acknowledge messages:
     %% - no_ack: never (auto)
     %% - on_publish: after each message is republished
     %% - on_confirm: when the destination broker confirms receipt
     %%
     %% {ack_mode, on_confirm},

     %% Overwrite fields of the outbound basic.publish.
     %%
     %% {publish_fields, [{exchange,    <<"my_exchange">>},
     %%                   {routing_key, <<"from_shovel">>}]},

     %% Static list of basic.properties to set on re-publication.
     %%
     %% {publish_properties, [{delivery_mode, 2}]},

     %% The number of seconds to wait before attempting to
     %% reconnect in the event of a connection failure.
     %%
     %% {reconnect_delay, 2.5}

     %% ]} %% End of my_first_shovel
    ]}
   %% Rather than specifying some values per-shovel, you can specify
   %% them for all shovels here.
   %%
   %% {defaults, [{prefetch_count,     0},
   %%             {ack_mode,           on_confirm},
   %%             {publish_fields,     []},
   %%             {publish_properties, [{delivery_mode, 2}]},
   %%             {reconnect_delay,    2.5}]}
  ]},

 %% ----------------------------------------------------------------------------
 %% RabbitMQ Stomp Adapter
 %%
 %% See http://www.rabbitmq.com/stomp.html for details
 %% ----------------------------------------------------------------------------

 {rabbitmq_stomp,
  [%% Network Configuration - the format is generally the same as for the broker

   %% Listen only on localhost (ipv4 & ipv6) on a specific port.
   %% {tcp_listeners, [{"127.0.0.1", 61613},
   %%                  {"::1",       61613}]},

   %% Listen for SSL connections on a specific port.
   %% {ssl_listeners, [61614]},

   %% Number of Erlang processes that will accept connections for the TCP
   %% and SSL listeners.
   %%
   %% {num_tcp_acceptors, 10},
   %% {num_ssl_acceptors, 1},

   %% Additional SSL options

   %% Extract a name from the client's certificate when using SSL.
   %%
   %% {ssl_cert_login, true},

   %% Set a default user name and password. This is used as the default login
   %% whenever a CONNECT frame omits the login and passcode headers.
   %%
   %% Please note that setting this will allow clients to connect without
   %% authenticating!
   %%
   %% {default_user, [{login,    "guest"},
   %%                 {passcode, "guest"}]},

   %% If a default user is configured, or you have configured use SSL client
   %% certificate based authentication, you can choose to allow clients to
   %% omit the CONNECT frame entirely. If set to true, the client is
   %% automatically connected as the default user or user supplied in the
   %% SSL certificate whenever the first frame sent on a session is not a
   %% CONNECT frame.
   %%
   %% {implicit_connect, true}
  ]},

 %% ----------------------------------------------------------------------------
 %% RabbitMQ MQTT Adapter
 %%
 %% See https://github.com/rabbitmq/rabbitmq-mqtt/blob/stable/README.md
 %% for details
 %% ----------------------------------------------------------------------------

 {rabbitmq_mqtt,
  [%% Set the default user name and password. Will be used as the default login
   %% if a connecting client provides no other login details.
   %%
   %% Please note that setting this will allow clients to connect without
   %% authenticating!
   %%
   %% {default_user, <<"guest">>},
   %% {default_pass, <<"guest">>},

   %% Enable anonymous access. If this is set to false, clients MUST provide
   %% login information in order to connect. See the default_user/default_pass
   %% configuration elements for managing logins without authentication.
   %%
   %% {allow_anonymous, true},

   %% If you have multiple chosts, specify the one to which the
   %% adapter connects.
   %%
   %% {vhost, <<"/">>},

   %% Specify the exchange to which messages from MQTT clients are published.
   %%
   %% {exchange, <<"amq.topic">>},

   %% Specify TTL (time to live) to control the lifetime of non-clean sessions.
   %%
   %% {subscription_ttl, 1800000},

   %% Set the prefetch count (governing the maximum number of unacknowledged
   %% messages that will be delivered).
   %%
   %% {prefetch, 10},

   %% TCP/SSL Configuration (as per the broker configuration).
   %%
   %% {tcp_listeners, [1883]},
   %% {ssl_listeners, []},

   %% Number of Erlang processes that will accept connections for the TCP
   %% and SSL listeners.
   %%
   %% {num_tcp_acceptors, 10},
   %% {num_ssl_acceptors, 1},

   %% TCP/Socket options (as per the broker configuration).
   %%
   %% {tcp_listen_options, [{backlog,   128},
   %%                       {nodelay,   true}]}
  ]},

 %% ----------------------------------------------------------------------------
 %% RabbitMQ AMQP 1.0 Support
 %%
 %% See https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/stable/README.md
 %% for details
 %% ----------------------------------------------------------------------------

 {rabbitmq_amqp1_0,
  [%% Connections that are not authenticated with SASL will connect as this
   %% account. See the README for more information.
   %%
   %% Please note that setting this will allow clients to connect without
   %% authenticating!
   %%
   %% {default_user, "guest"},

   %% Enable protocol strict mode. See the README for more information.
   %%
   %% {protocol_strict_mode, false}
  ]},

 %% ----------------------------------------------------------------------------
 %% RabbitMQ LDAP Plugin
 %%
 %% See http://www.rabbitmq.com/ldap.html for details.
 %%
 %% ----------------------------------------------------------------------------

 {rabbitmq_auth_backend_ldap,
  [%%
   %% Connecting to the LDAP server(s)
   %% ================================
   %%

   %% Specify servers to bind to. You *must* set this in order for the plugin
   %% to work properly.
   %%
   %% {servers, ["your-server-name-goes-here"]},

   %% Connect to the LDAP server using SSL
   %%
   %% {use_ssl, false},

   %% Specify the LDAP port to connect to
   %%
   %% {port, 389},

   %% LDAP connection timeout, in milliseconds or 'infinity'
   %%
   %% {timeout, infinity},

   %% Enable logging of LDAP queries.
   %% One of
   %%   - false (no logging is performed)
   %%   - true (verbose logging of the logic used by the plugin)
   %%   - network (as true, but additionally logs LDAP network traffic)
   %%
   %% Defaults to false.
   %%
   %% {log, false},

   %%
   %% Authentication
   %% ==============
   %%

   %% Pattern to convert the username given through AMQP to a DN before
   %% binding
   %%
   %% {user_dn_pattern, "cn=${username},ou=People,dc=example,dc=com"},

   %% Alternatively, you can convert a username to a Distinguished
   %% Name via an LDAP lookup after binding. See the documentation for
   %% full details.

   %% When converting a username to a dn via a lookup, set these to
   %% the name of the attribute that represents the user name, and the
   %% base DN for the lookup query.
   %%
   %% {dn_lookup_attribute,   "userPrincipalName"},
   %% {dn_lookup_base,        "DC=gopivotal,DC=com"},

   %% Controls how to bind for authorisation queries and also to
   %% retrieve the details of users logging in without presenting a
   %% password (e.g., SASL EXTERNAL).
   %% One of
   %%  - as_user (to bind as the authenticated user - requires a password)
   %%  - anon    (to bind anonymously)
   %%  - {UserDN, Password} (to bind with a specified user name and password)
   %%
   %% Defaults to 'as_user'.
   %%
   %% {other_bind, as_user},

   %%
   %% Authorisation
   %% =============
   %%

   %% The LDAP plugin can perform a variety of queries against your
   %% LDAP server to determine questions of authorisation. See
   %% http://www.rabbitmq.com/ldap.html#authorisation for more
   %% information.

   %% Set the query to use when determining vhost access
   %%
   %% {vhost_access_query, {in_group,
   %%                       "ou=${vhost}-users,ou=vhosts,dc=example,dc=com"}},

   %% Set the query to use when determining resource (e.g., queue) access
   %%
   %% {resource_access_query, {constant, true}},

   %% Set queries to determine which tags a user has
   %%
   %% {tag_queries, []}
  ]}
].

5、Golang調用RabbitMQ的案例

下載Golgang運行amqp協議的包,在Rabbitmq官網上有提供現在的golang包來使用amqp協議與Rabbitmq交互 。

我們先將包下載到本地,然后就可以直接使用了:

lion@node1:~$ go get github.com/streadway/amqp

5.1、使用Golang來發送第一個hello idoall.org

在第一個教程中,我們寫程序從一個命名的隊列(test-idoall-queues)中發送和接收消息。

producer_hello.go(消息生產者):

package main

import (
  "fmt"
  "log"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
	uri          =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange name
  exchangeName =	""
  //Durable AMQP queue name
	queueName 	 = "test-idoall-queues"
  //Body of message
  bodyMsg string   = "hello idoall.org"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
  //調用發布消息函數
  publish(uri, exchangeName, queueName, bodyMsg)
	log.Printf("published %dB OK", len(bodyMsg))
}

//發布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
//@body, 主體內容
func publish(amqpURI string, exchange string, queue string, body string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
	connection, err := amqp.Dial(amqpURI)
	failOnError(err, "Failed to connect to RabbitMQ")
	defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
	channel, err := connection.Channel()
	failOnError(err, "Failed to open a channel")
  	defer channel.Close()
  
	log.Printf("got queue, declaring %q", queue)
	
	//創建一個queue
	q, err := channel.QueueDeclare(
    queueName, // name
    false,   // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

	// Producer只能發送到exchange,它是不能直接發送到queue的。
	// 現在我們使用默認的exchange(名字是空字符)。這個默認的exchange允許我們發送給指定的queue。
	// routing_key就是指定的queue名字。
  err = channel.Publish(
    exchange,     // exchange
    q.Name, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing {
      Headers:         amqp.Table{},
      ContentType: "text/plain",
      ContentEncoding: "",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")
}

consumer_hello(消息消費者).go

package main

import (
  "fmt"
  "log"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri           =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange nam
  exchangeName  = ""
  //Durable AMQP queue name
  queueName     = "test-idoall-queues"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}



func main(){
    //調用消息接收者
    consumer(uri, exchangeName, queueName)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
func consumer(amqpURI string, exchange string, queue string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
  log.Printf("got queue, declaring %q", queue)
    
  //創建一個queue
  q, err := channel.QueueDeclare(
      queueName, // name
      false,   // durable
      false,   // delete when unused
      false,   // exclusive
      false,   // no-wait
      nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("Queue bound to Exchange, starting Consume")
  //訂閱消息
  msgs, err := channel.Consume(
      q.Name, // queue
      "",     // consumer
      true,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  //創建一個channel
  forever := make(chan bool)

  //調用gorountine
  go func() {
      for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
      }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

  //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出
  <-forever
}

Console1(運行producer):

lion@node1:~/_code/_rabbitmq/_golang$ go run producer_hello.go
2016/07/23 02:29:51 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 02:29:51 got Connection, getting Channel
2016/07/23 02:29:51 got queue, declaring "test-idoall-queues"
2016/07/23 02:29:51 declared queue, publishing 16B body ("hello idoall.org")
2016/07/23 02:29:51 published 16B OK

然后運行以下命令,可以看到我們剛才創建的queues在列表中

lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl list_queues
Listing queues ...
test-idoall-queues	1

Console2(運行consumer)打印消息到屏幕,可以看到剛才我們通過producer發送的消息hello idoall.org

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_hello.go
2016/07/23 03:33:14 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 03:33:14 got Connection, getting Channel
2016/07/23 03:33:14 got queue, declaring "test-idoall-queues"
2016/07/23 03:33:14 Queue bound to Exchange, starting Consume
2016/07/23 03:33:14  [*] Waiting for messages. To exit press CTRL+C
2016/07/23 03:33:14 Received a message: hello idoall.org

5.2、Rabbitmq的任務分發機制

在5.1章節中,我們寫程序從一個命名的隊列中發送和接收消息。在這個章節中,我們將創建一個工作隊列,將用於分配在多個工人之間的耗時的任務。

RabbitMQ的分發機制非常適合擴展,而且它是專門為並發程序設計的。如果任務隊伍過多,那么只需要創建更多的Consumer來進行任務處理即可。

producer_task.go(消息生產者):

package main

import (
  "fmt"
  "log"
  "os"
  "strings"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri          =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange name
  exchangeName =  ""
  //Durable AMQP queue name
  queueName    = "test-idoall-queues-task"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
  bodyMsg := bodyFrom(os.Args)
  //調用發布消息函數
  publish(uri, exchangeName, queueName, bodyMsg)
  log.Printf("published %dB OK", len(bodyMsg))
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello idoall.org"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

//發布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
//@body, 主體內容
func publish(amqpURI string, exchange string, queue string, body string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
  log.Printf("got queue, declaring %q", queue)
  
  //創建一個queue
  q, err := channel.QueueDeclare(
    queueName, // name
    false,   // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

  // Producer只能發送到exchange,它是不能直接發送到queue的。
  // 現在我們使用默認的exchange(名字是空字符)。這個默認的exchange允許我們發送給指定的queue。
  // routing_key就是指定的queue名字。
  err = channel.Publish(
    exchange,     // exchange
    q.Name, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing {
      Headers:         amqp.Table{},
      ContentType: "text/plain",
      ContentEncoding: "",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")
}

consumer_task(消息消費者).go

package main

import (
  "fmt"
  "log"
  "bytes"
  "time"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri           =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange nam
  exchangeName  = ""
  //Durable AMQP queue name
  queueName     = "test-idoall-queues-task"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}



func main(){
    //調用消息接收者
    consumer(uri, exchangeName, queueName)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
func consumer(amqpURI string, exchange string, queue string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
  log.Printf("got queue, declaring %q", queue)
    
  //創建一個queue
  q, err := channel.QueueDeclare(
      queueName, // name
      false,   // durable
      false,   // delete when unused
      false,   // exclusive
      false,   // no-wait
      nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("Queue bound to Exchange, starting Consume")
  //訂閱消息
  msgs, err := channel.Consume(
      q.Name, // queue
      "",     // consumer
      false,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  //創建一個channel
  forever := make(chan bool)

  //調用gorountine
  go func() {
      for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
        dot_count := bytes.Count(d.Body, []byte("."))
        t := time.Duration(dot_count)
        time.Sleep(t * time.Second)
        log.Printf("Done")
      }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

  //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出
  <-forever
}

查看結果


Console1(consumer):

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_task.go
2016/07/23 10:11:40 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:11:40 got Connection, getting Channel
2016/07/23 10:11:40 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:11:40 Queue bound to Exchange, starting Consume
2016/07/23 10:11:40  [*] Waiting for messages. To exit press CTRL+C

Console2(consumer):

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_task.go
2016/07/23 10:11:40 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:11:40 got Connection, getting Channel
2016/07/23 10:11:40 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:11:40 Queue bound to Exchange, starting Consume
2016/07/23 10:11:40  [*] Waiting for messages. To exit press CTRL+C

這個時候我們使用Producer 來 Publish Message:

lion@node1:~/_code/_rabbitmq/_golang$ go run producer_task.go First message. && go run producer_task.go Second message.. && go run producer_task.go Third message... && go run producer_task.go Fourth message.... && go run producer_task.go Fifth message.....
2016/07/23 10:17:13 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:17:13 got Connection, getting Channel
2016/07/23 10:17:13 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:17:13 declared queue, publishing 14B body ("First message.")
2016/07/23 10:17:13 published 14B OK
2016/07/23 10:17:14 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:17:14 got Connection, getting Channel
2016/07/23 10:17:14 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:17:14 declared queue, publishing 16B body ("Second message..")
2016/07/23 10:17:14 published 16B OK
2016/07/23 10:17:15 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:17:15 got Connection, getting Channel
2016/07/23 10:17:15 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:17:15 declared queue, publishing 16B body ("Third message...")
2016/07/23 10:17:15 published 16B OK
2016/07/23 10:17:16 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:17:16 got Connection, getting Channel
2016/07/23 10:17:16 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:17:16 declared queue, publishing 18B body ("Fourth message....")
2016/07/23 10:17:16 published 18B OK
2016/07/23 10:17:16 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:17:16 got Connection, getting Channel
2016/07/23 10:17:16 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:17:16 declared queue, publishing 18B body ("Fifth message.....")
2016/07/23 10:17:16 published 18B OK

這時我們再看剛才打開的兩個Consumer的結果:
  Console1(consumer):

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_task.go
2016/07/23 10:11:21 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:11:21 got Connection, getting Channel
2016/07/23 10:11:21 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:11:21 Queue bound to Exchange, starting Consume
2016/07/23 10:11:21  [*] Waiting for messages. To exit press CTRL+C
2016/07/23 10:17:13 Received a message: First message.
2016/07/23 10:17:14 Done
2016/07/23 10:17:15 Received a message: Third message...
2016/07/23 10:17:18 Done
2016/07/23 10:17:18 Received a message: Fifth message.....
2016/07/23 10:17:23 Done

Console2(consumer):

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_task.go
2016/07/23 10:11:40 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 10:11:40 got Connection, getting Channel
2016/07/23 10:11:40 got queue, declaring "test-idoall-queues-task"
2016/07/23 10:11:40 Queue bound to Exchange, starting Consume
2016/07/23 10:11:40  [*] Waiting for messages. To exit press CTRL+C
2016/07/23 10:17:14 Received a message: Second message..
2016/07/23 10:17:16 Done
2016/07/23 10:17:16 Received a message: Fourth message....
2016/07/23 10:17:20 Done

默認情況下,RabbitMQ 會順序的分發每個Message。當每個收到ack后,會將該Message刪除,然后將下一個Message分發到下一個Consumer。這種分發方式叫做round-robin,也叫消息輪詢

5.3、Message acknowledgment 消息確認

每個Consumer可能需要一段時間才能處理完收到的數據。如果在這個過程中,Consumer出錯了,異常退出了,而數據還沒有處理完成,那么非常不幸,這段數據就丟失了。因為我們的代碼,一旦RabbitMQ Server發送給Consumer消息后,會立即把這個Message標記為完成,然后從queue中刪除。我們將無法再操作這個尚未處理完成的消息。

實際場景中,如果一個Consumer異常退出了,我們希望它處理的數據能夠被另外的Consumer處理,這樣數據在這種情況下(通道關閉、連接關閉、TCP連接丟失等情況)就不會丟失了。

為了保證數據不被丟失,RabbitMQ支持消息確認機制,ack(nowledgments)是從Consumer消費后發送到一個特定的消息告訴RabbitMQ已經收到、處理結束,RabbitMQ可以去安全的刪除它了。

如果Consumer退出了但是沒有發送ack,那么RabbitMQ就會把這個Message重新排進隊列,發送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下數據也不會丟失。

這里並沒有用到超時機制。RabbitMQ僅僅通過Consumer的連接中斷來確認該Message並沒有被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來做數據處理。

消息確認默認是關閉的,我們需要通過,d.ACK(false)來告訴RabbitMQ我們已經完成任務。

producer_acknowledgments(消息生產者).go:

package main

import (
  "fmt"
  "log"
  "os"
  "strings"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri          =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange name
  exchangeName =  ""
  //Durable AMQP queue name
  queueName    = "test-idoall-queues-acknowledgments"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
  bodyMsg := bodyFrom(os.Args)
  //調用發布消息函數
  publish(uri, exchangeName, queueName, bodyMsg)
  log.Printf("published %dB OK", len(bodyMsg))
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello idoall.org"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

//發布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
//@body, 主體內容
func publish(amqpURI string, exchange string, queue string, body string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
  log.Printf("got queue, declaring %q", queue)
  
  //創建一個queue
  q, err := channel.QueueDeclare(
    queueName, // name
    false,   // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

  // Producer只能發送到exchange,它是不能直接發送到queue的。
  // 現在我們使用默認的exchange(名字是空字符)。這個默認的exchange允許我們發送給指定的queue。
  // routing_key就是指定的queue名字。
  err = channel.Publish(
    exchange,     // exchange
    q.Name, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing {
      Headers:         amqp.Table{},
      ContentType: "text/plain",
      ContentEncoding: "",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")
}

consumer_acknowledgments(消息消費者).go

package main

import (
  "fmt"
  "log"
  "bytes"
  "time"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri           =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange nam
  exchangeName  = ""
  //Durable AMQP queue name
  queueName     = "test-idoall-queues-acknowledgments"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
    //調用消息接收者
    consumer(uri, exchangeName, queueName)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
func consumer(amqpURI string, exchange string, queue string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
  log.Printf("got queue, declaring %q", queue)
    
  //創建一個queue
  q, err := channel.QueueDeclare(
      queueName, // name
      false,   // durable
      false,   // delete when unused
      false,   // exclusive
      false,   // no-wait
      nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("Queue bound to Exchange, starting Consume")
  //訂閱消息
  msgs, err := channel.Consume(
      q.Name, // queue
      "",     // consumer
      false,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  //創建一個channel
  forever := make(chan bool)

  //調用gorountine
  go func() {
      for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
        dot_count := bytes.Count(d.Body, []byte("."))
        t := time.Duration(dot_count)
        time.Sleep(t * time.Second)
        log.Printf("Done")
        d.Ack(false)
      }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

  //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出
  <-forever
}

查看結果


我們先使用Producer來發送一列消息:

lion@node1:~/_code/_rabbitmq/_golang$ go run producer_acknowledgments.go First message. && go run producer_acknowledgments.go Second message.. && go run producer_acknowledgments.go Third message... && go run producer_acknowledgments.go Fourth message.... && go run producer_acknowledgments.go Fifth message.....
2016/07/23 21:41:40 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 21:41:40 got Connection, getting Channel
2016/07/23 21:41:40 got queue, declaring "test-idoall-queues-acknowledgments"
2016/07/23 21:41:40 declared queue, publishing 14B body ("First message.")
2016/07/23 21:41:40 published 14B OK
2016/07/23 21:41:41 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 21:41:41 got Connection, getting Channel
2016/07/23 21:41:41 got queue, declaring "test-idoall-queues-acknowledgments"
2016/07/23 21:41:41 declared queue, publishing 16B body ("Second message..")
2016/07/23 21:41:41 published 16B OK
2016/07/23 21:41:41 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 21:41:41 got Connection, getting Channel
2016/07/23 21:41:41 got queue, declaring "test-idoall-queues-acknowledgments"
2016/07/23 21:41:41 declared queue, publishing 16B body ("Third message...")
2016/07/23 21:41:41 published 16B OK
2016/07/23 21:41:42 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 21:41:42 got Connection, getting Channel
2016/07/23 21:41:42 got queue, declaring "test-idoall-queues-acknowledgments"
2016/07/23 21:41:42 declared queue, publishing 18B body ("Fourth message....")
2016/07/23 21:41:42 published 18B OK
2016/07/23 21:41:43 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 21:41:43 got Connection, getting Channel
2016/07/23 21:41:43 got queue, declaring "test-idoall-queues-acknowledgments"
2016/07/23 21:41:43 declared queue, publishing 18B body ("Fifth message.....")
2016/07/23 21:41:43 published 18B OK

通過rabbitmqctl命令,來看下messages_unacknowledged的情況:

lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
test-idoall-queues-task	0	0
test-idoall-queues	0	0
test-idoall-queues-acknowledgments	5	0

使用Consumer來訂閱消息操作到第三條的時候,我們按CTRL+C退出,這個時候相當於消息已經被讀取,但是未發送d.ACK(false):

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_acknowledgments.go
2016/07/23 21:56:35 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 21:56:35 got Connection, getting Channel
2016/07/23 21:56:35 got queue, declaring "test-idoall-queues-acknowledgments"
2016/07/23 21:56:35 Queue bound to Exchange, starting Consume
2016/07/23 21:56:35  [*] Waiting for messages. To exit press CTRL+C
2016/07/23 21:56:35 Received a message: First message.
2016/07/23 21:56:36 Done
2016/07/23 21:56:36 Received a message: Second message..
2016/07/23 21:56:38 Done
2016/07/23 21:56:38 Received a message: Third message...
^Csignal: interrupt

再通過rabbitmqctl命令可以看到,還是有3條消息未處理

lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
test-idoall-queues-task	0	0
test-idoall-queues	0	0
test-idoall-queues-acknowledgments	3	0

5.4、Message durability消息持久化

如果服務器死機或程序 crash了,數據仍然會丟失。為了確保消息不會丟失,我們需要將queue和Message做持久化操作。

將durable設置為true可以做持久化處理(生產者和消息者的代碼里都要設置),如果是已經存在的一個queue 沒有設置過持久化,再重新設置是不起作用的,我們需要重新為queue設置一個名字。

最后在Producer發布消息的時候,我們需要設置DeliveryMode為amqp.Persistent,持久化的工作就做完了,下面我們來看代碼

producer_durability.go(消息生產者):

package main

import (
  "fmt"
  "log"
  "os"
  "strings"
  "github.com/streadway/amqp"
)


const (
  //AMQP URI
  uri          =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange name
  exchangeName =  ""
  //Durable AMQP queue name
  queueName    = "test-idoall-queues-durability"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
  bodyMsg := bodyFrom(os.Args)
  //調用發布消息函數
  publish(uri, exchangeName, queueName, bodyMsg)
  log.Printf("published %dB OK", len(bodyMsg))
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello idoall.org"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

//發布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
//@body, 主體內容
func publish(amqpURI string, exchange string, queue string, body string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
  log.Printf("got queue, declaring %q", queue)
  
  //創建一個queue
  q, err := channel.QueueDeclare(
    queueName, // name
    true,   // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

  // Producer只能發送到exchange,它是不能直接發送到queue的。
  // 現在我們使用默認的exchange(名字是空字符)。這個默認的exchange允許我們發送給指定的queue。
  // routing_key就是指定的queue名字。
  err = channel.Publish(
    exchange,     // exchange
    q.Name, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing {
      Headers:         amqp.Table{},
      DeliveryMode: amqp.Persistent,
      ContentType: "text/plain",
      ContentEncoding: "",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")
}

consumer_durability.go(消息接收者):

package main

import (
  "fmt"
  "log"
  "bytes"
  "time"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri           =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange nam
  exchangeName  = ""
  //Durable AMQP queue name
  queueName     = "test-idoall-queues-durability"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
    //調用消息接收者
    consumer(uri, exchangeName, queueName)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
func consumer(amqpURI string, exchange string, queue string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
  log.Printf("got queue, declaring %q", queue)
    
  //創建一個queue
  q, err := channel.QueueDeclare(
      queueName, // name
      true,   // durable
      false,   // delete when unused
      false,   // exclusive
      false,   // no-wait
      nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("Queue bound to Exchange, starting Consume")
  //訂閱消息
  msgs, err := channel.Consume(
      q.Name, // queue
      "",     // consumer
      false,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  //創建一個channel
  forever := make(chan bool)

  //調用gorountine
  go func() {
      for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
        dot_count := bytes.Count(d.Body, []byte("."))
        t := time.Duration(dot_count)
        time.Sleep(t * time.Second)
        log.Printf("Done")
        d.Ack(false)
      }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

  //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出
  <-forever
}

查看結果


我們先使用Producer來發送一列消息:

lion@node1:~/_code/_rabbitmq/_golang$ go run producer_durability.go First message. && go run producer_durability.go Second message.. && go run producer_durability.go Third message... && go run producer_durability.go Fourth message.... && go run producer_durability.go Fifth message.....
2016/07/23 22:35:03 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 22:35:03 got Connection, getting Channel
2016/07/23 22:35:03 got queue, declaring "test-idoall-queues-durability"
2016/07/23 22:35:04 declared queue, publishing 14B body ("First message.")
2016/07/23 22:35:04 published 14B OK
2016/07/23 22:35:04 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 22:35:04 got Connection, getting Channel
2016/07/23 22:35:04 got queue, declaring "test-idoall-queues-durability"
2016/07/23 22:35:04 declared queue, publishing 16B body ("Second message..")
2016/07/23 22:35:04 published 16B OK
2016/07/23 22:35:05 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 22:35:05 got Connection, getting Channel
2016/07/23 22:35:05 got queue, declaring "test-idoall-queues-durability"
2016/07/23 22:35:05 declared queue, publishing 16B body ("Third message...")
2016/07/23 22:35:05 published 16B OK
2016/07/23 22:35:06 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 22:35:06 got Connection, getting Channel
2016/07/23 22:35:06 got queue, declaring "test-idoall-queues-durability"
2016/07/23 22:35:06 declared queue, publishing 18B body ("Fourth message....")
2016/07/23 22:35:06 published 18B OK
2016/07/23 22:35:06 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 22:35:06 got Connection, getting Channel
2016/07/23 22:35:06 got queue, declaring "test-idoall-queues-durability"
2016/07/23 22:35:06 declared queue, publishing 18B body ("Fifth message.....")
2016/07/23 22:35:06 published 18B OK

通過rabbitmqctl list_queues命令,來看下messages_unacknowledged的情況:

lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl list_queues
Listing queues ...
test-idoall-queues-task	0
test-idoall-queues	0
test-idoall-queues-durability	5
test-idoall-queues-acknowledgments	0

重啟RabbitMQ-Server

lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl stop
lion@node1:~/_code/_rabbitmq/_golang$ rabbitmq-server

              RabbitMQ 3.6.3. Copyright (C) 2007-2016 Pivotal Software, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /home/lion/_app/rabbitmq_server-3.6.3/var/log/rabbitmq/rabbit@node1.log
  ######  ##        /home/lion/_app/rabbitmq_server-3.6.3/var/log/rabbitmq/rabbit@node1-sasl.log
  ##########
              Starting broker...
 completed with 6 plugins.

再次通過rabbitmqctl list_queues命令查看,可以看到消息是存在的,說明我們的持久化是成功的

lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl list_queues
Listing queues ...
test-idoall-queues-durability	5

5.5、Fair dispatch 公平分發

上面的,分發機制不是那么優雅。默認狀態下,RabbitMQ將第n個Message分發給第n個Consumer。當然n是取余后的。它不管Consumer是否還有unacked Message,只是按照這個默認機制進行分發。

那么如果有個Consumer工作比較重,那么就會導致有的Consumer基本沒事可做,有的Consumer卻是毫無休息的機會。

通過 ch.Qos 方法設置預讀取消息prefetch count=1 。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。

producer_fair_dispatch.go(消息生產者):

package main

import (
  "fmt"
  "log"
  "os"
  "strings"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri          =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange name
  exchangeName =  ""
  //Durable AMQP queue name
  queueName    = "test-idoall-queues-fair_dispatch"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
  bodyMsg := bodyFrom(os.Args)
  //調用發布消息函數
  publish(uri, exchangeName, queueName, bodyMsg)
  log.Printf("published %dB OK", len(bodyMsg))
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello idoall.org"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

//發布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
//@body, 主體內容
func publish(amqpURI string, exchange string, queue string, body string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
  log.Printf("got queue, declaring %q", queue)
  
  //創建一個queue
  q, err := channel.QueueDeclare(
    queueName, // name
    true,   // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

  // Producer只能發送到exchange,它是不能直接發送到queue的。
  // 現在我們使用默認的exchange(名字是空字符)。這個默認的exchange允許我們發送給指定的queue。
  // routing_key就是指定的queue名字。
  err = channel.Publish(
    exchange,     // exchange
    q.Name, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing {
      Headers:         amqp.Table{},
      DeliveryMode: amqp.Persistent,
      ContentType: "text/plain",
      ContentEncoding: "",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")
}

consumer_fair_dispatch.go(消息消費者):

package main

import (
  "fmt"
  "log"
  "bytes"
  "time"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri           =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange nam
  exchangeName  = ""
  //Durable AMQP queue name
  queueName     = "test-idoall-queues-fair_dispatch"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
    //調用消息接收者
    consumer(uri, exchangeName, queueName)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
func consumer(amqpURI string, exchange string, queue string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
  log.Printf("got queue, declaring %q", queue)
    
  //創建一個queue
  q, err := channel.QueueDeclare(
      queueName, // name
      true,   // durable
      false,   // delete when unused
      false,   // exclusive
      false,   // no-wait
      nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  //每次只取一條消息
  err = channel.Qos(
          1,     // prefetch count
          0,     // prefetch size
          false, // global
  )
  failOnError(err, "Failed to set QoS")

  log.Printf("Queue bound to Exchange, starting Consume")
  //訂閱消息
  msgs, err := channel.Consume(
      q.Name, // queue
      "",     // consumer
      false,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  //創建一個channel
  forever := make(chan bool)

  //調用gorountine
  go func() {
      for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
        dot_count := bytes.Count(d.Body, []byte("."))
        t := time.Duration(dot_count)
        time.Sleep(t * time.Second)
        log.Printf("Done")
        d.Ack(false)
      }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

  //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出
  <-forever
}

查看結果


我們先使用Producer來發送一列消息:

lion@node1:~/_code/_rabbitmq/_golang$ go run producer_fair_dispatch.go First message. && go run producer_fair_dispatch.go Second message.. && go run producer_fair_dispatch.go Third message... && go run producer_fair_dispatch.go Fourth message.... && go run producer_fair_dispatch.go Fifth message.....
2016/07/23 23:09:24 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 23:09:24 got Connection, getting Channel
2016/07/23 23:09:24 got queue, declaring "test-idoall-queues-fair_dispatch"
2016/07/23 23:09:24 declared queue, publishing 14B body ("First message.")
2016/07/23 23:09:24 published 14B OK
2016/07/23 23:09:24 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 23:09:24 got Connection, getting Channel
2016/07/23 23:09:24 got queue, declaring "test-idoall-queues-fair_dispatch"
2016/07/23 23:09:24 declared queue, publishing 16B body ("Second message..")
2016/07/23 23:09:24 published 16B OK
2016/07/23 23:09:25 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 23:09:25 got Connection, getting Channel
2016/07/23 23:09:25 got queue, declaring "test-idoall-queues-fair_dispatch"
2016/07/23 23:09:25 declared queue, publishing 16B body ("Third message...")
2016/07/23 23:09:25 published 16B OK
2016/07/23 23:09:26 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 23:09:26 got Connection, getting Channel
2016/07/23 23:09:26 got queue, declaring "test-idoall-queues-fair_dispatch"
2016/07/23 23:09:26 declared queue, publishing 18B body ("Fourth message....")
2016/07/23 23:09:26 published 18B OK
2016/07/23 23:09:27 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 23:09:27 got Connection, getting Channel
2016/07/23 23:09:27 got queue, declaring "test-idoall-queues-fair_dispatch"
2016/07/23 23:09:27 declared queue, publishing 18B body ("Fifth message.....")
2016/07/23 23:09:27 published 18B OK

再依次在兩個Console中依次執行下面的命令,可以看到消息被正常的分發了

Console1(consumer):

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_fair_dispatch.go
2016/07/23 23:10:47 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 23:10:47 got Connection, getting Channel
2016/07/23 23:10:47 got queue, declaring "test-idoall-queues-fair_dispatch"
2016/07/23 23:10:47 Queue bound to Exchange, starting Consume
2016/07/23 23:10:47  [*] Waiting for messages. To exit press CTRL+C
2016/07/23 23:10:47 Received a message: First message.
2016/07/23 23:10:48 Done
2016/07/23 23:10:48 Received a message: Second message..
2016/07/23 23:10:50 Done
2016/07/23 23:10:50 Received a message: Fourth message....
2016/07/23 23:10:54 Done

Console2(consumer):

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_fair_dispatch.go
2016/07/23 23:10:49 dialing "amqp://guest:guest@localhost:5672/"
2016/07/23 23:10:49 got Connection, getting Channel
2016/07/23 23:10:49 got queue, declaring "test-idoall-queues-fair_dispatch"
2016/07/23 23:10:49 Queue bound to Exchange, starting Consume
2016/07/23 23:10:49  [*] Waiting for messages. To exit press CTRL+C
2016/07/23 23:10:49 Received a message: Third message...
2016/07/23 23:10:52 Done
2016/07/23 23:10:52 Received a message: Fifth message.....
2016/07/23 23:10:57 Done

​ 基於AMQP的更多通道和消息屬性,可以瀏覽AMQP API參考

5.6、Exchanges & Bindings

RabbitMQ 的Messaging Model就是Producer並不會直接發送Message到queue。實際上,Producer並不知道它發送的Message是否已經到達queue。

Producer發送的Message實際上是發到了Exchange中。它的功能也很簡單:從Producer接收Message,然后投遞到queue中。Exchange需要知道如何處理Message,是把它放到一個queue中,還是放到多個queue中?這個rule是通過Exchange 的類型定義的。

我們知道有三種類型的Exchange:direct,,topic,headers 和fanout。fanout就是廣播模式,會將所有的Message都放到它所知道的queue中。

現在我們已經創建了fanout類型的exchange和沒有名字的queue(實際上是RabbitMQ幫我們取了名字)。那exchange怎么樣知道它的Message發送到哪個queue呢?答案就是通過bindings

idoall.org

通過rabbitmqctl可以列出當前所有的Exchange:

lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl list_exchanges
Listing exchanges ...
amq.direct	direct
amq.fanout	fanout
amq.match	headers
amq.headers	headers
	direct
amq.rabbitmq.trace	topic
amq.topic	topic
amq.rabbitmq.log	topic

注意:amq.* 是RabbitMQ默認創建的。

​ 我們假設做一個日志系統,其中一個運行的接收程序Consumer發到消息后寫入到磁盤中,同時, 另一個Consumer將收到的日志輸出到屏幕上。

producer_exchange_logs.go(消息生產者):

package main

import (
  "fmt"
  "log"
  "os"
  "strings"
  "github.com/streadway/amqp"
)


const (
  //AMQP URI
  uri          =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange name
  exchangeName =  "test-idoall-exchange-logs"
  //Exchange type - direct|fanout|topic|x-custom
  exchangeType = "fanout"
  //AMQP routing key
  routingKey   = ""
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
  bodyMsg := bodyFrom(os.Args)
  //調用發布消息函數
  publish(uri, exchangeName, exchangeType, routingKey, bodyMsg)
  log.Printf("published %dB OK", len(bodyMsg))
}


func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello idoall.org"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

//發布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@exchangeType, exchangeType的類型direct|fanout|topic
//@routingKey, routingKey的名稱
//@body, 主體內容
func publish(amqpURI string, exchange string, exchangeType string, routingKey string, body string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
 
  //創建一個queue
  log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange)
  err = channel.ExchangeDeclare(
    exchange,     // name
    exchangeType, // type
    true,         // durable
    false,        // auto-deleted
    false,        // internal
    false,        // noWait
    nil,          // arguments
  )
  failOnError(err, "Failed to declare a queue")

  // 發布消息
  log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
  err = channel.Publish(
    exchange,     // exchange
    routingKey, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing {
      Headers:         amqp.Table{},
      ContentType: "text/plain",
      ContentEncoding: "",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")
}

consumer_exchange_logs.go(消息消費者):

package main

import (
  "fmt"
  "log"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri           =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange name
  exchangeName =  "test-idoall-exchange-logs"
  //Exchange type - direct|fanout|topic|x-custom
  exchangeType = "fanout"
  //AMQP binding key
  bindingKey   = ""
  //Durable AMQP queue name
  queueName     = ""
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
    //調用消息接收者
    consumer(uri, exchangeName, exchangeType, queueName, bindingKey)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@exchangeType, exchangeType的類型direct|fanout|topic
//@queue, queue的名稱
//@key , 綁定的key名稱
func consumer(amqpURI string, exchange string, exchangeType string, queue string, key string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
  //創建一個exchange
  log.Printf("got Channel, declaring Exchange (%q)", exchange)
  err = channel.ExchangeDeclare(
    exchange,     // name of the exchange
    exchangeType, // type
    true,         // durable
    false,        // delete when complete
    false,        // internal
    false,        // noWait
    nil,          // arguments
  );
  failOnError(err, "Exchange Declare:")
  
  //創建一個queue
  q, err := channel.QueueDeclare(
      queueName, // name
      false,   // durable
      false,   // delete when unused
      true,   // exclusive 當Consumer關閉連接時,這個queue要被deleted 
      false,   // no-wait
      nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  //綁定到exchange
  err = channel.QueueBind(
    q.Name, // name of the queue
    key,        // bindingKey
    exchange,   // sourceExchange
    false,      // noWait
    nil,        // arguments
  );
  failOnError(err, "Failed to bind a queue")

  log.Printf("Queue bound to Exchange, starting Consume")
  //訂閱消息
  msgs, err := channel.Consume(
      q.Name, // queue
      "",     // consumer
      false,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  //創建一個channel
  forever := make(chan bool)

  //調用gorountine
  go func() {
      for d := range msgs {
        log.Printf(" [x] %s", d.Body)
      }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

  //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出
  <-forever
}

在AMQP客戶端 ,當routing key為空的時候, 自動創建一個隨機的queue,同時設置exclusive為true時,當這個Consumer關閉鏈接 時,會刪除這個queue。

當使用fanout類型的exchange和沒有名字的queue,Cusomer並不知道消息發送到了哪個queue,這個時候我們就需要用到QueueBind方法,來綁定到exchange。

過程中可以使用rabbitmqctl list_bindings命令來查看綁定的列表

查看結果


Console1(Consumer),輸出到文件:

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_logs.go &> consumer_exchange_logs.log

Console2(Consumer),打印到控制台:

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_logs.go

使用Producer來發送消息:

lion@node1:~/_code/_rabbitmq/_golang$ go run producer_exchange_logs.go
2016/07/24 02:21:49 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 02:21:49 got Connection, getting Channel
2016/07/24 02:21:49 got Channel, declaring "fanout" Exchange ("test-idoall-exchange-logs")
2016/07/24 02:21:49 declared queue, publishing 16B body ("hello idoall.org")
2016/07/24 02:21:49 published 16B OK

這時可以使用rabbitmqctl list_bindings來查看我們的綁定信息,可以看到queueu的名字是隨機的

lion@node1:~/_code/_rabbitmq/_golang$ rabbitmqctl list_bindings
Listing bindings ...
	exchange	amq.gen-D2AnzGsLUMhJCPk7YxgUUw	queue	amq.gen-D2AnzGsLUMhJCPk7YxgUUw	[]
	exchange	amq.gen-GC4VDS3mxsAOTEqii_WsWw	queue	amq.gen-GC4VDS3mxsAOTEqii_WsWw	[]
test-idoall-exchange-logs	exchange	amq.gen-D2AnzGsLUMhJCPk7YxgUUw	queue		[]
test-idoall-exchange-logs	exchange	amq.gen-GC4VDS3mxsAOTEqii_WsWw	queue		[]

使用cat命令,查看consumer_exchange_logs.log文件,可以看到內容被輸入到文件中

lion@node1:~/_code/_rabbitmq/_golang$ cat consumer_exchange_logs.log
2016/07/24 02:25:17 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 02:25:17 got Connection, getting Channel
2016/07/24 02:25:17 got Channel, declaring Exchange ("test-idoall-exchange-logs")
2016/07/24 02:25:17 Queue bound to Exchange, starting Consume
2016/07/24 02:25:17  [*] Waiting for messages. To exit press CTRL+C
signal: interrupt

5.7、Direct exchange

RabbitMQ支持同一個binding key綁定到多個queue中。Direct exchange的算法就是通過binding key來做匹配的。

對於fanout的exchange來說,routing_key這個參數是被忽略的。

idoall.org

producer_exchange_direct_logs.go(消息生產者):

package main

import (
  "fmt"
  "log"
  "os"
  "strings"
  "github.com/streadway/amqp"
)


const (
  //AMQP URI
  uri          =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange name
  exchangeName =  "test-idoall-exchange-direct-logs"
  //Exchange type - direct|fanout|topic|x-custom
  exchangeType = "direct"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
  bodyMsg := bodyFrom(os.Args)
  //調用發布消息函數
  publish(uri, exchangeName, exchangeType, bodyMsg)
  log.Printf("published %dB OK", len(bodyMsg))
}


func bodyFrom(args []string) string {
        var s string
        if (len(args) < 3) || os.Args[2] == "" {
                s = "hello idoall.org"
        } else {
                s = strings.Join(args[2:], " ")
        }
        return s
}

func severityFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "info"
        } else {
                s = os.Args[1]
        }
        return s
}

//發布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@exchangeType, exchangeType的類型direct|fanout|topic
//@body, 主體內容
func publish(amqpURI string, exchange string, exchangeType string, body string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
 
  //創建一個queue
  log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange)
  err = channel.ExchangeDeclare(
    exchange,     // name
    exchangeType, // type
    true,         // durable
    false,        // auto-deleted
    false,        // internal
    false,        // noWait
    nil,          // arguments
  )
  failOnError(err, "Failed to declare a queue")


  // 發布消息
  log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
  err = channel.Publish(
    exchange,     // exchange
    severityFrom(os.Args), // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing {
      Headers:         amqp.Table{},
      ContentType: "text/plain",
      ContentEncoding: "",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")
}

consumer_exchange_direct_logs.go(消息消費者):

package main

import (
  "fmt"
  "log"
  "os"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri           =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange name
  exchangeName =  "test-idoall-exchange-direct-logs"
  //Exchange type - direct|fanout|topic|x-custom
  exchangeType = "direct"
  //AMQP binding key
  bindingKey   = ""
  //Durable AMQP queue name
  queueName     = ""
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
    //調用消息接收者
    consumer(uri, exchangeName, exchangeType, queueName, bindingKey)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@exchangeType, exchangeType的類型direct|fanout|topic
//@queue, queue的名稱
//@key , 綁定的key名稱
func consumer(amqpURI string, exchange string, exchangeType string, queue string, key string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
  //創建一個exchange
  log.Printf("got Channel, declaring Exchange (%q)", exchange)
  err = channel.ExchangeDeclare(
    exchange,     // name of the exchange
    exchangeType, // type
    true,         // durable
    false,        // delete when complete
    false,        // internal
    false,        // noWait
    nil,          // arguments
  );
  failOnError(err, "Exchange Declare:")
  
  //創建一個queue
  q, err := channel.QueueDeclare(
      queueName, // name
      false,   // durable
      false,   // delete when unused
      true,   // exclusive 當Consumer關閉連接時,這個queue要被deleted 
      false,   // no-wait
      nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  if len(os.Args) < 2 {
          log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
          os.Exit(0)
  }
  for _, s := range os.Args[1:] {
          log.Printf("Binding queue %s to exchange %s with routing key %s",
                  q.Name, exchange, s)
          //綁定到exchange
          err = channel.QueueBind(
            q.Name, // name of the queue
            s,        // bindingKey
            exchange,   // sourceExchange
            false,      // noWait
            nil,        // arguments
          );
          failOnError(err, "Failed to bind a queue")
  }

  log.Printf("Queue bound to Exchange, starting Consume")
  //訂閱消息
  msgs, err := channel.Consume(
      q.Name, // queue
      "",     // consumer
      false,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  //創建一個channel
  forever := make(chan bool)

  //調用gorountine
  go func() {
      for d := range msgs {
        log.Printf(" [x] %s", d.Body)
      }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

  //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出
  <-forever
}

查看結果


Console1(Consumer),輸出到文件:

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_direct_logs.go warning error &> consumer_exchange_direct_logs.log

Console2(Consumer),打印到控制台:

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_direct_logs.go info warning error
2016/07/24 08:48:17 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 08:48:17 got Connection, getting Channel
2016/07/24 08:48:17 got Channel, declaring Exchange ("test-idoall-exchange-direct-logs")
2016/07/24 08:48:17 Binding queue amq.gen-vE-62-Lwt4VQYjlBbMLTjQ to exchange test-idoall-exchange-direct-logs with routing key info
2016/07/24 08:48:17 Binding queue amq.gen-vE-62-Lwt4VQYjlBbMLTjQ to exchange test-idoall-exchange-direct-logs with routing key warning
2016/07/24 08:48:17 Binding queue amq.gen-vE-62-Lwt4VQYjlBbMLTjQ to exchange test-idoall-exchange-direct-logs with routing key error
2016/07/24 08:48:17 Queue bound to Exchange, starting Consume
2016/07/24 08:48:17  [*] Waiting for messages. To exit press CTRL+C

使用Producer來發送消息:

lion@node1:~/_code/_rabbitmq/_golang$ go run producer_exchange_direct_logs.go error "Error. Error" && go run producer_exchange_direct_logs.go info "Info. Info" && go run producer_exchange_direct_logs.go warning "warning. warning"

我們可以看到,在Console2控制台上能夠看到error、info、waring的所有消息,而在文件中只能看到和error相關的消息。

5.7、Topic exchange

對於Topic的exchange中Message的routing_key是有限制的,不能太隨意。格式是以點號“."分割的字符表。比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,不過長度不能超過255 bytes。

idoall.org

對於routing_key,有兩個特殊字符(在正則表達式里叫元字符)

  • * (星號) 代表任意 一個單詞
  • # (hash哈希) 0個或者多個單詞

Topic exchange和其他exchange的區別,由於有"*"和"#", Topic exchange 非常強大並且可以轉化為其他的exchange:

  • 如果binding_key 是 "#" - 它會接收所有的Message,不管routing_key是什么,就像是fanout exchange。
  • 如果 "*"和"#"沒有被使用,那么topic exchange就變成了direct exchange。

下面的代碼中,我們將演示Topic的exchange使用"#"和"*"來匹配binding key。

producer_exchange_topic_logs.go(消息生產者):

package main

import (
  "fmt"
  "log"
  "os"
  "strings"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri          =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange name
  exchangeName =  "test-idoall-exchange-direct-logs"
  //Exchange type - direct|fanout|topic|x-custom
  exchangeType = "fanout"
  //AMQP routing key
  routingKey   = ""
  //Durable AMQP queue name
  queueName    = "test-idoall-queues-direct"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
  bodyMsg := bodyFrom(os.Args)
  //調用發布消息函數
  publish(uri, exchangeName, exchangeType, routingKey, bodyMsg)
  log.Printf("published %dB OK", len(bodyMsg))
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello idoall.org"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

//發布者的方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@exchangeType, exchangeType的類型direct|fanout|topic
//@routingKey, routingKey的名稱
//@body, 主體內容
func publish(amqpURI string, exchange string, exchangeType string, routingKey string, body string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
 
  //創建一個queue
  log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange)
  err = channel.ExchangeDeclare(
    exchange,     // name
    exchangeType, // type
    true,         // durable
    false,        // auto-deleted
    false,        // internal
    false,        // noWait
    nil,          // arguments
  )
  failOnError(err, "Failed to declare a queue")


  // 發布消息
  log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
  err = channel.Publish(
    exchange,     // exchange
    routingKey, // routing key
    false,  // mandatory
    false,  // immediate
    amqp.Publishing {
      Headers:         amqp.Table{},
      ContentType: "text/plain",
      ContentEncoding: "",
      Body:        []byte(body),
    })
  failOnError(err, "Failed to publish a message")
}

consumer_exchange_topic_logs.go(消息消費者):

package main

import (
  "fmt"
  "log"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri           =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange name
  exchangeName =  "test-idoall-exchange-topic-logs"
  //Exchange type - direct|fanout|topic|x-custom
  exchangeType = "topic"
  //AMQP binding key
  bindingKey   = ""
  //Durable AMQP queue name
  queueName     = ""
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func main(){
    //調用消息接收者
    consumer(uri, exchangeName, exchangeType, queueName, bindingKey)
}

//接收者方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@exchangeType, exchangeType的類型direct|fanout|topic
//@queue, queue的名稱
//@key , 綁定的key名稱
func consumer(amqpURI string, exchange string, exchangeType string, queue string, key string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
  //創建一個exchange
  log.Printf("got Channel, declaring Exchange (%q)", exchange)
  err = channel.ExchangeDeclare(
    exchange,     // name of the exchange
    exchangeType, // type
    true,         // durable
    false,        // delete when complete
    false,        // internal
    false,        // noWait
    nil,          // arguments
  );
  failOnError(err, "Exchange Declare:")
  
    
  //創建一個queue
  q, err := channel.QueueDeclare(
      queueName, // name
      false,   // durable
      false,   // delete when unused
      true,   // exclusive 當Consumer關閉連接時,這個queue要被deleted 
      false,   // no-wait
      nil,     // arguments
  )
  failOnError(err, "Failed to declare a queue")

  //綁定到exchange
  err = channel.QueueBind(
    q.Name, // name of the queue
    key,        // bindingKey
    exchange,   // sourceExchange
    false,      // noWait
    nil,        // arguments
  );
  failOnError(err, "Failed to bind a queue")

  log.Printf("Queue bound to Exchange, starting Consume")
  //訂閱消息
  msgs, err := channel.Consume(
      q.Name, // queue
      "",     // consumer
      false,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  //創建一個channel
  forever := make(chan bool)

  //調用gorountine
  go func() {
      for d := range msgs {
        log.Printf(" [x] %s", d.Body)
      }
  }()

  log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

  //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出
  <-forever
}

查看結果


Console1(Consumer),接收所有的日志:

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_topic_logs.go "#"
2016/07/24 09:28:29 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 09:28:29 got Connection, getting Channel
2016/07/24 09:28:29 got Channel, declaring Exchange ("test-idoall-exchange-topic-logs")
2016/07/24 09:28:29 Binding queue amq.gen-jW2-PIBg4izXpt96CynyFw to exchange test-idoall-exchange-topic-logs with routing key #
2016/07/24 09:28:29 Queue bound to Exchange, starting Consume
2016/07/24 09:28:29  [*] Waiting for messages. To exit press CTRL+C

Console2(Consumer),接收以"kern"開頭的日志:

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_topic_logs.go "kern.*"
2016/07/24 09:34:00 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 09:34:00 got Connection, getting Channel
2016/07/24 09:34:00 got Channel, declaring Exchange ("test-idoall-exchange-topic-logs")
2016/07/24 09:34:00 Binding queue amq.gen-8zYBz2uXYbWXcItJMZ3AQA to exchange test-idoall-exchange-topic-logs with routing key kern.*
2016/07/24 09:34:00 Queue bound to Exchange, starting Consume
2016/07/24 09:34:00  [*] Waiting for messages. To exit press CTRL+C

Console3(Consumer),接收第二個單詞以"critical"結尾的日志:

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_topic_logs.go "*.critical"
2016/07/24 09:37:21 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 09:37:21 got Connection, getting Channel
2016/07/24 09:37:21 got Channel, declaring Exchange ("test-idoall-exchange-topic-logs")
2016/07/24 09:37:21 Binding queue amq.gen-tq9QsD1i1mCps-jrqDtTTA to exchange test-idoall-exchange-topic-logs with routing key *.critical
2016/07/24 09:37:21 Queue bound to Exchange, starting Consume
2016/07/24 09:37:21  [*] Waiting for messages. To exit press CTRL+C

Console4(Consumer), 可以創建多個綁定關系:

lion@node1:~/_code/_rabbitmq/_golang$ go run consumer_exchange_topic_logs.go "kern.critical" "A critical kernel error"
2016/07/24 09:39:35 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 09:39:35 got Connection, getting Channel
2016/07/24 09:39:35 got Channel, declaring Exchange ("test-idoall-exchange-topic-logs")
2016/07/24 09:39:35 Binding queue amq.gen-vcaHyCor5bbB2NX7YQhmzA to exchange test-idoall-exchange-topic-logs with routing key kern.critical
2016/07/24 09:39:35 Binding queue amq.gen-vcaHyCor5bbB2NX7YQhmzA to exchange test-idoall-exchange-topic-logs with routing key A critical kernel error
2016/07/24 09:39:35 Queue bound to Exchange, starting Consume
2016/07/24 09:39:35  [*] Waiting for messages. To exit press CTRL+C

使用Producer來發送消息:

lion@node1:~/_code/_rabbitmq/_golang$ go run producer_exchange_topic_logs.go "kern.critical" "A critical kernel error"
2016/07/24 09:56:33 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 09:56:33 got Connection, getting Channel
2016/07/24 09:56:33 got Channel, declaring "topic" Exchange ("test-idoall-exchange-topic-logs")
2016/07/24 09:56:33 declared queue, publishing 23B body ("A critical kernel error")
2016/07/24 09:56:33  [x] Sent A critical kernel error
2016/07/24 09:56:33 published 23B OK

5.7、遠程調用RPC

之前的實例都是通過一個或多個Consumer來訂閱消息,如果我們需要在遠程機器上運行一個函數,來等待結果呢?這是一個不同的場景,例如做雲計算。

AMQP協議預定義了14個屬性,大多數我們都很少用到,以下幾個是比較常用的。

  • persistent:消息持久性
  • content_type:用來描述編碼的MIME類型
  • reply_to:回調queue的名字
  • correlation_id:將遠程RPC請求,進行關聯的唯一標識

correlation_id

如果為每個RPC的請求創建一個queue效率是非常低的,正常發送到queue的一個Message,它不知道是從哪里發過來的,而correlation_id屬性的存在就是為每個請求設置一個唯一值,在回調接收消息的時候,也會帶回這個屬性進行匹配,如果不匹配,這個消息就不會被處理。

接下來我們將使用RabbitMQ搭建一個RPC系統:一個客戶端和一個可擴展的RPC服務器,RPC的工作流程如下:

  • 客戶端啟動時,創建一個匿名的exclusive callback queue
  • 客戶端發送請求時,要帶兩個屬性reply_to(設置回調的queue)和correlation_id(唯一標識)
  • 將請求發送到一個RPC queue
  • RPC的server端 ,一直在等待請求,當消息到達時會對過reply_to回復到指定的queue
  • 客戶端在等queue從server的回調,檢查 correlation_id是否一致,如果和請求時發送的一致,則做其他響應。

idoall.org

rpc_server.go(服務端代碼):

package main

import (
  "fmt"
  "log"
  "strconv"
  "github.com/streadway/amqp"
)


const (
  //AMQP URI
  uri          =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP queue name
  queueName =  "rpc-queue"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}


func main(){
  //調用發布消息函數
  publish(uri, queueName)
}

//發布者的方法
//
//@amqpURI, amqp的地址
//@queue, queue的名稱
func publish(amqpURI string, queue string){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()
  
 
  //創建一個queue
  log.Printf("got queue, declaring %q", queue)
  q,err := channel.QueueDeclare(
          queue, // name
          false,       // durable
          false,       // delete when usused
          false,       // exclusive
          false,       // no-wait
          nil,         // arguments
  )
  failOnError(err, "Failed to declare a queue")

  //均衡處理,每次處理一條消息
  err = channel.Qos(
          1,     // prefetch count
          0,     // prefetch size
          false, // global
  )
  failOnError(err, "Failed to set QoS")

  //訂閱一個消息
  //log.Printf("Queue bound to Exchange, starting Consume")
  msgs, err := channel.Consume(
          q.Name, // queue
          "",     // consumer
          false,  // auto-ack
          false,  // exclusive
          false,  // no-local
          false,  // no-wait
          nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  forever := make(chan bool)

  // 發布消息
  go func() {
          for d := range msgs {
                  n, err := strconv.Atoi(string(d.Body))
                  failOnError(err, "Failed to convert body to integer")

                  log.Printf(" [.] server端接收到的數據是 (%d)", n)
                  response := n*2

                  err = channel.Publish(
                          "",        // exchange
                          d.ReplyTo, // routing key
                          false,     // mandatory
                          false,     // immediate
                          amqp.Publishing{
                                  ContentType:   "text/plain",
                                  CorrelationId: d.CorrelationId,
                                  Body:          []byte(strconv.Itoa(response)),
                          })
                  failOnError(err, "Failed to publish a message")

                  d.Ack(false)
          }
  }()

  log.Printf(" [*] Awaiting RPC requests")

  //沒有寫入數據,一直等待讀,阻塞當前線程,目的是讓線程不退出
  <-forever
}

consumer_exchange_topic_logs.go(消息消費者):

package main

import (
  "fmt"
  "log"
  "math/rand"
  "os"
  "strconv"
  "strings"
  "time"
  "github.com/streadway/amqp"
)

const (
  //AMQP URI
  uri           =  "amqp://guest:guest@localhost:5672/"
  //Durable AMQP exchange name
  exchangeName =  ""
  //Exchange type - direct|fanout|topic|x-custom
  queueName = "rpc-queue"
)

//如果存在錯誤,則輸出
func failOnError(err error, msg string) {
  if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
  }
}

func randomString(l int) string {
        bytes := make([]byte, l)
        for i := 0; i < l; i++ {
                bytes[i] = byte(randInt(65, 90))
        }
        return string(bytes)
}

func randInt(min int, max int) int {
        return min + rand.Intn(max-min)
}

func bodyFrom(args []string) int {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "30"
        } else {
                s = strings.Join(args[1:], " ")
        }
        n, err := strconv.Atoi(s)
        failOnError(err, "Failed to convert arg to integer")
        return n
}

func main(){

    rand.Seed(time.Now().UTC().UnixNano())

    n := bodyFrom(os.Args)

    log.Printf(" [x] 請求的數據是(%d)", n)
    res, err := fibonacciRPC(n, uri, exchangeName, queueName)
    failOnError(err, "Failed to handle RPC request")

    log.Printf(" [.] 計算結果為 %d", res)
}

//RPC client調用方法
//
//@amqpURI, amqp的地址
//@exchange, exchange的名稱
//@queue, queue的名稱
func fibonacciRPC(n int, amqpURI string, exchange string, queue string) (res int, err error){
  //建立連接
  log.Printf("dialing %q", amqpURI)
  connection, err := amqp.Dial(amqpURI)
  failOnError(err, "Failed to connect to RabbitMQ")
  defer connection.Close()
  
  //創建一個Channel
  log.Printf("got Connection, getting Channel")
  channel, err := connection.Channel()
  failOnError(err, "Failed to open a channel")
  defer channel.Close()

    
  //創建一個queue
  log.Printf("got queue, declaring %q", queue)
  q,err := channel.QueueDeclare(
          "", // name
          false,       // durable
          false,       // delete when usused
          true,       // exclusive
          false,       // no-wait
          nil,         // arguments
  )
  failOnError(err, "Failed to declare a queue")


  log.Printf("Queue bound to Exchange, starting Consume")
  //訂閱消息
  msgs, err := channel.Consume(
      q.Name, // queue
      "",     // consumer
      true,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
  )
  failOnError(err, "Failed to register a consumer")

  corrId := randomString(32)


  err = channel.Publish(
          "",          // exchange
          queue, // routing key
          false,       // mandatory
          false,       // immediate
          amqp.Publishing{
                  ContentType:   "text/plain",
                  CorrelationId: corrId,
                  ReplyTo:       q.Name,
                  Body:          []byte(strconv.Itoa(n)),
          })
  failOnError(err, "Failed to publish a message")

  for d := range msgs {
          if corrId == d.CorrelationId {
                  res, err = strconv.Atoi(string(d.Body))
                  failOnError(err, "Failed to convert body to integer")
                  break
          }
  }

  return
}

查看結果


Console1(rpc server):

lion@node1:~/_code/_rabbitmq/_golang$ go run rpc_server.go
2016/07/24 11:20:32 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 11:20:32 got Connection, getting Channel
2016/07/24 11:20:32 got queue, declaring "rpc-queue"
2016/07/24 11:20:32  [*] Awaiting RPC requests

Console2(rpc client):

lion@node1:~/_code/_rabbitmq/_golang$ go run rpc_client.go 69
2016/07/24 11:24:37  [x] 請求的數據是(69)
2016/07/24 11:24:37 dialing "amqp://guest:guest@localhost:5672/"
2016/07/24 11:24:37 got Connection, getting Channel
2016/07/24 11:24:37 got queue, declaring "rpc-queue"
2016/07/24 11:24:37 Queue bound to Exchange, starting Consume
2016/07/24 11:24:37  [.] 計算結果為 138

以上只是簡單實現了RPC的功能,如果你有復雜的需求,需要根據需求對Server和Client做調整。

6、寫在后面

業界對於消息傳輸有很多種方案,之前我們也介紹過KafkaKafka是Linkedin於2010年12月份開源的消息發布訂閱系統,它主要用於處理活躍的流式數據,大數據量的數據處理上。RabbitMQ在吞吐量方面稍遜於kafka,他們的出發點不一樣,RabbitMQ支持對消息的可靠的傳遞,支持事務,不支持批量的操作。

RabbitMQ的消息應當盡可能的小,並且只用來處理實時且要高可靠性的消息。消費者和生產者的能力盡量對等,否則消息堆積會嚴重影響RabbitMQ的性能。

7、參考資料

http://www.rabbitmq.com/getstarted.html

https://github.com/streadway/amqp

8、FAQ

安裝Erlang過程中出現提示configure: error: No curses library functions found

因為缺少缺少ncurses安裝包,執行以下命令,即可解決:

lion@node1:~/$ sudo apt-get install libncurses5-dev

博文作者:迦壹 博客地址:[Ubuntu14.04+RabbitMQ3.6.3+Golang的最佳實踐](http://idoall.org/blog/post/lion/14) 轉載聲明:可以轉載, 但必須以超鏈接形式標明文章原始出處和作者信息及版權聲明,謝謝合作!



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM