學習ActiveMQ(六):JMS消息的確認與重發機制


  當我們發送消息的時候,會出現發送失敗的情況,此時我們需要用到activemq為我們提供了消息重發機制,進行消息的重新發送。那么我們怎么知道消息有沒有發送失敗呢?activemq還有消息確認機制,消費者在接收到消息的時候可以進行確認。本節將確認機制和重發機制一起在原有的代碼中學習。

消息確認機制有四種:定義於在session對象中

AUTO_ACKNOWLEDGE= 1 :自動確認

CLIENT_ACKNOWLEDGE= 2:客戶端手動確認 

UPS_OK_ACKNOWLEDGE= 3: 自動批量確認

SESSION_TRANSACTED= 0:事務提交並確認

但是在activemq補充了一個自定義的ACK模式:

INDIVIDUAL_ACKNOWLEDGE= 4:單條消息確認

首先在配置文件中定義重發機制ReDelivery:設置重發兩次

 <!-- 定義ReDelivery(重發機制)機制 ,重發時間間隔是100毫秒,最大重發次數是3次 -->
    <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
        <!--是否在每次嘗試重新發送失敗后,增長這個等待時間 -->
        <property name="useExponentialBackOff" value="true"/>
        <!--重發次數,默認為6次-->
        <property name="maximumRedeliveries" value="2"/>
        <!--重發時間間隔,默認為1秒 -->
        <property name="initialRedeliveryDelay" value="1000"/>
        <!--第一次失敗后重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這里的2就是value -->
        <property name="backOffMultiplier" value="2"/>
        <!--最大傳送延遲,只在useExponentialBackOff為true時有效(V5.5),假設首次重連間隔為10ms,倍數為2,那么第二次重連時間間隔為 20ms,
        第三次重連時間間隔為40ms,當重連時間間隔大的最大重連時間間隔時,以后每次重連時間間隔都為最大重連時間間隔。 -->
        <property name="maximumRedeliveryDelay" value="1000"/>
    </bean>

在工廠中引用重發機制:

 <!--PooledConnectionFactory對session和消息producer的緩存機制而帶來的性能提升-->
    <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
          destroy-method="stop">
        <property name="connectionFactory">
            <!--連接mq的連接工廠-->
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL">
                    <value>tcp://127.0.0.1:61616</value>
                </property>
                <!-- 引用重發機制 -->
                <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>

在監聽容器中新增配置消息的確認機制:

 <!--配置 消息監聽容器-->
    <bean id="jmsContainer" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="queueDestination"/>
        <property name="messageListener" ref="consumerMessageListener"/>
        <!--應答模式是 INDIVIDUAL_ACKNOWLEDGE-->
        <!--AUTO_ACKNOWLEDGE = 1    自動確認
        CLIENT_ACKNOWLEDGE = 2    客戶端手動確認
        DUPS_OK_ACKNOWLEDGE = 3    自動批量確認
        SESSION_TRANSACTED = 0    事務提交並確認
        INDIVIDUAL_ACKNOWLEDGE = 4    單條消息確認-->
        <property name="sessionAcknowledgeMode" value="4"/>
    </bean>

發送者無需改動,正常發送即可,我們在消費者中進行改動,在消息接收到的時候調用確認方法(上面配置了4單挑消息確認),然后再接收消息做點手腳讓其報錯不能確認使其觸發重發機制:

public class ConsumerMessageListener implements SessionAwareMessageListener<Message> {//消息確認需要session,需要實現SessionAwareMessageListener

    @Override
    public void onMessage(Message message, Session session)  throws JMSException {
        if (message instanceof TextMessage){
            String msg = ((TextMessage) message).getText();

            System.out.println("------------------------------------------");
            System.out.println("消費者收到的消息:" + msg);
            System.out.println("------------------------------------------");

            try {
                if ("test2".equals(msg)) {
                    throw new RuntimeException("故意拋出的異常");
                }
                // 確認消息。只要被確認后  就會出隊,接受失敗沒有確認成功,會在原隊列里面
                message.acknowledge();
            } catch (Exception e) {
                // 此不可省略 重發信息使用,如果不寫此方法,將不會實現重發操作。失敗的消息將會一直在隊列中,因為沒有進行消息確認。
                // 下次還會監聽到這條消息。效果將會是:第一次接受一個消息2。第二次接受2個,依次累加
                session.recover();
            }
        }
}

發送4條消息:

  //發送字符串
        for (int i = 0; i < 4; i++) {

            service.sendMessage("test" + i);
        }

結果

由結果可見,test2由於拋出異常,未能進行消息確認,所有重發了兩次,共三次。

 代碼地址:https://github.com/MrLiu1227/ActiveMQ


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM