一 RabbitMQ的介紹
RabbitMQ是消息中間件的一種,消息中間件即分布式系統中完成消息的發送和接收的基礎軟件.這些軟件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,現已經轉讓給apache).
消息中間件的工作過程可以用生產者消費者模型來表示.即,生產者不斷的向消息隊列發送信息,而消費者從消息隊列中消費信息.具體過程如下:
從上圖可看出,對於消息隊列來說,生產者,消息隊列,消費者是最重要的三個概念,生產者發消息到消息隊列中去,消費者監聽指定的消息隊列,並且當消息隊列收到消息之后,接收消息隊列傳來的消息,並且給予相應的處理.消息隊列常用於分布式系統之間互相信息的傳遞.
對於RabbitMQ來說,除了這三個基本模塊以外,還添加了一個模塊,即交換機(Exchange).它使得生產者和消息隊列之間產生了隔離,生產者將消息發送給交換機,而交換機則根據調度策略把相應的消息轉發給對應的消息隊列.那么RabitMQ的工作流程如下所示:
緊接着說一下交換機.交換機的主要作用是接收相應的消息並且綁定到指定的隊列.交換機有四種類型,分別為Direct,topic,headers,Fanout.
Direct是RabbitMQ默認的交換機模式,也是最簡單的模式.即創建消息隊列的時候,指定一個BindingKey.當發送者發送消息的時候,指定對應的Key.當Key和消息隊列的BindingKey一致的時候,消息將會被發送到該消息隊列中.
topic轉發信息主要是依據通配符,隊列和交換機的綁定主要是依據一種模式(通配符+字符串),而當發送消息的時候,只有指定的Key和該模式相匹配的時候,消息才會被發送到該消息隊列中.
headers也是根據一個規則進行匹配,在消息隊列和交換機綁定的時候會指定一組鍵值對規則,而發送消息的時候也會指定一組鍵值對規則,當兩組鍵值對規則相匹配的時候,消息會被發送到匹配的消息隊列中.
Fanout是路由廣播的形式,將會把消息發給綁定它的全部隊列,即便設置了key,也會被忽略.
二.SpringBoot整合RabbitMQ(Direct模式)
SpringBoot整合RabbitMQ非常簡單!感覺SpringBoot真的極大簡化了開發的搭建環境的時間..這樣我們程序員就可以把更多的時間用在業務上了,下面開始搭建環境:
首先創建兩個maven工程,這是為了模擬分布式應用系統中,兩個應用之間互相交流的過程,一個發送者(Sender),一個接收者(Receiver)
緊接着,配置pom.xml文件,注意其中用到了springboot對於AMQP(高級消息隊列協議,即面向消息的中間件的設計)
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.0.RELEASE</version> </parent> <properties> <java.version>1.7</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> <scope>true</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!-- 添加springboot對amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <scope>provided</scope> </dependency> </dependencies>
緊接着,我們編寫發送者相關的代碼.首先毫無疑問,要書寫啟動類:
@SpringBootApplication public class App{ public static void main(String[] args) { SpringApplication.run(App.class, args); } }
接着在application.properties中,去編輯和RabbitMQ相關的配置信息,配置信息的代表什么內容根據鍵就能很直觀的看出了.這里端口是5672,不是15672...15672是管理端的端口!
spring.application.name=spirng-boot-rabbitmq-sender spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
隨后,配置Queue(消息隊列).那注意由於采用的是Direct模式,需要在配置Queue的時候,指定一個鍵,使其和交換機綁定.
@Configuration public class SenderConf { @Bean public Queue queue() { return new Queue("queue"); } }
接着就可以發送消息啦!在SpringBoot中,我們使用AmqpTemplate去發送消息!代碼如下:
@Component public class HelloSender { @Autowired private AmqpTemplate template; public void send() { template.convertAndSend("queue","hello,rabbit~"); } }
編寫測試類!這樣我們的發送端代碼就編寫完了~
@SpringBootTest(classes=App.class) @RunWith(SpringJUnit4ClassRunner.class) public class TestRabbitMQ { @Autowired private HelloSender helloSender; @Test public void testRabbit() { helloSender.send(); } }
接着我們編寫接收端.接收端的pom文件,application.properties(修改spring.application.name),Queue配置類,App啟動類都是一致的!這里省略不計.主要在於我們需要配置監聽器去監聽綁定到的消息隊列,當消息隊列有消息的時候,予以接收,代碼如下:
@Component public class HelloReceive { @RabbitListener(queues="queue") //監聽器監聽指定的Queue public void processC(String str) { System.out.println("Receive:"+str); } }
接下來就可以測試啦,首先啟動接收端的應用,緊接着運行發送端的單元測試,接收端應用打印出來接收到的消息,測試即成功!
需要注意的地方,Direct模式相當於一對一模式,一個消息被發送者發送后,會被轉發到一個綁定的消息隊列中,然后被一個接收者接收!
實際上RabbitMQ還可以支持發送對象:當然由於涉及到序列化和反序列化,該對象要實現Serilizable接口.HelloSender做出如下改寫:
public void send() { User user=new User(); //實現Serializable接口 user.setUsername("hlhdidi"); user.setPassword("123"); template.convertAndSend("queue",user); }
HelloReceiver做出如下改寫:
@RabbitListener(queues="queue") //監聽器監聽指定的Queue public void process1(User user) { //用User作為參數 System.out.println("Receive1:"+user); }
三.SpringBoot整合RabbitMQ(Topic轉發模式)
首先我們看發送端,我們需要配置隊列Queue,再配置交換機(Exchange),再把隊列按照相應的規則綁定到交換機上:
@Configuration public class SenderConf { @Bean(name="message") public Queue queueMessage() { return new Queue("topic.message"); } @Bean(name="messages") public Queue queueMessages() { return new Queue("topic.messages"); } @Bean public TopicExchange exchange() { return new TopicExchange("exchange"); } @Bean Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一個詞,#表示零個或多個詞 } }
而在接收端,我們配置兩個監聽器,分別監聽不同的隊列:
@RabbitListener(queues="topic.message") //監聽器監聽指定的Queue public void process1(String str) { System.out.println("message:"+str); } @RabbitListener(queues="topic.messages") //監聽器監聽指定的Queue public void process2(String str) { System.out.println("messages:"+str); }
好啦!接着我們可以進行測試了!首先我們發送如下內容:
方法的第一個參數是交換機名稱,第二個參數是發送的key,第三個參數是內容,RabbitMQ將會根據第二個參數去尋找有沒有匹配此規則的隊列,如果有,則把消息給它,如果有不止一個,則把消息分發給匹配的隊列(每個隊列都有消息!),顯然在我們的測試中,參數2匹配了兩個隊列,因此消息將會被發放到這兩個隊列中,而監聽這兩個隊列的監聽器都將收到消息!那么如果把參數2改為topic.messages呢?顯然只會匹配到一個隊列,那么process2方法對應的監聽器收到消息!
四.SpringBoot整合RabbitMQ(Fanout Exchange形式)
那前面已經介紹過了,Fanout Exchange形式又叫廣播形式,因此我們發送到路由器的消息會使得綁定到該路由器的每一個Queue接收到消息,這個時候就算指定了Key,或者規則(即上文中convertAndSend方法的參數2),也會被忽略!那么直接上代碼,發送端配置如下:
@Configuration public class SenderConf { @Bean(name="Amessage") public Queue AMessage() { return new Queue("fanout.A"); } @Bean(name="Bmessage") public Queue BMessage() { return new Queue("fanout.B"); } @Bean(name="Cmessage") public Queue CMessage() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange");//配置廣播路由器 } @Bean Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
發送端使用如下代碼發送:
接收端監聽器配置如下:
@Component public class HelloReceive { @RabbitListener(queues="fanout.A") public void processA(String str1) { System.out.println("ReceiveA:"+str1); } @RabbitListener(queues="fanout.B") public void processB(String str) { System.out.println("ReceiveB:"+str); } @RabbitListener(queues="fanout.C") public void processC(String str) { System.out.println("ReceiveC:"+str); } }
運行測試代碼,發現三個監聽器都接收到了數據,測試成功!