一、項目結構
二、pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wuxi</groupId> <artifactId>A01mq</artifactId> <version>0.0.1-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.4.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.9</source> <target>1.9</target> </configuration> </plugin> </plugins> </build> </project>
三、application.yml
server: port: 8080 spring: application: name: rabbitmqServer rabbitmq: host: 127.0.0.1
四、啟動類
package com.wuxi; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class MyApplication { public static void main(String[] args) { SpringApplication.run(MyApplication.class, args); } }
五、service
package com.wuxi.services; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; import java.util.Date; @Service //@RabbitListener(queues = "work") //注解在類上時,只能有一個方法需要加@RabbitHandler注解 public class MqService { @RabbitListener(queues = "work") public void work1(String text) { try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消費者1:+++++" + text); } @RabbitListener(queues = "work") public void work2(String text) { System.out.println("消費者2:-----" + text); } //**************************************************************** @RabbitListener(queues = "publish_queue1") public void publish1(String text) { try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消費者1:+++++" + text); } @RabbitListener(queues = "publish_queue2") public void publish2(String text) { System.out.println("消費者2:-----" + text); } //**************************************************************** @RabbitListener(queues = "routing_queue1") public void routing1(String text) { System.out.println("消費者1:+++++" + text); } @RabbitListener(queues = "routing_queue2") public void routing2(String text) { System.out.println("消費者2:-----" + text); } //**************************************************************** @RabbitListener(queues = "topic_queue1") public void topic1(String text) { System.out.println("消費者1:+++++" + text); } @RabbitListener(queues = "topic_queue2") public void topic2(String text) { System.out.println("消費者2:-----" + text); } //**************************************************************** @RabbitListener(queues = "delayed_queue") public void delayed(String text) { System.out.println("接收時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); System.out.println("消費者1:+++++" + text); } }
六、controller
package com.wuxi.controllers; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.text.SimpleDateFormat; import java.util.Date; /** * 安裝完erlang再安裝rabbitmq,然后開啟界面管理。如果用戶無法登錄,命令行添加用戶再配置權限然后登錄 * * @author LL */ @RestController public class MqController { @Autowired private AmqpTemplate amqpTemplate; /** * work模式:只有一個消費者能收到消息,當一個消費者較忙時,消息將被另一個不忙的消費者接收 * 在rabbitmq中要創建一個隊列 * * @return */ @RequestMapping("/work") public String work() { for (int i = 0; i < 10; i++) { amqpTemplate.convertAndSend("work", "發送消息" + i); } return "ok"; } // **************************************************************** /** * publish模式:多個消費者同時接收到消息 * 在rabbitmq中要創建一個交換機(fanout)和兩個隊列,兩個隊列要綁定到交換機 * * @return */ @RequestMapping("/publish") public String publish() { for (int i = 0; i < 10; i++) { amqpTemplate.convertAndSend("publish_exchange", "", "發送消息" + i); } return "ok"; } // **************************************************************** /** * 路由模式:具有routing_key的將同時接收到消息(完全匹配) * 在rabbitmq創建1個交換機(direct)和兩個隊列,兩個隊列綁定到交換機,並且配置routing_key,根據routing_key發送消息到隊列 * * @return */ @RequestMapping("/routing1") public String routing1() { amqpTemplate.convertAndSend("routing_exchange", "routing_key1", "發送消息+routing_key1"); return "ok"; } @RequestMapping("/routing2") public String routing2() { amqpTemplate.convertAndSend("routing_exchange", "routing_key2", "發送消息+routing_key2"); return "ok"; } // **************************************************************** /** * topic模式:根據routing_key匹配的隊列將同時接收到消息(通配符匹配) * rabbitmq配置一個交換機(topic)和兩個隊列,兩個隊列綁定交換機,並配置通配routing_key * * @return */ @RequestMapping("/topic1") public String topic1() { amqpTemplate.convertAndSend("topic_exchange", "key1", "發送消息+key1"); return "ok"; } @RequestMapping("/topic2") public String topic2() { amqpTemplate.convertAndSend("topic_exchange", "topic", "發送消息+topic"); return "ok"; } // **************************************************************** /** * 需要開啟延時插件功能,arguments配置{"x-delayed-type"="topic(公共模式publish、路由模式direct、通配符模式topic)"} * rabbitmq創建一個交換機(x-delayed-message)和一個隊列,隊列綁定到交換機,routing_key根據arguments配置的模式進行配置 * * @return */ @RequestMapping("/delayed") public String delayed() { amqpTemplate.convertAndSend("delayed_exchange", "delayed_key", "發送消息+delayed", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { System.out.println("發送時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); message.getMessageProperties().setDelay(5000); return message; } }); return "ok"; } }
七、管理界面截圖
1、隊列
2、交換機
3、Routing Key
4、用戶
5、虛擬主機