Spring AMQP + Rabbit 配置多數據源消息隊列


一般在稍微大一點的項目中,需要配置多個數據庫數據源,最簡單的方式是用 Spring 來實現,只需要繼承 AbstractRoutingDataSource 類,實現 determineCurrentLookupKey 方法,再配合使用 ThreadLocal 就可以實現。

但是如何實現 MQ 的多數據源呢?假設有部署在不同服務器上的兩個消息隊列,或者是同一服務器,不同 vhost 的消息隊列,在一個項目中,我如何自由地選擇從哪個隊列收發消息呢?下面說說用 Spring AMQP + Rabbit 的實現過程及踩過的坑。

最開始的單數據源的實現很簡單,網上有好多博文可以參考,官網也有介紹。主要就是創建一個 xml 的配置文件,添加各種必要的配置,聲明 connection-factory、rabbitListenerContainerFactory、rabbitTemplate、queue、exchange、binding 等等。然后用 RabbitTemplate 來發消息,用 @RabbitListener 注解來監聽,用 queue 指定隊列來收消息,這里就不贅述了。主要說一下,在現有的基礎上實現多數據源的收發。

先說配置方面,為了對比,下面先給出單數據源配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
    
    <rabbit:connection-factory id="rabbitConnectionFactory" host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}" 
       requested-heartbeat="30" virtual-host="${rabbit.vhost}" channel-cache-size="50"/>

    <bean id="rabbitListenerContainerFactory"
          class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
        <property name="connectionFactory" ref="rabbitConnectionFactory"/>
        <property name="concurrentConsumers" value="16"/>
        <property name="maxConcurrentConsumers" value="50"/>
    </bean>
    
    <rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory"/>
    
    <!-- queue declare -->
    <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test"/>
    
    <!-- bind queue to exchange -->
    <rabbit:direct-exchange name="exchange" auto-delete="false" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="queue.test" key="rkey.test"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:template id="rabbitTemplate" connection-factory="rabbitConnectionFactory" retry-template="retryTemplate" reply-timeout="60000"/>

    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="backOffPolicy">
            <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                <property name="initialInterval" value="500"/>
                <property name="multiplier" value="10.0"/>
                <property name="maxInterval" value="10000"/>
            </bean>
        </property>
    </bean>
</beans>

為了實現雙數據源,查閱了很多資料,最初實現的配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
        
    
    <rabbit:connection-factory id="rabbitConnectionFactory" host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}"
            password="${rabbit.password}" requested-heartbeat="30" virtual-host="${rabbit.vhost}" channel-cache-size="50"/>
    
    <!-- 添加了一個連接工廠,參數從 properties 文件中取 -->
    <rabbit:connection-factory id="rabbitConnectionFactory1" host="${rabbit.host1}" port="${rabbit.port1}" username="${rabbit.username1}" 
       password="${rabbit.password1}" requested-heartbeat="30" virtual-host="${rabbit.vhost1}" channel-cache-size="50"/>

    
    <!-- 添加 SimpleRoutingConnectionFactory 配置,將兩個 Connection factory 配置好-->
    <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
        <property name="targetConnectionFactories">
            <map>
                <entry key="rabbitConnectionFactory" value-ref="rabbitConnectionFactory"/>
                <entry key="rabbitConnectionFactory1" value-ref="rabbitConnectionFactory1"/>
            </map>
        </property>
    </bean>
    
    <rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>

    
    <!-- 由於增加了一個連接工廠,ContainerFactory 的連接工廠改為新增的 ConnectionFactory  -->
    <bean id="rabbitListenerContainerFactory"
          class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
        <!-- <property name="connectionFactory" ref="rabbitConnectionFactory"/> -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="concurrentConsumers" value="16"/>
        <property name="maxConcurrentConsumers" value="50"/>
    </bean>

    
    <!-- queue declare,增加一個消息隊列 -->
    <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test"/>
    <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test1"/>
    
    <!-- bind queue to exchange -->
    <rabbit:direct-exchange name="exchange" auto-delete="false" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="queue.test" key="rkey.test"></rabbit:binding>
            <rabbit:binding queue="queue.test1" key="rkey.test1"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    
    <!-- connection-factory 改為新增的 ConnectionFactory -->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" retry-template="retryTemplate" reply-timeout="60000"/>

    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="backOffPolicy">
            <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                <property name="initialInterval" value="500"/>
                <property name="multiplier" value="10.0"/>
                <property name="maxInterval" value="10000"/>
            </bean>
        </property>
    </bean>
</beans>

改動都寫在注釋里了,主要就是增加了一個連接工廠的配置,其他配置做了一些相應的適配。

發消息的時候,需要指定連接工廠,也就是說,你要往哪個消息服務器發:

    @Test
    public void testSendMsg() {
        SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), "rabbitConnectionFactory");
        rabbitTemplate.convertAndSend("exchange", "rkey.test", "test");
        SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());

        SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), "rabbitConnectionFactory1");
        rabbitTemplate.convertAndSend("exchange", "rkey.test1", "test1");
        SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
    }

在發消息之前調用 SimpleResourceHolder.bind 綁定要使用的工廠,發完之后,調用 unbind 解除綁定。將上述代碼封裝為兩個工具類,更好。

然后,有一個大坑在前面。。。如何收消息?

發消息要綁定連接工廠,指明往哪個消息服務器上發,收的時候,同樣得指定要從哪個消息服務器上收。最開始沒想到這點,以為只要指定隊列名稱就可以,如下:

    @RabbitListener(queues = "queue.test")
    public void receiveMsg(Message message) {
        String msg = new String(message.getBody());
        System.out.println(msg);
    }

然並卵,報了異常:

java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
    at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:116) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:94) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:80) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:130) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:456) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1158) ~[spring-rabbit-1.5.1.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]

這個問題不好解決,查了很多資料都沒用,比如這種方式:https://stackoverflow.com/questions/42784471/spring-amqp-mix-simpleroutingconnectionfactory-with-rabbitlistener  。

無奈之下,只能試着看看 Spring 的 AMQP 怎么實現,看看有沒有解決的辦法,最開始想的是繼承 Spring 的某個類來實現。然而,看來看去,很是頭大,沒有結果。

最后無意間點到了 @RabbitListener 這個注解中,發現了有一個屬性,瞬間感覺很興奮,如下圖:

看了下注釋,這里可以指定一個 containerFactory,感覺可以試試。首先只有一個 containerFactory,那就加一個吧。為了看的比較清晰,我把第一次添加的注釋去掉了,於是配置成了這樣:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
        
    
    <rabbit:connection-factory id="rabbitConnectionFactory" host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}"
            password="${rabbit.password}" requested-heartbeat="30" virtual-host="${rabbit.vhost}" channel-cache-size="50"/>
    
    <rabbit:connection-factory id="rabbitConnectionFactory1" host="${rabbit.host1}" port="${rabbit.port1}" username="${rabbit.username1}" 
       password="${rabbit.password1}" requested-heartbeat="30" virtual-host="${rabbit.vhost1}" channel-cache-size="50"/>

    <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
        <property name="targetConnectionFactories">
            <map>
                <entry key="rabbitConnectionFactory" value-ref="rabbitConnectionFactory"/>
                <entry key="rabbitConnectionFactory1" value-ref="rabbitConnectionFactory1"/>
            </map>
        </property>
    </bean>
    
    <rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory"/>
    
    <!-- 添加一個 rabbitAdmin-->
    <rabbit:admin id="rabbitAdmin1" connection-factory="rabbitConnectionFactory1"/>

    
    <!-- 把原有的 ContainerFactory 的連接工廠改為 rabbitConnectionFactory-->
    <bean id="rabbitListenerContainerFactory"
          class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
        <property name="connectionFactory" ref="rabbitConnectionFactory"/>
        <property name="concurrentConsumers" value="16"/>
        <property name="maxConcurrentConsumers" value="50"/>
    </bean>
    
    <!-- 添加一個 ContainerFactory, 連接工廠為 rabbitConnectionFactory1-->
    <bean id="rabbitListenerContainerFactory1"
          class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
        <property name="connectionFactory" ref="rabbitConnectionFactory1"/>
        <property name="concurrentConsumers" value="16"/>
        <property name="maxConcurrentConsumers" value="50"/>
    </bean>

    <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test"/>
    <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="queue.test1"/>
    
    <rabbit:direct-exchange name="exchange" auto-delete="false" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="queue.test" key="rkey.test"></rabbit:binding>
            <rabbit:binding queue="queue.test1" key="rkey.test1"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" retry-template="retryTemplate" reply-timeout="60000"/>

    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="backOffPolicy">
            <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                <property name="initialInterval" value="500"/>
                <property name="multiplier" value="10.0"/>
                <property name="maxInterval" value="10000"/>
            </bean>
        </property>
    </bean>
</beans>

收消息的時候指定 container factory 即可:

    @RabbitListener(queues = "queue.test", containerFactory = "rabbitListenerContainerFactory")
    public void receiveMsg(Message message) {
        String msg = new String(message.getBody());
        System.out.println(msg);
    }

測試通過!

以上配置、解決辦法是嘗試過多次以后得出的,所以還是要有耐心,多嘗試。

由於在網上沒有找到解決辦法,只有自己摸索着解決,如果大家有其他解決方案,歡迎留言討論!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM