aio-pika的使用


aio-pika的使用

簡單使用

生產者

  1. 創建連接
  2. 生成channel
  3. 聲明隊列

無回調函數

import asyncio
import aio_pika


async def main():
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/"
    )
    # 建立連接

    queue_name = "test_queue"
    async with connection:
        # 上下文管理,退出時自動關閉connection
        channel = await connection.channel()
        # 創建channel
        # Declaring queue
        queue = await channel.declare_queue(queue_name, auto_delete=True)
        # auto_delete 通道關閉時是否刪除隊列
        # 聲明隊列
        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    print(message.body)
                    # 獲取消息

if __name__ == "__main__":
    asyncio.run(main())

有回調函數

import asyncio
from aio_pika import connect, IncomingMessage


async def on_message(message: IncomingMessage):
    """
    on_message doesn't necessarily have to be defined as async.
    Here it is to show that it's possible.
    """

    print("Message body is: %r" % message.body)
    await asyncio.sleep(5)  # Represents async I/O operations


async def main(loop):
    # Perform connection
    connection = await connect(
        "amqp://guest:guest@localhost/", loop=loop
    )

    # Creating a channel
    channel = await connection.channel()

    # Declaring queue
    queue = await channel.declare_queue("hello")

    # Start listening the queue with name 'hello'
    await queue.consume(on_message, no_ack=True)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.create_task(main(loop))
    loop.run_forever()

消費者

  1. 創建連接
  2. 生成channel
  3. 聲明隊列,避免隊列不存在
  4. 通過隊列向channel發送消息
import asyncio
import aio_pika

async def main():
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/"
    )
    # 生成連接

    async with connection:
        routing_key = "test_queue"
        channel = await connection.channel()
        # 生成通道

        queue = await channel.declare_queue(routing_key, auto_delete=True)
        # 聲明隊列信息,避免隊列不存在

        await channel.default_exchange.publish(
            aio_pika.Message(body="Hello {}".format(routing_key).encode()),
            routing_key=routing_key,
        )
        # 向指定隊列發布消息

if __name__ == "__main__":
    asyncio.run(main())

交換機模式

FANOUT(分發訂閱模式)

生產者

  1. 創建連接
  2. 生成channel
  3. 聲明交換機,類型為FANOUT
  4. 交換機直接發布消息,routing_key為空
import asyncio
from aio_pika import connect, Message, DeliveryMode, ExchangeType


async def main():
    # Perform connection
    connection = await connect(
        "amqp://guest:guest@localhost/"
    )
    # 生成連接

    # Creating a channel
    channel = await connection.channel()
    # 創建通道

    logs_exchange = await channel.declare_exchange(
        "logs", ExchangeType.FANOUT
    )
    # 綁定交換機,發布訂閱模式

    message_body = b"Hello World!"

    message = Message(
        message_body,
        delivery_mode=DeliveryMode.PERSISTENT
    )
    # delivery_mode消息是否持久化

    await logs_exchange.publish(message, routing_key="")
    # 交換機綁定隊列發布消息,不用聲明隊列

    await connection.close()
    # 確保信息到達

if __name__ == "__main__":
    asyncio.run(main())

消費者

  1. 創建連接
  2. 生成channel
  3. 聲明交換機
  4. 聲明隊列參數為exclusive=True
  5. 隊列綁定交換機,routing_key為空等待接收消息
import asyncio
from aio_pika import connect, IncomingMessage, ExchangeType


async def main():
    # Perform connection
    connection = await connect(
        "amqp://guest:guest@localhost/"
    )

    # Creating a channel
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)

    logs_exchange = await channel.declare_exchange(
        "logs", ExchangeType.FANOUT
    )

    # Declaring queue
    queue = await channel.declare_queue(exclusive=True)

    # Binding the queue to the exchange
    await queue.bind(logs_exchange)

    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            async with message.process():
                print(message.body)


if __name__ == "__main__":
    asyncio.run(main())

direct(路由模式)

生產者

  1. 創建連接
  2. 生成channel
  3. 聲明交換機,類型為direct
  4. 交換機直接發布消息,設置routing_key
import asyncio
from aio_pika import connect, Message, DeliveryMode, ExchangeType


async def main():
    # Perform connection
    connection = await connect(
        "amqp://guest:guest@localhost/"
    )

    # Creating a channel
    channel = await connection.channel()

    logs_exchange = await channel.declare_exchange(
        "logss", ExchangeType.DIRECT
    )

    message_body = b"Hello World!"

    message = Message(
        message_body,
        delivery_mode=DeliveryMode.PERSISTENT
    )

    # Sending the message
    routing_key = "info"
    await logs_exchange.publish(message, routing_key=routing_key)

    await connection.close()


if __name__ == "__main__":
    asyncio.run(main())

消費者

  1. 創建連接
  2. 生成channel
  3. 聲明交換機,類型為direct
  4. 隊列綁定交換機,,設置routing_key
  5. 等待接收消息
import asyncio
from aio_pika import connect, IncomingMessage, ExchangeType

async def main():
    # Perform connection
    connection = await connect(
        "amqp://guest:guest@localhost/"
    )

    # Creating a channel
    channel = await connection.channel()

    direct_logs_exchange = await channel.declare_exchange(
        "logss", ExchangeType.DIRECT
    )

    # Declaring random queue
    queue = await channel.declare_queue(durable=True)

    severity = "info"

    await queue.bind(direct_logs_exchange, routing_key=severity)

    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            async with message.process():
                print(message.body)


if __name__ == "__main__":
    asyncio.run(main())

交換機會向routing_key相同的隊列中發送數據

topic(主題模式)

  1. 創建連接
  2. 生成channel
  3. 聲明交換機,類型為topic
  4. 交換機直接發布消息,設置routing_key

import asyncio
from aio_pika import connect, Message, DeliveryMode, ExchangeType


async def main():
    # Perform connection
    connection = await connect(
        "amqp://guest:guest@localhost/"
    )

    # Creating a channel
    channel = await connection.channel()

    logs_exchange = await channel.declare_exchange(
        "log2", ExchangeType.TOPIC
    )

    message_body = b"Hello World!"

    message = Message(
        message_body,
        delivery_mode=DeliveryMode.PERSISTENT
    )

    # Sending the message
    routing_key = "xxx.info"
    await logs_exchange.publish(message, routing_key=routing_key)

    await connection.close()


if __name__ == "__main__":
    asyncio.run(main())

消費者

  1. 創建連接
  2. 生成channel
  3. 聲明交換機,類型為direct
  4. 隊列綁定交換機,,設置routing_key
  5. 等待接收消息
import sys
import asyncio
from aio_pika import connect, IncomingMessage, ExchangeType




async def main():
    # Perform connection
    connection = await connect(
        "amqp://guest:guest@localhost/"
    )

    # Creating a channel
    channel = await connection.channel()

    direct_logs_exchange = await channel.declare_exchange(
        "log2", ExchangeType.TOPIC
    )

    # Declaring random queue
    queue = await channel.declare_queue(durable=True)

    severity = "#.info"

    await queue.bind(direct_logs_exchange, routing_key=severity)

    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            async with message.process():
                print(message.body)


if __name__ == "__main__":
    asyncio.run(main())

topics模式中要注意,三種符號

符號 作用
. 用來分割單詞
* 匹配一個單詞
# 匹配一個或多個單詞

生產者用完整的routing_key,消費者用*或#進行模糊匹配


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM