RabbitMQ持久化編碼注意事項


以Java語言,MQ客戶端為amqp-client作為示例

1、基本原則

  direct模式,由生產者聲明隊列名,消費者也聲明隊列名

  topic模式,由生產者聲明交換器名,由消費者聲明隊列名+交換器名+綁定關系

  即生產者只負責生產消息,至於消息要投遞到哪里由消費者指定

2、隊列、交換器、消息的持久化配置

    隊列聲明持久化  

public void queueDeclare(String queue) {
        try {
            if (conn == null) {
                conn = connectionFactory.newConnection();
            }
            Channel channel = conn.createChannel();

            // 聲明隊列,如果隊列不存在則創建之
            boolean durable = true;
            boolean exclusive = false;
            boolean autoDelete = false;
            Map<String, Object> arguments = null;
            channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);

            channel.close();
        } catch (IOException e) {
            logger.error("IOException:", e);
        } catch (TimeoutException e) {
            logger.error("TimeoutException:", e);
        }
    }

 

    交換器聲明持久化

         // 聲明topic交換器
    public void topicExchangeDeclare(String exchange) {
        String type = "topic";
        boolean durable = true;
        exchangeDeclare(exchange, type, durable);
    }

    private void exchangeDeclare(String exchange, String type, boolean durable) {
        try {
            if (conn == null) {
                conn = connectionFactory.newConnection();
            }
            Channel channel = conn.createChannel();

            // 聲明交換器
            channel.exchangeDeclare(exchange, type, durable);

            channel.close();
        } catch (IOException e) {
            logger.error("IOException:", e);
        } catch (TimeoutException e) {
            logger.error("TimeoutException:", e);
        }
    }    

   

 消息發送時指定持久化

   

       // 發送消息
    public void send(String exchange, String routingKey, JSONObject json) {
        try {
            if (conn == null) {
                conn = connectionFactory.newConnection();
            }
            Channel channel = conn.createChannel();

            String msg = json.toJSONString();
            channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("utf-8"));
            channel.close();
        } catch (IOException e) {
            logger.error("IOException:", e);
        } catch (TimeoutException e) {
            logger.error("TimeoutException:", e);
        } 
 }  

 

3、網絡閃斷、RabbitMQ重啟時App的自恢復編碼

  首先,必須已經指定了隊列和交換器的持久化,否則在自恢復時,由於無法找到隊列及交換器和綁定關系會報錯

需要注意的是,RabbitMQ推薦盡量共用Connection,多個線程之間用不同的Channel 

<bean id="connectionFactory" class="com.rabbitmq.client.ConnectionFactory">
        <property name="automaticRecoveryEnabled" value="true"></property>
        <property name="host" value="${RABBITMQ.SERVER_IP}"></property>
        <property name="port" value="${RABBITMQ.SERVER_PORT}"></property>
        <property name="username" value="${RABBITMQ.USERNAME}"></property>
        <property name="password" value="${RABBITMQ.PASSWORD}"></property>
        <property name="virtualHost" value="${RABBITMQ.VIRTUAL_HOST}"></property> 
        </bean> 

設置automaticRecoveryEnabled為true

 
        
public class MQConsumer implements Runnable, Consumer {

    static Logger logger = LoggerFactory.getLogger(MQConsumer.class);

    protected Connection connection;
    protected Channel channel;

    protected String queue;
    protected ConsumerExecutor executor;// 執行器
    private MQConfig config;

    public MQConsumer(MQConfig config, String queue, ConsumerExecutor executor) {
        this.config = config;
        this.queue = queue;
        this.executor = executor;
    }

    @Override
    public void run() {
        try {
            init();
            try {
                channel.basicConsume(queue, true, this);
            } catch (IOException e) {
                logger.error("MQ消費處理失敗:", e);
            }
        } catch (Exception e) {
            logger.error("mq init() error", e);
        }
    }

    protected void init() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(config.getIp());
        factory.setPort(config.getPort());
        factory.setUsername(config.getUserName());
        factory.setPassword(config.getPassword());
        factory.setVirtualHost(config.getvHost());
        factory.setAutomaticRecoveryEnabled(true);
        
        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException {
        String msg = new String(body, "utf-8");
        logger.debug("從隊列[" + queue + "] 接收消息: " + msg);
        try {
            executor.consume(msg);
        } catch (Exception e) {
            logger.error("handleDelivery error:", e);
        }
    }

    @Override
    public void handleCancel(String consumerTag) {
        logger.info("handleCancel:" + consumerTag);
    }

    @Override
    public void handleCancelOk(String consumerTag) {
        logger.info("handleCancelOk:" + consumerTag);
    }

    @Override
    public void handleConsumeOk(String consumerTag) {
        logger.info("handleConsumeOk:" + consumerTag);
    }

    @Override
    public void handleRecoverOk(String consumerTag) {
        logger.info("handleRecoverOk:" + consumerTag);
    }

    @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException e) {
        logger.info("handleShutdownSignal:" + consumerTag);
    }
}

消費者代碼示例,只要automaticRecoveryEnabled為true,而且queue和exchange都是持久化的,能夠自動恢復,不用手工處理。

4、auto_ack問題

在auto_ack為true時,數據流是這樣的:

App從MQ取消息->刪除消息->App業務邏輯處理(包括讀寫數據庫等)->發送處理結果(如果有需要)

可以看出當App業務邏輯處理失敗時,消息已經被刪除了,很多情況下,這是不安全的,所以改為:

App從MQ取消息->App業務邏輯處理(包括讀寫數據庫等)->發送ACK刪除消息 ->發送處理結果(如果有需要)

但是由於性能問題一般出現在業務邏輯部分,如果這部分處理慢又會造成擁塞,所以要自已權衡

      try {
                channel.basicConsume(queue, true, this);
                boolean autoAck = false;
                channel.basicConsume(queue, autoAck, this);
             } catch (IOException e) {
                 logger.error("MQ消費處理失敗:", e);
             }

 

         try{
                channel.basicAck(env.getDeliveryTag(), true);
            }catch(Exception e){
                logger.error("basicAck error:", e);
            }
		 	

5、超時處理

采用MQ解耦后系統之間雖然是異步處理,但正常情況下響應速度跟同步處理接近。特殊情況下響應慢時很可能消息從發送到被處理已經過去了很長一段時間,前端極可能已經重復提交並完成了業務,所以需要加個快速失敗機制。即消息生產者將消息的創建時間帶到消息體里,消費者拿到消息后,判斷如果是已經過去了指定間隔的消息,則直接失敗返回。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM