【rabbitmq】之Exchange


rabbitmq常用Exchange有3個,Direct,Topic,Fanout

全局配置文件

spring.rabbitmq.host=dev-mq.ttsingops.com
spring.rabbitmq.port=5672
spring.rabbitmq.username=xxx
spring.rabbitmq.password=xxxxx
spring.rabbitmq.virtual-host=/cd

 

三個完整交換機配置

@Configuration
public class RabbitMQExchangeConfig {


    /**
     * direct sexchange 點對點  完全匹配
     */
    public static final String DIRECT_EXCHANGE = "direct_exchange";

    /**
     * topic_exchage 點對點 規則匹配
     */
    public static final String TOPIC_EXCHANGE = "topic_exchange";

    /**
     * fanout_exchage 廣播
     */
    public static final String FANOUT_EXCHANGE = "fanout_exchange";


    @Bean("directExchange")
    public DirectExchange directExchange() {
        return new DirectExchange(DIRECT_EXCHANGE, true, false);
    }

    @Bean("topicExchange")
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE, true, false);
    }

    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE, true, false);
    }

}

 

RabbitmqTemplate配置

@Configuration
public class RabbitMQConfig {


    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //Mandatory為true時,消息通過交換器無法匹配到隊列會返回給生產者,為false時匹配不到會直接被丟棄
         rabbitTemplate.setMandatory(true);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

}

 

 

DirectExchange

可以理解為發布/訂閱,點對點的一種交換機,A發消息,B消費消息。是一種完全匹配的交換機

配置DirectExchange  綁定direct_queue 綁定direct_queue_key

/********************direct************************/
    @Bean("directQueue")
    public Queue directQueue() {
        return new Queue("direct_queue", true, false, false);
    }

    @Bean("directBind")
    public Binding directBind(@Autowired @Qualifier("directExchange") DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue()).to(directExchange).with("direct_queue_key");
    }

 

發送DirectMQ消息

@GetMapping("direct")
    public String direct(String direct) {
        rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.DIRECT_EXCHANGE, "direct_queue_key", direct);
        return "ok";
    }

 

監聽direct_queue消息

@RabbitListener(queues = {"#{directQueue.name}"})
    public void directQueue(@Header(AmqpHeaders.CHANNEL) Channel channel, String msg, Message message) throws Exception {

        log.info("msg:{},mq.message:{}", msg, message.toString());

    }

 

image

TopicExchange

 

TopicExchange配置

/********************topic1,************************/
    @Bean("topicQueue")
    public Queue topicQueue() {
        return new Queue("topic_queue", true, false, false);
    }
    /**
     *  * 是單個匹配
     * @param topicExchange
     * @return
     */
    @Bean("topicBind")
    public Binding topicBind(@Autowired @Qualifier("topicExchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue()).to(topicExchange).with("*.queue.key");
    }

    /********************topic2************************/
    @Bean("topic2Queue")
    public Queue topic2Queue() {
        return new Queue("topic_queue", true, false, false);
    }

    /**
     *  #可以多個匹配
     * @param topicExchange
     * @return
     */
    @Bean("topic2Bind")
    public Binding topic2Bind(@Autowired @Qualifier("topicExchange") TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue()).to(topicExchange).with("#.queue.key");
    }

 

發送TopicExchange消息

@GetMapping("topic")
    public String topic(String topic) {
        //* 單個匹配
        rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.TOPIC_EXCHANGE, "order.queue.key", topic);
        rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.TOPIC_EXCHANGE, "bill.queue.key", topic);
        //# 多個匹配
        rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.TOPIC_EXCHANGE, "order.1.queue.key", topic);
        rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.TOPIC_EXCHANGE, "bill.1.queue.key", topic);
        return "ok";
    }

 

監聽TopicExchange消息

@RabbitListener(queues = {"#{topicQueue.name}"})
    public void topicQueue(@Header(AmqpHeaders.CHANNEL) Channel channel, String msg, Message message) throws Exception {
        log.info("topicQueue,msg:{},mq.message:{}", msg, message.toString());
    }


    @RabbitListener(queues = {"#{topic2Queue.name}"})
    public void topic2Queue(@Header(AmqpHeaders.CHANNEL) Channel channel, String msg, Message message) throws Exception {
        log.info("topic2Queue,msg:{},mq.message:{}", msg, message.toString());

    }

 

image

 

FanoutExchange

/********************fanout************************/
    @Bean("fanoutQueue")
    public Queue fanoutQueue() {
        return new Queue("fanout_queue", true, false, false);
    }

    @Bean("fanoutBind")
    public Binding fanoutBind(@Autowired @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange);
    }

 

@GetMapping("fanout")
    public String fanout(String fanout) {
        rabbitTemplate.convertAndSend(RabbitMQExchangeConfig.FANOUT_EXCHANGE,"",fanout);
        return "ok";
    }

 

@RabbitListener(queues = {"#{fanoutQueue.name}"})
    public void fanoutQueue(@Header(AmqpHeaders.CHANNEL) Channel channel, String msg, Message message) throws Exception {

        log.info("msg:{},mq.message:{}", msg, message.toString());


    }

 

image

 

 

以上就是Rabbitmq三個常用Exchange的基本用法。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM