RabbitMQ是一個消息代理,它接受和轉發消息,是一個由 Erlang 語言開發的遵循AMQP協議的開源實現。在RabbitMQ中生產者不會將消息直接發送到隊列當中,而是將消息直接發送到交換機(exchange),交換機用來接受生產者發送的消息並將這些消息發送給綁定的隊列,即:生產者-->交換機-->隊列。
在RabbitMQ中最主要的三種交換機:1. fanout(廣播交換機) 2. direct(直連交換機) 3. topic(話題交換機)
1. fanout(廣播交換機)
fanout會將接受到的所有消息廣播到它所綁定的所有隊列當中(每個消費者都會收到所有的消息),對於廣播交換機,消息路由鍵routing_key和隊列綁定鍵routing_key的作用都會被忽略。
fanout生產者:

1 import pika 2 3 4 class RabbitProducer(object): 5 """ 6 與RabbitMq服務器建立連接 7 """ 8 9 def __init__(self): 10 self.conn = pika.BlockingConnection( 11 pika.ConnectionParameters(host='localhost', port=5672) 12 ) 13 self.channel = self.conn.channel() 14 15 # 聲明一個exchange交換機,交換機的類型為fanout廣播. 16 self.channel.exchange_declare( 17 exchange='fanout_exchange', exchange_type='fanout', durable=True 18 ) 19 20 def send_msg(self, message): 21 """ 22 routing_key:綁定的key 23 :param message: 24 :return: 25 """ 26 self.channel.basic_publish( 27 exchange='fanout_exchange', 28 routing_key='', # 因為exchange的類型為fanout,所以routing_key的數值在這里將被忽略 29 body=message, 30 properties=pika.BasicProperties( 31 delivery_mode=2, 32 # 消息進行持久化(防止服務器掛掉.)===> 如果沒有queue綁定到這個exchange交換機,這個參數是沒有的. 33 )) 34 35 def close(self): 36 self.conn.close() 37 38 39 if __name__ == "__main__": 40 rabbit_producer = RabbitProducer() 41 for i in range(10): 42 message = 'hello world {}!'.format(i) 43 rabbit_producer.send_msg(message)
消費者consumer1:

1 import pika 2 import uuid 3 4 5 class RabbitConsumer(object): 6 """ 7 fanout 消費者1 8 """ 9 10 def __init__(self): 11 self.conn = pika.BlockingConnection( 12 pika.ConnectionParameters(host='localhost', port=5672) 13 ) 14 self.channel = self.conn.channel() 15 16 # 聲明一個隊列queue_consumer1,並進行持久化(防止服務器掛掉),exclusive設置為false 17 self.channel.queue_declare( 18 exclusive=False, durable=True, queue='queue_consumer1' 19 ) 20 21 # 聲明一個exhange交換機,其類型為fanout廣播類型 與生產者的交換機一致 22 self.channel.exchange_declare( 23 exchange='fanout_exchange', exchange_type='fanout', durable=True 24 ) 25 26 # 將隊列queue_consumer1與該exchange交換機進行綁定 27 self.channel.queue_bind(exchange='fanout_exchange', queue='queue_consumer1') 28 29 def call_back(self, method, body): 30 """ 31 消費者對消息進行確認,防止消費者掛掉. 32 :param method: 33 :param body: 34 :return: 35 """ 36 self.channel.basic_ack(delivery_tag=method.delivery_tag) 37 print('接收到的消息為:{}'.format(str(body))) 38 39 def receive_msg(self): 40 print('consumer1開始接受消息...') 41 # 當上一條消息未確認時,會告知RabbitMQ不要再發送消息給這個消費者了 可以控制流量 42 self.channel.basic_qos(prefetch_count=1) 43 self.channel.basic_consume( 44 consumer_callback=self.call_back, 45 queue='queue_consumer1', 46 no_ack=False, # 消費者對消息進行確認,防止消費者掛掉 47 consumer_tag=str(uuid.uuid4()) 48 ) 49 50 def consume(self): 51 self.receive_msg() 52 self.channel.start_consuming() 53 54 55 if __name__ == '__main__': 56 rabbit_consumer = RabbitConsumer() 57 rabbit_consumer.consume()
消費者consumer2:

1 import pika 2 import uuid 3 4 5 class RabbitConsumer(object): 6 def __init__(self): 7 self.conn = pika.BlockingConnection( 8 pika.ConnectionParameters(host='localhost', port=5672) 9 ) 10 self.channel = self.conn.channel() 11 12 # 聲明一個隊列queue_consumer2,並進行持久化(防止服務器掛掉),exclusive設置為false 13 self.channel.queue_declare( 14 exclusive=False, durable=True, queue='queue_consumer2' 15 ) 16 17 # T聲明一個exhange交換機,其類型為fanout廣播類型 18 self.channel.exchange_declare( 19 exchange='fanout_exchange', exchange_type='fanout', durable=True 20 ) 21 22 # 將隊列queue_consumer2與該exchange交換機進行綁定 23 self.channel.queue_bind(exchange='fanout_exchange', queue='queue_consumer2') 24 25 def call_back(self, method, body): 26 """ 27 消費者對消息進行確認,防止消費者掛掉. 28 :param method: 29 :param body: 30 :return: 31 """ 32 self.channel.basic_ack(delivery_tag=method.delivery_tag) 33 print('接收到的消息為:{}'.format(str(body))) 34 35 def receive_msg(self): 36 print('consumer2開始接受消息...') 37 self.channel.basic_consume( 38 consumer_callback=self.call_back, 39 queue='queue_consumer2', 40 no_ack=False, 41 consumer_tag=str(uuid.uuid4()) 42 ) 43 44 def consume(self): 45 self.receive_msg() 46 self.channel.start_consuming() 47 48 49 if __name__ == '__main__': 50 rabbit_consumer = RabbitConsumer() 51 rabbit_consumer.consume()
fanout會將接受到的所有消息廣播到消費者consumer1和消費者consumer2,交換機的缺陷:它只能無意識的播放,不夠靈活地控制消息廣播給指定的消費者
2. direct(直連交換機)
對於direct,根據綁定鍵判定應該將數據發送至哪個隊列,消息進入隊列,其綁定秘鑰(routing_key)與消息的路由秘鑰要完全匹配,當exchange使用相同的綁定秘鑰(routing_key)去綁定多個隊列也是合法的,在這種情況下direct exchange的效果等同於fanout exchange,交換機會將消息廣播到所有匹配的隊列當中。
direct生產者:

1 import pika 2 3 4 class RabbitProducer(object): 5 """ 6 與RabbitMq服務器建立連接 7 """ 8 9 def __init__(self): 10 self.conn = pika.BlockingConnection( 11 pika.ConnectionParameters(host='localhost', port=5672) 12 ) 13 self.channel = self.conn.channel() 14 15 # 聲明一個exchange交換機,交換機的類型為direct 16 self.channel.exchange_declare( 17 exchange='direct_exchange', exchange_type='direct', durable=True 18 ) 19 20 def send_msg(self, routing_key, message): 21 """ 22 :param routing_key: 消息的路由鍵 本例中為routing_info 23 :param message: 生成者發送的消息 24 :return: 25 """ 26 self.channel.basic_publish( 27 exchange='direct_exchange', 28 routing_key=routing_key, 29 body=message, 30 properties=pika.BasicProperties( 31 delivery_mode=2, 32 # 消息進行持久化(防止服務器掛掉.)===> 如果沒有queue綁定到這個exchange交換機,這個參數是沒有的. 33 )) 34 35 def close(self): 36 self.conn.close() 37 38 39 if __name__ == "__main__": 40 rabbit_producer = RabbitProducer() 41 routing_key = 'routing_info' 42 for i in range(10): 43 message = 'hello world {}!'.format(i) 44 print('生產者發送的消息為:{}'.format(message)) 45 rabbit_producer.send_msg(routing_key, message)
direct消費者:

1 import pika 2 import uuid 3 4 5 class RabbitConsumer(object): 6 """ 7 消費者(訂閱者) 8 """ 9 10 def __init__(self): 11 self.conn = pika.BlockingConnection( 12 pika.ConnectionParameters(host='localhost', port=5672) 13 ) 14 self.channel = self.conn.channel() 15 16 # 消息持久化 17 self.channel.queue_declare( 18 exclusive=False, durable=True, queue='task_queue' 19 ) 20 21 # 交換機類型為direct. 22 self.channel.exchange_declare( 23 exchange='direct_exchange', exchange_type='direct', durable=True 24 ) 25 26 # 將隊列與該exchange交換機進行綁定 27 routing_keys = ['routing_info', 'aaa'] 28 for routing_key in routing_keys: 29 self.channel.queue_bind( 30 exchange='direct_exchange', queue='task_queue', routing_key=routing_key 31 ) # 如果生產者發生消息的routing_key與消費者綁定隊列的routing_key相同則成功發送 32 33 def call_back(self, channel, method, properties, body): 34 """ 35 消費者對消息進行確認,防止消費者掛掉 36 :param channel: 37 :param method: 38 :param properties: 39 :param body: 40 :return: 41 """ 42 self.channel.basic_ack(delivery_tag=method.delivery_tag) 43 print('接收到的消息為:{}'.format(str(body))) 44 45 def receive_msg(self): 46 print('開始接受消息...') 47 self.channel.basic_qos(prefetch_count=1) # TODO 告訴RabbitMQ,不要向我發送新的消息. 48 self.channel.basic_consume( 49 consumer_callback=self.call_back, 50 queue='task_queue', 51 no_ack=False, 52 consumer_tag=str(uuid.uuid4()) 53 ) 54 55 def consume(self): 56 self.receive_msg() 57 self.channel.start_consuming() 58 59 60 if __name__ == '__main__': 61 rabbit_consumer = RabbitConsumer() 62 rabbit_consumer.consume()
direct直連交換機相當於是fanout的升級版,當消費者的隊列綁定的秘鑰routing_key與生產者的routing_key相同時,消費者就會收到消息;當所有消費者的隊列所綁定的routing_key都一樣且與生產者相同時,就相當於fanout交換機
3. topic(話題交換機)
direct(直連交換機)雖然相當於fanout的升級版,但它仍然有局限性,它不能根據多個標准進行路由;topic(話題交換機)可以很好地解決這一問題:
(1) 如果消息的路由秘鑰與隊列的綁定秘鑰符合匹配規則,topic就會將消息發送到相應的隊列當中
(2) 對於綁定鍵(routing_key)有兩個特殊的情況: * (星號)可以代替一個單詞,#(散列)可以替代零個或多個單詞
(3) 對於發送到topic交換機消息的routing_key如果包含特殊字符,只能是由"."分割的單詞表,如("zhangsan.lisi")
topic 生產者:

1 import pika 2 3 4 class RabbitProducer(object): 5 def __init__(self): 6 self.conn = pika.BlockingConnection( 7 pika.ConnectionParameters(host='localhost', port=5672) 8 ) 9 self.channel = self.conn.channel() 10 11 # 聲明交換機,交換機的類型為topic 12 self.channel.exchange_declare( 13 exchange='logs_topic', exchange_type='topic', durable=True 14 ) 15 16 def send_msg(self, routing_key, message): 17 """ 18 :param routing_key: 消息的路由鍵 19 :param message: 生成者發送的消息 20 :return: 21 """ 22 self.channel.basic_publish( 23 exchange='logs_topic', 24 routing_key=routing_key, 25 body=message, 26 properties=pika.BasicProperties( 27 delivery_mode=2, 28 # 消息進行持久化 29 )) 30 31 def close(self): 32 self.conn.close() 33 34 35 if __name__ == "__main__": 36 rabbit_producer = RabbitProducer() 37 routing_keys = ['info', "debug", "a.debug.b", "a.info.b"] 38 for routing_key in routing_keys: 39 message = 'hello world! {}'.format(routing_key) 40 print('生產者發送的消息為:{}'.format(message)) 41 rabbit_producer.send_msg(routing_key, message)
topic 消費者1 --> 實現fanout交換機:

1 """ 2 當topic交換機使用#綁定鍵綁定隊列時,此時topic交換機就會將消息廣播到所有的隊列當中, 3 不管消息的路由秘鑰如何,此時topic交換機的效果等同於fanout:發送所有消息都會接受到 4 """ 5 import pika 6 import uuid 7 8 9 class RabbitConsumer(object): 10 def __init__(self): 11 self.conn = pika.BlockingConnection( 12 pika.ConnectionParameters(host='localhost', port=5672) 13 ) 14 self.channel = self.conn.channel() 15 16 # 消息持久化 17 self.channel.queue_declare( 18 exclusive=False, durable=True, queue='task_queue' 19 ) 20 21 # 聲明交換機,其類型為topic 22 self.channel.exchange_declare( 23 exchange='logs_topic', exchange_type='topic', durable=True 24 ) 25 26 # 將隊列與該交換機進行綁定 27 routing_keys = ['#'] # 使用#綁定鍵時,它將接受所有的消息,同fanout效果一樣. 28 for routing_key in routing_keys: 29 self.channel.queue_bind( 30 exchange='logs_topic', queue='task_queue', routing_key=routing_key 31 ) 32 33 def call_back(self, channel, method, properties, body): 34 """ 35 消費者對消息進行確認,防止消費者掛掉 36 :param channel: 37 :param method: 38 :param properties: 39 :param body: 40 :return: 41 """ 42 self.channel.basic_ack(delivery_tag=method.delivery_tag) 43 print('接收到的消息為:{}'.format(str(body))) 44 45 def receive_msg(self): 46 print('開始接受消息...') 47 self.channel.basic_qos(prefetch_count=1) 48 self.channel.basic_consume( 49 consumer_callback=self.call_back, 50 queue='task_queue', 51 no_ack=False, # 消費者對消息進行確認 52 consumer_tag=str(uuid.uuid4()) 53 ) 54 55 def consume(self): 56 self.receive_msg() 57 self.channel.start_consuming() 58 59 60 if __name__ == '__main__': 61 rabbit_consumer = RabbitConsumer() 62 rabbit_consumer.consume()
topic 消費者2 --> 實現direct交換機:

1 """ 2 當topic交換機沒有使用*和#匹配符綁定鍵綁定隊列時,此時topic交換機的效果等同於direct, 3 會收到key相匹配的消息 如:info debug 4 """ 5 import pika 6 import uuid 7 8 9 class RabbitConsumer(object): 10 def __init__(self): 11 self.conn = pika.BlockingConnection( 12 pika.ConnectionParameters(host='localhost', port=5672) 13 ) 14 self.channel = self.conn.channel() 15 16 # 消息持久化 17 self.channel.queue_declare( 18 exclusive=False, durable=True, queue='work_queue' 19 ) 20 21 # 22 # 聲明交換機,其類型為topic 23 self.channel.exchange_declare( 24 exchange='logs_topic', exchange_type='topic', durable=True 25 ) 26 27 # 將隊列與交換機進行綁定 28 routing_keys = ['info', 'debug'] 29 for routing_key in routing_keys: 30 self.channel.queue_bind( 31 exchange='logs_topic', queue='work_queue', routing_key=routing_key 32 ) 33 34 def call_back(self, channel, method, properties, body): 35 """ 36 消費者對消息進行確認,防止消費者掛掉 37 :param channel: 38 :param method: 39 :param properties: 40 :param body: 41 :return: 42 """ 43 self.channel.basic_ack(delivery_tag=method.delivery_tag) 44 print('接收到的消息為:{}'.format(str(body))) 45 46 def receive_msg(self): 47 print('開始接受消息...') 48 self.channel.basic_qos(prefetch_count=1) 49 self.channel.basic_consume( 50 consumer_callback=self.call_back, 51 queue='work_queue', 52 no_ack=False, # 消費者對消息進行確認 53 consumer_tag=str(uuid.uuid4()) 54 ) 55 56 def consume(self): 57 self.receive_msg() 58 self.channel.start_consuming() 59 60 61 if __name__ == '__main__': 62 rabbit_consumer = RabbitConsumer() 63 rabbit_consumer.consume()
topic 消費者3 --> 實現*.x.* 消息匹配:

1 """ 2 匹配任意點分割的單詞 生產者發送的:a.debug.b 則匹配了'*.debug.*' 3 生產者發送的:a.info.b 則匹配了'*.info.*' 4 """ 5 import pika 6 import uuid 7 8 9 class RabbitConsumer(object): 10 def __init__(self): 11 self.conn = pika.BlockingConnection( 12 pika.ConnectionParameters(host='localhost', port=5672) 13 ) 14 self.channel = self.conn.channel() 15 16 # 消息持久化 17 self.channel.queue_declare( 18 exclusive=False, durable=True, queue='other_queue' 19 ) 20 21 # 聲明交換機,其類型為topic 22 self.channel.exchange_declare( 23 exchange='logs_topic', exchange_type='topic', durable=True 24 ) 25 26 # 將隊列與交換機進行綁定 27 routing_keys = ['*.info.*', '*.debug.*', 'dfdf*'] 28 for routing_key in routing_keys: 29 self.channel.queue_bind( 30 exchange='logs_topic', queue='other_queue', routing_key=routing_key 31 ) 32 33 def call_back(self, channel, method, properties, body): 34 """ 35 消費者對消息進行確認,防止消費者掛掉 36 :param channel: 37 :param method: 38 :param properties: 39 :param body: 40 :return: 41 """ 42 self.channel.basic_ack(delivery_tag=method.delivery_tag) 43 print('接收到的消息為:{}'.format(str(body))) 44 45 def receive_msg(self): 46 print('開始接受消息...') 47 self.channel.basic_qos(prefetch_count=1) 48 self.channel.basic_consume( 49 consumer_callback=self.call_back, 50 queue='other_queue', 51 no_ack=False, # 消費者對消息進行確認 52 consumer_tag=str(uuid.uuid4()) 53 ) 54 55 def consume(self): 56 self.receive_msg() 57 self.channel.start_consuming() 58 59 60 if __name__ == '__main__': 61 rabbit_consumer = RabbitConsumer() 62 rabbit_consumer.consume()
topic消費者執行結果:
消費者1:
消費者2:
消費者3: