消息的 destination 分為 queue 和 topic,而消費者稱為 subscriber(訂閱者)。queue 中的消息只會發送給一個訂閱者,而 topic 的消息,會發送給每一個訂閱者。在 broker 中,處理 queue 消息和 topic 消息的邏輯是不同的。queue 先存儲消息,然后把消息分發給消費者,topic 收到消息的同時,就會分發。
Queue 中有 doMessageSend 和 iterate 方法,doMessageSend 負責接收生產者的消息,iterate 負責分發消息給消費者。Topic 中也有 doMessageSend 和 iterate 方法,doMessageSend 負責接收生產者的消息,並且分發給消費者。
queue 有持久和臨時2種類型(topic相同):
隊列默認為持久隊列,一旦創建,一直存在於broker中。而臨時隊列被創建后,在connection關閉后,broker就會刪除它。
topic 訂閱有持久和非持久2種類型:
broker 會把消息全部推送給持久訂閱,即便該訂閱者中途offline了,如果是非持久訂閱,一旦它下線,broker 不會為它保留消息,直到它上線后,開始繼續發送消息。
需要注意:假定有一個 topic,生產者向該 topic 發送一條消息,但此時該 topic 沒有任何訂閱者,則該消息不會保存,它會被刪除。
(創建持久訂閱)代碼示例:
public static void main(String[] args) { //該連接上會創建durable subscriber,需要指定唯一clientID ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.clientID=10086"); ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection(); connection.start(); ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //創建topic ActiveMQTopic destination = (ActiveMQTopic) session.createTopic("topic_zhang"); //創建持久訂閱者 TopicSubscriber consumer = session.createDurableSubscriber(destination, "subscriber_zhang"); //普通消費者,即非持久訂閱者 ActiveMQMessageConsumer consumer2 = (ActiveMQMessageConsumer) session.createConsumer(destination); }