Python消息隊列(RabbitMQ)


RabbitMQ 即一個消息隊列,主要是用來實現應用程序的異步和解耦,同時也能起到消息緩沖,消息分發的作用。可維護多個隊列,可實現消息的一對一和廣播等方式發送

RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

CentOs安裝:

安裝socat

yum -y install socat

安裝erlang

wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
Rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm

安裝rabbitmq

wget  http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-3.6.10-1.el7.noarch.rpm
rpm -ivh rabbitmq-server-3.6.10-1.el7.noarch.rpm

啟動:

systemctl start rabbitmq-server

查看狀態:

rabbitmqctl status

配置網頁管理端:

mkdir /etc/rabbitmq

啟用插件:

rabbitmq-plugins enable rabbitmq_management

配置開放端口:

firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent

重啟防火牆:

systemctl restart firewalld.service

創建用戶:

rabbitmqctl add_user ruroot rproot

修改角色為管理員:

rabbitmqctl set_user_tags ruroot administrator

設置權限:

rabbitmqctl set_permissions -p / ruroot2 ".*" ".*" ".*"

測試結果:

命令行消息管理:

得到所有隊列及存在的數據條數

rabbitmqctl list_queues

Python簡單操控

安裝

pip3 install pika

發送數據:

如果生成多個的話,實現效果是輪詢發送,一個一個循環發送數據,如同“皇帝輪流做…”

 1 import pika
 2 
 3 #建立連接
 4 userx=pika.PlainCredentials("ruroot2","rproot2")
 5 conn=pika.BlockingConnection(pika.ConnectionParameters("192.168.43.10",5672,'/',credentials=userx))
 6 
 7 #開辟管道
 8 channelx=conn.channel()
 9 
10 #聲明隊列,參數為隊列名
11 channelx.queue_declare(queue="dongchannel11")
12 
13 #發送數據,發送一條,如果要發送多條則復制此段
14 channelx.basic_publish(exchange="",
15                        routing_key="dongchannel11",# 隊列名
16                        body="dongxiaodongtodata3" # 發送的數據
17                        )
18 print("--------發送數據完成-----------")
19 
20 #關閉連接
21 conn.close()

取出數據:

 1 import pika
 2 
 3 #建立連接
 4 userx=pika.PlainCredentials("ruroot2","rproot2")
 5 conn=pika.BlockingConnection(pika.ConnectionParameters("192.168.43.10",5672,'/',credentials=userx))
 6 
 7 #開辟管道
 8 channelx=conn.channel()
 9 
10 #聲明隊列,參數為隊列名
11 channelx.queue_declare(queue="dongchannel11")
12 
13 #消息處理函數,執行完成才說明接收完成,此時才可以接收下一條,串行
14 def dongcallbackfun(v1,v2,v3,bodyx):
15     print("得到的數據為:",bodyx)
16 
17 #接收准備
18 channelx.basic_consume(dongcallbackfun, #收到消息的回調函數
19                        queue="dongchannel11", #隊列名
20                        no_ack=True #是否發送消息確認
21                        )
22 print("-------- 開始接收數據 -----------")
23 
24 #開始接收消息
25 channelx.start_consuming()

發送端是否設置數據保存時間:

默認服務器(rabbitmq-server)重啟后消息隊列和消息數據均會全部消失

消息隊列的永久保存,開啟后將僅僅實現服務器重啟后消息隊列依然在,但數據還是會丟失,如果要保存數據,請參考接下來

#聲明隊列,參數為隊列名
#實現隊列永久保存,durable=True
channelx.queue_declare(queue="dongch1",durable=True)

數據的永久保存(一直等待被取,即使服務器重啟),將要永久保存的發送數據添加屬性properties

#發送數據
channelx.basic_publish(exchange="",
                       routing_key="dongch1",# 隊列名
                       body="dongxiaodongtodata333335", # 發送的數據
                       properties=pika.BasicProperties(
                           delivery_mode=2, #實現消息永久保存
                       )
                       )

發送端實現能者多勞

在發送端發送數據前,添加下面一句,此句添加一次即可,可以實現自動判斷多接收端的處理速度,實現接收端處理快則多派發任務,處理慢則少派發任務

channelx.basic_qos(prefetch_count=1)

接收端是否接收確認:

接收端開啟消息確認(值為False),接收端則會在接收回調函數結束時手動發送確認消息到數據發送者,如果接收端在回調函數處理未完成時就掛掉了,那么發送端將會立即把當前數據轉交到下一個接收端進行數據處理

1 #消息處理函數,執行完成才說明接收完成,此時才可以接收下一條,串行
2 def dongcallbackfun(channlx,methodx,v3,bodyx):
3     print("得到的數據為:",bodyx)
4     channelx.basic_ack(delivery_tag=methodx.delivery_tag) #發送數據完成確認消息,手動確認
5 
6 #接收准備
7 channelx.basic_consume(dongcallbackfun, #收到消息的回調函數
8                        queue="dongchannel11", #隊列名
9                        no_ack=False #是否在消息回調函數結束后發送確認信息到發消息者,true表示不發送

非阻塞版數據接收:

啟用會立即返回結果,如果有數據則進入回調函數,無數據則進行下一條,可以配合while使用

conn.process_data_events() #使用連接對象進行數據接收判斷
print("無數據")

實現消息的訂閱和發布:

發布:

 1 import pika
 2 
 3 #建立連接
 4 userx=pika.PlainCredentials("ruroot2","rproot2")
 5 conn=pika.BlockingConnection(pika.ConnectionParameters("192.168.1.175",5672,'/',credentials=userx))
 6 
 7 #開辟管道
 8 channelx=conn.channel()
 9 
10 #聲明發布和訂閱通道,如果可以確認通道存在則可以去掉該句
11 channelx.exchange_declare(exchange="dongee",exchange_type="fanout")
12 
13 #發送數據
14 channelx.basic_publish(exchange="dongee",#確定發布主題為:dongee
15                        routing_key="",
16                        body="dongxiaodongeeedata11", # 發送的數據
17                        )
18 
19 print("--------發送數據完成-----------")
20 
21 #關閉連接
22 conn.close()

訂閱:

 1 import pika
 2 #建立連接
 3 userx=pika.PlainCredentials("ruroot2","rproot2")
 4 conn=pika.BlockingConnection(pika.ConnectionParameters("192.168.1.175",5672,'/',credentials=userx))
 5 
 6 #開辟管道
 7 channelx=conn.channel()
 8 
 9 #聲明發布和訂閱通道,如果可以確認通道存在則可以去掉該句
10 channelx.exchange_declare(exchange="dongee",exchange_type="fanout")
11 
12 #聲明隊列,生成一個隨機的且不存在的隊列,該隊列會在連接斷開后自動銷毀
13 resqueue=channelx.queue_declare(exclusive=True)
14 #得到隨機生成的隊列名
15 queuenamex=resqueue.method.queue
16 
17 #將隊列和發布數據綁定,確定訂閱主題為:dongee
18 channelx.queue_bind(exchange="dongee",queue=queuenamex)
19 
20 #消息處理函數,執行完成才說明接收完成,此時才可以接收下一條,串行
21 def dongcallbackfun(channlx,methodx,v3,bodyx):
22     print("得到的數據為:",bodyx)
23 
24 #接收准備
25 channelx.basic_consume(dongcallbackfun, #收到消息的回調函數
26                        queue=queuenamex, #隊列名
27                        no_ack=True
28                        )
29 
30 print("-------- 開始接收數據 -----------")
31 
32 #開始接收消息
33 channelx.start_consuming()

通過管道實現進一步的消息訂閱和發布:

發布:

 1 import pika
 2 
 3 #建立連接
 4 userx=pika.PlainCredentials("ruroot2","rproot2")
 5 conn=pika.BlockingConnection(pika.ConnectionParameters("192.168.1.175",5672,'/',credentials=userx))
 6 
 7 #開辟管道
 8 channelx=conn.channel()
 9 
10 #聲明發布和訂閱通道,如果可以確認通道存在則可以去掉該句
11 channelx.exchange_declare(exchange="dongee2",exchange_type="direct")
12 
13 #發送數據
14 channelx.basic_publish(exchange="dongee2",#確定發布主題為:dongee2
15                        routing_key="dongqu33", #確定發布的隊列(發布的主題):dongqu33
16                        body="dongxiaodong333", # 確定發送的數據
17                        )
18 
19 #發送數據
20 channelx.basic_publish(exchange="dongee2",#確定發布主題為:dongee2
21                        routing_key="dongqu22", #確定發布的隊列(發布的主題):dongqu22
22                        body="dongxiaodong222", # 確定發送的數據
23                        )
24 
25 print("--------發送數據完成-----------")
26 
27 #關閉連接
28 conn.close()

訂閱:

 1 import pika
 2 #建立連接
 3 userx=pika.PlainCredentials("ruroot2","rproot2")
 4 conn=pika.BlockingConnection(pika.ConnectionParameters("192.168.1.175",5672,'/',credentials=userx))
 5 
 6 #開辟管道
 7 channelx=conn.channel()
 8 
 9 #聲明發布和訂閱通道,如果可以確認通道存在則可以去掉該句
10 channelx.exchange_declare(exchange="dongee2",exchange_type="direct")
11 
12 #聲明隊列,生成一個隨機的且不存在的隊列,該隊列會在連接斷開后自動銷毀
13 resqueue=channelx.queue_declare(exclusive=True)
14 #得到隨機生成的隊列名
15 queuenamex=resqueue.method.queue
16 
17 #將隊列和發布數據綁定,確定訂閱主題為:dongqu11 和 dongqu22
18 channelx.queue_bind(exchange="dongee2",queue=queuenamex,routing_key="dongqu11")
19 channelx.queue_bind(exchange="dongee2",queue=queuenamex,routing_key="dongqu22")
20 
21 #消息處理函數,執行完成才說明接收完成,此時才可以接收下一條,串行
22 def dongcallbackfun(channlx,methodx,v3,bodyx):
23     print("隊列名(訂閱的主題名)為:%r  得到的數據為:%r  "%(methodx.routing_key,bodyx))
24 
25 #接收准備
26 channelx.basic_consume(dongcallbackfun, #收到消息的回調函數
27                        queue=queuenamex, #隊列名
28                        no_ack=True
29                        )
30 
31 print("-------- 開始接收數據 -----------")
32 
33 #開始接收消息
34 channelx.start_consuming()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM