pika模塊操作rabbitmq-server


環境是centos6.9

rabbitmq依賴erlang語言,首先要安裝erlang,然后安裝rabbitmq-server

啟動rabbitmq-server:service rabbitmq-server start

python3安裝pika模塊:pip3 install pika

 

pika連接rabbitmq示例:

接收端:

import pika, time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello', durable=True)

def callback(ch, method, properties, body):
    print("-->正在接收數據...")
    print("[x] Received %r" % body)
    ch.basic_ack(delivery_tag = method.delivery_tag)

#no_ack字段告訴rabbitmq是否需要發送消息接收確認
#channel.basic_consume(callback, queue='hello', no_ack=True)

#rabbitmq如果沒有接收到ack就會繼續發送此條消息
channel.basic_consume(callback, queue='hello')

print("[* Waiting for message. To exit press CTRL-C]")
channel.start_consuming()

  發送端:

import pika

#建立管道 
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

#信息高速通道 
channel = connection.channel()

#聲明queue, durable隊列持久化
channel.queue_declare(queue='hello', durable=True)

#通過exchange發送消息到queue
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!',
                     properties = pika.BasicProperties(
                        delivery_mode = 2, #消息持久化,服務器斷開也會保存消息
                     )
                     )

print("[x] Send 'Hello World!'")

connection.close()

  fanout: 所有bind到此exchange的queue都可以接收消息

接收端:

import pika
'''
Procuder->exchange->queue->consumer
'''

#創建管道
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#創建信息高速
channel = connection.channel()
#聲明exchange的類型
channel.exchange_declare(exchange='logs', exchange_type='fanout')
#聲明queue,不聲明queue名字,rabbitmq會自動生成queue名字,斷開后自動刪除
result = channel.queue_declare(exclusive=True)
#獲取queue名字
queue_name = result.method.queue
#消費者綁定queue
channel.queue_bind(exchange='logs', queue=queue_name)

print(queue_name)
print('[*] Waiting fro logs.To exit press CTRL+C')
#回調函數
def callback(ch, method, properties, body):
    print('-->正在接收數據...')
    print('[x] %r' % body)
#接收句柄
channel.basic_consume(callback,
                      queue = queue_name,
                      no_ack = True
                     )
channel.start_consuming()

  發送端:

import pika
'''
rabbitmq廣播
'''

#建立連接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#建立信息高速
channel = connection.channel()
#聲明exchange為fanout類型
channel.exchange_declare(exchange='logs', exchange_type='fanout')
#創建消息內容
message = 'info:Hello World!'
#發送消息句柄
channel.basic_publish(
    exchange='logs',
    routing_key='',
    body=message
)
#打印發送的消息
print('[x] Send %r' % message)
#關閉連接
connection.close()

  direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息

接收端:

import pika, sys
'''
exchange的direct模式下的廣播
'''
#建立通道
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#建立信息高速
channel = connection.channel()
#聲明exchange類型
channel.exchange_declare(
    exchange='direct_logs',
    exchange_type='direct'
)
#獲取queue名字
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

#獲取指定接收類型
severities = sys.argv[1:]
#如果沒有指定接收類型,報錯
if not severities:
    sys.stderr.write('Usage: %s [info] [warning] [error]\n' % sys.argv[0])
    sys.exit(1)
#把要接收的所有類型,綁定到exchange的queue里
for severity in severities:
    channel.queue_bind(
        exchange='direct_logs',
        queue=queue_name,
        routing_key=severity
    )

print('[x] Waiting for logs.To exit press CTRL+C')
#接收成功后的回調函數
def callback(ch, method, properties, body):
    print('[x] %r:%r' % (method.routing_key, body))
#接收入口
channel.basic_consume(
    callback,
    queue=queue_name,
    no_ack=True
)
#開啟接收
channel.start_consuming()

  發送端:

import pika, sys
'''
exchange為direct的廣播模式
可以指定
'''

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(
    exchange='direct_logs',
    routing_key=severity,
    body=message
)

print('[x] send %r:%r' % (severity, message))
connection.close()

  topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息

發送端:

import pika, sys
'''
exchange的topic模式下更精細的廣播過濾
'''
#建立通道
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#建立信息高速
channel = connection.channel()
#聲明exchange類型
channel.exchange_declare(
    exchange='topic_logs',
    exchange_type='topic'
)
#獲取queue名字
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

#獲取指定接收類型
severities = sys.argv[1:]
#如果沒有指定接收類型,報錯
if not severities:
    sys.stderr.write('Usage: %s [binding_key]......\n' % sys.argv[0])
    sys.exit(1)
#把要接收的所有類型,綁定到exchange的queue里
for severity in severities:
    channel.queue_bind(
        exchange='topic_logs',
        queue=queue_name,
        routing_key=severity
    )

print('[x] Waiting for logs.To exit press CTRL+C')
#接收成功后的回調函數
def callback(ch, method, properties, body):
    print('[x] %r:%r' % (method.routing_key, body))
#接收入口
channel.basic_consume(
    callback,
    queue=queue_name,
    no_ack=True
)
#開啟接收
channel.start_consuming()

  接受端:

import pika, sys
'''
exchange為direct的廣播模式
可以指定
'''

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anosy.info'

message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(
    exchange='topic_logs',
    routing_key=routing_key,
    body=message
)

print('[x] send %r:%r' % (routing_key, message))
connection.close()

  rpc通過rabbitmq接收並返回消息:

客戶端:

import pika, uuid
'''
p端發送消息,並接收返回信息
'''

class RPCClient(object):
    def __init__(self):
        #回收消息
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(
            self.on_response,
            no_ack=True,
            queue=self.callback_queue
        )
    #接收功能的回調函數
    def on_response(self, ch, method, properties, body):
        if self.corr_id == properties.correlation_id:
            self.response = body
    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                #把指定回收的隊列發送給服務
                reply_to=self.callback_queue,
                #確認id發送到服務端
                correlation_id=self.corr_id
            ),
            body=str(n)
        )
        while self.response is None:
            #和start_consume的區別是此處不會阻塞
            self.connection.process_data_events()
        return int(self.response)
rpcclient = RPCClient()

print('[x] Requesting fib(10)')
response = rpcclient.call(10)
print('[.] Got %r' % response)

  服務端:

import pika
'''
rpc服務端
思路如下:
接收客戶端消息,
加工數據並打包
發送給客戶端
'''
def fibs(n):
    if n == 0:
        return 0
    if n == 1:
        return 1
    return fibs(n-1) + fibs(n-2)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def on_request(ch, method, properties, body):
    n = int(body)
    print('[.] fibs(%s)' % n)
    response = fibs(n)
    ch.basic_publish(
        exchange='',
        routing_key=properties.reply_to,
        properties=pika.BasicProperties(
            correlation_id=properties.correlation_id
        ),
        body=str(response)
    )
    #手動確認
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(
    on_request,
    queue='rpc_queue'
)
print('[x] Awaiting RPC requests...')
channel.start_consuming()

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM