前言:
因為項目需要用到RabbitMQ,前幾天就看了看RabbitMQ的知識,記錄下SpringBoot整合RabbitMQ的過程。
給出兩個網址:
RabbitMQ官方教程:http://www.rabbitmq.com/getstarted.html
SpringBoot整個RabbitMQ教程:https://docs.spring.io/spring-boot/docs/2.0.1.RELEASE/reference/htmlsingle/#boot-features-amqp
正文:
1:最簡單的HelloWorld
首先加上SpringBoot的依賴
1 <!-- rabbitmq --> 2 <dependency> 3 <groupId>org.springframework.boot</groupId> 4 <artifactId>spring-boot-starter-amqp</artifactId> 5 </dependency>
其實我們要申明一個隊列,這里隊列存放生產者生產的消息,然后消費者來這里面取出來
1 @Configuration 2 public class RabbitBeanConf { 3 4 public static final String QUEUE = "queue"; 5 6 @Bean 7 public Queue queue() { 8 return new Queue(QUEUE, true); 9 } 10 }
生產者:
1 @Service 2 public class RabbitSender { 3 4 @Autowired 5 AmqpTemplate amqpTemplate; 6 7 public void send(String msg) { 8 amqpTemplate.convertAndSend(RabbitBeanConf.QUEUE, msg); 9 System.out.println("生產者生產了一個消息: " + msg + " " + new Date().getTime()); 10 } 11 }
消費者:
1 @Service 2 public class RabbitReceiver { 3 4 @RabbitListener(queues = RabbitBeanConf.QUEUE) 5 public void receive(String msg) { 6 7 System.out.println("消費者收到了一個消息: " + msg + " " + new Date().getTime()); 8 } 9 }
2:廣播模式(Fanout模式)
這個模式不同於上面的模式,這個模式只要一個生產者生產出一個消息,那么所有消費者全部可以接收到。
最簡單一個例子就是你微信群發消息的時候,你發了一條消息,但你的好友全部都能收到這條消息
RabbitMQ配置:
1 @Configuration 2 public class FanoutRabbitMQConfig { 3 4 //默認持久化durable為true 5 //根據方法名來進行綁定的 firstFanoutQueue() 6 @Bean 7 public Queue firstFanoutQueue() { 8 return new Queue("firstFanoutQueue"); 9 } 10 11 @Bean 12 public Queue secondFanoutQueue() { 13 return new Queue("secondFanoutQueue"); 14 } 15 16 //默認持久化durbale為true 17 //exchange交換機 18 @Bean 19 public FanoutExchange fanoutExchange() { 20 return new FanoutExchange("fanoutExchange"); 21 } 22 23 /** 24 * 以下兩種方法都可以成功 25 * 26 * @return 27 * 2018年6月11日 28 */ 29 @Bean 30 public Binding bindingFirst() { 31 return BindingBuilder.bind(firstFanoutQueue()).to(fanoutExchange()); 32 } 33 34 @Bean 35 public Binding bindingSecond(FanoutExchange fanoutExchange, Queue secondFanoutQueue) { 36 return BindingBuilder.bind(secondFanoutQueue).to(fanoutExchange); 37 } 38 }
這里我解釋一下,官方引入了一個叫做 “exchange”交換機的東西。
我是這樣理解的:這個交換機主要是為了解耦生產了和消費者直接的強聯系。生產者不直接往隊列里面送東西了,而且往exchange里面送東西,然后exchange綁定想要送消息進去的隊列,然后消費者監聽即可。主要解耦。
生產者:
1 @Service 2 public class FanoutSender { 3 4 @Autowired 5 AmqpTemplate amqpTemplate; 6 7 public void send(String msg) { 8 9 amqpTemplate.convertAndSend("fanoutExchange", "", msg); 10 System.out.println("生產者生產了一個消息: " + msg + " " + new Date().getTime()); 11 } 12 }
消費者:
1 @Service 2 public class FanoutRecevi { 3 4 @RabbitListener(queues = {"firstFanoutQueue", "secondFanoutQueue"}) 5 public void receive(String msg) { 6 7 System.out.println("消費者收到了一個消息: " + msg + " " + new Date().getTime()); 8 } 9 }
3:Topic模式
感覺和直接模式(direct模式)有點相像。只不過這里用了類似正則的東西,一個exchange匹配符合規則的隊列。
topic 和 direct 類似, 只是匹配上支持了"模式", 在"點分"的 routing_key 形式中, 可以使用兩個通配符:
*
表示一個詞.#
表示零個或多個詞.
4:Header模式
headers 也是根據規則匹配, 相較於 direct 和 topic 固定地使用 routing_key , headers 則是一個自定義匹配規則的類型.
在隊列與交換器綁定時, 會設定一組鍵值對規則, 消息中也包括一組鍵值對( headers 屬性), 當這些鍵值對有一對, 或全部匹配時, 消息被投送到對應隊列.