RabbitMQ
基礎
關於MQ:
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。
RabbitMQ安裝
1
2
3
4
5
6
7
8
|
安裝配置epel源
$ rpm
-
ivh http:
/
/
dl.fedoraproject.org
/
pub
/
epel
/
6
/
i386
/
epel
-
release
-
6
-
8.noarch
.rpm
安裝erlang
$ yum
-
y install erlang
安裝RabbitMQ
$ yum
-
y install rabbitmq
-
server
|
啟動/停止:
1
|
systemctl start
/
stop rabbitmq
|
安裝python-API:
1
2
3
4
5
6
7
|
pip install pika
or
easy_install pika
or
源碼
https:
/
/
pypi.python.org
/
pypi
/
pika
|
API基礎操作
先來看看使用RabbitMQ之前,怎么實現消息隊列:利用Queue和Thread,每線程往內存里的隊列里put一個數,另一個程序再去內存隊列里取數。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import
Queue
import
threading
message
=
Queue.Queue(
10
)
def
producer(i):
while
True
:
message.put(i)
def
consumer(i):
while
True
:
msg
=
message.get()
for
i
in
range
(
12
):
t
=
threading.Thread(target
=
producer, args
=
(i,))
t.start()
for
i
in
range
(
10
):
t
=
threading.Thread(target
=
consumer, args
=
(i,))
t.start()
|
對於RabbitMQ來說,生產和消費不再針對內存里的一個Queue對象,而是某台服務器上的RabbitMQ Server實現的消息隊列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import
pika
# ######################### 生產者 #########################
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'192.168.136.8'
))
channel
=
connection.channel() #創建通道
channel.queue_declare(queue
=
'hello'
)
#隊列名稱
channel.basic_publish(exchange
=
'',
routing_key
=
'hello'
, #路由名稱
body
=
'Hello World!'
)
#發送內容
print
(
" [x] Sent 'Hello World!'"
)
connection.close()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import
pika
# ########################## 消費者 ##########################
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'192.168.136.8'
))
channel
=
connection.channel()
channel.queue_declare(queue
=
'hello'
)
#聲明,隊列名稱,和producer創建的重復沒有關系
def
callback(ch, method, properties, body):
print
(
" [x] Received %r"
%
body)
channel.basic_consume(callback, #獲取body后執行回調函數
queue
=
'hello'
,
no_ack
=
True
) #自動應答開啟,會給MQ服務器發送一個ack:‘已經收到了’。
print
(
' [*] Waiting for messages. To exit press CTRL+C'
)
channel.start_consuming()
|
消費者運行起來后會和RabbitMQ建立長連接,一旦生產者放數據到隊列里,消費者就能獲取到該值,並進行處理。
1
2
|
[root@localhost ~]# netstat -ntp |grep beam
tcp6
0
0
192.168
.
136.8
:
5672
192.168
.
136.1
:
52587
ESTABLISHED
1146
/beam
|
消息安全
1、no-ack = False(自動應答關閉)
如果生產者遇到情況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那么,RabbitMQ會重新將該任務添加到隊列中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import
pika
#no-ack
########################### 消費者 ##########################
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'192.168.136.8'
))
channel
=
connection.channel()
channel.queue_declare(queue
=
'hello'
)
def
callback(ch, method, properties, body):
print
(
" [x] Received %r"
%
body)
import
time
time.sleep(
10
)
print
'ok'
ch.basic_ack(delivery_tag
=
method.delivery_tag)
#主動發送ack
#打印‘ok’后才告訴MQ,這個消息已經處理完了。
channel.basic_consume(callback,
queue
=
'hello'
,
no_ack
=
False
)
#自動應答關閉,與channel.basic_ack共同使用
print
(
' [*] Waiting for messages. To exit press CTRL+C'
)
channel.start_consuming()
|
2、durable
make message persistent 使消息持久化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import
pika
#durable
########################## 生產者 #########################
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'192.168.136.8'
))
channel
=
connection.channel()
channel.queue_declare(queue
=
'hello'
, durable
=
True
)
#開啟持久化
channel.basic_publish(exchange
=
'',
routing_key
=
'hello'
,
body
=
'Hello World!'
,
properties
=
pika.BasicProperties(
delivery_mode
=
2
,
# make message persistent
))
print
(
" [x] Sent 'Hello World!'"
)
connection.close()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import
pika
#durable
########################## 消費者 #########################
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'192.168.136.8'
))
channel
=
connection.channel()
# make message persistent
channel.queue_declare(queue
=
'hello'
, durable
=
True
)
def
callback(ch, method, properties, body):
print
(
" [x] Received %r"
%
body)
import
time
time.sleep(
10
)
print
'ok'
ch.basic_ack(delivery_tag
=
method.delivery_tag)
channel.basic_consume(callback,
queue
=
'hello'
,
no_ack
=
False
)
print
(
' [*] Waiting for messages. To exit press CTRL+C'
)
channel.start_consuming()
|
消息獲取順序
默認消息隊列里的數據是按照奇偶順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者2去隊列中獲取 偶數 序列的任務。
channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'10.211.55.4'
))
channel
=
connection.channel()
# make message persistent
channel.queue_declare(queue
=
'hello'
)
def
callback(ch, method, properties, body):
print
(
" [x] Received %r"
%
body)
import
time
time.sleep(
10
)
print
'ok'
ch.basic_ack(delivery_tag
=
method.delivery_tag)
channel.basic_qos(prefetch_count
=
1
) #增加這行
channel.basic_consume(callback,
queue
=
'hello'
,
no_ack
=
False
)
print
(
' [*] Waiting for messages. To exit press CTRL+C'
)
channel.start_consuming()
|
發布&訂閱
與消息隊列區別:
消息隊列中的數據只要被消費一次便消失。
創建隊列的數量:
同一份消息,有多少訂閱者,就要創建多少個隊列。(RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。)
語法:
exchange type = fanout #fanout==>輸出到很多
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
# ######################### 發布者 #########################
import
pika
import
sys
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'192.168.136.8'
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
'fanout_name'
,
type
=
'fanout'
)
message
=
' '
.join(sys.argv[
1
:])
or
"info: Hello World!"
channel.basic_publish(exchange
=
'fanout_name'
,
#自命名exchange
routing_key
=
'',
body
=
message)
print
(
" [x] Sent %r"
%
message)
connection.close()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# ########################## 訂閱者1 ##########################
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'192.168.136.8'
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
'fanout_name'
,
type
=
'fanout'
) #創建exchange(if not exist)
result
=
channel.queue_declare(exclusive
=
True
)
queue_name
=
result.method.queue #獲取隊列名稱
channel.queue_bind(exchange
=
'fanout_name'
,queue
=
queue_name) #通過上面兩個值綁定隊列
print
(
' [*] Waiting for fanout_name. To exit press CTRL+C'
)
def
callback(ch, method, properties, body):
print
(
" [x] %r"
%
body)
channel.basic_consume(callback,
queue
=
queue_name,
no_ack
=
True
)
channel.start_consuming()
|
創建多個訂閱者,能更好的體現它的效果。
運行結果總結:
每個訂閱者創建一個exchange隊列,名稱自定,發布者會把數據發送給所有叫這個名字的隊列。因為數據只能被消費一次,所以有多少個訂閱者,就有多少個隊列。
發送到指定(not 固定)隊列
之前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送
1、按關鍵字尋找隊列發送
exchange type = direct
隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# ######################### 生產者 #########################
#關鍵字發送
import
pika
import
sys
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
'192.168.136.8'
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
'direct_logs'
,
type
=
'direct'
)
message
=
'Hello World!'
channel.basic_publish(exchange
=
'direct_logs'
,
routing_key
=
"yes"
,
#"yes","no","db"
body
=
message)
print
(
" [x] Sent %r"
%
(message))
connection.close()
|
模擬兩個消費者,一個消費者的隊列是("yes","db"),另一個消費者隊列("no","db")。如果生產者發送的隊列關鍵字是"yes"or"no",其一匹配;如果生產者發送的隊列關鍵字是"db",則兩個消費者都能接收到。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
########################### 消費者1 ##########################
import
pika
import
sys
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
'192.168.136.8'
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
'direct_logs'
,
type
=
'direct'
)
result
=
channel.queue_declare(exclusive
=
True
)
queue_name
=
result.method.queue
channel.queue_bind(exchange
=
'direct_logs'
,
queue
=
queue_name,
routing_key
=
'yes'
)
channel.queue_bind(exchange
=
'direct_logs'
,
queue
=
queue_name,
routing_key
=
'db'
)
print
(
' [*] Waiting for logs. To exit press CTRL+C'
)
def
callback(ch, method, properties, body):
print
(
" [x] %r:%r"
%
(method.routing_key, body))
channel.basic_consume(callback,
queue
=
queue_name,
no_ack
=
True
)
channel.start_consuming()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
########################### 消費者2 ##########################
import
pika
import
sys
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
'192.168.136.8'
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
'direct_logs'
,
type
=
'direct'
)
result
=
channel.queue_declare(exclusive
=
True
)
queue_name
=
result.method.queue
channel.queue_bind(exchange
=
'direct_logs'
,
queue
=
queue_name,
routing_key
=
'no'
)
channel.queue_bind(exchange
=
'direct_logs'
,
queue
=
queue_name,
routing_key
=
'db'
)
print
(
' [*] Waiting for logs. To exit press CTRL+C'
)
def
callback(ch, method, properties, body):
print
(
" [x] %r:%r"
%
(method.routing_key, body))
channel.basic_consume(callback,
queue
=
queue_name,
no_ack
=
True
)
channel.start_consuming()
|
2、模糊匹配
exchange type = topic
在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之后發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。
# 表示可以匹配 0 個 或 多個 單詞
* 表示只能匹配 一個 單詞
1
2
3
|
發送者路由值 隊列中
python.topic.cn python.
*
-
-
不匹配
python.topic.cn python.
# -- 匹配
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
# ######################### 生產者 #########################
#模糊匹配
import
pika
import
sys
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
'192.168.136.8'
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
'topic_logs'
,
type
=
'topic'
)
message
=
'Hello World!'
channel.basic_publish(exchange
=
'topic_logs'
,
routing_key
=
"python.topic"
,
body
=
message)
print
(
" [x] Sent %r"
%
(message))
connection.close()
|
消費者1是‘*’匹配,消費者2是‘#’匹配:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
########################### 消費者1 ##########################
import
pika
import
sys
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
'192.168.136.8'
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
'topic_logs'
,
type
=
'topic'
)
result
=
channel.queue_declare(exclusive
=
True
)
queue_name
=
result.method.queue
channel.queue_bind(exchange
=
'topic_logs'
,
queue
=
queue_name,
routing_key
=
'python.*'
) #只匹配python.后有一個單詞的
print
(
' [*] Waiting for topic_logs. To exit press CTRL+C'
)
def
callback(ch, method, properties, body):
print
(
" [x] %r:%r"
%
(method.routing_key, body))
channel.basic_consume(callback,
queue
=
queue_name,
no_ack
=
True
)
channel.start_consuming()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
########################### 消費者2 ##########################
import
pika
import
sys
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
'192.168.136.8'
))
channel
=
connection.channel()
channel.exchange_declare(exchange
=
'topic_logs'
,
type
=
'topic'
)
result
=
channel.queue_declare(exclusive
=
True
)
queue_name
=
result.method.queue
channel.queue_bind(exchange
=
'topic_logs'
,
queue
=
queue_name,
routing_key
=
'python.#'
) #匹配python.后所有單詞
print
(
' [*] Waiting for topic_logs. To exit press CTRL+C'
)
def
callback(ch, method, properties, body):
print
(
" [x] %r:%r"
%
(method.routing_key, body))
channel.basic_consume(callback,
queue
=
queue_name,
no_ack
=
True
)
channel.start_consuming()
|
從結果得出結論,如果生產者發送的routing_key是:
python.topic.cn --> 只有消費者2能接收到
python.cn --> 消費者1和消費者2都能接收到
python. --> 消費者1和消費者2都能接收到
python --> 只有消費者2能接收到
網絡搜索的概念:
Topic Exchange – 主題式交換器,通過消息的路由關鍵字和綁定關鍵字的模式匹配,將消息路由到被綁定的隊列中。
這種路由器類型可以被用來支持經典的發布/訂閱消息傳輸模型——使用主題名字空間作為消息尋址模式,將消息傳遞給那些部分或者全部匹配主題模式的多個消費者。
主題交換器類型的工作方式如下: 綁定關鍵字用零個或多個標記構成,每一個標記之間用“.”字符分隔。
綁定關鍵字必須用這種形式明確說明,並支持通配符:“*”匹配一個詞組,“#”零個或多個詞組。
因此綁定關鍵字“*.stock.#”匹配路由關鍵字“usd.stock”和“eur.stock.db”,但是不匹配“stock.nasdaq”
參考來源:http://www.cnblogs.com/wupeiqi/