RabbitMQ 即一個消息隊列,主要是用來實現應用程序的異步和解耦,同時也能起到消息緩沖,消息分發的作用。可維護多個隊列,可實現消息的一對一和廣播等方式發送
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
CentOs安裝:
安裝socat
yum -y install socat
安裝erlang
wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
Rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm
安裝rabbitmq
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-3.6.10-1.el7.noarch.rpm
rpm -ivh rabbitmq-server-3.6.10-1.el7.noarch.rpm
啟動:
systemctl start rabbitmq-server
查看狀態:
rabbitmqctl status
配置網頁管理端:
mkdir /etc/rabbitmq
啟用插件:
rabbitmq-plugins enable rabbitmq_management
配置開放端口:
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
重啟防火牆:
systemctl restart firewalld.service
創建用戶:
rabbitmqctl add_user ruroot rproot
修改角色為管理員:
rabbitmqctl set_user_tags ruroot administrator
設置權限:
rabbitmqctl set_permissions -p / ruroot2 ".*" ".*" ".*"
測試結果:
命令行消息管理:
得到所有隊列及存在的數據條數
rabbitmqctl list_queues
Python簡單操控
安裝
pip3 install pika
發送數據:
如果生成多個的話,實現效果是輪詢發送,一個一個循環發送數據,如同“皇帝輪流做…”
1 import pika 2 3 #建立連接 4 userx=pika.PlainCredentials("ruroot2","rproot2") 5 conn=pika.BlockingConnection(pika.ConnectionParameters("192.168.43.10",5672,'/',credentials=userx)) 6 7 #開辟管道 8 channelx=conn.channel() 9 10 #聲明隊列,參數為隊列名 11 channelx.queue_declare(queue="dongchannel11") 12 13 #發送數據,發送一條,如果要發送多條則復制此段 14 channelx.basic_publish(exchange="", 15 routing_key="dongchannel11",# 隊列名 16 body="dongxiaodongtodata3" # 發送的數據 17 ) 18 print("--------發送數據完成-----------") 19 20 #關閉連接 21 conn.close()
取出數據:
1 import pika 2 3 #建立連接 4 userx=pika.PlainCredentials("ruroot2","rproot2") 5 conn=pika.BlockingConnection(pika.ConnectionParameters("192.168.43.10",5672,'/',credentials=userx)) 6 7 #開辟管道 8 channelx=conn.channel() 9 10 #聲明隊列,參數為隊列名 11 channelx.queue_declare(queue="dongchannel11") 12 13 #消息處理函數,執行完成才說明接收完成,此時才可以接收下一條,串行 14 def dongcallbackfun(v1,v2,v3,bodyx): 15 print("得到的數據為:",bodyx) 16 17 #接收准備 18 channelx.basic_consume(dongcallbackfun, #收到消息的回調函數 19 queue="dongchannel11", #隊列名 20 no_ack=True #是否發送消息確認 21 ) 22 print("-------- 開始接收數據 -----------") 23 24 #開始接收消息 25 channelx.start_consuming()
發送端是否設置數據保存時間:
默認服務器(rabbitmq-server)重啟后消息隊列和消息數據均會全部消失
消息隊列的永久保存,開啟后將僅僅實現服務器重啟后消息隊列依然在,但數據還是會丟失,如果要保存數據,請參考接下來
#聲明隊列,參數為隊列名 #實現隊列永久保存,durable=True channelx.queue_declare(queue="dongch1",durable=True)
數據的永久保存(一直等待被取,即使服務器重啟),將要永久保存的發送數據添加屬性properties
#發送數據 channelx.basic_publish(exchange="", routing_key="dongch1",# 隊列名 body="dongxiaodongtodata333335", # 發送的數據 properties=pika.BasicProperties( delivery_mode=2, #實現消息永久保存 ) )
發送端實現能者多勞
在發送端發送數據前,添加下面一句,此句添加一次即可,可以實現自動判斷多接收端的處理速度,實現接收端處理快則多派發任務,處理慢則少派發任務
channelx.basic_qos(prefetch_count=1)
接收端是否接收確認:
接收端開啟消息確認(值為False),接收端則會在接收回調函數結束時手動發送確認消息到數據發送者,如果接收端在回調函數處理未完成時就掛掉了,那么發送端將會立即把當前數據轉交到下一個接收端進行數據處理
1 #消息處理函數,執行完成才說明接收完成,此時才可以接收下一條,串行 2 def dongcallbackfun(channlx,methodx,v3,bodyx): 3 print("得到的數據為:",bodyx) 4 channelx.basic_ack(delivery_tag=methodx.delivery_tag) #發送數據完成確認消息,手動確認 5 6 #接收准備 7 channelx.basic_consume(dongcallbackfun, #收到消息的回調函數 8 queue="dongchannel11", #隊列名 9 no_ack=False #是否在消息回調函數結束后發送確認信息到發消息者,true表示不發送
非阻塞版數據接收:
啟用會立即返回結果,如果有數據則進入回調函數,無數據則進行下一條,可以配合while使用
conn.process_data_events() #使用連接對象進行數據接收判斷 print("無數據")
實現消息的訂閱和發布:
發布:
1 import pika 2 3 #建立連接 4 userx=pika.PlainCredentials("ruroot2","rproot2") 5 conn=pika.BlockingConnection(pika.ConnectionParameters("192.168.1.175",5672,'/',credentials=userx)) 6 7 #開辟管道 8 channelx=conn.channel() 9 10 #聲明發布和訂閱通道,如果可以確認通道存在則可以去掉該句 11 channelx.exchange_declare(exchange="dongee",exchange_type="fanout") 12 13 #發送數據 14 channelx.basic_publish(exchange="dongee",#確定發布主題為:dongee 15 routing_key="", 16 body="dongxiaodongeeedata11", # 發送的數據 17 ) 18 19 print("--------發送數據完成-----------") 20 21 #關閉連接 22 conn.close()
訂閱:
1 import pika 2 #建立連接 3 userx=pika.PlainCredentials("ruroot2","rproot2") 4 conn=pika.BlockingConnection(pika.ConnectionParameters("192.168.1.175",5672,'/',credentials=userx)) 5 6 #開辟管道 7 channelx=conn.channel() 8 9 #聲明發布和訂閱通道,如果可以確認通道存在則可以去掉該句 10 channelx.exchange_declare(exchange="dongee",exchange_type="fanout") 11 12 #聲明隊列,生成一個隨機的且不存在的隊列,該隊列會在連接斷開后自動銷毀 13 resqueue=channelx.queue_declare(exclusive=True) 14 #得到隨機生成的隊列名 15 queuenamex=resqueue.method.queue 16 17 #將隊列和發布數據綁定,確定訂閱主題為:dongee 18 channelx.queue_bind(exchange="dongee",queue=queuenamex) 19 20 #消息處理函數,執行完成才說明接收完成,此時才可以接收下一條,串行 21 def dongcallbackfun(channlx,methodx,v3,bodyx): 22 print("得到的數據為:",bodyx) 23 24 #接收准備 25 channelx.basic_consume(dongcallbackfun, #收到消息的回調函數 26 queue=queuenamex, #隊列名 27 no_ack=True 28 ) 29 30 print("-------- 開始接收數據 -----------") 31 32 #開始接收消息 33 channelx.start_consuming()
通過管道實現進一步的消息訂閱和發布:
發布:
1 import pika 2 3 #建立連接 4 userx=pika.PlainCredentials("ruroot2","rproot2") 5 conn=pika.BlockingConnection(pika.ConnectionParameters("192.168.1.175",5672,'/',credentials=userx)) 6 7 #開辟管道 8 channelx=conn.channel() 9 10 #聲明發布和訂閱通道,如果可以確認通道存在則可以去掉該句 11 channelx.exchange_declare(exchange="dongee2",exchange_type="direct") 12 13 #發送數據 14 channelx.basic_publish(exchange="dongee2",#確定發布主題為:dongee2 15 routing_key="dongqu33", #確定發布的隊列(發布的主題):dongqu33 16 body="dongxiaodong333", # 確定發送的數據 17 ) 18 19 #發送數據 20 channelx.basic_publish(exchange="dongee2",#確定發布主題為:dongee2 21 routing_key="dongqu22", #確定發布的隊列(發布的主題):dongqu22 22 body="dongxiaodong222", # 確定發送的數據 23 ) 24 25 print("--------發送數據完成-----------") 26 27 #關閉連接 28 conn.close()
訂閱:
1 import pika 2 #建立連接 3 userx=pika.PlainCredentials("ruroot2","rproot2") 4 conn=pika.BlockingConnection(pika.ConnectionParameters("192.168.1.175",5672,'/',credentials=userx)) 5 6 #開辟管道 7 channelx=conn.channel() 8 9 #聲明發布和訂閱通道,如果可以確認通道存在則可以去掉該句 10 channelx.exchange_declare(exchange="dongee2",exchange_type="direct") 11 12 #聲明隊列,生成一個隨機的且不存在的隊列,該隊列會在連接斷開后自動銷毀 13 resqueue=channelx.queue_declare(exclusive=True) 14 #得到隨機生成的隊列名 15 queuenamex=resqueue.method.queue 16 17 #將隊列和發布數據綁定,確定訂閱主題為:dongqu11 和 dongqu22 18 channelx.queue_bind(exchange="dongee2",queue=queuenamex,routing_key="dongqu11") 19 channelx.queue_bind(exchange="dongee2",queue=queuenamex,routing_key="dongqu22") 20 21 #消息處理函數,執行完成才說明接收完成,此時才可以接收下一條,串行 22 def dongcallbackfun(channlx,methodx,v3,bodyx): 23 print("隊列名(訂閱的主題名)為:%r 得到的數據為:%r "%(methodx.routing_key,bodyx)) 24 25 #接收准備 26 channelx.basic_consume(dongcallbackfun, #收到消息的回調函數 27 queue=queuenamex, #隊列名 28 no_ack=True 29 ) 30 31 print("-------- 開始接收數據 -----------") 32 33 #開始接收消息 34 channelx.start_consuming()