RabbitMQ中交換機的消息分發機制


RabbitMQ是一個消息代理,它接受和轉發消息,是一個由 Erlang 語言開發的遵循AMQP協議的開源實現。在RabbitMQ中生產者不會將消息直接發送到隊列當中,而是將消息直接發送到交換機(exchange),交換機用來接受生產者發送的消息並將這些消息發送給綁定的隊列,即:生產者-->交換機-->隊列。

在RabbitMQ中最主要的三種交換機:1. fanout(廣播交換機)  2. direct(直連交換機)  3. topic(話題交換機)

1. fanout(廣播交換機)

 fanout會將接受到的所有消息廣播到它所綁定的所有隊列當中(每個消費者都會收到所有的消息),對於廣播交換機,消息路由鍵routing_key和隊列綁定鍵routing_key的作用都會被忽略。

fanout生產者:

 

 1 import pika
 2 
 3 
 4 class RabbitProducer(object):
 5     """
 6     與RabbitMq服務器建立連接
 7     """
 8 
 9     def __init__(self):
10         self.conn = pika.BlockingConnection(
11             pika.ConnectionParameters(host='localhost', port=5672)
12         )
13         self.channel = self.conn.channel()
14 
15         # 聲明一個exchange交換機,交換機的類型為fanout廣播.
16         self.channel.exchange_declare(
17             exchange='fanout_exchange', exchange_type='fanout', durable=True
18         )
19 
20     def send_msg(self, message):
21         """
22         routing_key:綁定的key
23         :param message:
24         :return:
25         """
26         self.channel.basic_publish(
27             exchange='fanout_exchange',
28             routing_key='',  # 因為exchange的類型為fanout,所以routing_key的數值在這里將被忽略
29             body=message,
30             properties=pika.BasicProperties(
31                 delivery_mode=2,
32                 # 消息進行持久化(防止服務器掛掉.)===> 如果沒有queue綁定到這個exchange交換機,這個參數是沒有的.
33             ))
34 
35     def close(self):
36         self.conn.close()
37 
38 
39 if __name__ == "__main__":
40     rabbit_producer = RabbitProducer()
41     for i in range(10):
42         message = 'hello world {}!'.format(i)
43         rabbit_producer.send_msg(message)
View Code

 

消費者consumer1:

 

 1 import pika
 2 import uuid
 3 
 4 
 5 class RabbitConsumer(object):
 6     """
 7     fanout 消費者1
 8     """
 9 
10     def __init__(self):
11         self.conn = pika.BlockingConnection(
12             pika.ConnectionParameters(host='localhost', port=5672)
13         )
14         self.channel = self.conn.channel()
15 
16         # 聲明一個隊列queue_consumer1,並進行持久化(防止服務器掛掉),exclusive設置為false
17         self.channel.queue_declare(
18             exclusive=False, durable=True, queue='queue_consumer1'
19         )
20 
21         # 聲明一個exhange交換機,其類型為fanout廣播類型  與生產者的交換機一致
22         self.channel.exchange_declare(
23             exchange='fanout_exchange', exchange_type='fanout', durable=True
24         )
25 
26         # 將隊列queue_consumer1與該exchange交換機進行綁定
27         self.channel.queue_bind(exchange='fanout_exchange', queue='queue_consumer1')
28 
29     def call_back(self, method, body):
30         """
31         消費者對消息進行確認,防止消費者掛掉.
32         :param method:
33         :param body:
34         :return:
35         """
36         self.channel.basic_ack(delivery_tag=method.delivery_tag)
37         print('接收到的消息為:{}'.format(str(body)))
38 
39     def receive_msg(self):
40         print('consumer1開始接受消息...')
41         # 當上一條消息未確認時,會告知RabbitMQ不要再發送消息給這個消費者了 可以控制流量
42         self.channel.basic_qos(prefetch_count=1)
43         self.channel.basic_consume(
44             consumer_callback=self.call_back,
45             queue='queue_consumer1',
46             no_ack=False,  # 消費者對消息進行確認,防止消費者掛掉
47             consumer_tag=str(uuid.uuid4())
48         )
49 
50     def consume(self):
51         self.receive_msg()
52         self.channel.start_consuming()
53 
54 
55 if __name__ == '__main__':
56     rabbit_consumer = RabbitConsumer()
57     rabbit_consumer.consume()
View Code

 

消費者consumer2:

 1 import pika
 2 import uuid
 3 
 4 
 5 class RabbitConsumer(object):
 6     def __init__(self):
 7         self.conn = pika.BlockingConnection(
 8             pika.ConnectionParameters(host='localhost', port=5672)
 9         )
10         self.channel = self.conn.channel()
11 
12         # 聲明一個隊列queue_consumer2,並進行持久化(防止服務器掛掉),exclusive設置為false
13         self.channel.queue_declare(
14             exclusive=False, durable=True, queue='queue_consumer2'
15         )
16 
17         # T聲明一個exhange交換機,其類型為fanout廣播類型
18         self.channel.exchange_declare(
19             exchange='fanout_exchange', exchange_type='fanout', durable=True
20         )
21 
22         # 將隊列queue_consumer2與該exchange交換機進行綁定
23         self.channel.queue_bind(exchange='fanout_exchange', queue='queue_consumer2')
24 
25     def call_back(self, method, body):
26         """
27         消費者對消息進行確認,防止消費者掛掉.
28         :param method:
29         :param body:
30         :return:
31         """
32         self.channel.basic_ack(delivery_tag=method.delivery_tag)
33         print('接收到的消息為:{}'.format(str(body)))
34 
35     def receive_msg(self):
36         print('consumer2開始接受消息...')
37         self.channel.basic_consume(
38             consumer_callback=self.call_back,
39             queue='queue_consumer2',
40             no_ack=False,
41             consumer_tag=str(uuid.uuid4())
42         )
43 
44     def consume(self):
45         self.receive_msg()
46         self.channel.start_consuming()
47 
48 
49 if __name__ == '__main__':
50     rabbit_consumer = RabbitConsumer()
51     rabbit_consumer.consume()
View Code

fanout會將接受到的所有消息廣播到消費者consumer1和消費者consumer2,交換機的缺陷:它只能無意識的播放,不夠靈活地控制消息廣播給指定的消費者
 

 2. direct(直連交換機)

對於direct,根據綁定鍵判定應該將數據發送至哪個隊列,消息進入隊列,其綁定秘鑰(routing_key)與消息的路由秘鑰要完全匹配,當exchange使用相同的綁定秘鑰(routing_key)去綁定多個隊列也是合法的,在這種情況下direct exchange的效果等同於fanout exchange,交換機會將消息廣播到所有匹配的隊列當中。

direct生產者:
 1 import pika
 2 
 3 
 4 class RabbitProducer(object):
 5     """
 6     與RabbitMq服務器建立連接
 7     """
 8 
 9     def __init__(self):
10         self.conn = pika.BlockingConnection(
11             pika.ConnectionParameters(host='localhost', port=5672)
12         )
13         self.channel = self.conn.channel()
14 
15         # 聲明一個exchange交換機,交換機的類型為direct
16         self.channel.exchange_declare(
17             exchange='direct_exchange', exchange_type='direct', durable=True
18         )
19 
20     def send_msg(self, routing_key, message):
21         """
22         :param routing_key: 消息的路由鍵 本例中為routing_info
23         :param message: 生成者發送的消息
24         :return:
25         """
26         self.channel.basic_publish(
27             exchange='direct_exchange',
28             routing_key=routing_key,
29             body=message,
30             properties=pika.BasicProperties(
31                 delivery_mode=2,
32                 # 消息進行持久化(防止服務器掛掉.)===> 如果沒有queue綁定到這個exchange交換機,這個參數是沒有的.
33             ))
34 
35     def close(self):
36         self.conn.close()
37 
38 
39 if __name__ == "__main__":
40     rabbit_producer = RabbitProducer()
41     routing_key = 'routing_info'
42     for i in range(10):
43         message = 'hello world {}!'.format(i)
44         print('生產者發送的消息為:{}'.format(message))
45         rabbit_producer.send_msg(routing_key, message)
View Code
direct消費者:
 1 import pika
 2 import uuid
 3 
 4 
 5 class RabbitConsumer(object):
 6     """
 7     消費者(訂閱者)
 8     """
 9 
10     def __init__(self):
11         self.conn = pika.BlockingConnection(
12             pika.ConnectionParameters(host='localhost', port=5672)
13         )
14         self.channel = self.conn.channel()
15 
16         # 消息持久化
17         self.channel.queue_declare(
18             exclusive=False, durable=True, queue='task_queue'
19         )
20 
21         # 交換機類型為direct.
22         self.channel.exchange_declare(
23             exchange='direct_exchange', exchange_type='direct', durable=True
24         )
25 
26         # 將隊列與該exchange交換機進行綁定
27         routing_keys = ['routing_info', 'aaa']
28         for routing_key in routing_keys:
29             self.channel.queue_bind(
30                 exchange='direct_exchange', queue='task_queue', routing_key=routing_key
31             )  # 如果生產者發生消息的routing_key與消費者綁定隊列的routing_key相同則成功發送
32 
33     def call_back(self, channel, method, properties, body):
34         """
35         消費者對消息進行確認,防止消費者掛掉
36         :param channel:
37         :param method:
38         :param properties:
39         :param body:
40         :return:
41         """
42         self.channel.basic_ack(delivery_tag=method.delivery_tag)
43         print('接收到的消息為:{}'.format(str(body)))
44 
45     def receive_msg(self):
46         print('開始接受消息...')
47         self.channel.basic_qos(prefetch_count=1)  # TODO 告訴RabbitMQ,不要向我發送新的消息.
48         self.channel.basic_consume(
49             consumer_callback=self.call_back,
50             queue='task_queue',
51             no_ack=False,
52             consumer_tag=str(uuid.uuid4())
53         )
54 
55     def consume(self):
56         self.receive_msg()
57         self.channel.start_consuming()
58 
59 
60 if __name__ == '__main__':
61     rabbit_consumer = RabbitConsumer()
62     rabbit_consumer.consume()
View Code

direct直連交換機相當於是fanout的升級版,當消費者的隊列
綁定的秘鑰routing_key與生產者的routing_key相同時,消費者就會收到消息;當所有消費者的隊列所綁定的routing_key都一樣且與生產者相同時,就相當於fanout交換機
 
        

  3. topic(話題交換機)

direct(直連交換機)雖然相當於fanout的升級版,但它仍然有局限性,它不能根據多個標准進行路由;topic(話題交換機)可以很好地解決這一問題:
(1) 如果消息的路由秘鑰與隊列的綁定秘鑰符合匹配規則,topic就會將消息發送到相應的隊列當中
(2) 對於綁定鍵(routing_key)有兩個特殊的情況: * (星號)可以代替一個單詞,#(散列)可以替代零個或多個單詞
(3) 對於發送到topic交換機消息的routing_key如果包含特殊字符,只能是由"."分割的單詞表,如("zhangsan.lisi")

topic 生產者:
 1 import pika
 2 
 3 
 4 class RabbitProducer(object):
 5     def __init__(self):
 6         self.conn = pika.BlockingConnection(
 7             pika.ConnectionParameters(host='localhost', port=5672)
 8         )
 9         self.channel = self.conn.channel()
10 
11         # 聲明交換機,交換機的類型為topic
12         self.channel.exchange_declare(
13             exchange='logs_topic', exchange_type='topic', durable=True
14         )
15 
16     def send_msg(self, routing_key, message):
17         """
18         :param routing_key: 消息的路由鍵
19         :param message: 生成者發送的消息
20         :return:
21         """
22         self.channel.basic_publish(
23             exchange='logs_topic',
24             routing_key=routing_key,
25             body=message,
26             properties=pika.BasicProperties(
27                 delivery_mode=2,
28                 # 消息進行持久化
29             ))
30 
31     def close(self):
32         self.conn.close()
33 
34 
35 if __name__ == "__main__":
36     rabbit_producer = RabbitProducer()
37     routing_keys = ['info', "debug", "a.debug.b", "a.info.b"]
38     for routing_key in routing_keys:
39         message = 'hello world! {}'.format(routing_key)
40         print('生產者發送的消息為:{}'.format(message))
41         rabbit_producer.send_msg(routing_key, message)
View Code
topic 消費者1 --> 實現fanout交換機:
 1 """
 2 當topic交換機使用#綁定鍵綁定隊列時,此時topic交換機就會將消息廣播到所有的隊列當中,
 3 不管消息的路由秘鑰如何,此時topic交換機的效果等同於fanout:發送所有消息都會接受到
 4 """
 5 import pika
 6 import uuid
 7 
 8 
 9 class RabbitConsumer(object):
10     def __init__(self):
11         self.conn = pika.BlockingConnection(
12             pika.ConnectionParameters(host='localhost', port=5672)
13         )
14         self.channel = self.conn.channel()
15 
16         # 消息持久化
17         self.channel.queue_declare(
18             exclusive=False, durable=True, queue='task_queue'
19         )
20 
21         # 聲明交換機,其類型為topic
22         self.channel.exchange_declare(
23             exchange='logs_topic', exchange_type='topic', durable=True
24         )
25 
26         # 將隊列與該交換機進行綁定
27         routing_keys = ['#']  # 使用#綁定鍵時,它將接受所有的消息,同fanout效果一樣.
28         for routing_key in routing_keys:
29             self.channel.queue_bind(
30                 exchange='logs_topic', queue='task_queue', routing_key=routing_key
31             )
32 
33     def call_back(self, channel, method, properties, body):
34         """
35         消費者對消息進行確認,防止消費者掛掉
36         :param channel:
37         :param method:
38         :param properties:
39         :param body:
40         :return:
41         """
42         self.channel.basic_ack(delivery_tag=method.delivery_tag)
43         print('接收到的消息為:{}'.format(str(body)))
44 
45     def receive_msg(self):
46         print('開始接受消息...')
47         self.channel.basic_qos(prefetch_count=1)
48         self.channel.basic_consume(
49             consumer_callback=self.call_back,
50             queue='task_queue',
51             no_ack=False,  # 消費者對消息進行確認
52             consumer_tag=str(uuid.uuid4())
53         )
54 
55     def consume(self):
56         self.receive_msg()
57         self.channel.start_consuming()
58 
59 
60 if __name__ == '__main__':
61     rabbit_consumer = RabbitConsumer()
62     rabbit_consumer.consume()
View Code
topic 消費者2 --> 實現direct交換機:
 1 """
 2 當topic交換機沒有使用*和#匹配符綁定鍵綁定隊列時,此時topic交換機的效果等同於direct,
 3 會收到key相匹配的消息  如:info debug
 4 """
 5 import pika
 6 import uuid
 7 
 8 
 9 class RabbitConsumer(object):
10     def __init__(self):
11         self.conn = pika.BlockingConnection(
12             pika.ConnectionParameters(host='localhost', port=5672)
13         )
14         self.channel = self.conn.channel()
15 
16         # 消息持久化
17         self.channel.queue_declare(
18             exclusive=False, durable=True, queue='work_queue'
19         )
20 
21         #
22         # 聲明交換機,其類型為topic
23         self.channel.exchange_declare(
24             exchange='logs_topic', exchange_type='topic', durable=True
25         )
26 
27         # 將隊列與交換機進行綁定
28         routing_keys = ['info', 'debug']
29         for routing_key in routing_keys:
30             self.channel.queue_bind(
31                 exchange='logs_topic', queue='work_queue', routing_key=routing_key
32             )
33 
34     def call_back(self, channel, method, properties, body):
35         """
36         消費者對消息進行確認,防止消費者掛掉
37         :param channel:
38         :param method:
39         :param properties:
40         :param body:
41         :return:
42         """
43         self.channel.basic_ack(delivery_tag=method.delivery_tag)
44         print('接收到的消息為:{}'.format(str(body)))
45 
46     def receive_msg(self):
47         print('開始接受消息...')
48         self.channel.basic_qos(prefetch_count=1)
49         self.channel.basic_consume(
50             consumer_callback=self.call_back,
51             queue='work_queue',
52             no_ack=False,  # 消費者對消息進行確認
53             consumer_tag=str(uuid.uuid4())
54         )
55 
56     def consume(self):
57         self.receive_msg()
58         self.channel.start_consuming()
59 
60 
61 if __name__ == '__main__':
62     rabbit_consumer = RabbitConsumer()
63     rabbit_consumer.consume()
View Code
topic 消費者3 --> 實現*.x.* 消息匹配:
 1 """
 2 匹配任意點分割的單詞 生產者發送的:a.debug.b 則匹配了'*.debug.*'
 3 生產者發送的:a.info.b 則匹配了'*.info.*'
 4 """
 5 import pika
 6 import uuid
 7 
 8 
 9 class RabbitConsumer(object):
10     def __init__(self):
11         self.conn = pika.BlockingConnection(
12             pika.ConnectionParameters(host='localhost', port=5672)
13         )
14         self.channel = self.conn.channel()
15 
16         # 消息持久化
17         self.channel.queue_declare(
18             exclusive=False, durable=True, queue='other_queue'
19         )
20 
21         # 聲明交換機,其類型為topic
22         self.channel.exchange_declare(
23             exchange='logs_topic', exchange_type='topic', durable=True
24         )
25 
26         # 將隊列與交換機進行綁定
27         routing_keys = ['*.info.*', '*.debug.*', 'dfdf*']
28         for routing_key in routing_keys:
29             self.channel.queue_bind(
30                 exchange='logs_topic', queue='other_queue', routing_key=routing_key
31             )
32 
33     def call_back(self, channel, method, properties, body):
34         """
35         消費者對消息進行確認,防止消費者掛掉
36         :param channel:
37         :param method:
38         :param properties:
39         :param body:
40         :return:
41         """
42         self.channel.basic_ack(delivery_tag=method.delivery_tag)
43         print('接收到的消息為:{}'.format(str(body)))
44 
45     def receive_msg(self):
46         print('開始接受消息...')
47         self.channel.basic_qos(prefetch_count=1)
48         self.channel.basic_consume(
49             consumer_callback=self.call_back,
50             queue='other_queue',
51             no_ack=False,  # 消費者對消息進行確認
52             consumer_tag=str(uuid.uuid4())
53         )
54 
55     def consume(self):
56         self.receive_msg()
57         self.channel.start_consuming()
58 
59 
60 if __name__ == '__main__':
61     rabbit_consumer = RabbitConsumer()
62     rabbit_consumer.consume()
View Code

topic消費者執行結果:

消費者1:

消費者2:

消費者3:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM