RabbitMQ隊列


RabbitMQ是什么?

RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。

MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。

RabbitMQ的安裝

首先說明,RabbitMQ在win上安裝是一件頗為麻煩的事情。試了很長時間都沒有成功,后來就轉戰linux了。在linux的安裝中也可能會出現一點問題,下面會貼出一個網址有安裝中出現問題的解決辦法。

linux上都是直接install rabbitmq-server

當然可能會在安裝中和后來的使用上出現這樣或者是那樣的問題,解決辦法參見這篇博客http://www.cnblogs.com/kaituorensheng/p/4985767.html

RabbitMQ的語法以及實例

1.基本實例

基於Queue實現生產者消費者模型

 1 import Queue
 2 import threading
 3 
 4 
 5 message = Queue.Queue(10)
 6 
 7 
 8 def producer(i):
 9     while True:
10         message.put(i)
11 
12 
13 def consumer(i):
14     while True:
15         msg = message.get()
16 
17 
18 for i in range(12):
19     t = threading.Thread(target=producer, args=(i,))
20     t.start()
21 
22 for i in range(10):
23     t = threading.Thread(target=consumer, args=(i,))
24     t.start()
View Code

對於RabbitMQ來說,生產和消費不再針對內存里的一個Queue對象,而是某台服務器上的RabbitMQ Server實現的消息隊列。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()#開通一個管道

#聲明queue
channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',#queue名字
                      body='Hello World!')#消息內容
print(" [x] Sent 'Hello World!'")
connection.close()
import pika
#建立連接
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


channel.basic_consume(#消費消息
                      callback,#如果收到消息就調用callback函數處理消息
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

2.消息發布輪詢

  • 上面的只是一個生產者、一個消費者,能不能一個生產者多個消費者呢? 
    可以上面的例子,多啟動幾個消費者consumer,看一下消息的接收情況。 
    采用輪詢機制;把消息依次分發

 

  • 假如消費者處理消息需要15秒,如果當機了,那這個消息處理明顯還沒處理完,怎么處理? 
    (可以模擬消費端斷了,分別注釋和不注釋 no_ack=True 看一下) 
    你沒給我回復確認,就代表消息沒處理完。

 

  • 上面的效果消費端斷了就轉到另外一個消費端去了,但是生產者怎么知道消費端斷了呢? 
    因為生產者和消費者是通過socket連接的,socket斷了,就說明消費端斷開了。

 

  • 上面的模式只是依次分發,實際情況是機器配置不一樣。怎么設置類似權重的操作?
    RabbitMQ怎么辦呢,RabbitMQ做了簡單的處理就能實現公平的分發。 
    就是RabbitMQ給消費者發消息的時候檢測下消費者里的消息數量,如果超過指定值(比如1條),就不給你發了。 
    只需要在消費者端,channel.basic_consume前加上就可以了。
channel.basic_qos(prefetch_count=1)  # 類似權重,按能力分發,如果有一個消息,就不在給你發

3. acknowledgment 消息持久化

 no-ack = False

如果消費者遇到情況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那么,RabbitMQ會重新將該任務添加到隊列中。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print ('ok')
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
生產者

durable  

如果隊列里還有消息,RabbitMQ 服務端宕機了呢?消息還在不在? 
把RabbitMQ服務重啟,看一下消息在不在。 
上面的情況下,宕機了,消息就久了,下面看看如何把消息持久化。 
每次聲明隊列的時候,都加上durable,注意每個隊列都得寫,客戶端、服務端聲明的時候都得寫。

# 在管道里聲明queue
channel.queue_declare(queue='hello2', durable=True)

durable的作用只是把隊列持久化。離消息持久話還差一步: 
發送端發送消息時,加上properties

properties=pika.BasicProperties(
    delivery_mode=2,  # 消息持久化
    )
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

消費者
生產者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print( 'ok')
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
消費者

4.消息獲取順序

默認消息隊列里的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者2去隊列中獲取 偶數 序列的任務。

channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列

 1 #Auther: Xiaoliuer Li
 2 
 3 import pika
 4 
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 channel = connection.channel()
 7 
 8 # make message persistent
 9 channel.queue_declare(queue='hello')
10 
11 
12 def callback(ch, method, properties, body):
13     print(" [x] Received %r" % body)
14     import time
15     time.sleep(10)
16     print ('ok')
17     ch.basic_ack(delivery_tag = method.delivery_tag)
18 
19 channel.basic_qos(prefetch_count=1)
20 
21 channel.basic_consume(callback,
22                       queue='hello',
23                       no_ack=False)
24 
25 print(' [*] Waiting for messages. To exit press CTRL+C')
26 channel.start_consuming()
消費者

5.發布訂閱(廣播模式)

前面的效果都是一對一發,如果做一個廣播效果可不可以,這時候就要用到exchange了 
exchange必須精確的知道收到的消息要發給誰。exchange的類型決定了怎么處理, 
類型有以下幾種:

  • fanout: 所有綁定到此exchange的queue都可以接收消息
  • direct: 通過routingKey和exchange決定的那個唯一的queue可以接收消息
  • topic: 所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息

fanout 純廣播、all

發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()
發布者
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
訂閱者

注意:廣播,是實時的,收不到就沒了,消息不會存下來,類似收音機。

direct 有選擇的接收消息

 

之前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
發送者
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 獲取運行腳本所有的參數
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
# 循環列表去綁定
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
接收者

運行接收端,指定接收級別的參數,例:

python direct_sonsumer.py info warning 
python direct_sonsumer.py warning error

topic 更細致的過濾

在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。

  • # 表示可以匹配 0 個 或 多個 單詞
  • *  表示只能匹配 一個 單詞
發送者路由值              隊列中
old.boy.python          old.*  -- 不匹配
old.boy.python          old.#  -- 匹配
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
生產者
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
消費者

注意:

sudo rabbitmqctl add_user alex 123
# 設置用戶為administrator角色
sudo rabbitmqctl set_user_tags alex administrator
# 設置權限
sudo rabbitmqctl set_permissions -p "/" alex '.''.''.'

# 然后重啟rabbiMQ服務
sudo /etc/init.d/rabbitmq-server restart
 
# 然后可以使用剛才的用戶遠程連接rabbitmq server了。


------------------------------
credentials = pika.PlainCredentials("alex","123")

connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.14.47',credentials=credentials))
View Code

6.RabbitMQ RPC 實現(Remote procedure call)

不知道你有沒有發現,上面的流都是單向的,如果遠程的機器執行完返回結果,就實現不了了。 
如果返回,這種模式叫什么呢,RPC(遠程過程調用),snmp就是典型的RPC 
RabbitMQ能不能返回呢,怎么返回呢?既是發送端又是接收端。 
但是接收端返回消息怎么返回?可以發送到發過來的queue里么?不可以。 
返回時,再建立一個queue,把結果發送新的queue里 
為了服務端返回的queue不寫死,在客戶端給服務端發指令的的時候,同時帶一條消息說,你結果返回給哪個queue

import pika
import uuid
import time

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response,  # 只要一收到消息就調用on_response
                                   no_ack=True,
                                   queue=self.callback_queue)  # 收這個queue的消息

    def on_response(self, ch, method, props, body):  # 必須四個參數
        # 如果收到的ID和本機生成的相同,則返回的結果就是我想要的指令返回的結果
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None  # 初始self.response為None
        self.corr_id = str(uuid.uuid4())  # 隨機唯一字符串
        self.channel.basic_publish(
                exchange='',
                routing_key='rpc_queue',  # 發消息到rpc_queue
                properties=pika.BasicProperties(  # 消息持久化
                    reply_to = self.callback_queue,  # 讓服務端命令結果返回到callback_queue
                    correlation_id = self.corr_id,  # 把隨機uuid同時發給服務器
                ),
                body=str(n)
        )
        while self.response is None:  # 當沒有數據,就一直循環
            # 啟動后,on_response函數接到消息,self.response 值就不為空了
            self.connection.process_data_events()  # 非阻塞版的start_consuming()
            # print("no msg……")
            # time.sleep(0.5)
        # 收到消息就調用on_response
        return int(self.response)

if __name__ == '__main__':
    fibonacci_rpc = FibonacciRpcClient()
    print(" [x] Requesting fib(7)")
    response = fibonacci_rpc.call(7)
    print(" [.] Got %r" % response)
RPC client
import pika
import time

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
    n = int(body)
    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(
            exchange='',  # 把執行結果發回給客戶端
            routing_key=props.reply_to,  # 客戶端要求返回想用的queue
            # 返回客戶端發過來的correction_id 為了讓客戶端驗證消息一致性
            properties=pika.BasicProperties(correlation_id = props.correlation_id),
            body=str(response)
    )
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 任務完成,告訴客戶端

if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='rpc_queue')  # 聲明一個rpc_queue ,

    channel.basic_qos(prefetch_count=1)
    # 在rpc_queue里收消息,收到消息就調用on_request
    channel.basic_consume(on_request, queue='rpc_queue')
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
RPC server

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM