ActiveMQ 消息的重新投遞


正常情況下:consumer 消費完消息后,會發送"標准確認"給 broker,這個確認對象以 MessageAck 類表征:

// 省略其他代碼。類中定義了各種確認的類型
public class MessageAck extends BaseCommand {
    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ACK;
    public static final byte DELIVERED_ACK_TYPE = 0;
    public static final byte STANDARD_ACK_TYPE = 2;
    public static final byte POSION_ACK_TYPE = 1;
    public static final byte REDELIVERED_ACK_TYPE = 3;
    public static final byte INDIVIDUAL_ACK_TYPE = 4;
    public static final byte UNMATCHED_ACK_TYPE = 5;
    public static final byte EXPIRED_ACK_TYPE = 6;

    protected byte ackType;
    
    // messageCount 表示確認的消息的數量,即 consumer 可以對消息進行批量確認
    public MessageAck(Message message, byte ackType, int messageCount) {
        this.ackType = ackType;
        this.destination = message.getDestination();
        this.lastMessageId = message.getMessageId();
        this.messageCount = messageCount;
    }
}

但是當 consumer 處理消息失敗時,會怎樣呢?例如:發生了除數為 0,拋出異常

consumer.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {
        logger.info("Received: " + message);
        float f = 1 / 0;
    }
});

consumer 會進行重新投遞,重新把消息給 listener 處理。具體流程是:consumer 消費消息失敗,拋出異常,回滾,然后重新投遞。

// void org.apache.activemq.ActiveMQMessageConsumer.rollback() throws JMSException
if (messageListener.get() != null) {
    session.redispatch(this, unconsumedMessages);
}

 

下面的代碼設置 RedeliveryPolicy:

RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
queuePolicy.setInitialRedeliveryDelay(0);
queuePolicy.setRedeliveryDelay(1000);
queuePolicy.setUseExponentialBackOff(false);
queuePolicy.setMaximumRedeliveries(1);
RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
map.put(new ActiveMQQueue(">"), queuePolicy);

這個重新投遞策略是 consumer 端的(consumer 重新投遞給自己的消息 listener),而不是 broker 重新投遞給 consumer 的,理解這一點特別重要。超過了最大投遞次數后,consumer 會發送給 broker 一個 POSION_ACK_TYPE 類型的 MessageAck 響應,正常情況是 STANDARD_ACK_TYPE 類型的。

 

consumer發送正常消息確認的調用棧:

主要邏輯在:

// void org.apache.activemq.ActiveMQMessageConsumer
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
    if (unconsumedMessages.isClosed()) {
        return;
    }
    if (messageExpired) {
        // 過期消息
        acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);
        stats.getExpiredMessageCount().increment();
    } else {
        stats.onMessage();
        if (session.getTransacted()) {
            // Do nothing.
        } else if (isAutoAcknowledgeEach()) {
            // 設置為 Session.AUTO_ACKNOWLEDGE
            if (deliveryingAcknowledgements.compareAndSet(false, true)) {
                synchronized (deliveredMessages) {
                    if (!deliveredMessages.isEmpty()) {
                        if (optimizeAcknowledge) {
                            ackCounter++;

                            // AMQ-3956 evaluate both expired and normal msgs as
                            // otherwise consumer may get stalled
                            if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
                                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                if (ack != null) {
                                    deliveredMessages.clear();
                                    ackCounter = 0;
                                    session.sendAck(ack);
                                    optimizeAckTimestamp = System.currentTimeMillis();
                                }
                                // AMQ-3956 - as further optimization send
                                // ack for expired msgs when there are any.
                                // This resets the deliveredCounter to 0 so that
                                // we won't sent standard acks with every msg just
                                // because the deliveredCounter just below
                                // 0.5 * prefetch as used in ackLater()
                                if (pendingAck != null && deliveredCounter > 0) {
                                    session.sendAck(pendingAck);
                                    pendingAck = null;
                                    deliveredCounter = 0;
                                }
                            }
                        } else {
                            MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                            if (ack!=null) {
                                deliveredMessages.clear();
                                session.sendAck(ack);
                            }
                        }
                    }
                }
                deliveryingAcknowledgements.set(false);
            }
        } else if (isAutoAcknowledgeBatch()) {
            ackLater(md, MessageAck.STANDARD_ACK_TYPE);
        } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
            boolean messageUnackedByConsumer = false;
            synchronized (deliveredMessages) {
                messageUnackedByConsumer = deliveredMessages.contains(md);
            }
            if (messageUnackedByConsumer) {
                ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
            }
        }
        else {
            throw new IllegalStateException("Invalid session state.");
        }
    }
}

 

consumer 發送有毒消息確認的調用棧:

 

broker 接收消息確認的調用棧:

 

那么,在什么情況下,broker 會重新發送消息給 cosumer 呢?答案是:

broker 把一個消息推送給 consumer 后,但是還沒收到任何確認,如果這時消費者斷開連接,broker 會把這個消息加入到重新投遞隊列中,推送給其他的消費者。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM