在CentOS安裝
Erlang的安裝
因Rabbit MQ使用Erlang,所以需要先安裝Erlang,安裝過程中可能會遇到種種問題,可參考CentOS 6.5安裝Erlang/OTP 17.0。
Erlang可在Erlang Solutions下載,我安裝的是esl-erlang_19.0~centos~6_amd64.rpm
,個人是通過yum
安裝的:yum install esl-erlang_19.0~centos~6_amd64.rpm
。
安裝完測試是否成功:
[root@blog third_package]# erl
Erlang/OTP 19 [erts-8.0] [source-790c521] [64-bit] [async-threads:10] [hipe] [kernel-poll:false]
Eshell V8.0 (abort with ^G)
RabbitMQ的安裝
然后就可以安裝MQ了。
[root@blog third_package]# rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
我這里遇到一個異常情況,系統中安裝了esl-erlang_19.0~centos~6_amd64.rpm
,並通過erl
測試成功,但RabbitMQ仍然關聯不上,報一下錯誤:
error: Failed dependencies:
erlang >= R16B-03 is needed by rabbitmq-server-3.6.5-1.noarch
我這里選擇忽略依賴的安裝:
[root@blog third_package]# rpm -ivh --nodeps rabbitmq-server-3.6.5-1.noarch.rpm
Preparing... ########################################### [100%]
1:rabbitmq-server ########################################### [100%]
后續開啟控制台后,在控制台頁面看到RabbitMQ實際關聯上了Erlang 19.0
:
啟動與停止
啟動:
[root@blog ~]# rabbitmq-server start &
[1] 2349
[root@blog ~]#
RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /var/log/rabbitmq/rabbit@blog.log
###### ## /var/log/rabbitmq/rabbit@blog-sasl.log
##########
Starting broker...
completed with 0 plugins.
查看進程:
[root@blog ~]# ps -ef | grep rabbitmq
root 3955 32602 0 13:46 pts/1 00:00:00 /bin/sh /usr/sbin/rabbitmq-server start
root 3965 3955 0 13:46 pts/1 00:00:00 su rabbitmq -s /bin/sh -c /usr/lib/rabbitmq/bin/rabbitmq-server 'start'
rabbitmq 3966 3965 0 13:46 ? 00:00:00 /bin/sh -e /usr/lib/rabbitmq/bin/rabbitmq-server start
rabbitmq 4061 1 0 13:46 ? 00:00:00 /usr/lib/erlang/erts-8.0/bin/epmd -daemon
rabbitmq 4077 3966 6 13:46 ? 00:00:00 /usr/lib/erlang/erts-8.0/bin/beam -W w -A 64 -P 1048576 -t 5000000 -stbt db -K true -B i -- -root /usr/lib/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin -noshell -noinput -s rabbit boot -sname rabbit@blog -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit error_logger {file,"/var/log/rabbitmq/rabbit@blog.log"} -rabbit sasl_error_logger {file,"/var/log/rabbitmq/rabbit@blog-sasl.log"} -rabbit enabled_plugins_file "/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/plugins" -rabbit plugins_expand_dir "/var/lib/rabbitmq/mnesia/rabbit@blog-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/var/lib/rabbitmq/mnesia/rabbit@blog" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672 start
rabbitmq 4166 4077 0 13:46 ? 00:00:00 erl_child_setup 65535
rabbitmq 4173 4166 0 13:46 ? 00:00:00 inet_gethost 4
rabbitmq 4174 4173 0 13:46 ? 00:00:00 inet_gethost 4
root 4180 32602 0 13:47 pts/1 00:00:00 grep rabbitmq
查看狀態:
[root@blog ~]# rabbitmqctl status
Status of node rabbit@blog ...
[{pid,4077},
{running_applications,[{rabbit,"RabbitMQ","3.6.5"},
{os_mon,"CPO CXC 138 46","2.4.1"},
{rabbit_common,[],"3.6.5"},
{ranch,"Socket acceptor pool for TCP protocols.",
"1.2.1"},
{xmerl,"XML parser","1.3.11"},
{mnesia,"MNESIA CXC 138 12","4.14"},
{sasl,"SASL CXC 138 11","3.0"},
{stdlib,"ERTS CXC 138 10","3.0"},
{kernel,"ERTS CXC 138 10","5.0"}]},
{os,{unix,linux}},
{erlang_version,"Erlang/OTP 19 [erts-8.0] [source-790c521] [64-bit] [async-threads:64] [hipe] [kernel-poll:true]\n"},
{memory,[{total,43961024},
{connection_readers,0},
{connection_writers,0},
{connection_channels,0},
{connection_other,0},
{queue_procs,2688},
{queue_slave_procs,0},
{plugins,0},
{other_proc,18589480},
{mnesia,57432},
{mgmt_db,0},
{msg_index,38392},
{other_ets,926384},
{binary,19600},
{code,17725357},
{atom,752561},
{other_system,5849130}]},
{alarms,[]},
{listeners,[{clustering,25672,"::"},{amqp,5672,"0.0.0.0"}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,787311820},
{disk_free_limit,50000000},
{disk_free,36167917568},
{file_descriptors,[{total_limit,65435},
{total_used,2},
{sockets_limit,58889},
{sockets_used,0}]},
{processes,[{limit,1048576},{used,137}]},
{run_queue,0},
{uptime,58},
{kernel,{net_ticktime,60}}]
停止:
[root@blog ~]# rabbitmqctl stop
Stopping and halting node rabbit@blog ...
Gracefully halting Erlang VM
Java客戶端連接
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.0</version>
</dependency>
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private final static String QUEUE_NAME = "hello world";
public static void main(String[] argv) throws Exception {
Connection connection = null;
Channel channel = null;
try {
/* 創建連接工廠 */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.101");
factory.setPort(5672);
factory.setUsername("nicchagil");
factory.setPassword("123456");
/* 創建連接 */
connection = factory.newConnection();
/* 創建信道 */
channel = connection.createChannel();
// 聲明一個隊列:名稱、持久性的(重啟仍存在此隊列)、非私有的、非自動刪除的
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "hello world..."; // 需發送的信息
/* 發送消息,使用默認的direct交換器 */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("Send message -> " + message);
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
/* 關閉連接、通道 */
channel.close();
connection.close();
System.out.println("Closed the channel and conn.");
}
}
}
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Customer {
private final static String QUEUE_NAME = "hello world";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException, TimeoutException {
/* 創建連接工廠 */
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.101");
factory.setPort(5672);
factory.setUsername("nicchagil");
factory.setPassword("123456");
/* 創建連接 */
Connection connection = factory.newConnection();
/* 創建信道 */
Channel channel = connection.createChannel();
// 聲明一個隊列:名稱、持久性的(重啟仍存在此隊列)、非私有的、非自動刪除的
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("Waiting for messages.");
/* 定義消費者 */
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received the message -> " + message);
}
};
// 將消費者綁定到隊列,並設置自動確認消息(即無需顯示確認,如何設置請慎重考慮)
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消息持久化
為了避免RabbitMQ重啟或宕機導致消息丟失,我們需要設置消息持久性。
而設置消息持久性,主要分兩步:
- 設置隊列是持久的
- 發送消息時,設置消息是持久的
設置隊列是持久的:
// 聲明一個隊列:名稱、持久性的(重啟仍存在此隊列)、非私有的、非自動刪除的
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
發送消息時,設置消息是持久的:
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
RabbitMQ、Erlang之間的版本關系說明
安裝RabbitMQ之前,要安裝Erlang,而Erlang與RabbitMQ之間是有版本關系的,見 Erlang Versions Required and Supported by RabbitMQ
所以,安裝rabbitmq-server-3.5.7
,使用yum install rabbitmq-server-3.5.7-1.noarch.rpm
即可。
RabbitMQ的集群
RabbitMQ配置集群的文章可見Clustering Guide。這里只簡單介紹一下,因環境受限,只部署兩個實例組成集群。
這種集群方式,默認同步節點的元數據,比如隊列元數據
、交換器元數據
、綁定元數據
、vhost元數據
,但默認不同步隊列的內容
。
- 現以單節點方式啟動各節點:
rabbitmq-server -detached
- 啟動后可查看集群狀態:
rabbitmqctl cluster_status
,不出意外,這里只能看到單一節點的集群 - 現關閉節點2的應用,以准備加入節點1的集群:
rabbitmqctl stop_app
- 加入節點1的集群:
rabbitmqctl join_cluster rabbit@節點機器
,當然節點2可能並不認識節點1,可在/etc/hosts
加入IP與HOST的映射 - 啟動節點2的應用:
rabbitmqctl start_app
,如無意外,2個節點已形成集群,你在任意節點操作元數據,在另外的節點可體現 - 可再次查看確認集群狀態:
rabbitmqctl cluster_status
可以用rabbitmqctl stop_app
或rabbitmqctl stop
模擬節點故障而宕掉,當故障節點重新啟動后可見故障期間改動的元數據體現在重新啟動的故障節點上。
插件與Web控制台
[root@blog ~]# rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management
Applying plugin configuration to rabbit@ebadb-pdwy... started 6 plugins.
開啟后,可以在http://xx.xx.xx.xx:15672訪問控制台。如果你在其他機器訪問控制台,有可能出現訪問不了的情況,試試將Linus防火牆設置一下。
以前的版本通過賬號/密碼:guest/guest就可以登錄訪問,安裝新版本后就不行了,所以自己新建用戶,並授予角色和權限,具體可參考這篇文章:rabbitmq的web管理界面無法使用guest用戶登錄
rabbitmqctl add_user nicchagil 123456 # 常見用戶
rabbitmqctl set_user_tags nicchagil administrator # 授予administrator角色給nicchagil
rabbitmqctl set_permissions -p / nicchagil '.*' '.*' '.*' # 授予權限
可查看當前插件:
[root@blog ~]# rabbitmq-plugins list
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@blog
|/
[e*] amqp_client 3.6.5
[ ] cowboy 1.0.3
[ ] cowlib 1.0.1
[e*] mochiweb 2.13.1
[ ] rabbitmq_amqp1_0 3.6.5
[ ] rabbitmq_auth_backend_ldap 3.6.5
[ ] rabbitmq_auth_mechanism_ssl 3.6.5
[ ] rabbitmq_consistent_hash_exchange 3.6.5
[ ] rabbitmq_event_exchange 3.6.5
[ ] rabbitmq_federation 3.6.5
[ ] rabbitmq_federation_management 3.6.5
[ ] rabbitmq_jms_topic_exchange 3.6.5
[E*] rabbitmq_management 3.6.5
[e*] rabbitmq_management_agent 3.6.5
[ ] rabbitmq_management_visualiser 3.6.5
[ ] rabbitmq_mqtt 3.6.5
[ ] rabbitmq_recent_history_exchange 1.2.1
[ ] rabbitmq_sharding 0.1.0
[ ] rabbitmq_shovel 3.6.5
[ ] rabbitmq_shovel_management 3.6.5
[ ] rabbitmq_stomp 3.6.5
[ ] rabbitmq_top 3.6.5
[ ] rabbitmq_tracing 3.6.5
[ ] rabbitmq_trust_store 3.6.5
[e*] rabbitmq_web_dispatch 3.6.5
[ ] rabbitmq_web_stomp 3.6.5
[ ] rabbitmq_web_stomp_examples 3.6.5
[ ] sockjs 0.3.4
[e*] webmachine 1.10.3
當然也可禁用插件:
[root@blog ~]# rabbitmq-plugins disable rabbitmq_management
The following plugins have been disabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management
Applying plugin configuration to rabbit@blog... stopped 6 plugins.
消息跟蹤,Message Tracing
在開發和測試階段,有時候需跟蹤消息的生產與消費,我們可開啟Message Tracing
。
開啟Message Tracing
插件:rabbitmq-plugins enable rabbitmq_tracing
,然后Web控制台的Admin
-> Tracing
可進入消息跟蹤
頁面,在此頁面可以新增一個trace,然后通過命令rabbitmqctl trace_on
開啟消息跟蹤,如果有消息的生產和消費,就會記錄在一個日志文件中。
需注意,RabbitMQ重啟后,你新建的trace會消失。
更多說明見Firehose Tracer。
修改MQ默認配置
生成並編輯/etc/rabbitmq/rabbitmq.config即可:
mkdir -p /etc/rabbitmq
cp /usr/share/doc/rabbitmq-server-3.6.11/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
修改MQ監聽端口
編輯tcp_listeners塊,具體配置見Networking and RabbitMQ:
{tcp_listeners, [{"xx.xx.xx.xx", xxxx}]}
修改Web控制台端口
編輯rabbitmq_management塊:
{rabbitmq_management,
[
{listener, [{port, xxxx},
{ip, "xx.xx.xx.xx"},
{ssl, false}]}
]},