Spring配置activemq異步消息監聽器


<!-- 創建工廠連接 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL"
            value="failover:(tcp://127.0.0.1:61616)?initialReconnectDelay=100" />
        <property name="useAsyncSend" value="false" />
        <property name="dispatchAsync" value="true" />
    </bean>
    <bean id="pooledJmsConnectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="targetConnectionFactory" />
    </bean>
    
    <bean id="queueDestinationIng" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>order.ing</value>
        </constructor-arg>
    </bean>
    
    <!-- 消息監聽器 -->
    <bean id="ingMessageListener" class="com.bypay.listener.IngListener" />

    <!-- 消息監聽容器 -->
    <bean id="jmsContainerIng"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="pooledJmsConnectionFactory" />
        <property name="destination" ref="queueDestinationIng" />
        <property name="messageListener" ref="ingMessageListener" />
        <property name="sessionTransacted" value="false"></property>
        <property name="concurrency" value="10-15"></property>
    </bean>

在上述配置中DefaultMessageListenerContainer主要的屬性列表如下:

    messageListener: 消息偵聽器,必選屬性。

 

    taskExecutor: 任務調度器,可以使用線程池開並發的消費消息。如果開發者不指定,spring將會采用默認的TaskExecutor(SimpleAsyncTaskExecutor,類似於CachedThreadPool)。

 

    concurrentConsumers: 消費者的最大個數,因為在spring中messageListener實例是單例的,比如上文中的orderListener(備注:它實現了MessageListener接口),所以spring-jms不能自作主張的創建多個messageListener實例來並發消費。所以spring在內部,創建了多個MessageConsumer實例,並使用consumer.receive()方法以阻塞的方式來獲取消息,當獲取消息后,在執行messageListener.onMessage()方法;concurrentConsumers屬性就是為了指定spring內部可以創建MessageConsumer的最大個數;當messageConsumer實例被創建后,將會封裝在一個Runner接口並交給taskExecutor來調度;如果consumer在一直沒有收到消息,則會被置為“idle”並從consumer列表中移除;如果所有的consumer都處於active狀態,則會創建新的consumer實例直到達到maxConcurrentConsumers個數上限。通常taskExecutor的線程池容量稍大於concurrentConsumer。

 

    maxMessagesPerTask: 每個consumer所消費的消息個數,因為每個consumer都會獨占一個Thread[consumer.receive()是阻塞的],當consumer消費maxMessagesPerTask個消息后,它就會退出線程,由taskExecutor重新調度。

 

    receiveTimeout: 內部的consumer在receive方法中阻塞的時間。默認為1秒。

 

    recoveryInterval:  當消息消費時,底層connection異常而無法繼續,listener需要等待恢復的時間間隔。默認為5000ms。

 

    concurrency: “concurrentConsumers”與“maxConcurrentConsumers”兩個參數的簡寫方式,格式為“5-10”,則表示concurrentConsumers為5,maxConcurrentConsumers為10。

 

    sessionTransacted: Session是否為事務類型。默認為false。

 

    messageSelector: 消息選擇器。如果你希望此listener只接受某種特性的消息,可以通過指定selector的方式來過濾消息。

 

    pubSubDomain: 此消費通道是否為Topic,默認為“false”。所有與Topic有關的屬性,只有在pubSubDomain為true的情況下生效。

 

    pubSubNoLocal: 對於Topic而言,此消費者是否消費本地消息。所謂本地消息,就是當Consumer與Producer公用底層一個Connection時,那么Producer發送的消息,相對於此Consumer就是本地消息。在pubSubDomain為true時有效。

 

    subscriptionDurable: 是否為“耐久性”訂閱者。在pubSubDomain為true時有效。默認為false。

 

    durableSubscriptionName: 耐久訂閱者名稱,每個clientId下可以有多個耐久訂閱者,但是他們必須有不同的名字。默認為className。

 

    errorHandler: 當listener.onMessage方法拋出異常時,異常該如何處理。

 

    autoStartup: 消費者是否自動啟動,默認為true,那么在messageContainer實例化后,將會啟動consumer(即調用Connection.start());如果為false,那些開發者需要在合適的時機手動啟動。

    clientId: 對於Topic訂閱者而言,此參數必備。

 

    sessionAcknowledgeMode: ACK MODE,默認為AUTO。spring-jms做了一件非常遺憾的事情,如果指定了sessionTransacted為true,那么在調用listener.onMessage()方法之后,則會立即提交事務(session.commit()),即使開發者使用了sessionAwareMessageListener,所以開發者無法實現基於事務的“批量”確認機制。如果開發者指定為CLIENT_ACK,那么spring-JMS將會在onMessage方法返回后立即調用message.acknowlege()方法,所以開發者自己是否確認以及何時確認,將沒有意義,如果不希望spring來確認消息,只能在onMessage方法中通過拋出異常的方式。 其中“1”表示AUTO_ACKNOWLEDGE,“2”為CLIENT_ACKNOWLEDGE = 2,“3”為 DUPS_OK_ACKNOWLEDGE = 3。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM