消費者客戶端成功接收一條消息的標志是:這條消息被簽收。
消費者客戶端成功接收一條消息一般包括三個階段:
消費者客戶端成功接收一條消息一般包括三個階段:
1、消費者接收消息,也即從MessageConsumer的receive方法返回
2、消費者處理消息
3、消息被簽收
其中,第三階段的簽收可以有ActiveMQ發起,也可以由消費者客戶端發起,取決於Session是否開啟事務以及簽收模式的設置。
在帶事務的Session中,消費者客戶端事務提交之時,消息自動完成簽收。
在不帶事務的Session中,消息何時以及如何被簽收取決於Session的簽收模式設置
activemq的消息確認機制就是文檔中說的ack機制有:
AUTO_ACKNOWLEDGE = 1 自動確認
CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
SESSION_TRANSACTED = 0 事務提交並確認
INDIVIDUAL_ACKNOWLEDGE = 4 單條消息確認 activemq 獨有
ACK模式描述了Consumer與broker確認消息的方式(時機),比如當消息被Consumer接收之后,Consumer將在何時確認消息。
對於broker而言,只有接收到ACK指令,才會認為消息被正確的接收或者處理成功了,通過ACK,可以在consumer(/producer)
與Broker之間建立一種簡單的“擔保”機制.
手動確認和單條消息確認需要手動的在客戶端調用message.acknowledge()
消息重發機制RedeliveryPolicy 有幾個屬性如下:
RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy(); //是否在每次嘗試重新發送失敗后,增長這個等待時間 redeliveryPolicy.setUseExponentialBackOff(true); //重發次數,默認為6次 這里設置為10次 redeliveryPolicy.setMaximumRedeliveries(10); //重發時間間隔,默認為1秒 redeliveryPolicy.setInitialRedeliveryDelay(1); //第一次失敗后重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這里的2就是value redeliveryPolicy.setBackOffMultiplier(2); //是否避免消息碰撞 redeliveryPolicy.setUseCollisionAvoidance(false); //設置重發最大拖延時間-1 表示沒有拖延只有UseExponentialBackOff(true)為true時生效 redeliveryPolicy.setMaximumRedeliveryDelay(-1);
那么在整合activemq時候就只需要修改配置文件和客戶端就可以了,activemq就是這種機制,例如支付寶支付回調的時候,只有我們返回一個success,支付那邊才不會給我重發消息
配置文件:
import javax.jms.Queue; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.core.JmsTemplate; @EnableJms @Configuration public class ActiveMQ4Config { @Bean public Queue queue(){ return new ActiveMQQueue("queue1"); } @Bean public RedeliveryPolicy redeliveryPolicy(){ RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy(); //是否在每次嘗試重新發送失敗后,增長這個等待時間 redeliveryPolicy.setUseExponentialBackOff(true); //重發次數,默認為6次 這里設置為10次 redeliveryPolicy.setMaximumRedeliveries(10); //重發時間間隔,默認為1秒 redeliveryPolicy.setInitialRedeliveryDelay(1); //第一次失敗后重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這里的2就是value redeliveryPolicy.setBackOffMultiplier(2); //是否避免消息碰撞 redeliveryPolicy.setUseCollisionAvoidance(false); //設置重發最大拖延時間-1 表示沒有拖延只有UseExponentialBackOff(true)為true時生效 redeliveryPolicy.setMaximumRedeliveryDelay(-1); return redeliveryPolicy; } @Bean public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${activemq.url}")String url,RedeliveryPolicy redeliveryPolicy){ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory( "admin", "admin", url); activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); return activeMQConnectionFactory; } @Bean public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,Queue queue){ JmsTemplate jmsTemplate=new JmsTemplate(); jmsTemplate.setDeliveryMode(2);//進行持久化配置 1表示非持久化,2表示持久化 jmsTemplate.setConnectionFactory(activeMQConnectionFactory); jmsTemplate.setDefaultDestination(queue); //此處可不設置默認,在發送消息時也可設置隊列 jmsTemplate.setSessionAcknowledgeMode(4);//客戶端簽收模式 return jmsTemplate; } //定義一個消息監聽器連接工廠,這里定義的是點對點模式的監聽器連接工廠 @Bean(name = "jmsQueueListener") public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(activeMQConnectionFactory); //設置連接數 factory.setConcurrency("1-10"); //重連間隔時間 factory.setRecoveryInterval(1000L); factory.setSessionAcknowledgeMode(4); return factory; } }
消費者:
import javax.jms.JMSException; import javax.jms.Session; import javax.jms.TextMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class Consumer { private final static Logger logger = LoggerFactory .getLogger(Consumer.class); @JmsListener(destination = "queue1", containerFactory = "jmsQueueListener") public void receiveQueue(final TextMessage text, Session session) throws JMSException { try { logger.debug("Consumer收到的報文為:" + text.getText()); text.acknowledge();// 使用手動簽收模式,需要手動的調用,如果不在catch中調用session.recover()消息只會在重啟服務后重發 } catch (Exception e) { session.recover();// 此不可省略 重發信息使用 } } }
由此可以知道activemq的queue消息是可以保證消息不丟失,不會被重復消費的,因為會給每個消息設置一個唯一的id,當消息發送失敗之后可以根據這個機制來進行消費,當然也是一種處理分布式事物的方法
消息中間件的模式是可以保證消息不會丟失的,持久化和自動重發,消息回簽,都可以很好的避免那種機制。消費端代碼發生異常了,可以自動重發,自動消息重發。由於之前在測試的時候足夠看官方文檔,所以理解說客戶端發生異常了,是不可以進行重發的,但是今天了解之后,發覺還是自動重發的機制,利用回簽機制進行的。