springboot實現發布與訂閱(fanout)模式


支持的消息模式

 

 

消息發送者

application.yml rabbitmq配置文件

# 服務端口號
server:
 port: 8080
# 創建MQ服務連接
spring:
 rabbitmq:
   host: 192.168.43.213
   port: 5672
   username: admin
   password: admin
   virtual-host: /

config交換機與隊列綁定類

package com.zhang.rabbitmq.springbootrabbitmqproducer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 綁定交換機與隊列
*/
@Configuration
public class ConfigurationProducer {

   // 1:聲明MQ交換機
   @Bean
   public FanoutExchange fanoutExchange(){
       return new FanoutExchange("fanout-order-exchange", true,false);
  }
   // 2:聲明隊列 sms.fanout.queue email.fanout.queue duanxin.fanout.queue
   @Bean
   public Queue smsQueue(){
       return new Queue("sms.fanout.queue",true);
  }
   @Bean
   public Queue emailQueue(){
       return new Queue("email.fanout.queue", true);
  }
   @Bean
   public Queue duanxinQueue(){
       return new Queue("duanxin.fanout.queue",true);
  }
   // 3:綁定隊列與交換機
   @Bean
   public Binding smsBinding(){
       return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
  }
   @Bean
   public Binding emailBinding(){
       return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
  }
   @Bean
   public Binding duanxinBinding(){
       return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
  }
}

service消息發送類(消息生產者)

package com.zhang.rabbitmq.springbootrabbitmqproducer.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
* 消息生產者
*/
@Service
public class OrderService {

   //引入mq模板
   @Autowired
   private RabbitTemplate rabbitTemplate;

   /**
    * 模擬下單
    */
   public void morikOrder(String userId, String producerId, int number){
       // 1:查詢商品是否充足
       String orderId = UUID.randomUUID().toString();
       System.out.println("生成訂單號:" + orderId);
       // 2:創建交換機與路由key
       String exchangeName = "fanout-order-exchange";
       String routingKey = "";
       // 3:發送消息
       rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
  }
}

test 測試類

package com.zhang.rabbitmq.springbootrabbitmqproducer;

import com.zhang.rabbitmq.springbootrabbitmqproducer.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringbootRabbitmqProducerApplicationTests {

   @Autowired
   private OrderService orderService;

   @Test
   void contextLoads() {
       orderService.morikOrder("1","2",20);
  }

}

 

消息接收者

application.yml rabbitmq配置文件

# 服務端口號
server:
 port: 8081
# 創建MQ服務連接
spring:
 rabbitmq:
   host: 192.168.43.213
   port: 5672
   username: admin
   password: admin
   virtual-host: /

消息接收者一

package com.zhang.rabbitmq.springbootrabbitmqconsumer.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
* sms 消費者
*/
@RabbitListener(queues = {"sms.fanout.queue"})
@Service
public class SMSFanoutConsumer {

   @RabbitHandler
   public void revierMessage(String message) {
       System.out.println("sms fanout--接收到的訂單信息是-->" + message);
  }
}

消息接收者二

package com.zhang.rabbitmq.springbootrabbitmqconsumer.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
* email 消費者
*/
@RabbitListener(queues = {"email.fanout.queue"})
@Service
public class EmailFanoutConsumer {

   @RabbitHandler
   public void reviceMessage(String message){
       System.out.println("email fanout--接收到了訂單信息-->" + message);
  }
}

消息接收者三

package com.zhang.rabbitmq.springbootrabbitmqconsumer.service.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
* duanxin 消費者
*/
@RabbitListener(queues = {"duanxin.fanout.queue"})
@Service
public class DuanxinFanoutConsumer {

   @RabbitHandler
   public void reviceMessage(String message) {
       System.out.println("duanxin fanout--接收到得訂單信息是-->" + message);
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM