一、前言
前面講到direct類型的Exchange路由規則是完全匹配binding key與routing key,但這種嚴格的匹配方式在很多情況下不能滿足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage相似,也是將消息路由到binding key與routing key相匹配的Queue中,但這里的匹配規則有些不同,它約定:
- routing key為一個句點號“. ”分隔的字符串(我們將被句點號“. ”分隔開的每一段獨立的字符串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
- binding key與routing key一樣也是句點號“. ”分隔的字符串
- binding key中可以存在兩種特殊字符“*”與“#”,用於做模糊匹配,其中“*”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)

以上圖中的配置為例,routingKey=”quick.orange.rabbit”的消息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的消息會路由到Q1與Q2,routingKey=”lazy.brown.fox”的消息會路由到Q2,routingKey=”lazy.pink.rabbit”的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息將會被丟棄,因為它們沒有匹配任何bindingKey。
二、Exchange topic
topic 和 direct 改動不多,就是routing key 和bind key 需要改一下
生產端:
# -*- coding: UTF-8 -*-
import pika
# 創建一個連接
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
# 創建一個管道
channel = connection.channel()
# 聲明exchange 及類型
channel.exchange_declare(exchange='topic_log',
exchange_type='topic')
# 輸入信息,格式為 *.info from *.info test 類似
input_data = input('>>:').strip()
# 將輸入的信息以空格為分割,轉換為列表
data_list = input_data.split(' ')
# 三元運算,如果輸入信息存在,就使用輸入的信息data_list[0],否則用 'anonymous.info'
severity = data_list[0] if len(data_list) > 1 else 'anonymous.info'
message = ' '.join(data_list[2:]) or 'hello,world!'
# 這里的routing_key就是 data_list[0] 或 'info'
channel.basic_publish(exchange='topic_log',
routing_key=severity,
body=message)
print('[x] Sent %r:%r' % (severity, message))
connection.close()
消費端:
# -*- coding: UTF-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
# 聲明exchange 及類型
channel.exchange_declare(exchange='topic_log',
exchange_type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 在此我們定義一些列表,列表內容如下
# 這2個列表分別用來測試和routing_key匹配情況
# 第一種只允許接收info的信息
# 第二種允許接收error 和 mysql的信息
# severities = ['*.info']
severities = ['*.error', 'mysql.*']
for severity in severities:
channel.queue_bind(exchange='topic_log',
queue=queue_name,
routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
我們測試時,分別啟動兩個consumer。
第一個consumer1 中使用 severities = ['*.info']
第二個consumer2中使用 severities = ['*.error', 'mysql.*']
生產者分別輸入:
appache.info from appache info test nginx.error from nginx error test mysql.info from mysql info test
可以看到日志信息分別會匯總到兩個consumer中, 其中 consumer1 會收到 appache.info 和 mysql.info的信息, 而 consumer2 會收到 nginx.error 和 mysql.info 的信息。
