1、配置rabbitmq
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd" > <description>rabbitmq 連接服務配置</description> <!-- 連接配置 publisherConfirms 發布的消息是否確定--> <bean id="rabbitMqConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <property name="addresses" value="${rabbitmq.addresses}"/> <property name="username" value="${rabbitmq.username}"/> <property name="password" value="${rabbitmq.password}"/> <property name="virtualHost" value="${rabbitmq.vhost}"/> <property name="channelCacheSize" value="${rabbitmq.channelCacheSize}"/> </bean> <!-- spring template聲明 --> <rabbit:template id="amqpTemplate" connection-factory="rabbitMqConnectionFactory" /> </beans>
2、工具類實現
/** * 監控rabbitmq的工具類 */ @Slf4j public class RabbitMqUtil{ private static RabbitTemplate amqpTemplate; /** * 初始化amqpTemplate */ static { amqpTemplate = SpringContextUtil.getBean(RabbitTemplate.class); Assert.notNull(amqpTemplate,"獲取不到amqpTemplate"); } /** * 查詢隊列中的消息數量 * @param exchange 交換機名字 * @param exchangeType 交換機類型 fanout 或 direct * @param quene 隊列名字 * @return */ public static BaseResultDTO getMessageCount(String exchange, String exchangeType, String quene) { Assert.hasText(exchange,"exchange不能為空"); Assert.hasText(quene,"隊列名不能為空"); Assert.hasText(exchangeType,"exchangeType不能為空"); checkInitSuccess(); BaseResultDTO baseResultDTO = new BaseResultDTO(); ConnectionFactory connectionFactory = amqpTemplate.getConnectionFactory(); // 創建連接 Connection connection = connectionFactory.createConnection(); // 創建通道 Channel channel = connection.createChannel(false); // 設置消息交換機 try { channel.exchangeDeclare(exchange, exchangeType, true, false, null); AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive(quene); //獲取隊列中的消息個數 Integer queueCount = declareOk.getMessageCount(); // 關閉通道和連接 channel.close(); connection.close(); baseResultDTO.setSuccess(true); baseResultDTO.setMessage(queueCount.toString()); } catch (IOException e) { LoggerFormatUtil.warn(e,log,"連接rabbitmq異常"); } return baseResultDTO; } /** * 校驗amqpTemplate是否已經實例化 * @return */ private static void checkInitSuccess(){ if (null == amqpTemplate){ amqpTemplate = SpringContextUtil.getBean(RabbitTemplate.class); } Assert.notNull(amqpTemplate, "amqpTemplate 實例化異常"); } }
3、spring輔助工具類獲取bean
@Component public class SpringContextUtil implements ApplicationContextAware { public static ApplicationContext applicationContext = null; public SpringContextUtil() { } @SuppressWarnings("unchecked") public static <T> T getBean(String beanName, Class<T> beanType) { Assert.isTrue(applicationContext != null, "應用上下文不能為空"); Object bean = applicationContext.getBean(beanName); return bean == null ? null : (T)bean; } /** * 按類型獲取bean * @param beanType * @param <T> * @return */ @SuppressWarnings("unchecked") public static <T> T getBean(Class<T> beanType){ Assert.isTrue(applicationContext != null, "應用上下文不能為空"); T bean = applicationContext.getBean(beanType); return bean == null ? null : bean; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringContextUtil.applicationContext = applicationContext; } }