import threading
import time
import pika
class SingletonClass(object):
"""單例模式用來少創建連接"""
# 加鎖,防止並發較高時,同時創建對象,導致創建多個對象
_singleton_lock = threading.Lock()
def __init__(self, username='baibing', password='123456', ip='47.111.87.61', port=5672, data={}):
"""__init__在new出來對象后實例化對象"""
self.credentials = pika.PlainCredentials(username, password)
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=ip, port=port, credentials=self.credentials))
self.channel = self.connection.channel()
print('連接成功')
def __new__(cls):
"""__new__用來創建對象"""
if not hasattr(SingletonClass, "_instance"):
with SingletonClass._singleton_lock:
if not hasattr(SingletonClass, "_instance"):
SingletonClass._instance = super().__new__(cls)
return SingletonClass._instance
def callback(self, ch, method, properties, body):
"""訂閱者的回調函數,可以在這里面做操作,比如釋放庫存等"""
print("郵箱", body.decode())
# 在秒殺活動中,這里來對數據進行平滑的處理
time.sleep(0.8)
ch.basic_ack(delivery_tag=method.delivery_tag) # 手動ack機制,
def connection_close(self):
"""關閉連接"""
self.connection.close()
def consuming_start(self):
"""等待消息"""
self.channel.start_consuming()
def this_publisher(self, email, queue_name='HELLOP'):
"""發布者
email:消息
queue_name:隊列名稱
"""
# 1、創建一個名為python-test的交換機 durable=True 代表exchange持久化存儲
self.channel.exchange_declare(exchange='python1', durable=True, exchange_type='topic')
# self.channel.queue_declare(queue=queue_name)
# 2、訂閱發布模式,向名為python-test的交換機中插入用戶郵箱地址email,delivery_mode = 2 聲明消息在隊列中持久化,delivery_mod = 1 消息非持久化
self.channel.basic_publish(exchange='python1',
routing_key='#user#',
body=email,
properties=pika.BasicProperties(delivery_mode=2)
)
print("隊列{}發送用戶郵箱{}到MQ成功".format(queue_name, email))
# 3. 關閉連接
self.connection_close()
def this_subscriber(self, queue_name='HELLOP', prefetch_count=10):
"""訂閱者
queue_name:隊列名稱
prefetch_count:限制未處理消息的最大值,ack未開啟時生效
"""
# 創建臨時隊列,隊列名傳空字符,consumer關閉后,隊列自動刪除
result = self.channel.queue_declare('', durable=True, exclusive=True)
# 限制未處理消息的最大值 這個值就是你數據庫承受的並發量
self.channel.basic_qos(prefetch_count=5)
# 聲明exchange,由exchange指定消息在哪個隊列傳遞,如不存在,則創建。durable = True 代表exchange持久化存儲,False 非持久化存儲
self.channel.exchange_declare(exchange='python1', durable=True, exchange_type='topic')
# 綁定exchange和隊列 exchange 使我們能夠確切地指定消息應該到哪個隊列去
self.channel.queue_bind(exchange='python1', queue=result.method.queue, routing_key='#.anonymous.#')
self.channel.basic_consume(
result.method.queue,
self.callback, # 回調地址(函數)
auto_ack=False # 流量削峰 auto_ack必須為false 手動來ack
)
# 等待消息
self.consuming_start()
if __name__ == '__main__':
obj1 = SingletonClass()
print(id(obj1))
obj1.this_subscriber()
原文鏈接:https://blog.csdn.net/qq_42874635/article/details/116268306
先復制過來稍后整理