1、安裝RabbitMQ
1)下載和安裝erlang
下載erlang
wget http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el6.x86_64.rpm
安裝erlang,root用戶使用rpm安裝
rpm -ihv erlang-18.1-1.el6.x86_64.rpm
2)下載和安裝RabbitMQ
下載RabbitMQ
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/rabbitmq_v3_6_12/rabbitmq-server-3.6.12-1.el6.noarch.rpm
安裝RabbitMQ,root用戶使用rpm安裝
rpm -ihv rabbitmq-server-3.6.12-1.el6.noarch.rpm
一般來說不會有什么問題,如果安裝RabbitMQ過程中遇到如下錯誤,清空rpmdb然后重試。
我遇到的問題如下是
[root@bigdata-arch-client11 yangfan]# rpm -ihv erlang-18.1-1.el6.x86_64.rpm rpmdb: Thread/process 72423/139858093815712 failed: Thread died in Berkeley DB library error: db3 error(-30974) from dbenv->failchk: DB_RUNRECOVERY: Fatal error, run database recovery error: cannot open Packages index using db3 - (-30974) error: cannot open Packages database in /var/lib/rpm rpmdb: Thread/process 72423/139858093815712 failed: Thread died in Berkeley DB library error: db3 error(-30974) from dbenv->failchk: DB_RUNRECOVERY: Fatal error, run database recovery error: cannot open Packages database in /var/lib/rpm
百度了一下,請按順序執行,然后重試安裝。
rm -f /var/lib/rpm/__db* rpm --rebuilddb yum clean all
當安裝完成之后,可以使用缺省配置啟動一下,如果打印如下,那么安裝就成功了。
[root@bigdata-arch-client11 yangfan]# rabbitmq-server
RabbitMQ 3.6.12. Copyright (C) 2007-2017 Pivotal Software, Inc. ## ## Licensed under the MPL. See http://www.rabbitmq.com/ ## ## ########## Logs: /var/log/rabbitmq/rabbit@bigdata-arch-client11.log ###### ## /var/log/rabbitmq/rabbit@bigdata-arch-client11-sasl.log ########## Starting broker... completed with 0 plugins.
2、配置RabbitMQ
1)創建RabbitMQ賬號
rabbitmqctl add_user admin bigdata123
[root@bigdata-arch-client11 yangfan]# rabbitmqctl add_user admin bigdata123
Creating user "admin"
2)將admin賬號賦予管理員權限
rabbitmqctl set_user_tags admin administrator
[root@bigdata-arch-client11 yangfan]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator]
3)設置權限
rabbitmqctl set_permissions -p '/' admin '.' '.' '.'
[root@bigdata-arch-client09 ~]# rabbitmqctl set_permissions -p '/' admin '.' '.' '.'
Setting permissions for user "admin" in vhost "/"
4)啟用web管理插件
rabbitmq-plugins enable rabbitmq_management
[root@bigdata-arch-client11 yangfan]# rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
amqp_client
cowlib
cowboy
rabbitmq_web_dispatch
rabbitmq_management_agent
rabbitmq_management
Applying plugin configuration to rabbit@bigdata-arch-client11... started 6 plugins.
這樣你可以通過web頁面觀察rabbitmq的status,端口號是15672,例如http://ip:15672
3、配置RabbitMQ集群
我們這里會展示如何配置一個RabbitMQ集群,集群由以下節點組成。
要保證集群在同一個局域網,IP能通。
1)安裝好RabbitMQ
安裝方法同上文。
2)保證相同的Erlang Cookie
我這里是把client09上的.erlang.cookie以scp的方式拷貝到另外兩台機器。
[root@bigdata-arch-client09 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@10.93.18.34:/var/lib/rabbitmq [root@bigdata-arch-client09 ~]# scp /var/lib/rabbitmq/.erlang.cookie rootr@10.93.21.21:/var/lib/rabbitmq
3)運行各個RabbitMQ節點
rabbitmqctl stop
rabbitmq-server -detached
運行成功后可以查看一下節點當前的集群狀態,當然這個時候還沒有組成集群。
[root@bigdata-arch-client09 ~]# rabbitmqctl cluster_status [root@bigdata-arch-client10 ~]# rabbitmqctl cluster_status [root@bigdata-arch-client11 ~]# rabbitmqctl cluster_status
4)將節點連接成集群
client10:加入到集群rabbit@bigdata-arch-client09
[root@bigdata-arch-client10 ~]#rabbitmqctl stop_app
[root@bigdata-arch-client10 ~]#rabbitmqctl join_cluster rabbit@bigdata-arch-client09
[root@bigdata-arch-client10 ~]#rabbitmqctl start_app
client11:加入到集群rabbit@bigdata-arch-client09
[root@bigdata-arch-client11 ~]#rabbitmqctl stop_app [root@bigdata-arch-client11 ~]#rabbitmqctl join_cluster rabbit@bigdata-arch-client09 [root@bigdata-arch-client11 ~]#rabbitmqctl start_app
client09:不用加入自己
查看集群狀態,我們可以在任意一台機器上查看,我們選擇在client09上看。
[root@bigdata-arch-client09 ~]# rabbitmqctl cluster_status Cluster status of node 'rabbit@bigdata-arch-client09' [{nodes,[{disc,['rabbit@bigdata-arch-client09', 'rabbit@bigdata-arch-client11', 'rabbit@bigdata-arch-client10']}]}, {running_nodes,['rabbit@bigdata-arch-client10', 'rabbit@bigdata-arch-client11', 'rabbit@bigdata-arch-client09']}, {cluster_name,<<"rabbit@bigdata-arch-client09.xg01">>}, {partitions,[]}, {alarms,[{'rabbit@bigdata-arch-client10',[]}, {'rabbit@bigdata-arch-client09',[]}, {'rabbit@bigdata-arch-client11', []}]}]
可以看到,3個實例已經組成了集群。
5)試一下容錯
我們關掉client10上的實例
[root@bigdata-arch-client10 ~]# rabbitmqctl stop
然后我們再看集群情況
[root@bigdata-arch-client09 ~]# rabbitmqctl cluster_status Cluster status of node 'rabbit@bigdata-arch-client09' [{nodes,[{disc,['rabbit@bigdata-arch-client09', 'rabbit@bigdata-arch-client11']}]}, {running_nodes,['rabbit@bigdata-arch-client11', 'rabbit@bigdata-arch-client09']}, {cluster_name,<<"rabbit@bigdata-arch-client11.xg01">>}, {partitions,[]}, {alarms,[{'rabbit@bigdata-arch-client11',[]}, {'rabbit@bigdata-arch-client09',[]}]}]
可以發現client10已經成功摘除。
4、HA配置
我們使用haproxy來代理配置高可用。
haproxy可以用來做代理,進行負載均衡和backend探活。支持TCP和HTTP模式。
關於haproxy的內容就不展開說了。
這里僅僅給出配置。
########tcp配置################# listen rabbitmq bind 10.93.21.21:5077 mode tcp option tcplog #日志類別,采用tcplog maxconn 4086 #log 127.0.0.1 local0 debug server rabbit1 10.93.18.34:5672 maxconn 1024 weight 1 check inter 2000 rise 2 fall 3 server rabbit2 10.93.18.35:5672 maxconn 1024 weight 1 check inter 2000 rise 2 fall 3 server rabbit3 10.93.21.21:5672 maxconn 1024 weight 1 check inter 2000 rise 2 fall 3
實驗一下,下面是實驗驗證的程序,你可以掛掉一個實例試試。
send.py
# -*- coding:utf-8 -*- import pika
credentials = pika.PlainCredentials('admin','bigdata123') connection = pika.BlockingConnection(pika.ConnectionParameters( '10.93.21.21',5077, '/', credentials)) channel = connection.channel() # 聲明queue channel.queue_declare(queue='balance') # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. channel.basic_publish(exchange='', routing_key='balance', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
receive.py
# _*_coding:utf-8_*_ import pika credentials = pika.PlainCredentials('admin','bigdata123') connection = pika.BlockingConnection(pika.ConnectionParameters( '10.93.21.21',5077,'/',credentials)) channel = connection.channel() # You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program # was run before. But we're not yet sure which program to run first. In such cases it's a good # practice to repeat declaring the queue in both programs. channel.queue_declare(queue='balance') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='balance', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()