MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。
RabbitMQ和其他AMPQ程序的最大區別在於消息持久化和集群模式,使消息更加可靠。
本博客使用的例子均為官方文檔中的例子,英文好的可以直接查看原版文檔,寫此文檔的目的作為自己學習過程的一個記錄和總結。
本博客測試環境:CentOS6.8VM python:2.7.11 操作系統:win7 64位 rabbitMQ:3.6.5
一、安裝
1、yum方式安裝
安裝配置epel源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安裝erlang $ yum -y install erlang 安裝RabbitMQ $ yum -y install rabbitmq-server service rabbitmq-server start/stop
2、本地安裝
3、安裝RabbitMQ的python API
#本次測試使用pika pip install pika
二、Rabbit簡要架構介紹
1、架構圖
(圖片百度找的,侵刪)
2、主要術語介紹
Exchange:交換機,決定了消息路由規則;
Queue:消息隊列;
Channel:進行消息讀寫的通道;
Bind:綁定了Queue和Exchange,意即為符合什么樣路由規則的消息,將會放置入哪一個消息隊列;
3、RabbitMQ的主要工作模式
三、簡單隊列模式
該模式不需要生命Exchange僅使用queue隊列來直接交換消息。
生產者
#! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ = "TKQ" import pika # ######################### 生產者 ######################### #建立連接 connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.79.131')) #基於連接建立通道 channel = connection.channel() #創建隊列 channel.queue_declare(queue='hello') # channel.basic_publish(exchange='', routing_key='hello', body='Hello World!' ) print(" [x] Sent 'Hello World!'") connection.close()
消費者
#! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ = "TKQ" # ######################### 消費者 ######################### import pika #創建連接 connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.79.131')) #基於連接建立通道 channel = connection.channel() #創建隊列 channel.queue_declare(queue='hello') #定義消息回調函數 def callback(ch, method, properties, body): print(" [x] Received %r" %body) time.sleep(5) # channel.basic_consume(callback, queue='hello', no_ack=True) print(' [c1] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
1、消費者消息應答no-ack
channel.basic_consume(callback,queue='hello',no_ack=True),默認情況no_ack=False表示consumer完成消息處理后需手動應答。向生隊列服務器說明消息已處理完成,可以刪除。
如果消費者遇到情況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那么,RabbitMQ會重新將該任務添加到隊列中,並由其他consumer獲取該消息,確保消息不會丟失。
消費者回調函數中ch.basic_ack(delivery_tag=method.delivery_tag),表示手動應答了服務器。
#! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ = "TKQ" # ######################### 消費者 ######################### import pika import time import datetime #創建連接 connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.79.131')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r,%s" %body) time.sleep(5) ch.basic_ack(delivery_tag=method.delivery_tag)#手動應答 channel.basic_consume(callback, queue='hello', no_ack=False) print(' [c1] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
4、服務端持久化
RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,當我們需要可靠的消息處理的時候應該設置消息持久化。消息隊列持久化包括3個部分:
(1)exchange持久化,在聲明時指定durable => 1
(2)queue持久化,在聲明時指定durable => 1
(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。
#生產者
#! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ = "TKQ" import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.79.131')) channel = connection.channel() # 持久化隊列 channel.queue_declare(queue='hello', durable=True) channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # 消息傳遞持久化 )) print(" [x] Sent 'Hello World!'") connection.close()

#! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ = "TKQ" import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.79.131')) channel = connection.channel() #隊列持久化 channel.queue_declare(queue='hello', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
5、消息分發順序
默認狀態下,RabbitMQ將第n個Message分發給第n個Consumer。當然n是取余后的。它不管Consumer是否還有unacked Message,只是按照這個默認機制進行分發。那么如果有個Consumer工作比較重,那么就會導致有的Consumer基本沒事可做,有的Consumer卻是毫無休息的機會。那么,RabbitMQ是如何處理這種問題呢?
通過 basic.qos 方法設置prefetch_count=1 。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。
6、最終版本
設置消息確認,服務器的持久化,和消息分發順序后,最終版本:

#! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ = "TKQ" import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.79.131')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()

#生產者 #! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ = "TKQ" import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.79.131')) channel = connection.channel() # 持久化隊列 channel.queue_declare(queue='hello', durable=True) channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # 消息傳遞持久化 )) print(" [x] Sent 'Hello World!'") connection.close()
四、fanout-廣播模式
廣播模式也稱為發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。
關鍵點:
1、生產者:設置exchange type = fanout
2、消費者:result = channel.queue_declare(exclusive=True) #生成臨時隊列,當Consumer關閉連接時,這個queue要被deleted。
3、消費者:queue_name = result.method.queue #result.method.queue 可以取得queue的名字。基本上都是這個樣子:amq.gen-JzTY20BRgKO-HjmUJxsgf。
下面我們模擬一個簡單的日志系統,將所有日志簡單的廣播給所有訂閱者。
生產者:
#! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ = "TKQ" import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
消費者:
#! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ = "TKQ" import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) # queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
運行消費者兩個消費者和一個生產者,多個消費者均收到消息
消費者開啟:queue和binding狀態
[root@localhost ~]# rabbitmqctl list_bindings Listing bindings ... exchange amq.gen-5A-Amc-4DZgaE5_x52Q-Tw queue amq.gen-5A-Amc-4DZgaE5_x52Q-Tw [] exchange amq.gen-wGFWTvv42rUR6_tki5eeOg queue amq.gen-wGFWTvv42rUR6_tki5eeOg [] logs exchange amq.gen-5A-Amc-4DZgaE5_x52Q-Tw queue amq.gen-5A-Amc-4DZgaE5_x52Q-Tw [] logs exchange amq.gen-wGFWTvv42rUR6_tki5eeOg queue amq.gen-wGFWTvv42rUR6_tki5eeOg []
[root@localhost ~]# rabbitmqctl list_queues Listing queues ... amq.gen-5A-Amc-4DZgaE5_x52Q-Tw 0 amq.gen-wGFWTvv42rUR6_tki5eeOg 0
關閉消費者:queue和bindings均為空。
[root@localhost ~]# rabbitmqctl list_bindings Listing bindings ... ...done. [root@localhost ~]# rabbitmqctl list_queues Listing queues ... ...done.
五、direct模式-精確路由模式
Direct exchange的路由,通過routing_key(我將其稱為路由關鍵字)的精確匹配。即時一個queue的兩條routing_key同時匹配到同一條消息,也僅接收一條。
上個主題我們實現的簡單的日志廣播系統。這個小節我們實現更復雜點的需求,針對不同的日志等級發送給不同的訂閱者。
基本原理就是:
1、生產者:在fanout模式的基礎上,配置routing_key。
2、消費者:在fanout模式的基礎上,配置routing_key。
下面我們模擬一個簡單的日志系統,將所有日志簡單的廣播給所有訂閱者。

#! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ = "TKQ" #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.79.131')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) #使用for循環來同時綁定多個binding-key for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

#! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ = "TKQ" import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.79.131')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
關於exchange和queue的一對多或者多對一匹配:
1、消費者的一個queue可以綁定多個bindding_key來同時匹配多個路由關鍵字。使用循環把多個routing_key綁定到一個queue上。
2、多個queue可以匹配同一個routing_key來匹配同一個路由關鍵字。當所有queue的routing_key相同時,模式就和fanout一樣啦。
六、topic模式-模糊路由匹配
topic模式基本與Direct模式相同,唯一的不同點就是routing-key是模糊匹配。routing_key的格式以點號分隔,最長255bytes。
匹配元字符:
# 表示可以匹配 0 個 或 任意個單詞(包含1個)
* 表示只能匹配 一個 單詞
Topic exchange和其他exchange: 如果binding_key 是# ,那么它會接收所有的Message,不管routing_key是什么,就像是fanout exchange。 如果 #和* 沒有被使用,那么topic exchange就變成了direct exchange。
發送者路由值 roiting_key i.love.python i.* -- 不匹配 i.love.python i.# -- 匹配
i.love.python # --匹配,單個#匹配所有

#! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ = "TKQ" import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.79.131')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

#! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ = "TKQ" import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.79.131')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()