構建參考: [ Rabbitmq cluster setup with HAproxy ] [ python demo ]
RabbitMQ Cluster 遇到的問題
python pika 作為consumer 連接 rabbitmq cluster 的時候, 事實上連接的是 cluster 的一個 node, 當連接數過多的時候, 這個節點的處理性能會成為一個瓶頸, 可能會遇到這樣的報錯 [ connection reset by peer ].
對於 [ connection reset by peer ] 這個問題的處理, [ 這里 ] 提供了一個方案:
Client --> Load Balancer --> RabbitMQ Cluster Instances
RabbitMQ 集群構建
集群環境
系統: CentOS 7.x x64
haproxy-server : 172.16.0.100
rabbitmq-master : 172.16.0.10
rabbitmq-slave1 : 172.16.0.11
rabbitmq-slave2 : 172.16.0.12
配置 /etc/hosts
172.16.0.10 rabbitmq-master
172.16.0.11 rabbitmq-slave1
172.16.0.12 rabbitmq-slave2
安裝 RabbitMQ
在三台 rabbitmq 服務器上面分別執行:
yum install rabbitmq-server
配置 Cookie
RabbitMQ 集群通過 /var/lib/rabbitmq/.erlang.cookie 內的 cookie 值來確認各節點是否在同一個集群.
在 rabbitmq-master:
rabbitmq-server -detached
在 rabbitmq-master上查看 /var/lib/rabbitmq/.erlang.cookie 值:
cat /var/lib/rabbitmq/.erlang.cookie
比如獲取的值為: GBNXRROLXDWMMIFZQWHD
在 rabbitmq-slave1 & rabbitmq-slave2 :
echo GBNXRROLXDWMMIFZQWHD > /var/lib/rabbitmq/.erlang.cookie
添加 Slave 節點到 master
在 rabbitmq-slave1 & rabbitmq-slave2 上面啟動服務:
rabbitmq-server -detached
在rabbitmq-slave1 & rabbitmq-slave2 上分別執行:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbitmq@rabbitmq-master
rabbitmqctl start_app
查看集群狀態
在 RabbitMQ Cluster 任意一個節點:
rabbitmctl cluster_status
啟用集群高可用
在 RabbitMQ Cluster 每個節點執行:
rabbitmqctl set_policy ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
啟用 RabbitMQ 組件
在 RabbitMQ Cluster 每個節點執行:
rabbitmq-plugins enable rabbitmq_management
添加用戶
在 RabbitMQ Cluster 每個節點:
rabbitmqctl add_user <username> <password>
rabbitmqctl set_user_tags <username> <tag>
rabbitmqctl set_permissions -p / <username> ".*" ".*" ".*"
備注: 在 rabbitmqctl set_user_tags <username> <tag>
中, <tag> 可以是任何標記用戶的字符, 比如 admin, normal, guest, developer 等標記用戶身份的 tag.
這樣, RabbitMQ 集群配置完成.
Web 訪問
web 訪問的前提是我們啟用了 RabbitMQ 的組件: rabbitmq-plugins enable rabbitmq_management
可以在瀏覽器訪問: http://ip_address:15672
登錄用戶和密碼是 : rabbitmqctl add_user username password 設定的.
也可以使用默認添加的用戶和密碼: guest / guest.
HAProxy
以下所有的配置均在 haproxy-server 上面執行
安裝 haproxy
yum install haproxy
配置 haproxy
vim /etc/haproxy/haproxy.cfg
global
daemon
defaults
mode tcp
maxconn 10000
timeout connect 5s
timeout client 100s
timeout server 100s
listen rabbitmq 172.16.0.100:5672
mode tcp
balance roundrobin
server rabbit-master 172.16.0.10:5672 check inter 5s rise 2 fall 3
server rabbit-node1 172.16.0.11:5672 check inter 5s rise 2 fall 3
server rabbit-node2 172.16.0.12:5672 check inter 5s rise 2 fall 3
啟動 haproxy
systemctl start haproxy
至此, 用 HAproxy 做RabbitMQ 的 LB 的集群已經配置完成, 在連接 MQ 的時候, 只要將地址配置為 haproxy-server 的 IP 和對應 Port 即可. 接下來會用 python 程序做測試
RabbitMQ Cluster Product & Consume Test
生產者: send.py
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.0.100',port=5672))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
消費者: receive.py
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.16.0.100',port=5672))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
生產
在同一網絡中的機器 CLI 執行:
for i in {1..10};do python send.py;done
這可以寫10條數據到 RabbitMQ 的隊列中.
在 RabbitMQ Cluster 的任意一個節點執行:
rabbitmqctl list_queues
可以查看到隊列以及隊列內消息的數量.
消費
在同一網絡中的機器 CLI 執行:
python receive.py
即可讀取隊列中的數據.
btw, 我愛死博客園的 markdown 了!