環境是centos6.9
rabbitmq依賴erlang語言,首先要安裝erlang,然后安裝rabbitmq-server
啟動rabbitmq-server:service rabbitmq-server start
python3安裝pika模塊:pip3 install pika
pika連接rabbitmq示例:
接收端:
import pika, time connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello', durable=True) def callback(ch, method, properties, body): print("-->正在接收數據...") print("[x] Received %r" % body) ch.basic_ack(delivery_tag = method.delivery_tag) #no_ack字段告訴rabbitmq是否需要發送消息接收確認 #channel.basic_consume(callback, queue='hello', no_ack=True) #rabbitmq如果沒有接收到ack就會繼續發送此條消息 channel.basic_consume(callback, queue='hello') print("[* Waiting for message. To exit press CTRL-C]") channel.start_consuming()
發送端:
import pika #建立管道 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #信息高速通道 channel = connection.channel() #聲明queue, durable隊列持久化 channel.queue_declare(queue='hello', durable=True) #通過exchange發送消息到queue channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties = pika.BasicProperties( delivery_mode = 2, #消息持久化,服務器斷開也會保存消息 ) ) print("[x] Send 'Hello World!'") connection.close()
fanout: 所有bind到此exchange的queue都可以接收消息
接收端:
import pika ''' Procuder->exchange->queue->consumer ''' #創建管道 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #創建信息高速 channel = connection.channel() #聲明exchange的類型 channel.exchange_declare(exchange='logs', exchange_type='fanout') #聲明queue,不聲明queue名字,rabbitmq會自動生成queue名字,斷開后自動刪除 result = channel.queue_declare(exclusive=True) #獲取queue名字 queue_name = result.method.queue #消費者綁定queue channel.queue_bind(exchange='logs', queue=queue_name) print(queue_name) print('[*] Waiting fro logs.To exit press CTRL+C') #回調函數 def callback(ch, method, properties, body): print('-->正在接收數據...') print('[x] %r' % body) #接收句柄 channel.basic_consume(callback, queue = queue_name, no_ack = True ) channel.start_consuming()
發送端:
import pika ''' rabbitmq廣播 ''' #建立連接 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #建立信息高速 channel = connection.channel() #聲明exchange為fanout類型 channel.exchange_declare(exchange='logs', exchange_type='fanout') #創建消息內容 message = 'info:Hello World!' #發送消息句柄 channel.basic_publish( exchange='logs', routing_key='', body=message ) #打印發送的消息 print('[x] Send %r' % message) #關閉連接 connection.close()
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
接收端:
import pika, sys ''' exchange的direct模式下的廣播 ''' #建立通道 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #建立信息高速 channel = connection.channel() #聲明exchange類型 channel.exchange_declare( exchange='direct_logs', exchange_type='direct' ) #獲取queue名字 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue #獲取指定接收類型 severities = sys.argv[1:] #如果沒有指定接收類型,報錯 if not severities: sys.stderr.write('Usage: %s [info] [warning] [error]\n' % sys.argv[0]) sys.exit(1) #把要接收的所有類型,綁定到exchange的queue里 for severity in severities: channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key=severity ) print('[x] Waiting for logs.To exit press CTRL+C') #接收成功后的回調函數 def callback(ch, method, properties, body): print('[x] %r:%r' % (method.routing_key, body)) #接收入口 channel.basic_consume( callback, queue=queue_name, no_ack=True ) #開啟接收 channel.start_consuming()
發送端:
import pika, sys ''' exchange為direct的廣播模式 可以指定 ''' connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish( exchange='direct_logs', routing_key=severity, body=message ) print('[x] send %r:%r' % (severity, message)) connection.close()
topic:所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息
發送端:
import pika, sys ''' exchange的topic模式下更精細的廣播過濾 ''' #建立通道 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #建立信息高速 channel = connection.channel() #聲明exchange類型 channel.exchange_declare( exchange='topic_logs', exchange_type='topic' ) #獲取queue名字 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue #獲取指定接收類型 severities = sys.argv[1:] #如果沒有指定接收類型,報錯 if not severities: sys.stderr.write('Usage: %s [binding_key]......\n' % sys.argv[0]) sys.exit(1) #把要接收的所有類型,綁定到exchange的queue里 for severity in severities: channel.queue_bind( exchange='topic_logs', queue=queue_name, routing_key=severity ) print('[x] Waiting for logs.To exit press CTRL+C') #接收成功后的回調函數 def callback(ch, method, properties, body): print('[x] %r:%r' % (method.routing_key, body)) #接收入口 channel.basic_consume( callback, queue=queue_name, no_ack=True ) #開啟接收 channel.start_consuming()
接受端:
import pika, sys ''' exchange為direct的廣播模式 可以指定 ''' connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anosy.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish( exchange='topic_logs', routing_key=routing_key, body=message ) print('[x] send %r:%r' % (routing_key, message)) connection.close()
rpc通過rabbitmq接收並返回消息:
客戶端:
import pika, uuid ''' p端發送消息,並接收返回信息 ''' class RPCClient(object): def __init__(self): #回收消息 self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume( self.on_response, no_ack=True, queue=self.callback_queue ) #接收功能的回調函數 def on_response(self, ch, method, properties, body): if self.corr_id == properties.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( #把指定回收的隊列發送給服務 reply_to=self.callback_queue, #確認id發送到服務端 correlation_id=self.corr_id ), body=str(n) ) while self.response is None: #和start_consume的區別是此處不會阻塞 self.connection.process_data_events() return int(self.response) rpcclient = RPCClient() print('[x] Requesting fib(10)') response = rpcclient.call(10) print('[.] Got %r' % response)
服務端:
import pika ''' rpc服務端 思路如下: 接收客戶端消息, 加工數據並打包 發送給客戶端 ''' def fibs(n): if n == 0: return 0 if n == 1: return 1 return fibs(n-1) + fibs(n-2) connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='rpc_queue') def on_request(ch, method, properties, body): n = int(body) print('[.] fibs(%s)' % n) response = fibs(n) ch.basic_publish( exchange='', routing_key=properties.reply_to, properties=pika.BasicProperties( correlation_id=properties.correlation_id ), body=str(response) ) #手動確認 ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume( on_request, queue='rpc_queue' ) print('[x] Awaiting RPC requests...') channel.start_consuming()