官方說明:http://www.rabbitmq.com/getstarted.html
什么是MQ?
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。MQ是消費-生產者模型的一個典型的代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取隊列中的消息。
RabbitMQ是MQ的一種。下面詳細介紹一下RabbitMQ的基本概念。
1、隊列、生產者、消費者
隊列是RabbitMQ的內部對象,用於存儲消息。生產者(下圖中的P)生產消息並投遞到隊列中,消費者(下圖中的C)可以從隊列中獲取消息並消費。
多個消費者可以訂閱同一個隊列,這時隊列中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息並處理。
2、Publish/Subscribe,訂閱發布模式,每個通道都會收到消息
3、Exchange、Binding
剛才我們看到生產者將消息投遞到隊列中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生產者將消息發送到Exchange(交換器,下圖中的X),再通過Binding將Exchange與Queue關聯起來。
a.Direct exchange,一個exchange和多個queue綁定,會根據綁定的不同routingKey,發送到不同的Queue中
b.Topic exchange,按模式匹配路由鍵。模式符號 "#" 表示一個或多個單詞,"*" 僅匹配一個單詞。
c.RPC
4、Exchange Type、Bingding key、routing key
在綁定(Binding)Exchange與Queue的同時,一般會指定一個binding key。在綁定多個Queue到同一個Exchange的時候,這些Binding允許使用相同的binding key。
生產者在將消息發送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規則,生產者就可以在發送消息給Exchange時,通過指定routing key來決定消息流向哪里。
RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。
fanout:把所有發送到該Exchange的消息投遞到所有與它綁定的隊列中。
direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。
topic:將消息路由到binding key與routing key模式匹配的隊列中。
附上一張RabbitMQ的結構圖:
最后來具體解析一下幾個問題:
1、可以自動創建隊列,也可以手動創建隊列,如果自動創建隊列,那么是誰負責創建隊列呢?是生產者?還是消費者?
如果隊列不存在,當然消費者不會收到任何的消息。但是如果隊列不存在,那么生產者發送的消息就會丟失。所以,為了數據不丟失,消費者和生產者都可以創建隊列。那么如果創建一個已經存在的隊列呢?那么不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次創建如果參數和第一次不一樣,那么該操作雖然成功,但是隊列屬性並不會改變。
隊列對於負載均衡的處理是完美的。對於多個消費者來說,RabbitMQ使用輪詢的方式均衡的發送給不同的消費者。
2、RabbitMQ的消息確認機制
默認情況下,如果消息已經被某個消費者正確的接收到了,那么該消息就會被從隊列中移除。當然也可以讓同一個消息發送到很多的消費者。
如果一個隊列沒有消費者,那么,如果這個隊列有數據到達,那么這個數據會被緩存,不會被丟棄。當有消費者時,這個數據會被立即發送到這個消費者,這個數據被消費者正確收到時,這個數據就被從隊列中刪除。
那么什么是正確收到呢?通過ack。每個消息都要被acknowledged(確認,ack)。我們可以顯示的在程序中去ack,也可以自動的ack。如果有數據沒有被ack,那么:
RabbitMQ Server會把這個信息發送到下一個消費者。
如果這個app有bug,忘記了ack,那么RabbitMQServer不會再發送數據給它,因為Server認為這個消費者處理能力有限。
而且ack的機制可以起到限流的作用(Benefitto throttling):在消費者處理完成數據后發送ack,甚至在額外的延時后發送ack,將有效的均衡消費者的負載。
使用springboot調用RabbitMQ例子
環境:
apache-tomcat-8.5.15
jdk1.8.0_172
IDEA
搭建好RabbitMQ服務器環境,這點就不在敘述了。
使用RabbitTemplate的convertAndSend發送自定義的類消息的時候要統一類的包路徑,不然在序列化的時候要報錯,
通過IDEA創建springboot的WEB項目,引入了freemarker和和rabbitmq
創建完后的pom.xml文件為:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.xuan</groupId> <artifactId>springrabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>springrabbitmq</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-freemarker</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
配置運行環境
修改application.yml文件,配置freemarker和rabbitmq的相關參數:
# Tomcat server: tomcat: uri-encoding: UTF-8 max-threads: 1000 min-spare-threads: 30 port: 8070 servlet: context-path: /rabbitmq spring: servlet: multipart: max-file-size: 100MB max-request-size: 100MB enabled: true freemarker: suffix: .html rabbitmq: host: localhost port: 5672 username: admin password: 123456 virtual-host: /testmq listener: simple: #acknowledge-mode: manual #設置確認模式手工確認 concurrency: 3 #消費者最小數量 max-concurrency: 10 # 消費者最大數量
如果配置acknowledge-mode為manual則需要在消費消費的地方調用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);進行確認,不然消息會一直保存在通道中,指定concurrency最小的消費者數量和max-concurrency最大的消費者數量后,是多線程消費消息
這點的rabbitmq配置也可以通過RabbitConfig.java類來配置:
package com.xuan.springrabbitmq.config; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //連接rabbitMQ的基本配置 @Configuration @EnableRabbit public class RabbitConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("123456"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/testmq"); return connectionFactory; } @Bean public AmqpAdmin amqpAdmin() { return new RabbitAdmin(connectionFactory()); } @Bean public RabbitTemplate rabbitTemplate() { return new RabbitTemplate(connectionFactory()); } //配置消費者監聽的容器 @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); //factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//設置確認模式手工確認 return factory; } }
配置路由和通道
配置最簡單的生產者消費者模式ProducerConsumerConfig.java
package com.xuan.springrabbitmq.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //生產者消費者模式的配置,包括一個隊列和兩個對應的消費者 @Configuration public class ProducerConsumerConfig { @Bean public Queue myQueue() { Queue queue = new Queue("myqueue"); return queue; } }
配置訂閱發布模式PublishSubscribeConfig.java
package com.xuan.springrabbitmq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //發布訂閱模式的配置,包括兩個隊列和對應的訂閱者,發布者的交換機類型使用fanout(子網廣播),兩根網線binding用來綁定隊列到交換機 @Configuration public class PublishSubscribeConfig { @Bean public Queue myQueue1() { Queue queue = new Queue("queue1"); return queue; } @Bean public Queue myQueue2() { Queue queue = new Queue("queue2"); return queue; } @Bean public FanoutExchange fanoutExchange() { FanoutExchange fanoutExchange = new FanoutExchange("fanout"); return fanoutExchange; } @Bean public Binding binding1() { Binding binding = BindingBuilder.bind(myQueue1()).to(fanoutExchange()); return binding; } @Bean public Binding binding2() { Binding binding = BindingBuilder.bind(myQueue2()).to(fanoutExchange()); return binding; } }
配置direct直連模式DirectExchangeConfig.java
package com.xuan.springrabbitmq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //direct直連模式的交換機配置,包括一個direct交換機,兩個隊列,三根網線binding @Configuration public class DirectExchangeConfig { @Bean public DirectExchange directExchange() { DirectExchange directExchange = new DirectExchange("direct"); return directExchange; } @Bean public Queue directQueue1() { Queue queue = new Queue("directqueue1"); return queue; } @Bean public Queue directQueue2() { Queue queue = new Queue("directqueue2"); return queue; } //3個binding將交換機和相應隊列連起來 @Bean public Binding bindingorange() { Binding binding = BindingBuilder.bind(directQueue1()).to(directExchange()).with("orange"); return binding; } @Bean public Binding bindingblack() { Binding binding = BindingBuilder.bind(directQueue2()).to(directExchange()).with("black"); return binding; } @Bean public Binding bindinggreen() { Binding binding = BindingBuilder.bind(directQueue2()).to(directExchange()).with("green"); return binding; } }
配置topic交換機模型TopicExchangeConfig.java
package com.xuan.springrabbitmq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //topic交換機模型,需要一個topic交換機,兩個隊列和三個binding @Configuration public class TopicExchangeConfig { @Bean public TopicExchange topicExchange(){ TopicExchange topicExchange=new TopicExchange("mytopic"); return topicExchange; } @Bean public Queue topicQueue1() { Queue queue=new Queue("topicqueue1"); return queue; } @Bean public Queue topicQueue2() { Queue queue=new Queue("topicqueue2"); return queue; } //3個binding將交換機和相應隊列連起來 @Bean public Binding bindingtopic1(){ Binding binding= BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");//binding key return binding; } @Bean public Binding bindingtopic2(){ Binding binding= BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit"); return binding; } @Bean public Binding bindingtopic3(){ Binding binding= BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");//#表示0個或若干個關鍵字,*表示一個關鍵字 return binding; } }
定義發送的消息Mail.java
package po; import java.io.Serializable; public class Mail implements Serializable { private static final long serialVersionUID = -8140693840257585779L; private String mailId; private String country; private Double weight; public Mail() { } public Mail(String mailId, String country, double weight) { this.mailId = mailId; this.country = country; this.weight = weight; } public String getMailId() { return mailId; } public void setMailId(String mailId) { this.mailId = mailId; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public double getWeight() { return weight; } public void setWeight(double weight) { this.weight = weight; } @Override public String toString() { return "Mail [mailId=" + mailId + ", country=" + country + ", weight=" + weight + "]"; } }
繼承的消息TopicMail.java
package po; public class TopicMail extends Mail { String routingkey; public String getRoutingkey() { return routingkey; } public void setRoutingkey(String routingkey) { this.routingkey = routingkey; } @Override public String toString() { return "TopicMail [routingkey=" + routingkey + "]"; } }
定義發送接口的實現ProducerImpl.java
package com.xuan.springrabbitmq.service.impl; import com.xuan.springrabbitmq.service.Producer; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import po.Mail; @Transactional @Service("producer") public class ProducerImpl implements Producer { @Autowired RabbitTemplate rabbitTemplate; public void sendMail(String queue, Mail mail) { rabbitTemplate.setQueue(queue); rabbitTemplate.convertAndSend(queue,mail); } }
訂閱發布時的發送消息實現PublisherImpl.java
package com.xuan.springrabbitmq.service.impl; import po.Mail; import com.xuan.springrabbitmq.service.Publisher; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service("publisher") public class PublisherImpl implements Publisher { @Autowired RabbitTemplate rabbitTemplate; public void publishMail(Mail mail) { rabbitTemplate.convertAndSend("fanout", "", mail); } public void senddirectMail(Mail mail, String routingkey) { rabbitTemplate.convertAndSend("direct", routingkey, mail); } public void sendtopicMail(Mail mail, String routingkey) { rabbitTemplate.convertAndSend("mytopic", routingkey, mail); } }
消費者的實現代碼QueueListener1.java
package com.xuan.springrabbitmq.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import po.Mail; @Component @RabbitListener(queues = "myqueue") public class QueueListener1 { @RabbitHandler public void displayMail(Mail mail, Channel channel, Message message) throws Exception { System.out.println("隊列監聽器1號收到消息" + mail.toString()); //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//如果需要確認的要調用 } }
或者QueueListener2.java
package com.xuan.springrabbitmq.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import po.Mail; @Component public class QueueListener2 { @RabbitListener(queues = "myqueue") public void displayMail(Mail mail) throws Exception { System.out.println("隊列監聽器2號收到消息"+mail.toString()); } }
其它模式的消費者也是類似的,指定queues 的名稱就可以了。
源碼位置:https://gitee.com/xuantest/SpringBoot-RabbitMQ