【RabbitMQ】CentOS安裝RabbitMQ,及簡單的Java客戶端連接


在CentOS安裝

Erlang的安裝

因Rabbit MQ使用Erlang,所以需要先安裝Erlang,安裝過程中可能會遇到種種問題,可參考CentOS 6.5安裝Erlang/OTP 17.0
Erlang可在Erlang Solutions下載,我安裝的是esl-erlang_19.0~centos~6_amd64.rpm,個人是通過yum安裝的:yum install esl-erlang_19.0~centos~6_amd64.rpm

安裝完測試是否成功:

[root@blog third_package]# erl
Erlang/OTP 19 [erts-8.0] [source-790c521] [64-bit] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V8.0  (abort with ^G)

RabbitMQ的安裝

然后就可以安裝MQ了。

[root@blog third_package]# rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

我這里遇到一個異常情況,系統中安裝了esl-erlang_19.0~centos~6_amd64.rpm,並通過erl測試成功,但RabbitMQ仍然關聯不上,報一下錯誤:

error: Failed dependencies:
        erlang >= R16B-03 is needed by rabbitmq-server-3.6.5-1.noarch

我這里選擇忽略依賴的安裝:

[root@blog third_package]# rpm -ivh --nodeps rabbitmq-server-3.6.5-1.noarch.rpm
Preparing...                ########################################### [100%]
   1:rabbitmq-server        ########################################### [100%]

后續開啟控制台后,在控制台頁面看到RabbitMQ實際關聯上了Erlang 19.0

啟動與停止

啟動:

[root@blog ~]# rabbitmq-server start &
[1] 2349
[root@blog ~]# 
              RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /var/log/rabbitmq/rabbit@blog.log
  ######  ##        /var/log/rabbitmq/rabbit@blog-sasl.log
  ##########
              Starting broker...
 completed with 0 plugins.

查看進程:

[root@blog ~]# ps -ef | grep rabbitmq
root      3955 32602  0 13:46 pts/1    00:00:00 /bin/sh /usr/sbin/rabbitmq-server start
root      3965  3955  0 13:46 pts/1    00:00:00 su rabbitmq -s /bin/sh -c /usr/lib/rabbitmq/bin/rabbitmq-server  'start'
rabbitmq  3966  3965  0 13:46 ?        00:00:00 /bin/sh -e /usr/lib/rabbitmq/bin/rabbitmq-server start
rabbitmq  4061     1  0 13:46 ?        00:00:00 /usr/lib/erlang/erts-8.0/bin/epmd -daemon
rabbitmq  4077  3966  6 13:46 ?        00:00:00 /usr/lib/erlang/erts-8.0/bin/beam -W w -A 64 -P 1048576 -t 5000000 -stbt db -K true -B i -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin -noshell -noinput -s rabbit boot -sname rabbit@blog -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit error_logger {file,"/var/log/rabbitmq/rabbit@blog.log"} -rabbit sasl_error_logger {file,"/var/log/rabbitmq/rabbit@blog-sasl.log"} -rabbit enabled_plugins_file "/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/plugins" -rabbit plugins_expand_dir "/var/lib/rabbitmq/mnesia/rabbit@blog-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/var/lib/rabbitmq/mnesia/rabbit@blog" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672 start
rabbitmq  4166  4077  0 13:46 ?        00:00:00 erl_child_setup 65535
rabbitmq  4173  4166  0 13:46 ?        00:00:00 inet_gethost 4
rabbitmq  4174  4173  0 13:46 ?        00:00:00 inet_gethost 4
root      4180 32602  0 13:47 pts/1    00:00:00 grep rabbitmq

查看狀態:

[root@blog ~]# rabbitmqctl status
Status of node rabbit@blog ...
[{pid,4077},
 {running_applications,[{rabbit,"RabbitMQ","3.6.5"},
                        {os_mon,"CPO  CXC 138 46","2.4.1"},
                        {rabbit_common,[],"3.6.5"},
                        {ranch,"Socket acceptor pool for TCP protocols.",
                               "1.2.1"},
                        {xmerl,"XML parser","1.3.11"},
                        {mnesia,"MNESIA  CXC 138 12","4.14"},
                        {sasl,"SASL  CXC 138 11","3.0"},
                        {stdlib,"ERTS  CXC 138 10","3.0"},
                        {kernel,"ERTS  CXC 138 10","5.0"}]},
 {os,{unix,linux}},
 {erlang_version,"Erlang/OTP 19 [erts-8.0] [source-790c521] [64-bit] [async-threads:64] [hipe] [kernel-poll:true]\n"},
 {memory,[{total,43961024},
          {connection_readers,0},
          {connection_writers,0},
          {connection_channels,0},
          {connection_other,0},
          {queue_procs,2688},
          {queue_slave_procs,0},
          {plugins,0},
          {other_proc,18589480},
          {mnesia,57432},
          {mgmt_db,0},
          {msg_index,38392},
          {other_ets,926384},
          {binary,19600},
          {code,17725357},
          {atom,752561},
          {other_system,5849130}]},
 {alarms,[]},
 {listeners,[{clustering,25672,"::"},{amqp,5672,"0.0.0.0"}]},
 {vm_memory_high_watermark,0.4},
 {vm_memory_limit,787311820},
 {disk_free_limit,50000000},
 {disk_free,36167917568},
 {file_descriptors,[{total_limit,65435},
                    {total_used,2},
                    {sockets_limit,58889},
                    {sockets_used,0}]},
 {processes,[{limit,1048576},{used,137}]},
 {run_queue,0},
 {uptime,58},
 {kernel,{net_ticktime,60}}]

停止:

[root@blog ~]# rabbitmqctl stop
Stopping and halting node rabbit@blog ...
Gracefully halting Erlang VM

Java客戶端連接

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.0.0</version>
</dependency>
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    private final static String QUEUE_NAME = "hello world";

    public static void main(String[] argv) throws Exception {
        
        Connection connection = null;
        Channel channel = null;
        try {
            /* 創建連接工廠 */
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.1.101");
            factory.setPort(5672);
            factory.setUsername("nicchagil");
            factory.setPassword("123456");
            /* 創建連接 */
            connection = factory.newConnection();
            /* 創建信道 */
            channel = connection.createChannel();

            // 聲明一個隊列:名稱、持久性的(重啟仍存在此隊列)、非私有的、非自動刪除的
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            
            String message = "hello world..."; // 需發送的信息
            
            /* 發送消息,使用默認的direct交換器 */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("Send message -> " + message);
            
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        } finally {
            /* 關閉連接、通道 */
            channel.close();
            connection.close();
            System.out.println("Closed the channel and conn.");
        }

    }

}
import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Customer {

    private final static String QUEUE_NAME = "hello world";

    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException {

        /* 創建連接工廠 */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.1.101");
        factory.setPort(5672);
        factory.setUsername("nicchagil");
        factory.setPassword("123456");
        /* 創建連接 */
        Connection connection = factory.newConnection();
        /* 創建信道 */
        Channel channel = connection.createChannel();

        // 聲明一個隊列:名稱、持久性的(重啟仍存在此隊列)、非私有的、非自動刪除的
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println("Waiting for messages.");

        /* 定義消費者 */
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received the message -> " + message);
            }
        };
        
        // 將消費者綁定到隊列,並設置自動確認消息(即無需顯示確認,如何設置請慎重考慮)
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

消息持久化

為了避免RabbitMQ重啟或宕機導致消息丟失,我們需要設置消息持久性。

而設置消息持久性,主要分兩步:

  • 設置隊列是持久的
  • 發送消息時,設置消息是持久的

設置隊列是持久的:

// 聲明一個隊列:名稱、持久性的(重啟仍存在此隊列)、非私有的、非自動刪除的
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

發送消息時,設置消息是持久的:

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

RabbitMQ、Erlang之間的版本關系說明

安裝RabbitMQ之前,要安裝Erlang,而Erlang與RabbitMQ之間是有版本關系的,見 Erlang Versions Required and Supported by RabbitMQ
所以,安裝rabbitmq-server-3.5.7,使用yum install rabbitmq-server-3.5.7-1.noarch.rpm即可。

RabbitMQ的集群

RabbitMQ配置集群的文章可見Clustering Guide。這里只簡單介紹一下,因環境受限,只部署兩個實例組成集群。

這種集群方式,默認同步節點的元數據,比如隊列元數據交換器元數據綁定元數據vhost元數據,但默認不同步隊列的內容

  • 現以單節點方式啟動各節點:rabbitmq-server -detached
  • 啟動后可查看集群狀態:rabbitmqctl cluster_status,不出意外,這里只能看到單一節點的集群
  • 現關閉節點2的應用,以准備加入節點1的集群:rabbitmqctl stop_app
  • 加入節點1的集群:rabbitmqctl join_cluster rabbit@節點機器,當然節點2可能並不認識節點1,可在/etc/hosts加入IP與HOST的映射
  • 啟動節點2的應用:rabbitmqctl start_app,如無意外,2個節點已形成集群,你在任意節點操作元數據,在另外的節點可體現
  • 可再次查看確認集群狀態:rabbitmqctl cluster_status

可以用rabbitmqctl stop_apprabbitmqctl stop模擬節點故障而宕掉,當故障節點重新啟動后可見故障期間改動的元數據體現在重新啟動的故障節點上。

插件與Web控制台

[root@blog ~]# rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
  mochiweb
  webmachine
  rabbitmq_web_dispatch
  amqp_client
  rabbitmq_management_agent
  rabbitmq_management

Applying plugin configuration to rabbit@ebadb-pdwy... started 6 plugins.

開啟后,可以在http://xx.xx.xx.xx:15672訪問控制台。如果你在其他機器訪問控制台,有可能出現訪問不了的情況,試試將Linus防火牆設置一下。

以前的版本通過賬號/密碼:guest/guest就可以登錄訪問,安裝新版本后就不行了,所以自己新建用戶,並授予角色和權限,具體可參考這篇文章:rabbitmq的web管理界面無法使用guest用戶登錄

rabbitmqctl add_user nicchagil 123456 # 常見用戶
rabbitmqctl set_user_tags nicchagil administrator # 授予administrator角色給nicchagil
rabbitmqctl set_permissions -p / nicchagil '.*' '.*' '.*' # 授予權限

可查看當前插件:

[root@blog ~]# rabbitmq-plugins list
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status:   * = running on rabbit@blog
 |/
[e*] amqp_client                       3.6.5
[  ] cowboy                            1.0.3
[  ] cowlib                            1.0.1
[e*] mochiweb                          2.13.1
[  ] rabbitmq_amqp1_0                  3.6.5
[  ] rabbitmq_auth_backend_ldap        3.6.5
[  ] rabbitmq_auth_mechanism_ssl       3.6.5
[  ] rabbitmq_consistent_hash_exchange 3.6.5
[  ] rabbitmq_event_exchange           3.6.5
[  ] rabbitmq_federation               3.6.5
[  ] rabbitmq_federation_management    3.6.5
[  ] rabbitmq_jms_topic_exchange       3.6.5
[E*] rabbitmq_management               3.6.5
[e*] rabbitmq_management_agent         3.6.5
[  ] rabbitmq_management_visualiser    3.6.5
[  ] rabbitmq_mqtt                     3.6.5
[  ] rabbitmq_recent_history_exchange  1.2.1
[  ] rabbitmq_sharding                 0.1.0
[  ] rabbitmq_shovel                   3.6.5
[  ] rabbitmq_shovel_management        3.6.5
[  ] rabbitmq_stomp                    3.6.5
[  ] rabbitmq_top                      3.6.5
[  ] rabbitmq_tracing                  3.6.5
[  ] rabbitmq_trust_store              3.6.5
[e*] rabbitmq_web_dispatch             3.6.5
[  ] rabbitmq_web_stomp                3.6.5
[  ] rabbitmq_web_stomp_examples       3.6.5
[  ] sockjs                            0.3.4
[e*] webmachine                        1.10.3

當然也可禁用插件:

[root@blog ~]# rabbitmq-plugins disable rabbitmq_management
The following plugins have been disabled:
  mochiweb
  webmachine
  rabbitmq_web_dispatch
  amqp_client
  rabbitmq_management_agent
  rabbitmq_management

Applying plugin configuration to rabbit@blog... stopped 6 plugins.

消息跟蹤,Message Tracing

在開發和測試階段,有時候需跟蹤消息的生產與消費,我們可開啟Message Tracing
開啟Message Tracing插件:rabbitmq-plugins enable rabbitmq_tracing,然后Web控制台的Admin -> Tracing可進入消息跟蹤頁面,在此頁面可以新增一個trace,然后通過命令rabbitmqctl trace_on開啟消息跟蹤,如果有消息的生產和消費,就會記錄在一個日志文件中。
需注意,RabbitMQ重啟后,你新建的trace會消失。
更多說明見Firehose Tracer

修改MQ默認配置

生成並編輯/etc/rabbitmq/rabbitmq.config即可:

mkdir -p /etc/rabbitmq
cp /usr/share/doc/rabbitmq-server-3.6.11/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

修改MQ監聽端口

編輯tcp_listeners塊,具體配置見Networking and RabbitMQ

   {tcp_listeners, [{"xx.xx.xx.xx", xxxx}]}

修改Web控制台端口

編輯rabbitmq_management塊:

 {rabbitmq_management,
  [
   {listener, [{port,     xxxx},
               {ip,       "xx.xx.xx.xx"},
               {ssl,      false}]}
  ]},

參考的文章


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM