當我們發送消息的時候,會出現發送失敗的情況,此時我們需要用到activemq為我們提供了消息重發機制,進行消息的重新發送。那么我們怎么知道消息有沒有發送失敗呢?activemq還有消息確認機制,消費者在接收到消息的時候可以進行確認。本節將確認機制和重發機制一起在原有的代碼中學習。
消息確認機制有四種:定義於在session對象中
AUTO_ACKNOWLEDGE= 1 :自動確認
CLIENT_ACKNOWLEDGE= 2:客戶端手動確認
UPS_OK_ACKNOWLEDGE= 3: 自動批量確認
SESSION_TRANSACTED= 0:事務提交並確認
但是在activemq補充了一個自定義的ACK模式:
INDIVIDUAL_ACKNOWLEDGE= 4:單條消息確認
首先在配置文件中定義重發機制ReDelivery:設置重發兩次
<!-- 定義ReDelivery(重發機制)機制 ,重發時間間隔是100毫秒,最大重發次數是3次 --> <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <!--是否在每次嘗試重新發送失敗后,增長這個等待時間 --> <property name="useExponentialBackOff" value="true"/> <!--重發次數,默認為6次--> <property name="maximumRedeliveries" value="2"/> <!--重發時間間隔,默認為1秒 --> <property name="initialRedeliveryDelay" value="1000"/> <!--第一次失敗后重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這里的2就是value --> <property name="backOffMultiplier" value="2"/> <!--最大傳送延遲,只在useExponentialBackOff為true時有效(V5.5),假設首次重連間隔為10ms,倍數為2,那么第二次重連時間間隔為 20ms, 第三次重連時間間隔為40ms,當重連時間間隔大的最大重連時間間隔時,以后每次重連時間間隔都為最大重連時間間隔。 --> <property name="maximumRedeliveryDelay" value="1000"/> </bean>
在工廠中引用重發機制:
<!--PooledConnectionFactory對session和消息producer的緩存機制而帶來的性能提升--> <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <!--連接mq的連接工廠--> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://127.0.0.1:61616</value> </property> <!-- 引用重發機制 --> <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" /> </bean> </property> <property name="maxConnections" value="100"></property> </bean>
在監聽容器中新增配置消息的確認機制:
<!--配置 消息監聽容器--> <bean id="jmsContainer" class=" org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="queueDestination"/> <property name="messageListener" ref="consumerMessageListener"/> <!--應答模式是 INDIVIDUAL_ACKNOWLEDGE--> <!--AUTO_ACKNOWLEDGE = 1 自動確認 CLIENT_ACKNOWLEDGE = 2 客戶端手動確認 DUPS_OK_ACKNOWLEDGE = 3 自動批量確認 SESSION_TRANSACTED = 0 事務提交並確認 INDIVIDUAL_ACKNOWLEDGE = 4 單條消息確認--> <property name="sessionAcknowledgeMode" value="4"/> </bean>
發送者無需改動,正常發送即可,我們在消費者中進行改動,在消息接收到的時候調用確認方法(上面配置了4單挑消息確認),然后再接收消息做點手腳讓其報錯不能確認使其觸發重發機制:
public class ConsumerMessageListener implements SessionAwareMessageListener<Message> {//消息確認需要session,需要實現SessionAwareMessageListener @Override public void onMessage(Message message, Session session) throws JMSException { if (message instanceof TextMessage){ String msg = ((TextMessage) message).getText(); System.out.println("------------------------------------------"); System.out.println("消費者收到的消息:" + msg); System.out.println("------------------------------------------"); try { if ("test2".equals(msg)) { throw new RuntimeException("故意拋出的異常"); } // 確認消息。只要被確認后 就會出隊,接受失敗沒有確認成功,會在原隊列里面 message.acknowledge(); } catch (Exception e) { // 此不可省略 重發信息使用,如果不寫此方法,將不會實現重發操作。失敗的消息將會一直在隊列中,因為沒有進行消息確認。 // 下次還會監聽到這條消息。效果將會是:第一次接受一個消息2。第二次接受2個,依次累加 session.recover(); } } }
發送4條消息:
//發送字符串 for (int i = 0; i < 4; i++) { service.sendMessage("test" + i); }
結果
由結果可見,test2由於拋出異常,未能進行消息確認,所有重發了兩次,共三次。