rabbitMQ隊列使用及常用命令


一、RabbitMQ常用命令

啟動監控管理器:rabbitmq-plugins enable rabbitmq_management
關閉監控管理器:rabbitmq-plugins disable rabbitmq_management
啟動rabbitmq:rabbitmq-service start
關閉rabbitmq:rabbitmq-service stop
查看所有的隊列:rabbitmqctl list_queues
清除所有的隊列:rabbitmqctl reset
關閉應用:rabbitmqctl stop_app
啟動應用:rabbitmqctl start_app

用戶和權限設置(后面用處)

添加用戶:rabbitmqctl add_user username password
分配角色:rabbitmqctl set_user_tags username administrator
新增虛擬主機:rabbitmqctl add_vhost  vhost_name
將新虛擬主機授權給新用戶:rabbitmqctl set_permissions -p vhost_name username '.*' '.*' '.*'

角色說明

none  最小權限角色
management 管理員角色
policymaker   決策者
monitoring  監控
administrator  超級管理員 

二、RabbitMQ使用

(1)介紹

①什么叫消息隊列

消息(Message)是指在應用間傳送的數據。消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。

消息隊列(Message Queue)是一種應用間的通信方式,消息發送后可以立即返回,由消息系統來確保消息的可靠傳遞。消息發布者只管把消息發布到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不管是誰發布的。這樣發布者和使用者都不用知道對方的存在。

②為何用消息隊列

從上面的描述中可以看出消息隊列是一種應用間的異步協作機制,那什么時候需要使用 MQ 呢?

以常見的訂單系統為例,用戶點擊【下單】按鈕之后的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展初期這些邏輯可能放在一起同步執行,隨着業務的發展訂單量增長,需要提升系統服務的性能,這時可以將一些不需要立即生效的操作拆分出來異步執行,比如發放紅包、發短信通知等。這種場景下就可以用 MQ ,在下單的主流程(比如扣減庫存、生成相應單據)完成之后發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。

以上是用於業務解耦的情況,其它常見場景包括最終一致性、廣播、錯峰流控等等。

③RabbitMQ 特點

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。

AMQP :Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放標准,為面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。

RabbitMQ 最初起源於金融系統,用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特點包括:

  1. 可靠性(Reliability)
    RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認。

  2. 靈活的路由(Flexible Routing)
    在消息進入隊列之前,通過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現自己的 Exchange 。

  3. 消息集群(Clustering)
    多個 RabbitMQ 服務器可以組成一個集群,形成一個邏輯 Broker 。

  4. 高可用(Highly Available Queues)
    隊列可以在集群中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。

  5. 多種協議(Multi-protocol)
    RabbitMQ 支持多種消息隊列協議,比如 STOMP、MQTT 等等。

  6. 多語言客戶端(Many Clients)
    RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、Ruby 等等。

  7. 管理界面(Management UI)
    RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監控和管理消息 Broker 的許多方面。

  8. 跟蹤機制(Tracing)
    如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發生了什么。

  9. 插件機制(Plugin System)
    RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件。

④RabbitMQ 基本概念

  • Message
    消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。
  • Publisher
    消息的生產者,也是一個向交換器發布消息的客戶端應用程序。
  • Exchange
    交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
  • Binding
    綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
  • Queue
    消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
  • Connection
    網絡連接,比如一個TCP連接。
  • Channel
    信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對於操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
  • Consumer
    消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
  • Virtual Host
    虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。
  • Broker
    表示消息隊列服務器實體。

⑤Exchange 類型

Exchange分發消息時根據類型的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類型:
1)fanout(廣播)
2)direct(組播)
3)topic(規則博)
4)headers(通過headers 來決定把消息發給哪些queue)

(2)安裝

安裝 http://www.rabbitmq.com/install-standalone-mac.html

安裝python rabbitMQ module 

pip install pika
or
easy_install pika
or
源碼
https://pypi.python.org/pypi/pika

(3)消息持久化+消息公平分發(完整代碼)

帶消息持久化+消息公平分發的完整代碼

send端

 1 #!/usr/bin/env python
 2 
 3 import pika
 4 
 5 credentials = pika.PlainCredentials('wys', '123456')
 6 
 7 parameters = pika.ConnectionParameters(host='192.168.10.223',credentials=credentials)
 8 connection = pika.BlockingConnection(parameters)
 9 
10 channel = connection.channel() #隊列連接通道
11 
12 #聲明queue
13 channel.queue_declare(queue='task1',durable=True)  # durable=Ture 保證隊列持久化
14 
15 message = ' '.join(sys.argv[1:]) or "Hello World!"
16 channel.basic_publish(exchange='',
17                       routing_key='task1', #路由
18                       properties=pika.BasicProperties(
19                           delivery_mode=2,  # 使消息持久化
20                       ),
21                       body=message)
22 
23 print("[x] Sent %r" % message)
24 
25 connection.close()

receive端

 1 #!/usr/bin/env python
 2 
 3 import pika
 4 import time
 5 
 6 credentials = pika.PlainCredentials('wys', '123456')
 7 
 8 parameters = pika.ConnectionParameters(host='192.168.10.223',credentials=credentials)
 9 connection = pika.BlockingConnection(parameters)
10 
11 channel = connection.channel() #隊列連接通道
12 
13 
14 def callback(ch, method, properties, body):
15     print(" [x] Received %r" % body)
16     # time.sleep(15)
17     print('msg handle done...',body)
18     ch.basic_ack(delivery_tag=method.delivery_tag)  # 手動向服務器端確認這個消息已經被處理完畢
19 
20 channel.basic_qos(prefetch_count=1)  # 消息公平分發
21 channel.basic_consume(callback, #取到消息后,調用callback 函數
22                       queue='task1',)
23                       #no_ack=True) #消息處理后,不向rabbit-server確認消息已消費完畢
24 
25 
26 print(' [*] Waiting for messages. To exit press CTRL+C')
27 
28 channel.start_consuming() #阻塞模式

注意:遠程連接rabbitmq server的話,需要配置權限

首先在rabbitmq server上創建一個用戶

sudo rabbitmqctl  add_user alex alex3714 

同時還要配置權限,允許從外面訪問

sudo rabbitmqctl set_permissions -p / alex ".*" ".*" ".*"

set_permissions [-p vhost] {user} {conf} {write} {read}

vhost

The name of the virtual host to which to grant the user access, defaulting to /. (授予用戶訪問的虛擬主機的名稱,默認為/)

user
The name of the user to grant access to the specified virtual host.(授予對指定虛擬主機的訪問權限的用戶的名稱)
conf
A regular expression matching resource names for which the user is granted configure permissions.(與用戶授予配置權限的匹配資源名稱的正則表達式)
write
A regular expression matching resource names for which the user is granted write permissions.(與用戶匹配的資源名稱的正則表達式授予寫入權限)
read
A regular expression matching resource names for which the user is granted read permissions.(與用戶匹配的資源名稱的正則表達式授予讀取權限)
客戶端連接的時候需要配置認證參數
1 credentials = pika.PlainCredentials('wys', '123456')
2  
3  
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5     '192.168.10.134',5672,'/',credentials))
6 channel = connection.channel()

注意:消息公平分發

如果Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,很可能出現,一個機器配置不高的消費者那里堆積了很多消息處理不完,同時配置高的消費者卻一直很輕松。為解決此問題,可以在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

channel.basic_qos(prefetch_count=1)

(4)fanout(廣播)所有bind到此exchange的queue都可以接收消息

消息publisher

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='logs',
 9                          type='fanout')
10  
11 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
12 channel.basic_publish(exchange='logs',
13                       routing_key='',
14                       body=message)
15 print(" [x] Sent %r" % message)
16 connection.close()

消息subscriber

 1 #_*_coding:utf-8_*_
 2 __author__ = 'Alex Li'
 3 import pika
 4  
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6         host='localhost'))
 7 channel = connection.channel()
 8  
 9 channel.exchange_declare(exchange='logs',
10                          type='fanout')
11  
12 result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開后,自動將queue刪除
13 queue_name = result.method.queue
14  
15 channel.queue_bind(exchange='logs',
16                    queue=queue_name)
17  
18 print(' [*] Waiting for logs. To exit press CTRL+C')
19  
20 def callback(ch, method, properties, body):
21     print(" [x] %r" % body)
22  
23 channel.basic_consume(callback,
24                       queue=queue_name,
25                       no_ack=True)
26  
27 channel.start_consuming()

(5)direct(組播)通過routingKey和exchange決定的那個唯一的queue可以接收消息

有選擇的接收消息(exchange type=direct) 

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。

消息publisher

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
13 channel.basic_publish(exchange='direct_logs',
14                       routing_key=severity,
15                       body=message)
16 print(" [x] Sent %r:%r" % (severity, message))
17 connection.close()

消息subscriber

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 severities = sys.argv[1:]
15 if not severities:
16     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
17     sys.exit(1)
18  
19 for severity in severities:
20     channel.queue_bind(exchange='direct_logs',
21                        queue=queue_name,
22                        routing_key=severity)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

(6)topic(規則播)所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息

更細致的消息過濾

 表達式符號說明:#代表一個或多個字符,*代表任何字符
      例:#.a會匹配a.a,aa.a,aaa.a等
          *.a會匹配a.a,b.a,c.a等
     注:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout

消息publisher

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')
10  
11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
13 channel.basic_publish(exchange='topic_logs',
14                       routing_key=routing_key,
15                       body=message)
16 print(" [x] Sent %r:%r" % (routing_key, message))
17 connection.close()

消息subscriber

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')
10  
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 binding_keys = sys.argv[1:]
15 if not binding_keys:
16     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
17     sys.exit(1)
18  
19 for binding_key in binding_keys:
20     channel.queue_bind(exchange='topic_logs',
21                        queue=queue_name,
22                        routing_key=binding_key)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM