RabbitMQ基礎總結


  MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。

  RabbitMQ和其他AMPQ程序的最大區別在於消息持久化和集群模式,使消息更加可靠。

  本博客使用的例子均為官方文檔中的例子,英文好的可以直接查看原版文檔,寫此文檔的目的作為自己學習過程的一個記錄和總結。

本博客測試環境:CentOS6.8VM  python:2.7.11 操作系統:win7 64位  rabbitMQ:3.6.5

一、安裝

1、yum方式安裝

安裝配置epel源
   $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
 
安裝erlang
   $ yum -y install erlang
 
安裝RabbitMQ
   $ yum -y install rabbitmq-server

service rabbitmq-server start/stop

 

2、本地安裝

 

3、安裝RabbitMQ的python API

#本次測試使用pika
pip install pika

 

 

二、Rabbit簡要架構介紹

1、架構圖

(圖片百度找的,侵刪)

 

2、主要術語介紹

Exchange:交換機,決定了消息路由規則;
Queue:消息隊列;
Channel:進行消息讀寫的通道;
Bind:綁定了Queue和Exchange,意即為符合什么樣路由規則的消息,將會放置入哪一個消息隊列;

詳細的使用方法和具體的概念,相信看完下面的實例會有一個了解。
 

3、RabbitMQ的主要工作模式

1、簡單隊列模式,此模Exchange不工作。
2、fanout模式(廣播模式),此模式消息在exchange范圍內廣播。
2、direct模式(精確路由模式),由routing-key精確匹配。
3、topic模式(模糊路由模式),由routing-key模糊匹配
 

 三、簡單隊列模式

該模式不需要生命Exchange僅使用queue隊列來直接交換消息。

生產者

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pika

# ######################### 生產者 #########################
#建立連接
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.79.131'))
#基於連接建立通道
channel = connection.channel()
#創建隊列
channel.queue_declare(queue='hello')
#

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!' )
print(" [x] Sent 'Hello World!'")

connection.close()

消費者

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
# ######################### 消費者 #########################
import pika
#創建連接
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.79.131'))
#基於連接建立通道
channel = connection.channel()
#創建隊列
channel.queue_declare(queue='hello')

#定義消息回調函數
def callback(ch, method, properties, body):
    print(" [x] Received %r" %body)
    time.sleep(5)
    #
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [c1] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

1、消費者消息應答no-ack

channel.basic_consume(callback,queue='hello',no_ack=True),默認情況no_ack=False表示consumer完成消息處理后需手動應答。向生隊列服務器說明消息已處理完成,可以刪除。

如果消費者遇到情況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那么,RabbitMQ會重新將該任務添加到隊列中,並由其他consumer獲取該消息,確保消息不會丟失。

消費者回調函數中ch.basic_ack(delivery_tag=method.delivery_tag),表示手動應答了服務器。

 
        
#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
# ######################### 消費者 #########################
import pika
import time
import datetime
#創建連接
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.79.131'))

channel = connection.channel()
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r,%s" %body)
    time.sleep(5)
    ch.basic_ack(delivery_tag=method.delivery_tag)#手動應答

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [c1] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

4、服務端持久化

RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,當我們需要可靠的消息處理的時候應該設置消息持久化。消息隊列持久化包括3個部分:

(1)exchange持久化,在聲明時指定durable => 1

(2)queue持久化,在聲明時指定durable => 1

(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。

 

#生產者
#
! /usr/bin/env python # -*- coding:utf-8 -*- # __author__ = "TKQ" import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.79.131')) channel = connection.channel() # 持久化隊列 channel.queue_declare(queue='hello', durable=True) channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, # 消息傳遞持久化 )) print(" [x] Sent 'Hello World!'") connection.close()
#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.79.131'))
channel = connection.channel()

#隊列持久化
channel.queue_declare(queue='hello', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
消費者

 5、消息分發順序

默認狀態下,RabbitMQ將第n個Message分發給第n個Consumer。當然n是取余后的。它不管Consumer是否還有unacked Message,只是按照這個默認機制進行分發。那么如果有個Consumer工作比較重,那么就會導致有的Consumer基本沒事可做,有的Consumer卻是毫無休息的機會。那么,RabbitMQ是如何處理這種問題呢?

通過 basic.qos 方法設置prefetch_count=1 。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。

6、最終版本

設置消息確認,服務器的持久化,和消息分發順序后,最終版本:

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.79.131'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
消費者
#生產者
#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.79.131'))
channel = connection.channel()

# 持久化隊列
channel.queue_declare(queue='hello', durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2, # 消息傳遞持久化
                      ))
print(" [x] Sent 'Hello World!'")
connection.close()
生產者

 

四、fanout-廣播模式

廣播模式也稱為發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。

關鍵點:

  1、生產者:設置exchange type = fanout

  2、消費者:result = channel.queue_declare(exclusive=True)  #生成臨時隊列,當Consumer關閉連接時,這個queue要被deleted。

  3、消費者:queue_name = result.method.queue  #result.method.queue 可以取得queue的名字。基本上都是這個樣子:amq.gen-JzTY20BRgKO-HjmUJxsgf。

下面我們模擬一個簡單的日志系統,將所有日志簡單的廣播給所有訂閱者。

生產者:

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消費者:

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)  #
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

 

 運行消費者兩個消費者和一個生產者,多個消費者均收到消息

消費者開啟:queue和binding狀態

[root@localhost ~]# rabbitmqctl list_bindings
Listing bindings ...
      exchange    amq.gen-5A-Amc-4DZgaE5_x52Q-Tw    queue    amq.gen-5A-Amc-4DZgaE5_x52Q-Tw    []
      exchange    amq.gen-wGFWTvv42rUR6_tki5eeOg    queue    amq.gen-wGFWTvv42rUR6_tki5eeOg    []
logs    exchange    amq.gen-5A-Amc-4DZgaE5_x52Q-Tw    queue    amq.gen-5A-Amc-4DZgaE5_x52Q-Tw    []
logs    exchange    amq.gen-wGFWTvv42rUR6_tki5eeOg    queue    amq.gen-wGFWTvv42rUR6_tki5eeOg    []

 

[root@localhost ~]# rabbitmqctl list_queues
Listing queues ...
amq.gen-5A-Amc-4DZgaE5_x52Q-Tw    0
amq.gen-wGFWTvv42rUR6_tki5eeOg    0  

 關閉消費者:queue和bindings均為空。

[root@localhost ~]# rabbitmqctl list_bindings
Listing bindings ...
...done.
[root@localhost ~]# rabbitmqctl list_queues
Listing queues ...
...done.

 

 

 五、direct模式-精確路由模式

Direct exchange的路由,通過routing_key(我將其稱為路由關鍵字)的精確匹配。即時一個queue的兩條routing_key同時匹配到同一條消息,也僅接收一條。

上個主題我們實現的簡單的日志廣播系統。這個小節我們實現更復雜點的需求,針對不同的日志等級發送給不同的訂閱者。

基本原理就是:

  1、生產者:在fanout模式的基礎上,配置routing_key。

  2、消費者:在fanout模式的基礎上,配置routing_key。

下面我們模擬一個簡單的日志系統,將所有日志簡單的廣播給所有訂閱者。

#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.79.131'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
#使用for循環來同時綁定多個binding-key
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
消費者
#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.79.131'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
生產者

 

關於exchange和queue的一對多或者多對一匹配:

  1、消費者的一個queue可以綁定多個bindding_key來同時匹配多個路由關鍵字。使用循環把多個routing_key綁定到一個queue上。

  

 

   2、多個queue可以匹配同一個routing_key來匹配同一個路由關鍵字。當所有queue的routing_key相同時,模式就和fanout一樣啦。

 

六、topic模式-模糊路由匹配

topic模式基本與Direct模式相同,唯一的不同點就是routing-key是模糊匹配。routing_key的格式以點號分隔,最長255bytes。

匹配元字符:

  # 表示可以匹配 0 個 或 任意個單詞(包含1個)

  *  表示只能匹配 一個 單詞

Topic exchange和其他exchange:

    如果binding_key 是#   ,那么它會接收所有的Message,不管routing_key是什么,就像是fanout exchange。
    如果 #和* 沒有被使用,那么topic exchange就變成了direct exchange。

 

發送者路由值           roiting_key
i.love.python          i.*  -- 不匹配
i.love.python          i.#  -- 匹配
i.love.python      #  --匹配,單個#匹配所有
#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.79.131'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
消費者
#! /usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = "TKQ"
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.79.131'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
生產者

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM