RabbitMQ 簡單使用教程


RabbitMQ 簡單使用教程

安裝

安裝 RabbitMQ 服務

http://www.rabbitmq.com/install-standalone-mac.html

安裝 Python RabbitMQ 模塊

# 方式一
pip install pika

# 方式二
easy_install pika

# 方式三
# 源碼

https://pypi.python.org/pypi/pika

實現簡單隊列通信

image
send端

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 聲明 queue
channel.queue_declare(queue='hello')

# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
# 一條消息永遠不能直接發送到隊列中,它總是需要經過一個交換。
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

print(" [x] Sent 'Hello World!'")
connection.close()

receive

#_*_coding:utf-8_*_

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# 你可能會問我們為什么要再次聲明隊列——我們已經在之前的代碼中聲明了它。

# We could avoid that if we were sure that the queue already exists. For example if send.py program was run before. But we're not yet sure which program to run first. In such cases it's a good practice to repeat declaring the queue in both programs.
# 如果我們確定隊列已經存在,我們可以避免這種情況。 例如,如果 send.py 程序之前運行過。 但是我們還不確定首先運行哪個程序。 在這種情況下,最好在兩個程序中重復聲明隊列。

channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback, queue='hello', no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

遠程連接rabbitmq server的話,需要配置權限

  • 首先在rabbitmq server上創建一個用戶
sudo rabbitmqctl add_user username password

同時還要配置權限,允許從外面訪問

sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*"

set_permissions [-p vhost] {user} {conf} {write} {read}
vhost -- The name of the virtual host to which to grant the user access, defaulting to /.
      -- 授予用戶訪問權限的虛擬主機的名稱,默認為 /。
user -- The name of the user to grant access to the specified virtual host.
     -- 授予對指定虛擬主機訪問權限的用戶名。
conf -- A regular expression matching resource names for which the user is granted configure permissions.
     -- 授予用戶配置權限的資源名稱匹配的正則表達式。
write -- A regular expression matching resource names for which the user is granted write permissions.
      -- 授予用戶寫入權限的資源名稱匹配的正則表達式。
read -- A regular expression matching resource names for which the user is granted read permissions.
     -- 授予用戶讀取權限的資源名稱匹配的正則表達式。

認證

客戶端連接的時候需要配置認證參數

credentials = pika.PlainCredentials('alex', 'alex3714')

connection = pika.BlockingConnection(pika.ConnectionParameters('10.211.55.5', 5672, '/', credentials))
channel = connection.channel()

Work Queues

image
  在這種模式下,RabbitMQ 會默認把生產者(p)發的消息依次分發給各個消費者(c),跟負載均衡差不多
生產者

import pika
import time
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 聲明 queue
channel.queue_declare(queue='task_queue')

# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
# 一個消息永遠不能直接發送到隊列,它總是需要通過交換。

message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2  # make message persistent
    )
)

print(" [x] Sent %r" % message)
connection.close()

消費者

#_*_coding:utf-8_*_

import pika, time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(20)
    print(" [x] Done")
    print("method.delivery_tag", method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback, queue='task_queue', no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

  此時,先啟動消息生產者,然后再分別啟動3個消費者,通過生產者多發送幾條消息,你會發現,這幾條消息會被依次分配到各個消費者身上.

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

  完成一項任務可能需要幾秒鍾。 您可能想知道如果其中一個消費者開始一項長期任務並且只完成了部分任務而死去會發生什么。 使用我們當前的代碼,一旦 RabbitMQ 將消息傳遞給客戶,它就會立即將其從內存中刪除。 在這種情況下,如果你殺死一個消費者,我們將丟失它剛剛處理的消息。 我們還將丟失所有發送給該特定消費者但尚未處理的消息。


But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

  但是我們不想丟失任何任務。 如果一個消費者死亡,我們希望將任務交付給另一個消費者。


In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.

  為了確保消息永遠不會丟失,RabbitMQ 支持消息確認。 消費者發回一個 ack(nowledgement) 來告訴 RabbitMQ 一個特定的消息已經被接收、處理並且 RabbitMQ 可以自由地刪除它。


If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

如果消費者在沒有發送 ack 的情況下死亡(其通道關閉、連接關閉或 TCP 連接丟失),RabbitMQ 將理解消息未完全處理並將重新排隊。 如果同時有其他消費者在線,它會迅速將其重新發送給另一個消費者。 這樣,即使消費者偶爾死亡,您也可以確保不會丟失任何消息。


There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.

沒有任何消息超時; 當消費者死亡時,RabbitMQ 將重新傳遞消息。 即使處理消息需要非常非常長的時間也沒關系。


Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the no_ack=True flag. It's time to remove this flag and send a proper acknowledgment from the worker, once we're done with a task.

默認情況下啟用消息確認。 在前面的示例中,我們通過no_ack=True標志明確關閉它們。 一旦我們完成了一項任務,就刪除這個標志並從消費者那里發送一個適當的確認。

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback, queue='hello')

Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered

使用此代碼,我們可以確定即使您在處理消息時使用 CTRL+C 殺死了一個消費者,也不會丟失任何內容。 消費者死亡后不久,所有未確認的消息將被重新傳遞.


消息持久化

We have learned how to make sure that even if the consumer dies, the task isn't lost(by default, if wanna disable use no_ack=True). But our tasks will still be lost if RabbitMQ server stops.

我們已經學會了如何確保即使消費者死亡,任務也不會丟失(默認情況下,如果要禁用使用 no_ack=True)。 但是如果 RabbitMQ 服務器停止,我們的任務仍然會丟失。


When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

當 RabbitMQ 退出或崩潰時,它會丟失隊列和消息,除非你告訴它不要這樣做。 確保消息不會丟失需要做兩件事:我們需要將隊列和消息都標記為持久的。


First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:

首先,我們需要確保 RabbitMQ 永遠不會丟失我們的隊列。 為此,我們需要將其聲明為持久的:

channel.queue_declare(queue='hello', durable=True)

Although this command is correct by itself, it won't work in our setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for exampletask_queue:

盡管此命令本身是正確的,但它在我們的設置中不起作用。 那是因為我們已經定義了一個名為 hello 的隊列,它不是持久的。 RabbitMQ 不允許您使用不同的參數重新定義現有隊列,並且會向任何嘗試這樣做的程序返回錯誤。 但是有一個快速的解決方法 - 讓我們聲明一個具有不同名稱的隊列,例如 task_queue:

channel.queue_declare(queue='task_queue', durable=True)

This queue_declare change needs to be applied to both the producer and consumer code.

此 queue_declare 更改需要同時應用於生產者和消費者代碼。


At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.

到那時,我們可以確定即使 RabbitMQ 重新啟動,task_queue 隊列也不會丟失。 現在我們需要將我們的消息標記為持久 - 通過提供值為 2 的 delivery_mode 屬性。

channel.basic_publish(
    exchange='',
    routing_key="task_queue",
    body=message,
    properties=pika.BasicProperties(
        delivery_mode = 2, # make message persistent(使消息持久化)
    )
)

消息公平分發

  如果 RabbitMQ 只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置 perfetch=1,意思就是告訴 RabbitMQ 在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。
image

channel.basic_qos(prefetch_count=1)

帶消息持久化 + 公平分發的完整代碼

生產者端

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode = 2, # make message persistent
    )
)

print(" [x] Sent %r" % message)
connection.close()

消費者端

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='task_queue')

channel.start_consuming()

Publish\Subscribe(消息發布\訂閱)

  之前的例子都基本都是 1 對 1 的消息發送和接收,即消息只能發送到指定的 queue 里,但有些時候你想讓你的消息被所有的 Queue 收到,類似廣播的效果,這時候就要用到 exchange 了。

An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

交換是一件非常簡單的事情。 一方面它接收來自生產者的消息,另一方面它將它們推送到隊列中。 交換必須確切地知道如何處理它收到的消息。 是否應該將其附加到特定隊列?它應該附加到許多隊列中嗎? 或者它應該被丟棄。 其規則由交換類型定義。


Exchange 在定義的時候是有類型的,以決定到底是哪些 Queue 符合條件,可以接收消息

  • fanout: 所有 bind 到此 exchange 的 queue 都可以接收消息
  • direct: 通過 routingKey 和 exchange 決定的哪個唯一的 queue 可以接收消息
  • topic:所有符合 routingKey(此時可以是一個表達式)的 routingKey 所 bind 的 queue 可以接收消息

    表達式符號說明:# 代表一個或多個字符,* 代表任何字符
    例:#.a 會匹配 a.a,aa.a,aaa.a等
      *.a 會匹配 a.a,b.a,c.a等
    注:使用 RoutingKey 為 #,Exchange Type 為 topic 的時候相當於使用 fanout

  • headers: 通過 headers 來決定把消息發給哪些 queue
    image
    消息 publisher
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)

print(" [x] Sent %r" % message)
connection.close()

消息 subscriber

#_*_coding:utf-8_*_
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', type='fanout')

# 不指定 queue 名字,rabbitMQ 會隨機分配一個名字,exclusive=True 會在使用此 queue 的消費者斷開后,自動將 queue 刪除
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()

有選擇的接收消息(exchange type=direct)

  RabbitMQ 還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息 exchange,exchange 根據關鍵字判定應該將數據發送至指定隊列。
image
發布

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)

print(" [x] Sent %r:%r" % (severity, message))

connection.close()

訂閱

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

更細致的消息過濾

Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.

盡管使用直接交換改進了我們的系統,但它仍然有局限性——它不能基於多個標准進行路由。


In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).

在我們的日志系統中,我們可能不僅要根據優先級訂閱日志,還要根據發出日志的源來訂閱。 您可能從 syslog unix 工具中知道這個概念,該工具根據嚴重性 (info/warn/crit...) 和設施 (auth/cron/kern...) 路由日志。


That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.

這會給我們很大的靈活性——我們可能只想監聽來自“cron”的嚴重錯誤,也想監聽來自“kern”的所有日志。

image

發布

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)

print(" [x] Sent %r:%r" % (routing_key, message))

connection.close()

訂閱

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

To receive all the logs run:

要接收所有日志,請運行:

python receive_logs_topic.py "#"

To receive all logs from the facility "kern":

接收所有“kern”日志:

python receive_logs_topic.py "kern.*"

Or if you want to hear only about "critical" logs:

或者,如果您只想了解“critical”日志:

python receive_logs_topic.py "*.critical"

You can create multiple bindings:

您可以創建多個綁定:

python receive_logs_topic.py "kern.*" "*.critical"

And to emit a log with a routing key "kern.critical" type:

並發出帶有路由鍵“kern.critical”類型的日志:

python emit_log_topic.py "kern.critical" "A critical kernel error"


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM