1.配置文件
application-rabbit.yml
#################################################### ####### Rabbit MQ Exchange Queue Config ########## #################################################### rabbit: # 交換機 exchanges: # # 自定義-延遲 # - name: delay.mode # type: CUSTOM # custom-type: x-delayed-message # arguments: # x-delayed-type: direct # # 通配符訂閱 # - name: topic.mode # type: TOPIC # # 廣播 # - name: fanout.mode # type: FANOUT # # 消息頭 # - name: headers.mode # type: HEADERS # 直連交換機 - name: centerDeliverExchange type: DIRECT # 隊列 queues: # 直連隊列 - name: queue-PLUS2-9002 routing-key: route-PLUS2-9002 exchange-name: centerDeliverExchange - name: queue-PLUS2-9003 routing-key: route-PLUS2-9003 exchange-name: centerDeliverExchange # # 隊列2 # - name: queue.2 # routing-key: queue.* # exchange-name: fanout.mode,topic.mode # # 延遲隊列 # - name: delay.queue # routing-key: delay.queue # exchange-name: delay.mode
將以上配置文件引入application.yml
spring:
profiles:
include: rabbit
注入配置文件 並定義交換機 隊列
SpringBeanUtils.java
package com.mybatis.plus.utils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.stereotype.Component; /** * spring工具類 方便在非spring管理環境中獲取bean * * @author gch */ @Component public final class SpringBeanUtils implements BeanFactoryPostProcessor { /** * Spring應用上下文環境 */ private static ConfigurableListableBeanFactory beanFactory; @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { SpringBeanUtils.beanFactory = beanFactory; } /** * 獲取對象 * * @param name * @return Object 一個以所給名字注冊的bean的實例 * @throws BeansException */ @SuppressWarnings("unchecked") public static <T> T getBean(String name) throws BeansException { return (T) beanFactory.getBean(name); } /** * 獲取類型為requiredType的對象 * * @param clz * @return * @throws BeansException */ public static <T> T getBean(Class<T> clz) throws BeansException { T result = (T) beanFactory.getBean(clz); return result; } public static <T> T getBean(String name, Class<T> clz) throws BeansException { T result = (T) beanFactory.getBean(name, clz); return result; } /** * 如果BeanFactory包含一個與所給名稱匹配的bean定義,則返回true * * @param name * @return boolean */ public static boolean containsBean(String name) { return beanFactory.containsBean(name); } /** * 判斷以給定名字注冊的bean定義是一個singleton還是一個prototype。 如果與給定名字相應的bean定義沒有被找到,將會拋出一個異常(NoSuchBeanDefinitionException) * * @param name * @return boolean * @throws NoSuchBeanDefinitionException */ public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException { return beanFactory.isSingleton(name); } /** * @param name * @return Class 注冊對象的類型 * @throws NoSuchBeanDefinitionException */ public static Class<?> getType(String name) throws NoSuchBeanDefinitionException { return beanFactory.getType(name); } /** * 如果給定的bean名字在bean定義中有別名,則返回這些別名 * * @param name * @return * @throws NoSuchBeanDefinitionException */ public static String[] getAliases(String name) throws NoSuchBeanDefinitionException { return beanFactory.getAliases(name); } /** * 將bean對象注冊到bean工廠 * * @param beanName * @param bean * @param <T> * @return */ public static <T> boolean registerBean(String beanName, T bean) { // 將bean對象注冊到bean工廠 beanFactory.registerSingleton(beanName, bean); return true; } }
RabbitMqProperties.java
package com.mybatis.plus.config.mq; import com.baomidou.mybatisplus.core.toolkit.StringPool; import com.mybatis.plus.utils.SpringBeanUtils; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.amqp.core.*; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * rabbitmq 消息隊列和交換機 配置文件 * * @author gch */ @Slf4j @Data @ConfigurationProperties( prefix = "rabbit" ) public class RabbitMqProperties { /** * 裝載自定義配置交換機 */ private List<ExchangeConfig> exchanges = new ArrayList<>(); /** * 裝載自定義配置隊列 */ private List<QueueConfig> queues = new ArrayList<>(); @Data public static class QueueConfig { /** * 隊列名(每個隊列的名稱應該唯一) * 必須* */ private String name; /** * 指定綁定交互機,可綁定多個(逗號分隔) * 必須* */ private String exchangeName; /** * 隊列路由鍵(隊列綁定交換機的匹配鍵,例如:“user” 將會匹配到指定路由器下路由鍵為:【*.user、#.user】的隊列) */ private String routingKey; /** * 是否為持久隊列(該隊列將在服務器重啟后保留下來) */ private Boolean durable = Boolean.TRUE; /** * 是否為排它隊列 */ private Boolean exclusive = Boolean.FALSE; /** * 如果隊列為空是否刪除(如果服務器在不使用隊列時是否刪除隊列) */ private Boolean autoDelete = Boolean.FALSE; /** * 頭隊列是否全部匹配 * 默認:是 */ private Boolean whereAll = Boolean.TRUE; /** * 參數 */ private Map<String, Object> args; /** * 消息頭 */ private Map<String, Object> headers; } @Data public static class ExchangeConfig { /** * 交換機名 */ private String name; /** * 交換機類型 */ private ExchangeType type; /** * 自定義交換機類型 */ private String customType; /** * 交換機參數(自定義交換機) */ private Map<String, Object> arguments; } public enum ExchangeType { /** * 自定義交換機 */ CUSTOM, /** * 直連交換機(全文匹配) */ DIRECT, /** * 通配符交換機(兩種通配符:*只能匹配一個單詞,#可以匹配零個或多個) */ TOPIC, /** * 頭交換機(自定義鍵值對匹配,根據發送消息內容中的headers屬性進行匹配) */ HEADERS, /** * 扇形(廣播)交換機 (將消息轉發到所有與該交互機綁定的隊列上) */ FANOUT; } public ExchangeConfig getExchangeConfig(String name) { Map<String, ExchangeConfig> collect = exchanges.stream().collect(Collectors.toMap(e -> e.getName(), e -> e)); return collect.get(name); } /** * 動態創建交換機 * * @return */ @Bean public Object createExchange() { List<ExchangeConfig> exchanges = getExchanges(); if (!CollectionUtils.isEmpty(exchanges)) { exchanges.forEach(e -> { // 聲明交換機 Exchange exchange = null; switch (e.getType()) { case DIRECT: exchange = new DirectExchange(e.getName()); break; case TOPIC: exchange = new TopicExchange(e.getName()); break; case HEADERS: exchange = new HeadersExchange(e.getName()); break; case FANOUT: exchange = new FanoutExchange(e.getName()); break; case CUSTOM: exchange = new CustomExchange(e.getName(), e.getCustomType(), true, false, e.getArguments()); break; default: break; } // 將交換機注冊到spring bean工廠 讓spring實現交換機的管理 if (exchange != null) { SpringBeanUtils.registerBean(e.getName(), exchange); } }); } return null; } /** * 動態綁定隊列和交換機 * * @return */ @Bean public Object bindingQueueToExchange() { List<QueueConfig> queues = getQueues(); if (!CollectionUtils.isEmpty(queues)) { queues.forEach(q -> { // 創建隊列 Queue queue = new Queue(q.getName(), q.getDurable(), q.getExclusive(), q.getAutoDelete(), q.getArgs()); // 注入隊列bean SpringBeanUtils.registerBean(q.getName(), queue); // 獲取隊列綁定交換機名 List<String> exchangeNameList; if (q.getExchangeName().indexOf(StringPool.COMMA) != -1) { String[] split = q.getExchangeName().split(StringPool.COMMA); exchangeNameList = Arrays.asList(split); } else { exchangeNameList = Arrays.asList(q.getExchangeName()); } exchangeNameList.forEach(name -> { // 獲取交換機配置參數 ExchangeConfig exchangeConfig = getExchangeConfig(name); Binding binding = bindingBuilder(queue, q, exchangeConfig); // 將綁定關系注冊到spring bean工廠 讓spring實現綁定關系的管理 if (binding != null) { log.debug("queue [{}] binding exchange [{}] success!", q.getName(), exchangeConfig.getName()); SpringBeanUtils.registerBean(q.getName() + StringPool.DASH + name, binding); } }); }); } return null; } public Binding bindingBuilder(Queue queue, QueueConfig q, ExchangeConfig exchangeConfig) { // 聲明綁定關系 Binding binding = null; // 根據不同的交換機模式 獲取不同的交換機對象(注意:剛才注冊時使用的是父類Exchange,這里獲取的時候將類型獲取成相應的子類)生成不同的綁定規則 switch (exchangeConfig.getType()) { case TOPIC: binding = BindingBuilder.bind(queue) .to(SpringBeanUtils.getBean(exchangeConfig.getName(), TopicExchange.class)) .with(q.getRoutingKey()); break; case DIRECT: binding = BindingBuilder.bind(queue) .to(SpringBeanUtils.getBean(exchangeConfig.getName(), DirectExchange.class)) .with(q.getRoutingKey()); break; case HEADERS: if (q.getWhereAll()) { binding = BindingBuilder.bind(queue) .to(SpringBeanUtils.getBean(exchangeConfig.getName(), HeadersExchange.class)) .whereAll(q.getHeaders()).match(); } else { binding = BindingBuilder.bind(queue) .to(SpringBeanUtils.getBean(exchangeConfig.getName(), HeadersExchange.class)) .whereAny(q.getHeaders()).match(); } break; case FANOUT: binding = BindingBuilder.bind(queue) .to(SpringBeanUtils.getBean(exchangeConfig.getName(), FanoutExchange.class)); break; case CUSTOM: binding = BindingBuilder.bind(queue) .to(SpringBeanUtils.getBean(exchangeConfig.getName(), CustomExchange.class)) .with(q.getRoutingKey()).noargs(); break; default: log.warn("queue [{}] config unspecified exchange!", q.getName()); break; } return binding; } }
添加rabbitMQ配置
package com.mybatis.plus.config.mq; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; /** * RabbitMQ配置 * * @author gch */ @Slf4j @Configuration @EnableConfigurationProperties(RabbitMqProperties.class) //@PropertySource(value = "classpath:application-rabbit.yml") public class RabbitConfiguration { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //設置開啟Mandatory,才能觸發回調函數,無論消息推送結果怎么樣都強制調用回調函數 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("ConfirmCallback: "+"相關數據:"+correlationData); System.out.println("ConfirmCallback: "+"確認情況:"+ack); System.out.println("ConfirmCallback: "+"原因:"+cause); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("ReturnCallback: "+"消息:"+message); System.out.println("ReturnCallback: "+"回應碼:"+replyCode); System.out.println("ReturnCallback: "+"回應信息:"+replyText); System.out.println("ReturnCallback: "+"交換機:"+exchange); System.out.println("ReturnCallback: "+"路由鍵:"+routingKey); } }); return rabbitTemplate; } }
配置文件注意兩點
1.
@EnableConfigurationProperties(RabbitMqProperties.class)
2.就是監聽需要配置文件配置:
spring: #配置rabbitMq 服務器 rabbitmq: host: 127.0.0.1 port: 5672 username: root password: root #確認消息已發送到交換機(Exchange)
#publisher-confirms: true publisher-confirm-type: correlated #確認消息已發送到隊列(Queue) publisher-returns: true
經調試:springBoot 版本 2.1.6.RELEASE 使用 publisher-confirm-type: correlated ; 2.1.6.RELEASE 版本使用 publisher-confirms: true
接下來就可以愉快的生產消息了
rabbitTemplate.convertAndSend("testDirectExchange", "testDirectRouting", JSONObject.toJSONString(logEntity));
貼個消費的方式
MessageListenerConfig
package com.mybatis.plus.config.mq; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author : JCccc * @CreateTime : 2019/9/4 * @Description : **/ @Configuration public class MessageListenerConfig { @Autowired private CachingConnectionFactory connectionFactory; @Autowired private EurekaReceiver eurekaReceiver;//消息接收處理類 @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默認是自動確認,這里改為手動確認消息 //設置一個隊列 container.setQueueNames("testDirectQueue"); //如果同時設置多個如下: 前提是隊列都是必須已經創建存在的 // container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3"); //另一種設置隊列的方法,如果使用這種情況,那么要設置多個,就使用addQueues //container.setQueues(new Queue("TestDirectQueue",true)); //container.addQueues(new Queue("TestDirectQueue2",true)); //container.addQueues(new Queue("TestDirectQueue3",true)); container.setMessageListener(eurekaReceiver); return container; } }
package com.mybatis.plus.config.mq; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.copier.CopyOptions; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.mybatis.plus.entity.Log; import com.mybatis.plus.service.ILogService; import com.mybatis.plus.utils.EurekaUtils; import com.mybatis.plus.utils.hash.ConsistentHash; import com.mybatis.plus.utils.hash.pojo.ConsistentHashNode; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; import javax.annotation.Resource; import java.util.HashMap; import java.util.List; import java.util.Map; @Component @Slf4j public class EurekaReceiver implements ChannelAwareMessageListener { @Value("${server.port}") private String port; @Autowired RabbitTemplate rabbitTemplate; @Autowired ILogService logService; @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { if ("testDirectQueue".equals(message.getMessageProperties().getConsumerQueue())) { String msg = new String(message.getBody(), "UTF-8"); Log parseObject = JSONObject.parseObject(msg, Log.class); EurekaReceiver.log.info("消費的消息來自的隊列名為:" + message.getMessageProperties().getConsumerQueue()); log.info("消息成功消費到 messageId:" + parseObject.getLogUuid() + " messageData:" + parseObject.getLogTitle() + " createTime:" + parseObject.getCreateTime()); log.info("================================"); // 收到來自主機的消息 進行一致性hash分配 發往不同的服務 // 獲取服務節點 創建一致hash環 ConsistentHash consistentHash = InitConfig.consistentHash; List<Map<String, String>> allServiceAddr = EurekaUtils.getAllServiceInfo("127.0.0.1", port, "PLUS2"); if (!allServiceAddr.isEmpty()) { for (Map<String, String> stringMap : allServiceAddr) { String instanceId = stringMap.get("routeKey"); // 新增1個物理節點和150個對應的虛擬節點 // String instanceId = stringMap.get("queueKey"); // 如果hash環中沒有該節點 才新增 ConsistentHashNode node = consistentHash.getAccurateNode(instanceId); if (null == node) { consistentHash.putNode(new ConsistentHashNode(consistentHash.getPoint(instanceId),instanceId),150); } } } else { //沒有服務提供者 將消息返回隊列 channel.basicReject(deliveryTag, true); return; } // 提取消息中的某個代表來源主機的標識 然后在hash環上分配目標節點 String logUuid = parseObject.getLogUuid(); ConsistentHashNode node = consistentHash.getNode(logUuid); log.info("主機標識:{},分配節點:{}", logUuid, node.getTarget()); //向指定路由發送消息 // todo 問題 這里怎么保證隊列預先創建初始化好 解決方案 先從配置文件獲取隊列名稱 新增服務時 需要重啟服務 rabbitTemplate.convertAndSend("centerDeliverExchange", node.getTarget().toString(), msg); // planTwo(parseObject); channel.basicAck(deliveryTag, false); //第二個參數,手動確認可以被批處理,當該參數為 true 時,則可以一次性確認 delivery_tag 小於等於傳入值的所有消息 log.info(">>>>>>>>>>>>消費消息成功!"); } } catch (Exception e) { log.info(">>>>>>>>>>>>消費消息失敗!失敗消息ID:{}, 失敗原因:{}", deliveryTag, e.getMessage()); channel.basicReject(deliveryTag, true); } } private void planTwo(Log parseObject) { //先查詢出數據庫存在的數據(如果有) String uuid = parseObject.getLogUuid(); //結合數據庫索引 防止並發插入多條重復數據 Log byId = logService.getOne((new QueryWrapper<Log>()).eq("log_uuid", uuid)); if (byId == null) { logService.save(parseObject); } else { String logOrder = byId.getLogOrder(); String logOrder1 = parseObject.getLogOrder(); //編號大於數據庫編號才更新 if (Integer.valueOf(logOrder1) > Integer.valueOf(logOrder)) { log.info("============= 新序號:{},舊序號:{}; 執行更新操作", logOrder1, logOrder); BeanUtil.copyProperties(parseObject, byId, CopyOptions.create().ignoreNullValue()); logService.updateById(byId); } } } }
EurekaReceiver 這個類有些包我就不貼了 業務那一塊替換成自己的
