先說下背景:上周開始給項目添加曾經沒有過的消息中間件。雖然說,一路到頭非常容易,直接google,萬事不愁可是生活遠不僅是眼前的“苟且”。首先是想使用其他項目使用過的一套對mq封裝的框架,融合進來。雖然折騰了上周六周日兩天,總算吧老框架融進項目中了,可是周一來公司和大數據哥們兒一聯調發現,收不到數據!所以沒辦法,當場使用原生那一套擼了個版本出來可是,可是,可是,俗話說得好:生命在於折騰!在上周末融合老框架的時候,我把源碼讀了遍,發現了很多很好的封裝思想,Ok,這周末總算閑了下來,我就運用這個思想,封裝一個輕量級的唄,說干就干!
主要思想
說到封裝,我想,應該主要是要盡可能減小用戶使用的復雜度,盡量少的進行配置,書寫,甚至能盡量少的引入第三發或是原生類庫。所以在這種想法之下,這套框架的精髓主要在以下幾點:
- 使用注解,減少用戶配置
- 將不同的生產者消費者的初始化方式統一
- 初次注冊生產者或者消費者的時候,進行隊列的自動注冊
- 再統一的初始化方式中,使用動態代理的方式,代理到具體的生產者或是消費者的發送接收方法
在這種模式下,我們不用過多的配置,直接建立一個接口,接口上面使用注解聲明隊列的名稱,然后使用同一的Bean進行初始化,就齊活了!
統一初始化Bean的實現
不說啥,直接上代碼:
public class RabbitMQProducerFactoryBean<T> extends RabbitMQProducerInterceptor implements FactoryBean<T> {
private Logger logger = LoggerFactory.getLogger(getClass());
private Class<?> serviceInterface;
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Value("${mq.queue.durable}")
private String durable;
@Value("${mq.queue.exclusive}")
private String exclusive;
@Value("${mq.queue.autoDelete}")
private String autoDelete;
@SuppressWarnings("unchecked")
/**
這個方法很特殊,繼承自FactoryBean,就是說管理權歸屬IoC容器。每次注冊一個隊列的時候,並且注入到具體的service中使用的時候,就會調用這個getObject方法。所以,對於使用本類初始化的bean,其類型並非本類,而是本類的屬性serviceInterface類型,因為最終getObject的結果是返回了一個動態代理,代理到了serviceInterface。
**/
@Override
public T getObject() throws Exception {
//初始化
if (getQueueName() != null) {
logger.info("指定的目標列隊名[{}],覆蓋接口定義。", getQueueName());
} else {
RPCQueueName name = serviceInterface.getAnnotation(RPCQueueName.class);
if (name == null)
throw new IllegalArgumentException("接口" + serviceInterface.getCanonicalName() + "沒有指定@RPCQueueName");
setQueueName(name.value());
}
//創建隊列
declareQueue();
logger.info("建立MQ客戶端代理接口[{}],目標隊列[{}]。", serviceInterface.getCanonicalName(), getQueueName());
return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class<?>[]{serviceInterface}, this);//動態代理到目標接口
}
private void declareQueue() {
Connection connection = rabbitConnectionFactory.createConnection();
Channel channel = connection.createChannel(true);
try {
channel.queueDeclare(getQueueName(), Boolean.valueOf(durable), Boolean.valueOf(exclusive)
, Boolean.valueOf(autoDelete), null);
logger.info("注冊隊列成功!");
} catch (IOException e) {
logger.warn("隊列注冊失敗", e);
}
}
......
}
public class RabbitMQProducerInterceptor implements InvocationHandler {
private Logger logger = LoggerFactory.getLogger(getClass());
private String queueName;
@Autowired
private AmqpTemplate amqpTemplate;
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object sendObj;
Class<?>[] parameterTypes = method.getParameterTypes();
String methodName = method.getName();
boolean isSendOneJson = Objects.nonNull(args) && args.length == 1 && (args[0] instanceof String);
if (isSendOneJson) {
sendObj = args[0];
logger.info("發送單一json字符串消息:{}", (String) sendObj);
} else {
sendObj = new RemoteInvocation(methodName, parameterTypes, args);
logger.info("發送封裝消息體:{}", JSONSerializeUtil.jsonSerializerNoType(sendObj));
}
logger.info("發送異步消息到[{}],方法名為[{}]", queueName, method.getName());
//異步方式使用,同時要告知服務端不要發送響應
amqpTemplate.convertAndSend(queueName, sendObj);
return null;
}
......
}
下面是核心的配置文件
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<beans default-lazy-init="false"
xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
<rabbit:connection-factory id="rabbitConnectionFactory"
host="${mq.host}" port="${mq.port}" virtual-host="${mq.vhost}"
username="${mq.username}" password="${mq.password}" />
<!-- 供自動創建隊列 -->
<rabbit:admin connection-factory="rabbitConnectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"/>
<!-- 創建生產者 -->
<bean id="sendMsg" class="com.example.demo.RabbitMQProducerFactoryBean">
<property name="serviceInterface" value="com.example.demo.ISendMsg" />
</bean>
</beans>
說明:每次要使用mq,直接導入這個基本配置,和基礎jar包即可。對於配置文件中的生產者聲明,已經直接簡化到三行,這一部分可以單獨創建一個類似於producer-config.xml專門的配置文件。
附屬類
這里主要就是涉及一個注解類:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RPCQueueName {
String value();
}
說明:主要用於隊列名稱的聲明。可以拓展的再建立其他的注解類,並在RabbitMQProducerFactoryBean中進行具體的邏輯實現。對於未來功能添加,起到了非常好的解耦效果。
具體的接口:
@RPCQueueName("test.demo.ISendMsg")
public interface ISendMsg {
void sendMsg(String msg);
}
說明:這樣,就聲明了個隊列名叫test.demo.ISendMsg的生產者,每次講IsendMsg注入到要發送消息的Service里面,直接調用sendMsg即可向注解聲明的隊列發送消息了。
恩,開源
寫了個springboot的小demo:
github地址
接下來我會更新消費者的封裝,今天先放一放,出去動動。。哈哈