一、Exchange 類型
Exchange分發消息時根據類型的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。
direct:一對一。消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。
topic:一對多。通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號“#”和符號“*”。#匹配0個或多個單詞,“*”匹配不多不少一個單詞。
fanout:廣播。每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去,連routing key都不需要。
headers: 匹配 AMQP 消息的 header 而不是路由鍵,headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了。
過程:
項目結構:
二、搭建父工程rabbitMQ-parent
依賴包pom.xml
<packaging>pom</packaging> <parent> <artifactId>spring-boot-parent</artifactId> <groupId>org.springframework.boot</groupId> <version>2.1.16.RELEASE</version> </parent> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.1.1</version> </dependency> </dependencies>
三、搭建消息發送者send
1.結構:
2.依賴pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.配置文件application.yml
server: port: 7070 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
4.啟動類SendApp
@SpringBootApplication public class SendApp { public static void main(String[] args) { SpringApplication.run(SendApp.class,args); } }
四、搭建消息消費者receive
1.結構
2.依賴pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.配置文件application.yml
server:
port: 7070 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
4.啟動類ReceiveApp
@SpringBootApplication public class ReceiveApp { public static void main(String[] args) { SpringApplication.run(ReceiveApp.class,args); } }
五、發送消息
1.send中創建配置類AmqpSendConfig
@Configuration public class AmqpSendConfig { @Bean //模擬<bean>標簽 public DirectExchange getDirectExchange(){ return new DirectExchange("myDirectExchange"); } @Bean public FanoutExchange getFanoutExchange(){ return new FanoutExchange("myFanoutExchange"); } @Bean public TopicExchange getTopicExchange(){ return new TopicExchange("myTopicExchange"); } }
2.SendService
@Service public class SendService { //自動注入amqpTemplate模板對象 @Resource private AmqpTemplate amqpTemplate; //direct方式 public String driectSend(){ /** *參數一:交換機名稱,需要事先在config創建 *參數二:routing Key 根據這個Key發送消息到指定地方 *參數三:消息的具體內容 **/ amqpTemplate.convertAndSend("myDirectExchange","directKey","你吃飯了嗎?"); return "direct發送成功了"; } //fanout方式 public String fanoutSend(){ int []arr={1,2,3,4}; //fanout為廣播,不需要rountkey this.amqpTemplate.convertAndSend("myFanoutExchange","",arr); return "fanout發送成功了"; } //topic方式 public String topicSend() { this.amqpTemplate.convertAndSend("myTopicExchange","abc.def","1111115555555"); return "topic發送成功了"; } }
3.SendController
@RestController public class SendController { @Autowired private SendService sendService; @RequestMapping("/direct") public String directExchange(){ return this.sendService.driectSend(); } @RequestMapping("/fanout") public String fanoutExchange(){ return this.sendService.fanoutSend(); } @RequestMapping("/topic") public String topicExchange() { return this.sendService.topicSend(); } }
六、接收消息
1.receive中添加配置類
direct方式配置類DirectReceiveConfig
@Configuration public class DirectReceiveConfig { //1.創建一個名字為myDirectQueue的隊列 @Bean public Queue getQueue() { return new Queue("myDirectQueue"); } //2.創建名字為myDirectExchange的交換機 @Bean public Exchange getExchange() { return new DirectExchange("myDirectExchange"); } //3.將隊列綁定到交換機 @Bean("binding") public Binding binding(Queue getQueue,Exchange getExchange){ Binding myBinding = BindingBuilder.bind(getQueue) .to(getExchange) .with("directKey") .noargs(); return myBinding; } }
fanout方式配置類FanoutReceiveConfig
@Configuration public class FanoutReceiveConfig { @Bean public Queue fanoutQueue(){ return new Queue("myFanoutQueue"); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("myFanoutExchange"); } @Bean public Binding fanoutBinding(Queue fanoutQueue,FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue).to(fanoutExchange); } }
topic方式配置類TopicReceiveConfig
@Configuration public class TopicReceiveConfig { //隊列一 @Bean public Queue topicQueue(){ return new Queue("myTopicQueue"); } //隊列二 @Bean public Queue topicQueue2(){ return new Queue("myTopicQueue2"); } //交換機 @Bean public TopicExchange topicExchange(){ return new TopicExchange("myTopicExchange"); } //隊列一綁定到交換機 @Bean public Binding binding1(Queue topicQueue,TopicExchange topicExchange){ return BindingBuilder.bind(topicQueue) .to(topicExchange) .with("abc.#");//routingKey以abc開頭的都可以接受到消息 } //隊列一綁定到交換機 @Bean public Binding binding2(Queue topicQueue2,TopicExchange topicExchange){ return BindingBuilder.bind(topicQueue2) .to(topicExchange) .with("#.def");//routingKey以def結尾的都可以接受到消息 } }
2.ReceiveService
@Service public class ReceiveService { //direct方式 //@RabbitListener注解用於標記當前方法為消息監聽方法,可以監聽某個隊列,當隊列中有新消息則自動完成接收, @RabbitListener(queues = "myDirectQueue") public void receive(String message){ System.out.println("接收:"+message); } //fanout方式 @RabbitListener(queues = "myFanoutQueue") public void fanoutReceive(String o){ System.out.println("fanout接收:"+ o); } //topic方式 @RabbitListener(queues = "myTopicQueue") public void topicReceive1(String oi) { System.out.println("topic1接收:"+ oi); } @RabbitListener(queues = "myTopicQueue2") public void topicReceive2(String oi) { System.out.println("topic2接收:"+ oi); } }
七、訪問測試
1.啟動RabbitMQ服務器
2.啟動發送方
3.啟動接收方
4.瀏覽器訪問查看是否發送成功
5.控制器查看是否接收