activemq兩種實現方式


第一種:點對點

#發布者
public
class Producer { private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER; private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD; private static final String brokerURL = "tcp://192.168.178.X:61616"; public static void main(String[] args) throws JMSException { //1.創建連接工廠類 ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL); //2.創建連接 Connection connection = factory.createConnection(); //3.啟動連接 connection.start(); //4.創建會話對象session(事務transacted為true,參數2不生效) //acknowledgeMode: Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); //5.目的地 Queue queue = session.createQueue("mq-test-01"); //7.創建發送者 MessageProducer producer = session.createProducer(queue); for(int i=1;i<=10;i++) { //6.消息對象 TextMessage message = session.createTextMessage(); message.setText("消息"+i); //8.發送消息 producer.send(message); } //9.會話提交 // session.commit(); //10.關閉連接 connection.close(); } }

 

#消費者
public
class Consumer1 { private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER; private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD; private static final String brokerURL = "tcp://192.168.178.X:61616"; public static void main(String[] args) throws JMSException { //1.創建連接工廠類 ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL); //2.創建連接 Connection connection = factory.createConnection(); //3.啟動連接 connection.start(); //4.創建會話對象session(事務transacted為true,參數2不生效) //acknowledgeMode: Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); //5.目的地 Queue queue = session.createQueue("mq-test-01"); //6.接收消息對象 MessageConsumer consumer = session.createConsumer(queue); //7.通過監聽器接收消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; //獲取消息 try { String msg = textMessage.getText(); System.out.println(msg); } catch (JMSException e) { } } }); } }

 

 

第二種: 發布者/訂閱者

啟動順序:先訂閱、再發布

#訂閱者
public
class Subscriber1 { private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER; private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD; private static final String brokerURL = "tcp://192.168.129.10:61616"; public static void main(String[] args) throws JMSException { //1.創建連接工廠類 ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL); //2.創建連接 Connection connection = factory.createConnection(); //3.啟動連接 connection.start(); //4.創建會話對象session(事務transacted為true,參數2不生效) //acknowledgeMode: Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); //5.目的地 Topic topic = session.createTopic("mq-test-02"); //6.接收消息對象 MessageConsumer consumer = session.createConsumer(topic); //7.通過監聽器接收消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; //獲取消息 try { String msg = textMessage.getText(); System.out.println(msg); } catch (JMSException e) { } } }); } }

 

#發布者
public
class Publisher { private static final String userName = ActiveMQXAConnectionFactory.DEFAULT_USER; private static final String password = ActiveMQXAConnectionFactory.DEFAULT_PASSWORD; private static final String brokerURL = "tcp://192.168.129.10:61616"; public static void main(String[] args) throws JMSException { //1.創建連接工廠類 ConnectionFactory factory = new ActiveMQXAConnectionFactory(userName, password, brokerURL); //2.創建連接 Connection connection = factory.createConnection(); //3.啟動連接 connection.start(); //4.創建會話對象session(事務transacted為true,參數2不生效) //acknowledgeMode: Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); //5.目的地 Topic topic = session.createTopic("mq-test-02"); //7.創建發送者 MessageProducer producer = session.createProducer(topic); for(int i=1;i<=10;i++) { //6.消息對象 TextMessage message = session.createTextMessage(); message.setText("消息"+i); //8.發送消息 producer.send(message); //設置自動持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); } //9.會話提交 // session.commit(); //10.關閉連接 connection.close(); } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM