當我們發送消息的時候,會出現發送失敗的情況,此時我們需要用到activemq為我們提供了消息重發機制,進行消息的重新發送。那么我們怎么知道消息有沒有發送失敗呢?activemq還有消息確認機制,消費者在接收到消息的時候可以進行確認。本節將確認機制和重發機制一起在原有的代碼中學習。
消息確認機制有四種:定義於在session對象中
AUTO_ACKNOWLEDGE= 1 :自動確認
CLIENT_ACKNOWLEDGE= 2:客戶端手動確認
UPS_OK_ACKNOWLEDGE= 3: 自動批量確認
SESSION_TRANSACTED= 0:事務提交並確認
但是在activemq補充了一個自定義的ACK模式:
INDIVIDUAL_ACKNOWLEDGE= 4:單條消息確認
首先在配置文件中定義重發機制ReDelivery:設置重發兩次
<!-- 定義ReDelivery(重發機制)機制 ,重發時間間隔是100毫秒,最大重發次數是3次 -->
<bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<!--是否在每次嘗試重新發送失敗后,增長這個等待時間 -->
<property name="useExponentialBackOff" value="true"/>
<!--重發次數,默認為6次-->
<property name="maximumRedeliveries" value="2"/>
<!--重發時間間隔,默認為1秒 -->
<property name="initialRedeliveryDelay" value="1000"/>
<!--第一次失敗后重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這里的2就是value -->
<property name="backOffMultiplier" value="2"/>
<!--最大傳送延遲,只在useExponentialBackOff為true時有效(V5.5),假設首次重連間隔為10ms,倍數為2,那么第二次重連時間間隔為 20ms,
第三次重連時間間隔為40ms,當重連時間間隔大的最大重連時間間隔時,以后每次重連時間間隔都為最大重連時間間隔。 -->
<property name="maximumRedeliveryDelay" value="1000"/>
</bean>
在工廠中引用重發機制:
<!--PooledConnectionFactory對session和消息producer的緩存機制而帶來的性能提升-->
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory">
<!--連接mq的連接工廠-->
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://127.0.0.1:61616</value>
</property>
<!-- 引用重發機制 -->
<property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
在監聽容器中新增配置消息的確認機制:
<!--配置 消息監聽容器-->
<bean id="jmsContainer" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="queueDestination"/>
<property name="messageListener" ref="consumerMessageListener"/>
<!--應答模式是 INDIVIDUAL_ACKNOWLEDGE-->
<!--AUTO_ACKNOWLEDGE = 1 自動確認
CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
SESSION_TRANSACTED = 0 事務提交並確認
INDIVIDUAL_ACKNOWLEDGE = 4 單條消息確認-->
<property name="sessionAcknowledgeMode" value="4"/>
</bean>
發送者無需改動,正常發送即可,我們在消費者中進行改動,在消息接收到的時候調用確認方法(上面配置了4單挑消息確認),然后再接收消息做點手腳讓其報錯不能確認使其觸發重發機制:
public class ConsumerMessageListener implements SessionAwareMessageListener<Message> {//消息確認需要session,需要實現SessionAwareMessageListener @Override public void onMessage(Message message, Session session) throws JMSException { if (message instanceof TextMessage){ String msg = ((TextMessage) message).getText(); System.out.println("------------------------------------------"); System.out.println("消費者收到的消息:" + msg); System.out.println("------------------------------------------"); try { if ("test2".equals(msg)) { throw new RuntimeException("故意拋出的異常"); } // 確認消息。只要被確認后 就會出隊,接受失敗沒有確認成功,會在原隊列里面 message.acknowledge(); } catch (Exception e) { // 此不可省略 重發信息使用,如果不寫此方法,將不會實現重發操作。失敗的消息將會一直在隊列中,因為沒有進行消息確認。 // 下次還會監聽到這條消息。效果將會是:第一次接受一個消息2。第二次接受2個,依次累加 session.recover(); } } }
發送4條消息:
//發送字符串 for (int i = 0; i < 4; i++) { service.sendMessage("test" + i); }
結果

由結果可見,test2由於拋出異常,未能進行消息確認,所有重發了兩次,共三次。
