前言
先來了解RabbitMQ一個重要的概念:Exchange交換機
1. Exchange概念
-
Exchange:接收消息,並根據路由鍵轉發消息所綁定的隊列。
藍色框:客戶端發送消息至交換機,通過路由鍵路由至指定的隊列。
黃色框:交換機和隊列通過路由鍵有一個綁定的關系。
綠色框:消費端通過監聽隊列來接收消息。
2. 交換機屬性
Name:
交換機名稱 Type:
交換機類型——direct、topic、fanout、headers、sharding(此篇不講) Durability:
是否需要持久化,true為持久化 Auto Delete:
當最后一個綁定到Exchange上的隊列刪除后,自動刪除該Exchange Internal:
當前Exchange是否用於RabbitMQ內部使用,默認為false Arguments:
擴展參數,用於擴展AMQP協議自定制化使用
一、簡單模式(Hello Word)
P代表生產者,C代表消費者,紅色代碼消息隊列。P將消息發送到消息隊列,C對消息進行處理。
生產者:
@Controller public class ProducerDemo { @Autowired private AmqpTemplate rabbitTemplate; @RequestMapping("/send") @ResponseBody public String send() { String context = "hello==========" + new Date(); log.info("Sender : " + context); //生產者,正在往hello這個路由規則中發送,由於沒有交換機,所以路由規則就是隊列名稱 this.rabbitTemplate.convertAndSend("hello", context); return "success"; } }
消費者:
@Component //監聽hello這個隊列 @RabbitListener(queues = "hello") public class ConsumerDemo { @RabbitHandler public void process(String hello) { System.out.println("Receiver ===================: " + hello); } }
二、工作模式
一個隊列有兩個消費者。一個隊列中一條消息,只能被一個消費者消費。
在上面的基礎中,添加一個消費者就OK了。
消費者:
//第1個消費者
@Component @RabbitListener(queues = "hello")//監聽hello這個隊列 public class ConsumerDemo1{ @RabbitHandler public void process(String hello) { System.out.println("Receiver ===================: " + hello); } }
//第2個消費者 @Component @RabbitListener(queues = "hello")//監聽hello這個隊列 public class ConsumerDemo2{ @RabbitHandler public void process(String hello) { System.out.println("Receiver ===================: " + hello); } }
當兩個消費者同時監聽一個隊列時,他們並不能同時消費一條消息,而是隨機消費消息。1,2,3,4,5,6消息來了,consumer1消費了1,3,5;consumer2消費了2,4,6。
這個數據是隨機的哦,別理解為奇偶數。可以自己測試一下。
三、訂閱與發布模式(Fanout)
-
不處理路由鍵,只需要簡單的將隊里綁定到交換機上
-
生產者將消息不是直接發送到隊列,而是發送到X交換機,發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上
-
Fanout交換機轉發消息是最快的
定義一個訂閱模式的交換機:FanoutExchange交換機。然后創建2個隊列helloA,helloB,然后將這兩個隊列綁定到交換機上面。
@Configuration public class RabbitMQConfig { @Bean public Queue queueA() { return new Queue("helloA", true); } @Bean public Queue queueB() { return new Queue("helloB", true); } //創建一個fanoutExchange交換機 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } //將queueA隊列綁定到fanoutExchange交換機上面 @Bean Binding bindingExchangeMessageFanoutA(Queue queueA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueA).to(fanoutExchange); } //將queueB隊列綁定到fanoutExchange交換機上面 @Bean Binding bindingExchangeMessageFanoutB(Queue queueB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueB).to(fanoutExchange); } }
注意一個細節:bindingExchangeMessageFanoutA這種參數中的queueA與創建隊列的方法queueA()名字要相同哦。這樣才知道queueA綁定了該交換機哦。
交換機的名稱也同樣。fanoutExchange參數的名字和fanoutExchange()名字要一樣哦。
生產者:
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
消費者:
//第1個消費者 @Component @RabbitListener(queues = "hello")//監聽hello這個隊列 public class ConsumerDemo1{ @RabbitHandler public void process(String hello) { System.out.println("Receiver ===================: " + hello); } } //第2個消費者 @Component @RabbitListener(queues = "hello")//監聽hello這個隊列 public class ConsumerDemo2{ @RabbitHandler public void process(String hello) { System.out.println("Receiver ===================: " + hello); } }
現在生產者發送了一條消息,會發現consumer1,2都會收到。之前不是說過一個隊列里面的一條消息,只能被一個消費者消費嗎?怎么現在一條消息被兩個消費者消費了。
要知道這里對於生產者來說是只生產了一條消息,但是它發送給了交換機,交換機會根據綁定的隊列來發送。
現在綁定了queueA,queueB隊列,所以兩個隊列里面都有消息了。而消費者關注的也是兩個隊列,就看到了一條消息被兩個消費者消費的情況了。
四、路由模式(Direct)
- 所有發送到Direct Exchange的消息被轉發到RouteKey中指定的Queue。
- 消息傳遞時,RouteKey必須完全匹配才會被隊列接收,否則該消息會被拋棄。
@Configuration public class RabbitMQConfig { public static final String DIRECT_EXCHANGE = "directExchange"; public static final String QUEUE_DIRECT_A = "direct.A"; public static final String QUEUE_DIRECT_B = "direct.B"; //創建一個direct交換機 @Bean DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE); } //創建隊列A @Bean Queue queueDirectNameA() { return new Queue(QUEUE_DIRECT_A); } //創建隊列B @Bean Queue queueDirectNameB() { return new Queue(QUEUE_DIRECT_B); } //將direct.A隊列綁定到directExchange交換機中,使用direct.a.key作為路由規則 @Bean Binding bindingExchangeMessageDirectA(Queue queueDirectNameA, DirectExchange directExchange) { return BindingBuilder.bind(queueDirectNameA).to(directExchange).with("direct.a.key"); } //將direct.B隊列綁定到directExchange交換機中,使用direct.b.key作為路由規則 @Bean Binding bindingExchangeMessageDirectB(Queue queueDirectNameB, DirectExchange directExchange) { return BindingBuilder.bind(queueDirectNameB).to(directExchange).with("direct.b.key"); } }
消費者:
@Component public class ConsumerDemo { @RabbitListener(queues = "direct.A") @RabbitHandler public void processtopicA(String hello) { System.out.println("Receiver Exchanges direct.A ===================: " + hello); } @RabbitListener(queues = "direct.B") @RabbitHandler public void processtopicB(String hello) { System.out.println("Receiver Exchanges direct.B ===================: " + hello); } }
生產者:
// 往directExchange交換機中發送消息,使用direct.a.key作為路由規則。 rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, "direct.a.key", context);
direct.A,direct.B 兩個隊列都綁定了交換機directExchange,但他們的路由規則不同,a隊列用了direct.a.key,b隊列用了direct.b.key,
這種情況下,生產者使用direct.a.key作為路由規則,就只有a隊列能收到消息,b隊列則收不到消息。
五、主題模式(Topic)
-
所有發送到Topic Exchange的消息被轉發到所有管線RouteKey中指定Topic的Queue上
-
Exchange將RouteKey和某Topic進行模糊匹配,此時隊列需要綁定一個Topic
@Configuration public class RabbitMQConfig { public static final String TOPIC_EXCHANGE = "topicExchange"; public static final String TOPIC_KEY_A = "topic.#"; public static final String TOPIC_KEY_B = "topic.b.key"; public static final String QUEUE_TOPIC_A = "topic.A"; public static final String QUEUE_TOPIC_B = "topic.B"; //創建一個topic交換機 @Bean TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); } //創建隊列A @Bean Queue queueTopicNameA() { return new Queue(QUEUE_TOPIC_A); } //創建隊列B @Bean Queue queueTopicNameB() { return new Queue(QUEUE_TOPIC_B); } //隊列topic.A綁定交換機並且關聯了topic.#正則路由規則。就是說只要topic.開頭的,topic.A隊列都將收到消息 @Bean Binding bindingExchangeMessageTopicA(Queue queueTopicNameA, TopicExchange topicExchange) { return BindingBuilder.bind(queueTopicNameA).to(topicExchange).with(TOPIC_KEY_A); } //隊列topic.B綁定交換機並且關聯了topic.b.key正則路由規則。就是說必須是topic.b.key,topic.B隊列才能收到消息,和directExchange類型一樣了。 @Bean Binding bindingExchangeMessageTopicB(Queue queueTopicNameB, TopicExchange topicExchange) { return BindingBuilder.bind(queueTopicNameB).to(topicExchange).with(TOPIC_KEY_B); } }
消費者:
@Component public class ConsumerDemo { @RabbitListener(queues = "topic.A") @RabbitHandler public void processtopicA(String hello) { System.out.println("Receiver Exchanges topic.A ===================: " + hello); } @RabbitListener(queues = "topic.B") @RabbitHandler public void processtopicB(String hello) { System.out.println("Receiver Exchanges topic.B ===================: " + hello); } }
生產者:
@RequestMapping("/topic/send") @ResponseBody public String sendTopicExchange() { String context = "Exchange==topic-->a====== " + new Date(); // 往topicExchange交換機中發送消息,使用topic.b.key作為路由規則。 this.rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, "topic.b.key", context); return "success"; }
這里發送消息時,往topicExchange這個交換機中發送,並且路由規則為topic.b.key。由於b隊列綁定了交換機和路由規則就是它,所以隊列b能收到消息。
但是由於A隊列的過濾規則為topic.#,就是說只要topic開頭的就的路由規則,交換機就會往這個隊列里面發送消息。所以a隊列也能收到消息,topic.b.key是topic開頭的。
對於a隊列來說,路由規則為topic.adsf,topic.b.key,topic.a等等,a隊列都將收到消息,因為它的路由規則就是topic開頭就可以。
六、關系轉換
訂閱模式,路由模式,主題模式,他們三種都非常類似。而且主題模式可以隨時變成兩外兩種模式。
在主題模式下:
- 當路由規則不為正則表達式的時候,他就和路由模式一樣。
- 當路由規則不為正則表達式,且路由規則一樣時,就變成了訂閱模式。
在路由模式下:
- 當路由規則一樣時,就變成了訂閱模式。
簡單總結五種模式:
- 簡單模式:生產者,一個消費者,一個隊列
- 工作模式:生產者,多個消費者,一個隊列
- 訂閱與發布模式(fanout):生產者,一個交換機(fanoutExchange),沒有路由規則,多個消費者,多個隊列
- 路由模式(direct):生產者,一個交換機(directExchange),路由規則,多個消費者,多個隊列
- 主題模式(topic):生產者,一個交換機(topicExchange),模糊匹配路由規則,多個消費者,多個隊列
七、總結
- 一個隊列,一條消息只會被一個消費者消費(有多個消費者的情況也是一樣的)。
- 訂閱模式,路由模式,主題模式,他們的相同點就是都使用了交換機,只不過在發送消息給隊列時,添加了不同的路由規則。訂閱模式沒有路由規則,路由模式為完全匹配規則,主題模式有正則表達式,完全匹配規則。
- 在訂閱模式中可以看到一條消息被多個消費者消費了,不違背第一條總結,因為一條消息被發送到了多個隊列中去了。
- 在交換機模式下:隊列和路由規則有很大關系
- 在有交換機的模式下:3,4,5模式下,生產者只用關心交換機與路由規則即可,無需關心隊列
- 消費者不管在什么模式下:永遠不用關心交換機和路由規則,消費者永遠只關心隊列,消費者直接和隊列交互