ActiveMQ Topic使用示例


一、非持久的Topic

Topic 發送

public class NoPersistenceSender {
    
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
        Connection connection = connectionFactory.createConnection();
        
        connection.start();
        
        Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination topic=session.createTopic("myTopic");
        
    
        MessageProducer producer=session.createProducer(topic);
        
        
        for(int i=0 ; i<3 ; i++){
             TextMessage message=session.createTextMessage("message"+i);
             //message.setStringProperty("queue", "queue"+i);
             //message.setJMSType("1");
             producer.send(message);
        }
        session.commit();
        session.close();
        
        connection.close();
        
    }

}

Topic 接收

public class NoPersistenceRecever {
    
public static void main(String[] args) throws JMSException {
        
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        
        Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination topic=session.createTopic("myTopic");
        
        MessageConsumer  consumer = session.createConsumer(topic);
        
        Message message=consumer.receive();
        while (message !=null){
            TextMessage textMessage=(TextMessage) message;
            //System.out.println(message.getStringProperty("queue"));
            System.out.println(textMessage.getText());
            session.commit();
            message = consumer.receive(1000L);
        }        
                
        session.close();
        connection.close();
        
    }

}

 

二、持久化得Topic

Topic 發送

public class PersistenceSender {
    
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
        Connection connection = connectionFactory.createConnection();
        
        Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination topic=session.createTopic("myTopic1");
        
        MessageProducer producer=session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();
        
        for(int i=0 ; i<3 ; i++){
             TextMessage message=session.createTextMessage("message"+i);
             //message.setStringProperty("queue", "queue"+i);
             //message.setJMSType("1");
             producer.send(message);
        }
        session.commit();
        session.close();
        
        connection.close();
        
    }

}
  • 要用持久化訂閱,發送消息者要用 DeliveryMode.PERSISTENT 模式發現,在連接之前設定
  • 一定要設置完成后,再start 這個 connection

Topic 接收

public class PersistenceRecever {
    
public static void main(String[] args) throws JMSException {
        
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
        Connection connection = connectionFactory.createConnection();
        
        connection.setClientID("cc1");
        Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Topic topic=session.createTopic("myTopic1");
        
        TopicSubscriber   ts = session.createDurableSubscriber(topic, "t1");
        
        connection.start();
        
        
        Message message=ts.receive();
        while (message !=null){
            TextMessage textMessage=(TextMessage) message;
            //System.out.println(message.getStringProperty("queue"));
            System.out.println(textMessage.getText());
            session.commit();
            message = ts.receive(1000L);
        }        
                
        session.close();
        connection.close();
        
    }

}
  • 需要在連接上設置消費者id,用來識別消費者
  • 需要創建TopicSubscriber來訂閱
  • 要設置好了過后再start 這個 connection
  • 一定要先運行一次,等於向消息服務中間件注冊這個消費者,然后再運行客戶端發送信息,這個時候,無論消費者是否在線,都會接收到,不在線的話,下次連接的時候,會把沒有收過的消息都接收下來。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM