非持久的Topic消息示例
對於非持久的Topic消息的發送基本跟前面發送隊列信息是一樣的,只是把創建Destination的地方,由創建隊列替換成創建Topic,例如:
Destination destination = session.createTopic("MyTopic");
對於非持久的Topic消息的接收
1:必須要接收方在線,然后客戶端再發送信息,接收方才能接收到消息
2:同樣把創建Destination的地方,由創建隊列替換成創建Topic,例如:
Destination destination = session.createTopic("MyTopic");
3:由於不知道客戶端發送多少信息,因此改成while循環的方式了,例如:
Message message = consumer.receive(); while(message!=null) { TextMessage txtMsg = (TextMessage)message; System.out.println("收到消 息:" + txtMsg.getText()); message = consumer.receive(1000L); }
生產者代碼:
public class NoPersistenceSender { //默認連接用戶名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默認連接密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默認連接地址 private static final String BROKEURL = "tcp://192.168.0.101:61616"; //發送的消息數量 private static final int SENDNUM = 10; public static void main(String[] args) { //連接工廠 ConnectionFactory connectionFactory; //連接 Connection connection = null; //會話 接受或者發送消息的線程 Session session; //消息的目的地 Destination destination; //消息生產者 MessageProducer messageProducer; //實例化連接工廠(連接到ActiveMQ服務器) connectionFactory = new ActiveMQConnectionFactory(NoPersistenceSender.USERNAME, NoPersistenceSender.PASSWORD, NoPersistenceSender.BROKEURL); try { //通過連接工廠獲取連接 connection = connectionFactory.createConnection(); //啟動連接 connection.start(); //創建session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //創建一個名稱為MyTopic的消息隊列(生產者生成的消息放在哪) destination = session.createTopic("MyTopic"); //創建消息生產者 messageProducer = session.createProducer(destination); //發送消息 sendMessage(session, messageProducer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 發送消息 * * @param session * @param messageProducer 消息生產者 * @throws Exception */ public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception { for (int i = 0; i < NoPersistenceSender.SENDNUM; i++) { //創建一條文本消息 TextMessage message = session.createTextMessage("ActiveMQ 發送消息" + i); System.out.println("發送消息:Activemq 發送消息" + i); //通過消息生產者發出消息 messageProducer.send(message); } } }
消費者代碼:
public class NoPersistenceReceiver { //默認連接用戶名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默認連接密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默認連接地址 private static final String BROKEURL = "tcp://192.168.0.101:61616"; public static void main(String[] args) { ConnectionFactory connectionFactory;//連接工廠 Connection connection = null;//連接 Session session;//會話 接受或者發送消息的線程 Destination destination;//消息的目的地 MessageConsumer messageConsumer;//消息的消費者 //實例化連接工廠(連接到ActiveMQ服務器) connectionFactory = new ActiveMQConnectionFactory(NoPersistenceReceiver.USERNAME, NoPersistenceReceiver.PASSWORD, NoPersistenceReceiver.BROKEURL); try { //通過連接工廠獲取連接 connection = connectionFactory.createConnection(); //啟動連接 connection.start(); //創建session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //生產者將消息發送到MyTopic,所以消費者要到MyTopic去取 destination = session.createTopic("MyTopic"); //創建消息消費者 messageConsumer = session.createConsumer(destination); Message message = messageConsumer.receive(); while (message != null) { TextMessage txtMsg = (TextMessage) message; System.out.println("收到消息:" + txtMsg.getText()); message = messageConsumer.receive(1000L); } } catch (JMSException e) { e.printStackTrace(); } } }
如果運行生產者的時候沒有啟動消費者,也就是先運行生產者后運行消費者,那么運行效果是這樣的
消費者阻塞
查看一下控制台
隊列中有消息,但是無法消費~
在消費者運行的情況下再運行生產者
看下控制台
持久的Topic消息示例
生產者:
1:要用持久化訂閱,發送消息者要用 DeliveryMode.PERSISTENT 模式發現,在連接之前設定
2:一定要設置完成后,再start 這個 connection
public class PersistenceSender { //默認連接用戶名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默認連接密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默認連接地址 private static final String BROKEURL = "tcp://192.168.0.101:61616"; //發送的消息數量 private static final int SENDNUM = 10; public static void main(String[] args) { //連接工廠 ConnectionFactory connectionFactory; //連接 Connection connection = null; //會話 接受或者發送消息的線程 Session session; //消息的目的地 Destination destination; //消息生產者 MessageProducer messageProducer; //實例化連接工廠(連接到ActiveMQ服務器) connectionFactory = new ActiveMQConnectionFactory(PersistenceSender.USERNAME, PersistenceSender.PASSWORD, PersistenceSender.BROKEURL); try { //通過連接工廠獲取連接 connection = connectionFactory.createConnection(); //創建session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //創建一個名稱為MyTopic的消息隊列(生產者生成的消息放在哪) destination = session.createTopic("MyTopic"); //創建消息生產者 messageProducer = session.createProducer(destination); messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); //啟動連接 connection.start(); //發送消息 sendMessage(session, messageProducer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 發送消息 * * @param session * @param messageProducer 消息生產者 * @throws Exception */ public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception { for (int i = 0; i < PersistenceSender.SENDNUM; i++) { //創建一條文本消息 TextMessage message = session.createTextMessage("ActiveMQ 發送消息" + i); System.out.println("發送消息:Activemq 發送消息" + i); //通過消息生產者發出消息 messageProducer.send(message); } } }
消費者:
1:需要在連接上設置消費者id,用來識別消費者
2:需要創建TopicSubscriber來訂閱
3:要設置好了過后再start 這個connection
4:一定要先運行一次,等於向消息服務中間件注冊這個消費者,然后再運行客戶端發送信息,這個時候,無論消費者是否在線,都會接收到,不在線的話,下次連接的時候,會把沒有收過的消息都接收下來。
public class PersistenceReceiver { //默認連接用戶名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默認連接密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默認連接地址 private static final String BROKEURL = "tcp://192.168.0.101:61616"; public static void main(String[] args) { ConnectionFactory connectionFactory;//連接工廠 Connection connection = null;//連接 Session session;//會話 接受或者發送消息的線程 Topic topic;//消息的目的地 //實例化連接工廠(連接到ActiveMQ服務器) connectionFactory = new ActiveMQConnectionFactory(PersistenceReceiver.USERNAME, PersistenceReceiver.PASSWORD, PersistenceReceiver.BROKEURL); try { //通過連接工廠獲取連接 connection = connectionFactory.createConnection(); connection.setClientID("winner_0715"); //創建session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //生產者將消息發送到MyTopic,所以消費者要到MyTopic去取 topic = session.createTopic("MyTopic"); //創建消息消費者 TopicSubscriber consumer = session.createDurableSubscriber(topic, "t1"); //啟動連接 connection.start(); Message message = consumer.receive(); while (message != null) { TextMessage txtMsg = (TextMessage) message; System.out.println("收到消 息:" + txtMsg.getText()); //沒這句有錯 message = consumer.receive(1000L); } session.commit(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
控制台:
關於持久化和非持久化消息
持久化消息
這是 ActiveMQ 的默認傳送模式,此模式保證這些消息只被傳送一次和成 功使用一次。對於這些消息,可靠性是優先考慮的因素。可靠性的另一個重要方面是確保持久性消息傳送至目標后,消息服務在向消費者傳送它們之前不會丟失這些消息。這意味着在持久性消息傳送至目標時,消息服務將其放入持久性數據存儲。如果消息服務由於某種原因導致失敗,它可以恢復此消息並將此消息傳送至相應的消費者。雖然這樣增加了消息傳送的開銷,但卻增加了可靠性。
非持久化消息
保證這些消息最多被傳送一次。對於這些消息,可靠性並非主要的考慮因素。 此模式並不要求持久性的數據存儲,也不保證消息服務由於某種原因導致失敗后消息不會丟失。 有兩種方法指定傳送模式:
1.使用setDeliveryMode 方法,這樣所有的消息都采用此傳送模式; 如:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
2.使用send 方法為每一條消息設置傳送模式