springboot整合activemq加入會簽,自動重發機制,持久化


消費者客戶端成功接收一條消息的標志是:這條消息被簽收。
消費者客戶端成功接收一條消息一般包括三個階段:

         1、消費者接收消息,也即從MessageConsumer的receive方法返回

         2、消費者處理消息

         3、消息被簽收

        其中,第三階段的簽收可以有ActiveMQ發起,也可以由消費者客戶端發起,取決於Session是否開啟事務以及簽收模式的設置。

        在帶事務的Session中,消費者客戶端事務提交之時,消息自動完成簽收。

        在不帶事務的Session中,消息何時以及如何被簽收取決於Session的簽收模式設置
 
activemq的消息確認機制就是文檔中說的ack機制有:
    AUTO_ACKNOWLEDGE = 1    自動確認
    CLIENT_ACKNOWLEDGE = 2    客戶端手動確認   
    DUPS_OK_ACKNOWLEDGE = 3    自動批量確認
    SESSION_TRANSACTED = 0    事務提交並確認
    INDIVIDUAL_ACKNOWLEDGE = 4    單條消息確認 activemq 獨有
  ACK模式描述了Consumer與broker確認消息的方式(時機),比如當消息被Consumer接收之后,Consumer將在何時確認消息。
  對於broker而言,只有接收到ACK指令,才會認為消息被正確的接收或者處理成功了,通過ACK,可以在consumer(/producer)
  與Broker之間建立一種簡單的“擔保”機制.
  手動確認和單條消息確認需要手動的在客戶端調用message.acknowledge()
  消息重發機制RedeliveryPolicy 有幾個屬性如下:
RedeliveryPolicy redeliveryPolicy= new RedeliveryPolicy();
            //是否在每次嘗試重新發送失敗后,增長這個等待時間
            redeliveryPolicy.setUseExponentialBackOff(true);
            //重發次數,默認為6次   這里設置為10次
            redeliveryPolicy.setMaximumRedeliveries(10);
            //重發時間間隔,默認為1秒
            redeliveryPolicy.setInitialRedeliveryDelay(1);
            //第一次失敗后重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這里的2就是value
            redeliveryPolicy.setBackOffMultiplier(2);
            //是否避免消息碰撞
            redeliveryPolicy.setUseCollisionAvoidance(false);
            //設置重發最大拖延時間-1 表示沒有拖延只有UseExponentialBackOff(true)為true時生效
            redeliveryPolicy.setMaximumRedeliveryDelay(-1);

那么在整合activemq時候就只需要修改配置文件和客戶端就可以了,activemq就是這種機制,例如支付寶支付回調的時候,只有我們返回一個success,支付那邊才不會給我重發消息

配置文件:

import javax.jms.Queue;
 
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
 
@EnableJms  
@Configuration  
public class ActiveMQ4Config {  
 
    @Bean
    public Queue queue(){
        return new ActiveMQQueue("queue1");
    }
 
    @Bean
    public RedeliveryPolicy redeliveryPolicy(){
            RedeliveryPolicy  redeliveryPolicy=   new RedeliveryPolicy();
            //是否在每次嘗試重新發送失敗后,增長這個等待時間
            redeliveryPolicy.setUseExponentialBackOff(true);
            //重發次數,默認為6次   這里設置為10次
            redeliveryPolicy.setMaximumRedeliveries(10);
            //重發時間間隔,默認為1秒
            redeliveryPolicy.setInitialRedeliveryDelay(1);
            //第一次失敗后重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這里的2就是value
            redeliveryPolicy.setBackOffMultiplier(2);
            //是否避免消息碰撞
            redeliveryPolicy.setUseCollisionAvoidance(false);
            //設置重發最大拖延時間-1 表示沒有拖延只有UseExponentialBackOff(true)為true時生效
            redeliveryPolicy.setMaximumRedeliveryDelay(-1);
            return redeliveryPolicy;
    }
    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory (@Value("${activemq.url}")String url,RedeliveryPolicy redeliveryPolicy){  
        ActiveMQConnectionFactory activeMQConnectionFactory =  
                new ActiveMQConnectionFactory(
                       "admin",
                        "admin",
                        url);
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        return activeMQConnectionFactory;
    }
    
    @Bean
    public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,Queue queue){
        JmsTemplate jmsTemplate=new JmsTemplate();
        jmsTemplate.setDeliveryMode(2);//進行持久化配置 1表示非持久化,2表示持久化
        jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
        jmsTemplate.setDefaultDestination(queue); //此處可不設置默認,在發送消息時也可設置隊列
        jmsTemplate.setSessionAcknowledgeMode(4);//客戶端簽收模式
        return jmsTemplate;
    }
    
    //定義一個消息監聽器連接工廠,這里定義的是點對點模式的監聽器連接工廠
    @Bean(name = "jmsQueueListener")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMQConnectionFactory);
        //設置連接數
        factory.setConcurrency("1-10");
        //重連間隔時間
        factory.setRecoveryInterval(1000L);
        factory.setSessionAcknowledgeMode(4);
        return factory;
    }
 
}

消費者:

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
 
@Component
public class Consumer {
 
    private final static Logger logger = LoggerFactory
            .getLogger(Consumer.class);
    
    @JmsListener(destination = "queue1", containerFactory = "jmsQueueListener")
    public void receiveQueue(final TextMessage text, Session session)
            throws JMSException {
        try {
            logger.debug("Consumer收到的報文為:" + text.getText());
            text.acknowledge();// 使用手動簽收模式,需要手動的調用,如果不在catch中調用session.recover()消息只會在重啟服務后重發
        } catch (Exception e) {    
            session.recover();// 此不可省略 重發信息使用
        }
    }
}

由此可以知道activemq的queue消息是可以保證消息不丟失,不會被重復消費的,因為會給每個消息設置一個唯一的id,當消息發送失敗之后可以根據這個機制來進行消費,當然也是一種處理分布式事物的方法

消息中間件的模式是可以保證消息不會丟失的,持久化和自動重發,消息回簽,都可以很好的避免那種機制。消費端代碼發生異常了,可以自動重發,自動消息重發。由於之前在測試的時候足夠看官方文檔,所以理解說客戶端發生異常了,是不可以進行重發的,但是今天了解之后,發覺還是自動重發的機制,利用回簽機制進行的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM