RabbitMQ封裝實戰


先說下背景:上周開始給項目添加曾經沒有過的消息中間件。雖然說,一路到頭非常容易,直接google,萬事不愁可是生活遠不僅是眼前的“苟且”。首先是想使用其他項目使用過的一套對mq封裝的框架,融合進來。雖然折騰了上周六周日兩天,總算吧老框架融進項目中了,可是周一來公司和大數據哥們兒一聯調發現,收不到數據!所以沒辦法,當場使用原生那一套擼了個版本出來可是,可是,可是,俗話說得好:生命在於折騰!在上周末融合老框架的時候,我把源碼讀了遍,發現了很多很好的封裝思想,Ok,這周末總算閑了下來,我就運用這個思想,封裝一個輕量級的唄,說干就干!

主要思想

說到封裝,我想,應該主要是要盡可能減小用戶使用的復雜度,盡量少的進行配置,書寫,甚至能盡量少的引入第三發或是原生類庫。所以在這種想法之下,這套框架的精髓主要在以下幾點:

  • 使用注解,減少用戶配置
  • 將不同的生產者消費者的初始化方式統一
  • 初次注冊生產者或者消費者的時候,進行隊列的自動注冊
  • 再統一的初始化方式中,使用動態代理的方式,代理到具體的生產者或是消費者的發送接收方法

在這種模式下,我們不用過多的配置,直接建立一個接口,接口上面使用注解聲明隊列的名稱,然后使用同一的Bean進行初始化,就齊活了!

統一初始化Bean的實現

不說啥,直接上代碼:


public class RabbitMQProducerFactoryBean<T> extends RabbitMQProducerInterceptor implements FactoryBean<T> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private Class<?> serviceInterface;

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Value("${mq.queue.durable}")
    private String durable;

    @Value("${mq.queue.exclusive}")
    private String exclusive;

    @Value("${mq.queue.autoDelete}")
    private String autoDelete;

    @SuppressWarnings("unchecked")

    /**
    這個方法很特殊,繼承自FactoryBean,就是說管理權歸屬IoC容器。每次注冊一個隊列的時候,並且注入到具體的service中使用的時候,就會調用這個getObject方法。所以,對於使用本類初始化的bean,其類型並非本類,而是本類的屬性serviceInterface類型,因為最終getObject的結果是返回了一個動態代理,代理到了serviceInterface。
    **/
    @Override
    public T getObject() throws Exception {

        //初始化
        if (getQueueName() != null) {
            logger.info("指定的目標列隊名[{}],覆蓋接口定義。", getQueueName());
        } else {
            RPCQueueName name = serviceInterface.getAnnotation(RPCQueueName.class);
            if (name == null)
                throw new IllegalArgumentException("接口" + serviceInterface.getCanonicalName() + "沒有指定@RPCQueueName");
            setQueueName(name.value());
        }
        //創建隊列
        declareQueue();
        logger.info("建立MQ客戶端代理接口[{}],目標隊列[{}]。", serviceInterface.getCanonicalName(), getQueueName());

        return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class<?>[]{serviceInterface}, this);//動態代理到目標接口
    }

    private void declareQueue() {
        Connection connection = rabbitConnectionFactory.createConnection();
        Channel channel = connection.createChannel(true);
        try {
            channel.queueDeclare(getQueueName(), Boolean.valueOf(durable), Boolean.valueOf(exclusive)
                    , Boolean.valueOf(autoDelete), null);
            logger.info("注冊隊列成功!");
        } catch (IOException e) {
            logger.warn("隊列注冊失敗", e);
        }
    }
......

}


public class RabbitMQProducerInterceptor implements InvocationHandler {



    private Logger logger = LoggerFactory.getLogger(getClass());


    private String queueName;

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Object sendObj;
        Class<?>[] parameterTypes = method.getParameterTypes();
        String methodName = method.getName();
        boolean isSendOneJson = Objects.nonNull(args) && args.length == 1 && (args[0] instanceof String);
        if (isSendOneJson) {
            sendObj = args[0];
            logger.info("發送單一json字符串消息:{}", (String) sendObj);
        } else {
            sendObj = new RemoteInvocation(methodName, parameterTypes, args);
            logger.info("發送封裝消息體:{}", JSONSerializeUtil.jsonSerializerNoType(sendObj));
        }


        logger.info("發送異步消息到[{}],方法名為[{}]", queueName, method.getName());
        //異步方式使用,同時要告知服務端不要發送響應
        amqpTemplate.convertAndSend(queueName, sendObj);
        return null;

    }

    ......
}

下面是核心的配置文件


<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<beans default-lazy-init="false"
    xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:p="http://www.springframework.org/schema/p" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd
        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">

    <rabbit:connection-factory id="rabbitConnectionFactory"
        host="${mq.host}" port="${mq.port}" virtual-host="${mq.vhost}"
        username="${mq.username}" password="${mq.password}" />

    <!-- 供自動創建隊列 -->
    <rabbit:admin connection-factory="rabbitConnectionFactory" />

    <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"/>

    <!-- 創建生產者 -->
    <bean id="sendMsg" class="com.example.demo.RabbitMQProducerFactoryBean">
        <property name="serviceInterface" value="com.example.demo.ISendMsg" />
    </bean>


</beans>

說明:每次要使用mq,直接導入這個基本配置,和基礎jar包即可。對於配置文件中的生產者聲明,已經直接簡化到三行,這一部分可以單獨創建一個類似於producer-config.xml專門的配置文件。

附屬類

這里主要就是涉及一個注解類:

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RPCQueueName {

    String value();
}

說明:主要用於隊列名稱的聲明。可以拓展的再建立其他的注解類,並在RabbitMQProducerFactoryBean中進行具體的邏輯實現。對於未來功能添加,起到了非常好的解耦效果。

具體的接口:

@RPCQueueName("test.demo.ISendMsg")
public interface ISendMsg {

    void sendMsg(String msg);
}

說明:這樣,就聲明了個隊列名叫test.demo.ISendMsg的生產者,每次講IsendMsg注入到要發送消息的Service里面,直接調用sendMsg即可向注解聲明的隊列發送消息了。

恩,開源

寫了個springboot的小demo:
github地址

接下來我會更新消費者的封裝,今天先放一放,出去動動。。哈哈


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM