正常情況下:consumer 消費完消息后,會發送"標准確認"給 broker,這個確認對象以 MessageAck 類表征:
// 省略其他代碼。類中定義了各種確認的類型 public class MessageAck extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ACK; public static final byte DELIVERED_ACK_TYPE = 0; public static final byte STANDARD_ACK_TYPE = 2; public static final byte POSION_ACK_TYPE = 1; public static final byte REDELIVERED_ACK_TYPE = 3; public static final byte INDIVIDUAL_ACK_TYPE = 4; public static final byte UNMATCHED_ACK_TYPE = 5; public static final byte EXPIRED_ACK_TYPE = 6; protected byte ackType; // messageCount 表示確認的消息的數量,即 consumer 可以對消息進行批量確認 public MessageAck(Message message, byte ackType, int messageCount) { this.ackType = ackType; this.destination = message.getDestination(); this.lastMessageId = message.getMessageId(); this.messageCount = messageCount; } }
但是當 consumer 處理消息失敗時,會怎樣呢?例如:發生了除數為 0,拋出異常
consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { logger.info("Received: " + message); float f = 1 / 0; } });
consumer 會進行重新投遞,重新把消息給 listener 處理。具體流程是:consumer 消費消息失敗,拋出異常,回滾,然后重新投遞。
// void org.apache.activemq.ActiveMQMessageConsumer.rollback() throws JMSException if (messageListener.get() != null) { session.redispatch(this, unconsumedMessages); }
下面的代碼設置 RedeliveryPolicy:
RedeliveryPolicy queuePolicy = new RedeliveryPolicy(); queuePolicy.setInitialRedeliveryDelay(0); queuePolicy.setRedeliveryDelay(1000); queuePolicy.setUseExponentialBackOff(false); queuePolicy.setMaximumRedeliveries(1); RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap(); map.put(new ActiveMQQueue(">"), queuePolicy);
這個重新投遞策略是 consumer 端的(consumer 重新投遞給自己的消息 listener),而不是 broker 重新投遞給 consumer 的,理解這一點特別重要。超過了最大投遞次數后,consumer 會發送給 broker 一個 POSION_ACK_TYPE 類型的 MessageAck 響應,正常情況是 STANDARD_ACK_TYPE 類型的。
consumer發送正常消息確認的調用棧:
主要邏輯在:
// void org.apache.activemq.ActiveMQMessageConsumer private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { if (unconsumedMessages.isClosed()) { return; } if (messageExpired) { // 過期消息 acknowledge(md, MessageAck.EXPIRED_ACK_TYPE); stats.getExpiredMessageCount().increment(); } else { stats.onMessage(); if (session.getTransacted()) { // Do nothing. } else if (isAutoAcknowledgeEach()) { // 設置為 Session.AUTO_ACKNOWLEDGE if (deliveryingAcknowledgements.compareAndSet(false, true)) { synchronized (deliveredMessages) { if (!deliveredMessages.isEmpty()) { if (optimizeAcknowledge) { ackCounter++; // AMQ-3956 evaluate both expired and normal msgs as // otherwise consumer may get stalled if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack != null) { deliveredMessages.clear(); ackCounter = 0; session.sendAck(ack); optimizeAckTimestamp = System.currentTimeMillis(); } // AMQ-3956 - as further optimization send // ack for expired msgs when there are any. // This resets the deliveredCounter to 0 so that // we won't sent standard acks with every msg just // because the deliveredCounter just below // 0.5 * prefetch as used in ackLater() if (pendingAck != null && deliveredCounter > 0) { session.sendAck(pendingAck); pendingAck = null; deliveredCounter = 0; } } } else { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack!=null) { deliveredMessages.clear(); session.sendAck(ack); } } } } deliveryingAcknowledgements.set(false); } } else if (isAutoAcknowledgeBatch()) { ackLater(md, MessageAck.STANDARD_ACK_TYPE); } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { boolean messageUnackedByConsumer = false; synchronized (deliveredMessages) { messageUnackedByConsumer = deliveredMessages.contains(md); } if (messageUnackedByConsumer) { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } } else { throw new IllegalStateException("Invalid session state."); } } }
consumer 發送有毒消息確認的調用棧:
broker 接收消息確認的調用棧:
那么,在什么情況下,broker 會重新發送消息給 cosumer 呢?答案是:
broker 把一個消息推送給 consumer 后,但是還沒收到任何確認,如果這時消費者斷開連接,broker 會把這個消息加入到重新投遞隊列中,推送給其他的消費者。