aio-pika的使用
簡單使用
生產者
- 創建連接
- 生成channel
- 聲明隊列
無回調函數
import asyncio
import aio_pika
async def main():
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/"
)
# 建立連接
queue_name = "test_queue"
async with connection:
# 上下文管理,退出時自動關閉connection
channel = await connection.channel()
# 創建channel
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
# auto_delete 通道關閉時是否刪除隊列
# 聲明隊列
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(message.body)
# 獲取消息
if __name__ == "__main__":
asyncio.run(main())
有回調函數
import asyncio
from aio_pika import connect, IncomingMessage
async def on_message(message: IncomingMessage):
"""
on_message doesn't necessarily have to be defined as async.
Here it is to show that it's possible.
"""
print("Message body is: %r" % message.body)
await asyncio.sleep(5) # Represents async I/O operations
async def main(loop):
# Perform connection
connection = await connect(
"amqp://guest:guest@localhost/", loop=loop
)
# Creating a channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue("hello")
# Start listening the queue with name 'hello'
await queue.consume(on_message, no_ack=True)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.create_task(main(loop))
loop.run_forever()
消費者
- 創建連接
- 生成channel
- 聲明隊列,避免隊列不存在
- 通過隊列向channel發送消息
import asyncio
import aio_pika
async def main():
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/"
)
# 生成連接
async with connection:
routing_key = "test_queue"
channel = await connection.channel()
# 生成通道
queue = await channel.declare_queue(routing_key, auto_delete=True)
# 聲明隊列信息,避免隊列不存在
await channel.default_exchange.publish(
aio_pika.Message(body="Hello {}".format(routing_key).encode()),
routing_key=routing_key,
)
# 向指定隊列發布消息
if __name__ == "__main__":
asyncio.run(main())
交換機模式
FANOUT(分發訂閱模式)
生產者
- 創建連接
- 生成channel
- 聲明交換機,類型為FANOUT
- 交換機直接發布消息,routing_key為空
import asyncio
from aio_pika import connect, Message, DeliveryMode, ExchangeType
async def main():
# Perform connection
connection = await connect(
"amqp://guest:guest@localhost/"
)
# 生成連接
# Creating a channel
channel = await connection.channel()
# 創建通道
logs_exchange = await channel.declare_exchange(
"logs", ExchangeType.FANOUT
)
# 綁定交換機,發布訂閱模式
message_body = b"Hello World!"
message = Message(
message_body,
delivery_mode=DeliveryMode.PERSISTENT
)
# delivery_mode消息是否持久化
await logs_exchange.publish(message, routing_key="")
# 交換機綁定隊列發布消息,不用聲明隊列
await connection.close()
# 確保信息到達
if __name__ == "__main__":
asyncio.run(main())
消費者
- 創建連接
- 生成channel
- 聲明交換機
- 聲明隊列參數為exclusive=True
- 隊列綁定交換機,routing_key為空等待接收消息
import asyncio
from aio_pika import connect, IncomingMessage, ExchangeType
async def main():
# Perform connection
connection = await connect(
"amqp://guest:guest@localhost/"
)
# Creating a channel
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
logs_exchange = await channel.declare_exchange(
"logs", ExchangeType.FANOUT
)
# Declaring queue
queue = await channel.declare_queue(exclusive=True)
# Binding the queue to the exchange
await queue.bind(logs_exchange)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(message.body)
if __name__ == "__main__":
asyncio.run(main())
direct(路由模式)
生產者
- 創建連接
- 生成channel
- 聲明交換機,類型為direct
- 交換機直接發布消息,設置routing_key
import asyncio
from aio_pika import connect, Message, DeliveryMode, ExchangeType
async def main():
# Perform connection
connection = await connect(
"amqp://guest:guest@localhost/"
)
# Creating a channel
channel = await connection.channel()
logs_exchange = await channel.declare_exchange(
"logss", ExchangeType.DIRECT
)
message_body = b"Hello World!"
message = Message(
message_body,
delivery_mode=DeliveryMode.PERSISTENT
)
# Sending the message
routing_key = "info"
await logs_exchange.publish(message, routing_key=routing_key)
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
消費者
- 創建連接
- 生成channel
- 聲明交換機,類型為direct
- 隊列綁定交換機,,設置routing_key
- 等待接收消息
import asyncio
from aio_pika import connect, IncomingMessage, ExchangeType
async def main():
# Perform connection
connection = await connect(
"amqp://guest:guest@localhost/"
)
# Creating a channel
channel = await connection.channel()
direct_logs_exchange = await channel.declare_exchange(
"logss", ExchangeType.DIRECT
)
# Declaring random queue
queue = await channel.declare_queue(durable=True)
severity = "info"
await queue.bind(direct_logs_exchange, routing_key=severity)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(message.body)
if __name__ == "__main__":
asyncio.run(main())
交換機會向routing_key相同的隊列中發送數據
topic(主題模式)
- 創建連接
- 生成channel
- 聲明交換機,類型為topic
- 交換機直接發布消息,設置routing_key
import asyncio
from aio_pika import connect, Message, DeliveryMode, ExchangeType
async def main():
# Perform connection
connection = await connect(
"amqp://guest:guest@localhost/"
)
# Creating a channel
channel = await connection.channel()
logs_exchange = await channel.declare_exchange(
"log2", ExchangeType.TOPIC
)
message_body = b"Hello World!"
message = Message(
message_body,
delivery_mode=DeliveryMode.PERSISTENT
)
# Sending the message
routing_key = "xxx.info"
await logs_exchange.publish(message, routing_key=routing_key)
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
消費者
- 創建連接
- 生成channel
- 聲明交換機,類型為direct
- 隊列綁定交換機,,設置routing_key
- 等待接收消息
import sys
import asyncio
from aio_pika import connect, IncomingMessage, ExchangeType
async def main():
# Perform connection
connection = await connect(
"amqp://guest:guest@localhost/"
)
# Creating a channel
channel = await connection.channel()
direct_logs_exchange = await channel.declare_exchange(
"log2", ExchangeType.TOPIC
)
# Declaring random queue
queue = await channel.declare_queue(durable=True)
severity = "#.info"
await queue.bind(direct_logs_exchange, routing_key=severity)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(message.body)
if __name__ == "__main__":
asyncio.run(main())
topics模式中要注意,三種符號
符號 | 作用 |
---|---|
. | 用來分割單詞 |
* | 匹配一個單詞 |
# | 匹配一個或多個單詞 |
生產者用完整的routing_key,消費者用*或#進行模糊匹配