生產者發送消息:producer ---------> broker
broker返回確認:broker ---------> producer
生產者發送同步消息,broker會返回Response;發送異步消息,broker不會返回確認;滿足一定條件時,broker會返回ProducerAck:
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode();
broker 分發消息:broker ---------> consumer
消費者返回確認: consumer ---------> broker
如果消息被正常處理掉,consumer返回 STANDARD_ACK_TYPE 的 MessageAck,如果消息沒有被正常處理,且超過了客戶端重新投遞次數,consumer則返回 POSION_ACK_TYPE 的 MessageAck。在收到 MessageAck 后,broker 才會刪除消息。
通常我們使用 ActiveMQ,會這樣創建 Session,設置為自動確認:
ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
假定存在隊列 TEST.FOO,它有1個消費者consumer1,1 個生產者producer1,當producer1向隊列發送1條消息,broker 把這條消息分發給consumer1,如果配置自動確認,consumer進程會自動發送確認,broker收到確認后會刪除消息。
反之如果配置為CLIENT_ACKNOWLEDGE,則需要手動確認,即顯式調用代碼:
consumer.acknowledge();
如果consumer1收到消息后,並不調用acknowledge(),即不發送消息確認,broker 也一直會保存消息。
client 和 broker 之間所有消息都繼承自 BaseCommand:例如 ActiveMQTextMessage,ConnectionInfo,KeepAliveInfo,BrokerInfo 等。