一 什么是消息隊列(MQ)
MQ全稱為Message Queue 消息隊列(MQ)是一種應用程序對應用程序的通信方法。MQ是消費-生產者模型的一個典型的代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取隊列中的消息。這樣發布者和使用者都不用知道對方的存在。

''' 生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。 '''
我們先不管消息(Message)這個詞,來看看隊列(Queue)。這一看,隊列大家應該都熟悉吧。
隊列是一種先進先出的數據結構。
消息隊列可以簡單理解為:把要傳輸的數據放在隊列中。
二、為什么要用消息隊列?
消息隊列中間件是分布式系統中重要的組件,主要解決應用解耦,異步消息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。目前使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。
接下來利用一個外賣系統的消息推送給大家解釋下MQ的意義。
三 RabbitMQ
rabbitMQ是一款基於AMQP協議的消息中間件,它能夠在應用之間提供可靠的消息傳輸。在易用性,擴展性,高可用性上表現優秀。使用消息中間件利於應用之間的解耦,生產者(客戶端)無需知道消費者(服務端)的存在。而且兩端可以使用不同的語言編寫,大大提供了靈活性。
3.1 rabbitmq的安裝
略
3.2 rabbitMQ工作模型
3.2.1 簡單模式
### 生產者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") ### 消費者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(queue='hello', auto_ack=True, on_message_callback=callback) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
3.2.2 參數
應答參數
auto_ack=False ch.basic_ack(delivery_tag=method.delivery_tag)
持久化參數
#聲明queue channel.queue_declare(queue='hello2', durable=True) # 若聲明過,則換一個名字 channel.basic_publish(exchange='', routing_key='hello2', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # make message persistent ) )
分發參數
有兩個消費者同時監聽一個的隊列。其中一個線程sleep2秒,另一個消費者線程sleep1秒,但是處理的消息是一樣多。這種方式叫輪詢分發(round-robin)不管誰忙,都不會多給消息,總是你一個我一個。想要做到公平分發(fair dispatch),必須關閉自動應答ack,改成手動應答。使用basicQos(perfetch=1)限制每次只發送不超過1條消息到同一個消費者,消費者必須手動反饋告知隊列,才會發送下一個。
channel.basic_qos(prefetch_count=1)
3.2.3 交換機模式(exchange)
交換機之發布訂閱
發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。
# 生產者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close() # 消費者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare("",exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=callback) channel.start_consuming()
交換機之關鍵字
# 生產者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs2', exchange_type='direct') message = "info: Hello Yuan!" channel.basic_publish(exchange='logs2', routing_key='info', body=message) print(" [x] Sent %r" % message) connection.close() # 消費者 import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs2', exchange_type='direct') result = channel.queue_declare("",exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='logs2', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=callback) channel.start_consuming()
交換機之通配符
通配符交換機”與之前的路由模式相比,它將信息的傳輸類型的key更加細化,以“key1.key2.keyN....”的模式來指定信息傳輸的key的大類型和大類型下面的小類型,讓消費者可以更加精細的確認自己想要獲取的信息類型。而在消費者一段,不用精確的指定具體到哪一個大類型下的小類型的key,而是可以使用類似正則表達式(但與正則表達式規則完全不同)的通配符在指定一定范圍或符合某一個字符串匹配規則的key,來獲取想要的信息。
“通配符交換機”(Topic Exchange)將路由鍵和某模式進行匹配。此時隊列需要綁定在一個模式上。符號“#”匹配一個或多個詞,符號“*”僅匹配一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*”只會匹配到“audit.irs”。(這里與我們一般的正則表達式的“*”和“#”剛好相反,這里我們需要注意一下。)
下面是一個解釋通配符模式交換機工作的一個樣例
上面的交換機制類似於一個國際新聞訊息網站的機制。
# 生產者 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs3', exchange_type='topic') message = "info: Hello ERU!" channel.basic_publish(exchange='logs3', routing_key='europe.weather', body=message) print(" [x] Sent %r" % message) connection.close() # 消費者 import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs3', exchange_type='topic') result = channel.queue_declare("",exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs3', queue=queue_name, routing_key="#.news") print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=callback) channel.start_consuming()
四 基於rabbitmq的RPC實現
4.1、關於RPC
4.2、rpc的實現
- 1.生產端 生成rpc_queue隊列,這個隊列負責幫消費者 接收數據並把消息發給消費端。
- 2.生產端 生成另外一個隨機隊列,這個隊列是發給消費端,消費這個用這個隊列把處理好的數據發送給生產端。
- 3.生產端 生成一組唯一字符串UUID,發送給消費者,消費者會把這串字符作為驗證在發給生產者。
- 4.當消費端處理完數據,發給生產端,時會把處理數據與UUID一起通過隨機生產的隊列發回給生產端。
- 5.生產端,會使用while循環 不斷檢測是否有數據,並以這種形式來實現阻塞等待數據,來監聽消費端。
- 6.生產端獲取數據調用回調函數,回調函數判斷本機的UUID與消費端發回UID是否匹配,由於消費端,可能有多個,且處理時間不等所以需要判斷,判斷成功賦值數據,while循環就會捕獲到,完成交互。
client
import pika import uuid import time # 斐波那契數列 前兩個數相加依次排列 class FibonacciRpcClient(object): def __init__(self): # 賦值變量,一個循環值 self.response = None # 鏈接遠程 self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() # 生成隨機queue result = self.channel.queue_declare("",exclusive=True) # 隨機取queue名字,發給消費端 self.callback_queue = result.method.queue # self.on_response 回調函數:只要收到消息就調用這個函數。 # 聲明收到消息后就 收queue=self.callback_queue內的消息 self.channel.basic_consume(queue=self.callback_queue, auto_ack=True, on_message_callback=self.on_response) # 收到消息就調用 # ch 管道內存對象地址 # method 消息發給哪個queue # body數據對象 def on_response(self, ch, method, props, body): # 判斷本機生成的ID 與 生產端發過來的ID是否相等 if self.corr_id == props.correlation_id: # 將body值 賦值給self.response self.response = body def call(self, n): # 隨機一次唯一的字符串 self.corr_id = str(uuid.uuid4()) # routing_key='rpc_queue' 發一個消息到rpc_queue內 self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( # 執行命令之后結果返回給self.callaback_queue這個隊列中 reply_to = self.callback_queue, # 生成UUID 發送給消費端 correlation_id = self.corr_id, ), # 發的消息,必須傳入字符串,不能傳數字 body=str(n)) # 沒有數據就循環收 while self.response is None: # 非阻塞版的start_consuming() # 沒有消息不阻塞 self.connection.process_data_events() print("no msg...") time.sleep(0.5) return int(self.response) # 實例化 fibonacci_rpc = FibonacciRpcClient() response = fibonacci_rpc.call(50) print(" [.] Got %r" % response)
server
#_*_coding:utf-8_*_ import pika import time # 鏈接socket connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 生成rpc queue channel.queue_declare(queue='rpc_queue') # 斐波那契數列 def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) # 收到消息就調用 # ch 管道內存對象地址 # method 消息發給哪個queue # props 返回給消費的返回參數 # body數據對象 def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) # 調用斐波那契函數 傳入結果 response = fib(n) ch.basic_publish(exchange='', # 生產端隨機生成的queue routing_key=props.reply_to, # 獲取UUID唯一 字符串數值 properties=pika.BasicProperties(correlation_id = \ props.correlation_id), # 消息返回給生產端 body=str(response)) # 確保任務完成 # ch.basic_ack(delivery_tag = method.delivery_tag) # rpc_queue收到消息:調用on_request回調函數 # queue='rpc_queue'從rpc內收 channel.basic_consume(queue="rpc_queue", auto_ack=True, on_message_callback=on_request) print(" [x] Awaiting RPC requests") channel.start_consuming()