一、非持久的Topic
Topic 發送
public class NoPersistenceSender { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination topic=session.createTopic("myTopic"); MessageProducer producer=session.createProducer(topic); for(int i=0 ; i<3 ; i++){ TextMessage message=session.createTextMessage("message"+i); //message.setStringProperty("queue", "queue"+i); //message.setJMSType("1"); producer.send(message); } session.commit(); session.close(); connection.close(); } }
Topic 接收
public class NoPersistenceRecever { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination topic=session.createTopic("myTopic"); MessageConsumer consumer = session.createConsumer(topic); Message message=consumer.receive(); while (message !=null){ TextMessage textMessage=(TextMessage) message; //System.out.println(message.getStringProperty("queue")); System.out.println(textMessage.getText()); session.commit(); message = consumer.receive(1000L); } session.close(); connection.close(); } }
二、持久化得Topic
Topic 發送
public class PersistenceSender { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616"); Connection connection = connectionFactory.createConnection(); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination topic=session.createTopic("myTopic1"); MessageProducer producer=session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); for(int i=0 ; i<3 ; i++){ TextMessage message=session.createTextMessage("message"+i); //message.setStringProperty("queue", "queue"+i); //message.setJMSType("1"); producer.send(message); } session.commit(); session.close(); connection.close(); } }
- 要用持久化訂閱,發送消息者要用 DeliveryMode.PERSISTENT 模式發現,在連接之前設定
- 一定要設置完成后,再start 這個 connection
Topic 接收
public class PersistenceRecever { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616"); Connection connection = connectionFactory.createConnection(); connection.setClientID("cc1"); Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic topic=session.createTopic("myTopic1"); TopicSubscriber ts = session.createDurableSubscriber(topic, "t1"); connection.start(); Message message=ts.receive(); while (message !=null){ TextMessage textMessage=(TextMessage) message; //System.out.println(message.getStringProperty("queue")); System.out.println(textMessage.getText()); session.commit(); message = ts.receive(1000L); } session.close(); connection.close(); } }
- 需要在連接上設置消費者id,用來識別消費者
- 需要創建TopicSubscriber來訂閱
- 要設置好了過后再start 這個 connection
- 一定要先運行一次,等於向消息服務中間件注冊這個消費者,然后再運行客戶端發送信息,這個時候,無論消費者是否在線,都會接收到,不在線的話,下次連接的時候,會把沒有收過的消息都接收下來。