rabbitmq消息隊列,消息發送失敗,消息持久化,消費者處理失敗相關


轉:https://blog.csdn.net/u014373554/article/details/92686063

項目是使用springboot項目開發的,前是代碼實現,后面有分析發送消息失敗、消息持久化、消費者失敗處理方法和發送消息解決方法及手動確認的模式

先引入pom.xml

<!--rabbitmq-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application 配置文件

spring:
rabbitmq:
  host: IP地址
  port: 5672
  username: 用戶名
  password: 密碼

RabbitConfig配置文件
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;


/**
 Broker:它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸,
 Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
 Queue:消息的載體,每個消息都會被投到一個或多個隊列。
 Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來.
 Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
 vhost:虛擬主機,一個broker里可以有多個vhost,用作不同用戶的權限分離。
 Producer:消息生產者,就是投遞消息的程序.
 Consumer:消息消費者,就是接受消息的程序.
 Channel:消息通道,在客戶端的每個連接里,可建立多個channel.
*/
@Configuration
@Slf4j
public class RabbitConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    public static final String EXCHANGE_A = "my_mq_exchange_A";
    public static final String EXCHANGE_B = "my_mq_exchange_B";
    public static final String EXCHANGE_C = "my_mq_exchange_C";

    public static final String QUEUE_A="QUEUE_A";
    public static final String QUEUE_B="QUEUE_B";
    public static final String QUEUE_C="QUEUE_C";


    public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
    public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
    public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true); //設置發送消息失敗重試
        connectionFactory.setChannelCacheSize(100);//解決多線程發送消息

        return connectionFactory;
    }
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMandatory(true); //設置發送消息失敗重試
        return template;

    }
    //配置使用json轉遞數據
    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    /*public SimpleMessageListenerContainer messageListenerContainer(){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());

        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageHandler());
        adapter.setDefaultListenerMethod(new Jackson2JsonMessageConverter());
        return container;
    }*/

    /**
     * 針對消費者配置
     * 1. 設置交換機類型
     * 2. 將隊列綁定到交換機
     * FanoutExchange: 將消息分發到所有的綁定隊列,無 routingkey的概念
     * HeadersExchange: 通過添加屬性key - value匹配
     * DirectExchange: 按照routingkey分發到指定隊列
     * TopicExchange : 多關鍵字匹配
     * @return
     */
    @Bean
    public DirectExchange defaultExchange(){
        return new DirectExchange(EXCHANGE_A,true,false);
    }

    @Bean
    public Queue queueA(){
        return  new Queue(QUEUE_A,true);// 隊列持久化
    }

    @Bean
    public Queue queueB(){
        return  new Queue(QUEUE_B,true);// 隊列持久化
    }

    /**
     * 一個交換機可以綁定多個消息隊列,也就是消息通過一個交換機,可以分發到不同的隊列當中去。
     * @return
     */
    @Bean
    public Binding binding(){
        return BindingBuilder.bind( queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }

    @Bean
    public Binding bindingB(){
        return BindingBuilder.bind( queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }

}

生成者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;

/**
 * 生產者
 */
@Component
@Slf4j
public class ProducerMessage implements  RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback{

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public ProducerMessage(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this::confirm); //rabbitTemplate如果為單例的話,那回調就是最后設置的內容
        rabbitTemplate.setReturnCallback(this::returnedMessage);
        rabbitTemplate.setMandatory(true);
    }

    public void  sendMsg (Object content){
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A,RabbitConfig.ROUTINGKEY_A,content,correlationId);

    }

    /**
     * 消息發送到隊列中,進行消息確認
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info(" 消息確認的id: " + correlationData);
        if(ack){
            log.info("消息發送成功");
            //發送成功 刪除本地數據庫存的消息
        }else{
            log.info("消息發送失敗:id "+ correlationData +"消息發送失敗的原因"+ cause);
            // 根據本地消息的狀態為失敗,可以用定時任務去處理數據

        }
    }

    /**
     * 消息發送失敗返回監控
     * @param message
     * @param i
     * @param s
     * @param s1
     * @param s2
     */
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        log.info("returnedMessage [消息從交換機到隊列失敗]  message:"+message);

    }
}

消費者

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

/**
 * 消費者
 */

@Slf4j
@Component

public class ComsumerMessage {

    @RabbitListener(queues = RabbitConfig.QUEUE_A)
    public void handleMessage(Message message,Channel channel) throws  IOException{
        try {
            String json = new String(message.getBody());
            JSONObject jsonObject = JSONObject.fromObject(json);
            log.info("消息了【】handleMessage" +  json);
            int i = 1/0;
            //業務處理。
            /**
             * 防止重復消費,可以根據傳過來的唯一ID先判斷緩存數據中是否有數據
             * 1、有數據則不消費,直接應答處理
             * 2、緩存沒有數據,則進行消費處理數據,處理完后手動應答
             * 3、如果消息 處理異常則,可以存入數據庫中,手動處理(可以增加短信和郵件提醒功能)
             */

            //手動應答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            log.error("消費消息失敗了【】error:"+ message.getBody());
            log.error("OrderConsumer  handleMessage {} , error:",message,e);
            // 處理消息失敗,將消息重新放回隊列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
        }

    }

}

發送消息:調用生成的方法

import com.zz.blog.BlogApplicationTests;
import com.zz.blog.mq.ProducerMessage;
import net.sf.json.JSONObject;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.UUID;
public class Message extends BlogApplicationTests {
    @Autowired
    private ProducerMessage producerMessage;

    @Test
    public void sendMessage(){
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("id", UUID.randomUUID().toString());
        jsonObject.put("name","TEST");
        jsonObject.put("desc","訂單已生成");
        //防止發送消息失敗,將發送消息存入本地。

        producerMessage.sendMsg(jsonObject.toString());

    }
}

rabbitTemplate的發送消息流程是這樣的:
1 發送數據並返回(不確認rabbitmq服務器已成功接收)
2 異步的接收從rabbitmq返回的ack確認信息
3 收到ack后調用confirmCallback函數
注意:在confirmCallback中是沒有原message的,所以無法在這個函數中調用重發,confirmCallback只有一個通知的作用

在這種情況下,如果在2,3步中任何時候切斷連接,我們都無法確認數據是否真的已經成功發送出去,從而造成數據丟失的問題。

最完美的解決方案只有1種:
使用rabbitmq的事務機制。
但是在這種情況下,rabbitmq的效率極低,每秒鍾處理的message在幾百條左右。實在不可取。

基於上面的分析,我們使用一種新的方式來做到數據的不丟失。
在rabbitTemplate異步確認的基礎上
1 在本地緩存已發送的message
2 通過confirmCallback或者被確認的ack,將被確認的message從本地刪除
3 定時掃描本地的message,如果大於一定時間未被確認,則重發

當然了,這種解決方式也有一定的問題
想象這種場景,rabbitmq接收到了消息,在發送ack確認時,網絡斷了,造成客戶端沒有收到ack,重發消息。(相比於丟失消息,重發消息要好解決的多,我們可以在consumer端做到冪等)。

消息存入本地:在message 發消息的寫數據庫中。

消息應答成功,則刪除本地消息,失敗更改消息狀態,可以使用定時任務去處理。

消息持久化:

消費者: 

/**
 * 防止重復消費,可以根據傳過來的唯一ID先判斷緩存數據庫中是否有數據
 * 1、有數據則不消費,直接應答處理
 * 2、緩存沒有數據,則進行消費處理數據,處理完后手動應答
 * 3、如果消息 處理異常則,可以存入數據庫中,手動處理(可以增加短信和郵件提醒功能)
 */


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM