springboot 整合RabbitMQ yml配置文件配置交換機 隊列信息


1.配置文件

application-rabbit.yml

####################################################
#######  Rabbit MQ Exchange Queue Config  ##########
####################################################
rabbit:
  # 交換機
  exchanges:
#    # 自定義-延遲
#    - name: delay.mode
#      type: CUSTOM
#      custom-type: x-delayed-message
#      arguments:
#        x-delayed-type: direct
#    # 通配符訂閱
#    - name: topic.mode
#      type: TOPIC
#    # 廣播
#    - name: fanout.mode
#      type: FANOUT
#    # 消息頭
#    - name: headers.mode
#      type: HEADERS
    # 直連交換機
    - name: centerDeliverExchange
      type: DIRECT

  # 隊列
  queues:
    # 直連隊列
    - name: queue-PLUS2-9002
      routing-key: route-PLUS2-9002
      exchange-name: centerDeliverExchange

    - name: queue-PLUS2-9003
      routing-key: route-PLUS2-9003
      exchange-name: centerDeliverExchange
#    # 隊列2
#    - name: queue.2
#      routing-key: queue.*
#      exchange-name: fanout.mode,topic.mode
#    # 延遲隊列
#    - name: delay.queue
#      routing-key: delay.queue
#      exchange-name: delay.mode

將以上配置文件引入application.yml

spring:
  profiles:
    include: rabbit

注入配置文件 並定義交換機 隊列

SpringBeanUtils.java
 
        
package com.mybatis.plus.utils;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.stereotype.Component;

/**
 * spring工具類 方便在非spring管理環境中獲取bean
 *
 * @author gch
 */
@Component
public final class SpringBeanUtils implements BeanFactoryPostProcessor {
    /**
     * Spring應用上下文環境
     */
    private static ConfigurableListableBeanFactory beanFactory;

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
        SpringBeanUtils.beanFactory = beanFactory;
    }

    /**
     * 獲取對象
     *
     * @param name
     * @return Object 一個以所給名字注冊的bean的實例
     * @throws BeansException
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException {
        return (T) beanFactory.getBean(name);
    }

    /**
     * 獲取類型為requiredType的對象
     *
     * @param clz
     * @return
     * @throws BeansException
     */
    public static <T> T getBean(Class<T> clz) throws BeansException {
        T result = (T) beanFactory.getBean(clz);
        return result;
    }

    public static <T> T getBean(String name, Class<T> clz) throws BeansException {
        T result = (T) beanFactory.getBean(name, clz);
        return result;
    }

    /**
     * 如果BeanFactory包含一個與所給名稱匹配的bean定義,則返回true
     *
     * @param name
     * @return boolean
     */
    public static boolean containsBean(String name) {
        return beanFactory.containsBean(name);
    }

    /**
     * 判斷以給定名字注冊的bean定義是一個singleton還是一個prototype。 如果與給定名字相應的bean定義沒有被找到,將會拋出一個異常(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws NoSuchBeanDefinitionException
     */
    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
        return beanFactory.isSingleton(name);
    }

    /**
     * @param name
     * @return Class 注冊對象的類型
     * @throws NoSuchBeanDefinitionException
     */
    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
        return beanFactory.getType(name);
    }

    /**
     * 如果給定的bean名字在bean定義中有別名,則返回這些別名
     *
     * @param name
     * @return
     * @throws NoSuchBeanDefinitionException
     */
    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
        return beanFactory.getAliases(name);
    }

    /**
     * 將bean對象注冊到bean工廠
     *
     * @param beanName
     * @param bean
     * @param <T>
     * @return
     */
    public static <T> boolean registerBean(String beanName, T bean) {
        // 將bean對象注冊到bean工廠
        beanFactory.registerSingleton(beanName, bean);
        return true;
    }
}

 

RabbitMqProperties.java
package com.mybatis.plus.config.mq;

import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.mybatis.plus.utils.SpringBeanUtils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.amqp.core.*;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * rabbitmq 消息隊列和交換機 配置文件
 *
 * @author gch
 */
@Slf4j
@Data
@ConfigurationProperties(
        prefix = "rabbit"
)
public class RabbitMqProperties {

    /**
     * 裝載自定義配置交換機
     */
    private List<ExchangeConfig> exchanges = new ArrayList<>();

    /**
     * 裝載自定義配置隊列
     */
    private List<QueueConfig> queues = new ArrayList<>();

    @Data
    public static class QueueConfig {

        /**
         * 隊列名(每個隊列的名稱應該唯一)
         * 必須*
         */
        private String name;

        /**
         * 指定綁定交互機,可綁定多個(逗號分隔)
         * 必須*
         */
        private String exchangeName;

        /**
         * 隊列路由鍵(隊列綁定交換機的匹配鍵,例如:“user” 將會匹配到指定路由器下路由鍵為:【*.user、#.user】的隊列)
         */
        private String routingKey;

        /**
         * 是否為持久隊列(該隊列將在服務器重啟后保留下來)
         */
        private Boolean durable = Boolean.TRUE;

        /**
         * 是否為排它隊列
         */
        private Boolean exclusive = Boolean.FALSE;

        /**
         * 如果隊列為空是否刪除(如果服務器在不使用隊列時是否刪除隊列)
         */
        private Boolean autoDelete = Boolean.FALSE;

        /**
         * 頭隊列是否全部匹配
         * 默認:是
         */
        private Boolean whereAll = Boolean.TRUE;

        /**
         * 參數
         */
        private Map<String, Object> args;

        /**
         * 消息頭
         */
        private Map<String, Object> headers;

    }

    @Data
    public static class ExchangeConfig {

        /**
         * 交換機名
         */
        private String name;

        /**
         * 交換機類型
         */
        private ExchangeType type;

        /**
         * 自定義交換機類型
         */
        private String customType;

        /**
         * 交換機參數(自定義交換機)
         */
        private Map<String, Object> arguments;

    }

    public enum ExchangeType {
        /**
         * 自定義交換機
         */
        CUSTOM,
        /**
         * 直連交換機(全文匹配)
         */
        DIRECT,
        /**
         * 通配符交換機(兩種通配符:*只能匹配一個單詞,#可以匹配零個或多個)
         */
        TOPIC,
        /**
         * 頭交換機(自定義鍵值對匹配,根據發送消息內容中的headers屬性進行匹配)
         */
        HEADERS,
        /**
         * 扇形(廣播)交換機 (將消息轉發到所有與該交互機綁定的隊列上)
         */
        FANOUT;
    }

    public ExchangeConfig getExchangeConfig(String name) {
        Map<String, ExchangeConfig> collect = exchanges.stream().collect(Collectors.toMap(e -> e.getName(), e -> e));
        return collect.get(name);
    }



    /**
     * 動態創建交換機
     *
     * @return
     */
    @Bean
    public Object createExchange() {
        List<ExchangeConfig> exchanges = getExchanges();
        if (!CollectionUtils.isEmpty(exchanges)) {
            exchanges.forEach(e -> {
                // 聲明交換機
                Exchange exchange = null;
                switch (e.getType()) {
                    case DIRECT:
                        exchange = new DirectExchange(e.getName());
                        break;
                    case TOPIC:
                        exchange = new TopicExchange(e.getName());
                        break;
                    case HEADERS:
                        exchange = new HeadersExchange(e.getName());
                        break;
                    case FANOUT:
                        exchange = new FanoutExchange(e.getName());
                        break;
                    case CUSTOM:
                        exchange = new CustomExchange(e.getName(), e.getCustomType(), true, false, e.getArguments());
                        break;
                    default:
                        break;
                }

                // 將交換機注冊到spring bean工廠 讓spring實現交換機的管理
                if (exchange != null) {
                    SpringBeanUtils.registerBean(e.getName(), exchange);
                }
            });
        }

        return null;
    }

    /**
     * 動態綁定隊列和交換機
     *
     * @return
     */
    @Bean
    public Object bindingQueueToExchange() {
        List<QueueConfig> queues = getQueues();
        if (!CollectionUtils.isEmpty(queues)) {
            queues.forEach(q -> {

                // 創建隊列
                Queue queue = new Queue(q.getName(), q.getDurable(),
                        q.getExclusive(), q.getAutoDelete(), q.getArgs());

                // 注入隊列bean
                SpringBeanUtils.registerBean(q.getName(), queue);

                // 獲取隊列綁定交換機名
                List<String> exchangeNameList;
                if (q.getExchangeName().indexOf(StringPool.COMMA) != -1) {
                    String[] split = q.getExchangeName().split(StringPool.COMMA);
                    exchangeNameList = Arrays.asList(split);
                } else {
                    exchangeNameList = Arrays.asList(q.getExchangeName());
                }

                exchangeNameList.forEach(name -> {
                    // 獲取交換機配置參數
                    ExchangeConfig exchangeConfig = getExchangeConfig(name);
                    Binding binding = bindingBuilder(queue, q, exchangeConfig);

                    // 將綁定關系注冊到spring bean工廠 讓spring實現綁定關系的管理
                    if (binding != null) {
                        log.debug("queue [{}] binding exchange [{}] success!", q.getName(), exchangeConfig.getName());
                        SpringBeanUtils.registerBean(q.getName() + StringPool.DASH + name, binding);
                    }
                });

            });
        }

        return null;
    }

    public Binding bindingBuilder(Queue queue, QueueConfig q, ExchangeConfig exchangeConfig) {
        // 聲明綁定關系
        Binding binding = null;

        // 根據不同的交換機模式 獲取不同的交換機對象(注意:剛才注冊時使用的是父類Exchange,這里獲取的時候將類型獲取成相應的子類)生成不同的綁定規則
        switch (exchangeConfig.getType()) {
            case TOPIC:
                binding = BindingBuilder.bind(queue)
                        .to(SpringBeanUtils.getBean(exchangeConfig.getName(), TopicExchange.class))
                        .with(q.getRoutingKey());
                break;
            case DIRECT:
                binding = BindingBuilder.bind(queue)
                        .to(SpringBeanUtils.getBean(exchangeConfig.getName(), DirectExchange.class))
                        .with(q.getRoutingKey());
                break;
            case HEADERS:
                if (q.getWhereAll()) {
                    binding = BindingBuilder.bind(queue)
                            .to(SpringBeanUtils.getBean(exchangeConfig.getName(), HeadersExchange.class))
                            .whereAll(q.getHeaders()).match();
                } else {
                    binding = BindingBuilder.bind(queue)
                            .to(SpringBeanUtils.getBean(exchangeConfig.getName(), HeadersExchange.class))
                            .whereAny(q.getHeaders()).match();
                }
                break;
            case FANOUT:
                binding = BindingBuilder.bind(queue)
                        .to(SpringBeanUtils.getBean(exchangeConfig.getName(), FanoutExchange.class));
                break;
            case CUSTOM:
                binding = BindingBuilder.bind(queue)
                        .to(SpringBeanUtils.getBean(exchangeConfig.getName(), CustomExchange.class))
                        .with(q.getRoutingKey()).noargs();
                break;
            default:
                log.warn("queue [{}] config unspecified exchange!", q.getName());
                break;
        }

        return binding;
    }
}

添加rabbitMQ配置

package com.mybatis.plus.config.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;

/**
 * RabbitMQ配置
 *
 * @author gch
 */
@Slf4j
@Configuration
@EnableConfigurationProperties(RabbitMqProperties.class)
//@PropertySource(value = "classpath:application-rabbit.yml")
public class RabbitConfiguration {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //設置開啟Mandatory,才能觸發回調函數,無論消息推送結果怎么樣都強制調用回調函數
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("ConfirmCallback:     "+"相關數據:"+correlationData);
                System.out.println("ConfirmCallback:     "+"確認情況:"+ack);
                System.out.println("ConfirmCallback:     "+"原因:"+cause);
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("ReturnCallback:     "+"消息:"+message);
                System.out.println("ReturnCallback:     "+"回應碼:"+replyCode);
                System.out.println("ReturnCallback:     "+"回應信息:"+replyText);
                System.out.println("ReturnCallback:     "+"交換機:"+exchange);
                System.out.println("ReturnCallback:     "+"路由鍵:"+routingKey);
            }
        });

        return rabbitTemplate;
    }

  
}

配置文件注意兩點

1.

@EnableConfigurationProperties(RabbitMqProperties.class)

2.就是監聽需要配置文件配置:

spring:
#配置rabbitMq 服務器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: root
    password: root
    #確認消息已發送到交換機(Exchange)
#publisher-confirms: true publisher
-confirm-type: correlated #確認消息已發送到隊列(Queue) publisher-returns: true

經調試:springBoot 版本  2.1.6.RELEASE 使用  publisher-confirm-type: correlated ;  2.1.6.RELEASE 版本使用  publisher-confirms: true

 

接下來就可以愉快的生產消息了

rabbitTemplate.convertAndSend("testDirectExchange", "testDirectRouting", JSONObject.toJSONString(logEntity));

 

貼個消費的方式

MessageListenerConfig
package com.mybatis.plus.config.mq;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author : JCccc
 * @CreateTime : 2019/9/4
 * @Description :
 **/
@Configuration
public class MessageListenerConfig {

    @Autowired
    private CachingConnectionFactory connectionFactory;
    @Autowired
    private EurekaReceiver eurekaReceiver;//消息接收處理類

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默認是自動確認,這里改為手動確認消息
        //設置一個隊列
        container.setQueueNames("testDirectQueue");
        //如果同時設置多個如下: 前提是隊列都是必須已經創建存在的
        //  container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");


        //另一種設置隊列的方法,如果使用這種情況,那么要設置多個,就使用addQueues
        //container.setQueues(new Queue("TestDirectQueue",true));
        //container.addQueues(new Queue("TestDirectQueue2",true));
        //container.addQueues(new Queue("TestDirectQueue3",true));
        container.setMessageListener(eurekaReceiver);

        return container;
    }


}

 

package com.mybatis.plus.config.mq;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.mybatis.plus.entity.Log;
import com.mybatis.plus.service.ILogService;
import com.mybatis.plus.utils.EurekaUtils;
import com.mybatis.plus.utils.hash.ConsistentHash;
import com.mybatis.plus.utils.hash.pojo.ConsistentHashNode;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Component
@Slf4j
public class EurekaReceiver implements ChannelAwareMessageListener {

    @Value("${server.port}")
    private String port;

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    ILogService logService;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            if ("testDirectQueue".equals(message.getMessageProperties().getConsumerQueue())) {
                String msg = new String(message.getBody(), "UTF-8");
                Log parseObject = JSONObject.parseObject(msg, Log.class);
                EurekaReceiver.log.info("消費的消息來自的隊列名為:" + message.getMessageProperties().getConsumerQueue());
                log.info("消息成功消費到  messageId:" + parseObject.getLogUuid() + "  messageData:" + parseObject.getLogTitle() + "  createTime:" + parseObject.getCreateTime());
                log.info("================================");
                // 收到來自主機的消息 進行一致性hash分配 發往不同的服務
                // 獲取服務節點 創建一致hash環
                ConsistentHash consistentHash = InitConfig.consistentHash;
                List<Map<String, String>> allServiceAddr = EurekaUtils.getAllServiceInfo("127.0.0.1", port, "PLUS2");
                if (!allServiceAddr.isEmpty()) {
                    for (Map<String, String> stringMap : allServiceAddr) {
                        String instanceId = stringMap.get("routeKey");
                        // 新增1個物理節點和150個對應的虛擬節點
//                        String instanceId = stringMap.get("queueKey");
                        // 如果hash環中沒有該節點 才新增
                        ConsistentHashNode node = consistentHash.getAccurateNode(instanceId);
                        if (null == node) {
                            consistentHash.putNode(new ConsistentHashNode(consistentHash.getPoint(instanceId),instanceId),150);
                        }
                    }
                } else {
                    //沒有服務提供者 將消息返回隊列
                    channel.basicReject(deliveryTag, true);
                    return;
                }

                // 提取消息中的某個代表來源主機的標識 然后在hash環上分配目標節點
                String logUuid = parseObject.getLogUuid();
                ConsistentHashNode node = consistentHash.getNode(logUuid);
                log.info("主機標識:{},分配節點:{}", logUuid, node.getTarget());
                //向指定路由發送消息
                // todo 問題 這里怎么保證隊列預先創建初始化好 解決方案 先從配置文件獲取隊列名稱 新增服務時 需要重啟服務
                rabbitTemplate.convertAndSend("centerDeliverExchange", node.getTarget().toString(), msg);
//                planTwo(parseObject);
                channel.basicAck(deliveryTag, false); //第二個參數,手動確認可以被批處理,當該參數為 true 時,則可以一次性確認 delivery_tag 小於等於傳入值的所有消息
                log.info(">>>>>>>>>>>>消費消息成功!");
            }
        } catch (Exception e) {
            log.info(">>>>>>>>>>>>消費消息失敗!失敗消息ID:{}, 失敗原因:{}", deliveryTag, e.getMessage());
            channel.basicReject(deliveryTag, true);
        }
    }




    private void planTwo(Log parseObject) {
        //先查詢出數據庫存在的數據(如果有)
        String uuid = parseObject.getLogUuid();
        //結合數據庫索引 防止並發插入多條重復數據
        Log byId = logService.getOne((new QueryWrapper<Log>()).eq("log_uuid", uuid));
        if (byId == null) {
            logService.save(parseObject);
        } else {
            String logOrder = byId.getLogOrder();
            String logOrder1 = parseObject.getLogOrder();
            //編號大於數據庫編號才更新
            if (Integer.valueOf(logOrder1) > Integer.valueOf(logOrder)) {
                log.info("============= 新序號:{},舊序號:{}; 執行更新操作", logOrder1, logOrder);
                BeanUtil.copyProperties(parseObject, byId, CopyOptions.create().ignoreNullValue());
                logService.updateById(byId);
            }
        }
    }

}
EurekaReceiver 這個類有些包我就不貼了 業務那一塊替換成自己的


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM