springboot实现发布与订阅(fanout)模式


支持的消息模式

 

 

消息发送者

application.yml rabbitmq配置文件

# 服务端口号
server:
 port: 8080
# 创建MQ服务连接
spring:
 rabbitmq:
   host: 192.168.43.213
   port: 5672
   username: admin
   password: admin
   virtual-host: /

config交换机与队列绑定类

package com.zhang.rabbitmq.springbootrabbitmqproducer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 绑定交换机与队列
*/
@Configuration
public class ConfigurationProducer {

   // 1:声明MQ交换机
   @Bean
   public FanoutExchange fanoutExchange(){
       return new FanoutExchange("fanout-order-exchange", true,false);
  }
   // 2:声明队列 sms.fanout.queue email.fanout.queue duanxin.fanout.queue
   @Bean
   public Queue smsQueue(){
       return new Queue("sms.fanout.queue",true);
  }
   @Bean
   public Queue emailQueue(){
       return new Queue("email.fanout.queue", true);
  }
   @Bean
   public Queue duanxinQueue(){
       return new Queue("duanxin.fanout.queue",true);
  }
   // 3:绑定队列与交换机
   @Bean
   public Binding smsBinding(){
       return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
  }
   @Bean
   public Binding emailBinding(){
       return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
  }
   @Bean
   public Binding duanxinBinding(){
       return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
  }
}

service消息发送类(消息生产者)

package com.zhang.rabbitmq.springbootrabbitmqproducer.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
* 消息生产者
*/
@Service
public class OrderService {

   //引入mq模板
   @Autowired
   private RabbitTemplate rabbitTemplate;

   /**
    * 模拟下单
    */
   public void morikOrder(String userId, String producerId, int number){
       // 1:查询商品是否充足
       String orderId = UUID.randomUUID().toString();
       System.out.println("生成订单号:" + orderId);
       // 2:创建交换机与路由key
       String exchangeName = "fanout-order-exchange";
       String routingKey = "";
       // 3:发送消息
       rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
  }
}

test 测试类

package com.zhang.rabbitmq.springbootrabbitmqproducer;

import com.zhang.rabbitmq.springbootrabbitmqproducer.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringbootRabbitmqProducerApplicationTests {

   @Autowired
   private OrderService orderService;

   @Test
   void contextLoads() {
       orderService.morikOrder("1","2",20);
  }

}

 

消息接收者

application.yml rabbitmq配置文件

# 服务端口号
server:
 port: 8081
# 创建MQ服务连接
spring:
 rabbitmq:
   host: 192.168.43.213
   port: 5672
   username: admin
   password: admin
   virtual-host: /

消息接收者一

package com.zhang.rabbitmq.springbootrabbitmqconsumer.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
* sms 消费者
*/
@RabbitListener(queues = {"sms.fanout.queue"})
@Service
public class SMSFanoutConsumer {

   @RabbitHandler
   public void revierMessage(String message) {
       System.out.println("sms fanout--接收到的订单信息是-->" + message);
  }
}

消息接收者二

package com.zhang.rabbitmq.springbootrabbitmqconsumer.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
* email 消费者
*/
@RabbitListener(queues = {"email.fanout.queue"})
@Service
public class EmailFanoutConsumer {

   @RabbitHandler
   public void reviceMessage(String message){
       System.out.println("email fanout--接收到了订单信息-->" + message);
  }
}

消息接收者三

package com.zhang.rabbitmq.springbootrabbitmqconsumer.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
* duanxin 消费者
*/
@RabbitListener(queues = {"duanxin.fanout.queue"})
@Service
public class DuanxinFanoutConsumer {

   @RabbitHandler
   public void reviceMessage(String message) {
       System.out.println("duanxin fanout--接收到得订单信息是-->" + message);
  }
}


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM