RabbitMQ(pika模塊)


RabbitMQ

基礎

關於MQ:

MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。

RabbitMQ安裝

1
2
3
4
5
6
7
8
安裝配置epel源
    $ rpm - ivh http: / / dl.fedoraproject.org / pub / epel / 6 / i386 / epel - release - 6 - 8.noarch .rpm
   
安裝erlang
    $ yum - y install erlang
   
安裝RabbitMQ
    $ yum - y install rabbitmq - server

啟動/停止:

1
systemctl start / stop rabbitmq

安裝python-API:

1
2
3
4
5
6
7
pip install pika
or
easy_install pika
or
源碼
   
https: / / pypi.python.org / pypi / pika


API基礎操作


先來看看使用RabbitMQ之前,怎么實現消息隊列:利用Queue和Thread,每線程往內存里的隊列里put一個數,另一個程序再去內存隊列里取數。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import Queue
import threading
 
 
message = Queue.Queue( 10 )
 
 
def producer(i):
     while True :
         message.put(i)
 
 
def consumer(i):
     while True :
         msg = message.get()
 
 
for i in range ( 12 ):
     t = threading.Thread(target = producer, args = (i,))
     t.start()
 
for i in range ( 10 ):
     t = threading.Thread(target = consumer, args = (i,))
     t.start()

對於RabbitMQ來說,生產和消費不再針對內存里的一個Queue對象,而是某台服務器上的RabbitMQ Server實現的消息隊列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import pika
 
# ######################### 生產者 #########################
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.136.8' ))
channel = connection.channel()    #創建通道
 
channel.queue_declare(queue = 'hello' )    #隊列名稱
 
channel.basic_publish(exchange = '',
                       routing_key = 'hello' ,  #路由名稱
                       body = 'Hello World!' #發送內容
print ( " [x] Sent 'Hello World!'" )
connection.close()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pika
 
# ########################## 消費者 ##########################
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.136.8' ))
channel = connection.channel()
 
channel.queue_declare(queue = 'hello' )    #聲明,隊列名稱,和producer創建的重復沒有關系
 
def callback(ch, method, properties, body):
     print ( " [x] Received %r" % body)
 
channel.basic_consume(callback,         #獲取body后執行回調函數
                       queue = 'hello' ,
                       no_ack = True )                #自動應答開啟,會給MQ服務器發送一個ack:‘已經收到了’。
 
print ( ' [*] Waiting for messages. To exit press CTRL+C' )
channel.start_consuming()

消費者運行起來后會和RabbitMQ建立長連接,一旦生產者放數據到隊列里,消費者就能獲取到該值,並進行處理。

1
2
[root@localhost ~]# netstat -ntp |grep beam
tcp6       0       0 192.168 . 136.8 : 5672       192.168 . 136.1 : 52587      ESTABLISHED 1146 /beam


消息安全

1、no-ack = False(自動應答關閉)

如果生產者遇到情況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那么,RabbitMQ會重新將該任務添加到隊列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import pika
#no-ack
########################### 消費者 ##########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.136.8' ))
channel = connection.channel()
 
channel.queue_declare(queue = 'hello' )
 
def callback(ch, method, properties, body):
     print ( " [x] Received %r" % body)
     import time
     time.sleep( 10 )
     print 'ok'
     ch.basic_ack(delivery_tag = method.delivery_tag)    #主動發送ack
     #打印‘ok’后才告訴MQ,這個消息已經處理完了。
 
channel.basic_consume(callback,
                       queue = 'hello' ,
                       no_ack = False )     #自動應答關閉,與channel.basic_ack共同使用
 
print ( ' [*] Waiting for messages. To exit press CTRL+C' )
channel.start_consuming()

2、durable  

make message persistent 使消息持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pika
 
#durable
########################## 生產者 #########################
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.136.8' ))
channel = connection.channel()
 
channel.queue_declare(queue = 'hello' , durable = True #開啟持久化
 
channel.basic_publish(exchange = '',
                       routing_key = 'hello' ,
                       body = 'Hello World!' ,
                       properties = pika.BasicProperties(
                           delivery_mode = 2 , # make message persistent
                       ))
print ( " [x] Sent 'Hello World!'" )
connection.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import pika
#durable
########################## 消費者 #########################
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.136.8' ))
channel = connection.channel()
 
# make message persistent
channel.queue_declare(queue = 'hello' , durable = True )
 
 
def callback(ch, method, properties, body):
     print ( " [x] Received %r" % body)
     import time
     time.sleep( 10 )
     print 'ok'
     ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_consume(callback,
                       queue = 'hello' ,
                       no_ack = False )
 
print ( ' [*] Waiting for messages. To exit press CTRL+C' )
channel.start_consuming()


消息獲取順序

默認消息隊列里的數據是按照奇偶順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者2去隊列中獲取 偶數 序列的任務。

channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.211.55.4' ))
channel = connection.channel()
 
# make message persistent
channel.queue_declare(queue = 'hello' )
 
 
def callback(ch, method, properties, body):
     print ( " [x] Received %r" % body)
     import time
     time.sleep( 10 )
     print 'ok'
     ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count = 1 )    #增加這行
 
channel.basic_consume(callback,
                       queue = 'hello' ,
                       no_ack = False )
 
print ( ' [*] Waiting for messages. To exit press CTRL+C' )
channel.start_consuming()


發布&訂閱

與消息隊列區別:

消息隊列中的數據只要被消費一次便消失。

創建隊列的數量:

同一份消息,有多少訂閱者,就要創建多少個隊列。(RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。)

語法:

exchange type = fanout        #fanout==>輸出到很多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# ######################### 發布者 #########################
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.136.8' ))
channel = connection.channel()
 
channel.exchange_declare(exchange = 'fanout_name' , type = 'fanout' )
 
message = ' ' .join(sys.argv[ 1 :]) or "info: Hello World!"
channel.basic_publish(exchange = 'fanout_name' #自命名exchange
                       routing_key = '',
                       body = message)
print ( " [x] Sent %r" % message)
connection.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# ########################## 訂閱者1 ##########################
 
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.136.8' ))
channel = connection.channel()
 
channel.exchange_declare(exchange = 'fanout_name' , type = 'fanout' )    #創建exchange(if not exist)
 
result = channel.queue_declare(exclusive = True )
queue_name = result.method.queue        #獲取隊列名稱
 
channel.queue_bind(exchange = 'fanout_name' ,queue = queue_name)    #通過上面兩個值綁定隊列
 
print ( ' [*] Waiting for fanout_name. To exit press CTRL+C' )
 
def callback(ch, method, properties, body):
     print ( " [x] %r" % body)
 
channel.basic_consume(callback,
                       queue = queue_name,
                       no_ack = True )
 
channel.start_consuming()

創建多個訂閱者,能更好的體現它的效果。

運行結果總結:

每個訂閱者創建一個exchange隊列,名稱自定,發布者會把數據發送給所有叫這個名字的隊列。因為數據只能被消費一次,所以有多少個訂閱者,就有多少個隊列。


發送到指定(not 固定)隊列

之前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送

1、按關鍵字尋找隊列發送

exchange type = direct

隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# ######################### 生產者 #########################
#關鍵字發送
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
         host = '192.168.136.8' ))
channel = connection.channel()
 
channel.exchange_declare(exchange = 'direct_logs' ,
                          type = 'direct' )
 
 
message = 'Hello World!'
channel.basic_publish(exchange = 'direct_logs' ,
                       routing_key = "yes" , #"yes","no","db"
                       body = message)
print ( " [x] Sent %r" % (message))
connection.close()

模擬兩個消費者,一個消費者的隊列是("yes","db"),另一個消費者隊列("no","db")。如果生產者發送的隊列關鍵字是"yes"or"no",其一匹配;如果生產者發送的隊列關鍵字是"db",則兩個消費者都能接收到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
########################### 消費者1 ##########################
import pika
import sys
   
connection = pika.BlockingConnection(pika.ConnectionParameters(
         host = '192.168.136.8' ))
channel = connection.channel()
   
channel.exchange_declare(exchange = 'direct_logs' ,
                          type = 'direct' )
   
result = channel.queue_declare(exclusive = True )
queue_name = result.method.queue
 
channel.queue_bind(exchange = 'direct_logs' ,
                        queue = queue_name,
                        routing_key = 'yes' )
channel.queue_bind(exchange = 'direct_logs' ,
                        queue = queue_name,
                        routing_key = 'db' )
   
print ( ' [*] Waiting for logs. To exit press CTRL+C' )
   
def callback(ch, method, properties, body):
     print ( " [x] %r:%r" % (method.routing_key, body))
   
channel.basic_consume(callback,
                       queue = queue_name,
                       no_ack = True )
   
channel.start_consuming()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
########################### 消費者2 ##########################
import pika
import sys
   
connection = pika.BlockingConnection(pika.ConnectionParameters(
         host = '192.168.136.8' ))
channel = connection.channel()
   
channel.exchange_declare(exchange = 'direct_logs' ,
                          type = 'direct' )
   
result = channel.queue_declare(exclusive = True )
queue_name = result.method.queue
 
channel.queue_bind(exchange = 'direct_logs' ,
                        queue = queue_name,
                        routing_key = 'no' )
channel.queue_bind(exchange = 'direct_logs' ,
                        queue = queue_name,
                        routing_key = 'db' )
   
print ( ' [*] Waiting for logs. To exit press CTRL+C' )
   
def callback(ch, method, properties, body):
     print ( " [x] %r:%r" % (method.routing_key, body))
   
channel.basic_consume(callback,
                       queue = queue_name,
                       no_ack = True )
   
channel.start_consuming()

2、模糊匹配

 exchange type = topic

在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。

  • # 表示可以匹配 0 個 或 多個 單詞

  • *  表示只能匹配 一個 單詞

1
2
3
發送者路由值              隊列中
python.topic.cn          python. *   - - 不匹配
python.topic.cn          python. #  -- 匹配
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# ######################### 生產者 #########################
#模糊匹配
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
         host = '192.168.136.8' ))
channel = connection.channel()
 
channel.exchange_declare(exchange = 'topic_logs' ,
                          type = 'topic' )
 
message = 'Hello World!'
channel.basic_publish(exchange = 'topic_logs' ,
                       routing_key = "python.topic" ,
                       body = message)
print ( " [x] Sent %r" % (message))
connection.close()

消費者1是‘*’匹配,消費者2是‘#’匹配:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
########################### 消費者1 ##########################
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
         host = '192.168.136.8' ))
channel = connection.channel()
 
channel.exchange_declare(exchange = 'topic_logs' ,
                          type = 'topic' )
 
result = channel.queue_declare(exclusive = True )
queue_name = result.method.queue
 
channel.queue_bind(exchange = 'topic_logs' ,
                        queue = queue_name,
                        routing_key = 'python.*' )     #只匹配python.后有一個單詞的
 
print ( ' [*] Waiting for topic_logs. To exit press CTRL+C' )
 
def callback(ch, method, properties, body):
     print ( " [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                       queue = queue_name,
                       no_ack = True )
 
channel.start_consuming()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
########################### 消費者2 ##########################
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
         host = '192.168.136.8' ))
channel = connection.channel()
 
channel.exchange_declare(exchange = 'topic_logs' ,
                          type = 'topic' )
 
result = channel.queue_declare(exclusive = True )
queue_name = result.method.queue
 
channel.queue_bind(exchange = 'topic_logs' ,
                        queue = queue_name,
                        routing_key = 'python.#' )    #匹配python.后所有單詞
 
print ( ' [*] Waiting for topic_logs. To exit press CTRL+C' )
 
def callback(ch, method, properties, body):
     print ( " [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
                       queue = queue_name,
                       no_ack = True )
 
channel.start_consuming()

從結果得出結論,如果生產者發送的routing_key是:

  • python.topic.cn    -->    只有消費者2能接收到

  • python.cn            -->    消費者1和消費者2都能接收到

  • python.                -->    消費者1和消費者2都能接收到

  • python                 -->    只有消費者2能接收到


網絡搜索的概念:

Topic Exchange – 主題式交換器,通過消息的路由關鍵字和綁定關鍵字的模式匹配,將消息路由到被綁定的隊列中。

這種路由器類型可以被用來支持經典的發布/訂閱消息傳輸模型——使用主題名字空間作為消息尋址模式,將消息傳遞給那些部分或者全部匹配主題模式的多個消費者。

主題交換器類型的工作方式如下: 綁定關鍵字用零個或多個標記構成,每一個標記之間用“.”字符分隔

綁定關鍵字必須用這種形式明確說明,並支持通配符:“*”匹配一個詞組,“#”零個或多個詞組。

因此綁定關鍵字“*.stock.#”匹配路由關鍵字“usd.stock”和“eur.stock.db”,但是不匹配“stock.nasdaq”


參考來源:http://www.cnblogs.com/wupeiqi/















免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM