一、Exchange 类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。
direct:一对一。消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。
topic:一对多。通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,“*”匹配不多不少一个单词。
fanout:广播。每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去,连routing key都不需要。
headers: 匹配 AMQP 消息的 header 而不是路由键,headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了。
过程:
项目结构:
二、搭建父工程rabbitMQ-parent
依赖包pom.xml
<packaging>pom</packaging> <parent> <artifactId>spring-boot-parent</artifactId> <groupId>org.springframework.boot</groupId> <version>2.1.16.RELEASE</version> </parent> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.1.1</version> </dependency> </dependencies>
三、搭建消息发送者send
1.结构:
2.依赖pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.配置文件application.yml
server: port: 7070 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
4.启动类SendApp
@SpringBootApplication public class SendApp { public static void main(String[] args) { SpringApplication.run(SendApp.class,args); } }
四、搭建消息消费者receive
1.结构
2.依赖pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.配置文件application.yml
server:
port: 7070 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
4.启动类ReceiveApp
@SpringBootApplication public class ReceiveApp { public static void main(String[] args) { SpringApplication.run(ReceiveApp.class,args); } }
五、发送消息
1.send中创建配置类AmqpSendConfig
@Configuration public class AmqpSendConfig { @Bean //模拟<bean>标签 public DirectExchange getDirectExchange(){ return new DirectExchange("myDirectExchange"); } @Bean public FanoutExchange getFanoutExchange(){ return new FanoutExchange("myFanoutExchange"); } @Bean public TopicExchange getTopicExchange(){ return new TopicExchange("myTopicExchange"); } }
2.SendService
@Service public class SendService { //自动注入amqpTemplate模板对象 @Resource private AmqpTemplate amqpTemplate; //direct方式 public String driectSend(){ /** *参数一:交换机名称,需要事先在config创建 *参数二:routing Key 根据这个Key发送消息到指定地方 *参数三:消息的具体内容 **/ amqpTemplate.convertAndSend("myDirectExchange","directKey","你吃饭了吗?"); return "direct发送成功了"; } //fanout方式 public String fanoutSend(){ int []arr={1,2,3,4}; //fanout为广播,不需要rountkey this.amqpTemplate.convertAndSend("myFanoutExchange","",arr); return "fanout发送成功了"; } //topic方式 public String topicSend() { this.amqpTemplate.convertAndSend("myTopicExchange","abc.def","1111115555555"); return "topic发送成功了"; } }
3.SendController
@RestController public class SendController { @Autowired private SendService sendService; @RequestMapping("/direct") public String directExchange(){ return this.sendService.driectSend(); } @RequestMapping("/fanout") public String fanoutExchange(){ return this.sendService.fanoutSend(); } @RequestMapping("/topic") public String topicExchange() { return this.sendService.topicSend(); } }
六、接收消息
1.receive中添加配置类
direct方式配置类DirectReceiveConfig
@Configuration public class DirectReceiveConfig { //1.创建一个名字为myDirectQueue的队列 @Bean public Queue getQueue() { return new Queue("myDirectQueue"); } //2.创建名字为myDirectExchange的交换机 @Bean public Exchange getExchange() { return new DirectExchange("myDirectExchange"); } //3.将队列绑定到交换机 @Bean("binding") public Binding binding(Queue getQueue,Exchange getExchange){ Binding myBinding = BindingBuilder.bind(getQueue) .to(getExchange) .with("directKey") .noargs(); return myBinding; } }
fanout方式配置类FanoutReceiveConfig
@Configuration public class FanoutReceiveConfig { @Bean public Queue fanoutQueue(){ return new Queue("myFanoutQueue"); } @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("myFanoutExchange"); } @Bean public Binding fanoutBinding(Queue fanoutQueue,FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue).to(fanoutExchange); } }
topic方式配置类TopicReceiveConfig
@Configuration public class TopicReceiveConfig { //队列一 @Bean public Queue topicQueue(){ return new Queue("myTopicQueue"); } //队列二 @Bean public Queue topicQueue2(){ return new Queue("myTopicQueue2"); } //交换机 @Bean public TopicExchange topicExchange(){ return new TopicExchange("myTopicExchange"); } //队列一绑定到交换机 @Bean public Binding binding1(Queue topicQueue,TopicExchange topicExchange){ return BindingBuilder.bind(topicQueue) .to(topicExchange) .with("abc.#");//routingKey以abc开头的都可以接受到消息 } //队列一绑定到交换机 @Bean public Binding binding2(Queue topicQueue2,TopicExchange topicExchange){ return BindingBuilder.bind(topicQueue2) .to(topicExchange) .with("#.def");//routingKey以def结尾的都可以接受到消息 } }
2.ReceiveService
@Service public class ReceiveService { //direct方式 //@RabbitListener注解用于标记当前方法为消息监听方法,可以监听某个队列,当队列中有新消息则自动完成接收, @RabbitListener(queues = "myDirectQueue") public void receive(String message){ System.out.println("接收:"+message); } //fanout方式 @RabbitListener(queues = "myFanoutQueue") public void fanoutReceive(String o){ System.out.println("fanout接收:"+ o); } //topic方式 @RabbitListener(queues = "myTopicQueue") public void topicReceive1(String oi) { System.out.println("topic1接收:"+ oi); } @RabbitListener(queues = "myTopicQueue2") public void topicReceive2(String oi) { System.out.println("topic2接收:"+ oi); } }
七、访问测试
1.启动RabbitMQ服务器
2.启动发送方
3.启动接收方
4.浏览器访问查看是否发送成功
5.控制器查看是否接收