ActiveMQ的發布者/訂閱者模型入門示例
(1)下載安裝activemq,啟動activeMQ。
詳細步驟參考博客:http://www.cnblogs.com/DFX339/p/9050878.html
(2)創建maven項目,java項目或者web項目都可以。
項目源碼下載地址:https://github.com/DFX339/activeMQ_demo.git
目錄結構如下:(queue包下的是activemq隊列模型的入門示例,需要的可以參考 http://www.cnblogs.com/DFX339/p/9050950.html)
需要編寫的文件:MQProducer.java Listener01.java MQConsumer01.java Listener02.java MQConsumer02.java pom.xml
消息發布者的定義: MQProducer.java
主要步驟:
/**
* 1.創建連接工廠
* 2.創建連接實例
* 3、啟動連接
* 4、創建session創建接收或發送的線程實例(創建session的時候定義是否要啟用事務,
* 且事務類型是Auto_ACKNOWLEDGE也就是消費者成功在Listern中獲得消息返回時,會話自動確定用戶收到消息)
* 5、創建隊列(消息發送的目的地)
* 6、創建消息發送者
* 7、創建消息
* 8、發送消息
* 9、session.commit();提交千萬不要忘記了
*/
代碼如下:
package cn.dfx.activeMQ_demo.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 訂閱消息的發送消息方 * @author Administrator * */ public class MQProducer { public static void main(String[] args){ /** * 1.創建連接工廠 * 2.創建連接實例 * 3、啟動連接 * 4、創建session創建接收或發送的線程實例(創建session的時候定義是否要啟用事務, * 且事務類型是Auto_ACKNOWLEDGE也就是消費者成功在Listern中獲得消息返回時,會話自動確定用戶收到消息) * 5、創建隊列(消息發送的目的地) * 6、創建消息發送者 * 7、創建消息 * 8、發送消息 * 9、session.commit();提交千萬不要忘記了 */ ConnectionFactory connFactory = null; Connection conn = null; Session session = null; Destination destination = null; //連接參數定義: 用戶名 密碼 url String name = "system"; String password = "manager"; String url = "failover://tcp://localhost:61616"; System.out.println("消息發布者開始發布消息了……"); try{ //創建連接工廠 //這里的連接參數可以使用常量:Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL connFactory = new ActiveMQConnectionFactory(name,password,url); //通過連接工廠創建連接實例 conn = connFactory.createConnection(); //啟動連接 conn.start(); //4、創建session創建接收或發送的線程實例(創建session的時候定義是否要啟用事務,且事務類型是Auto_ACKNOWLEDGE也就是消費者成功在Listern中獲得消息返回時,會話自動確定用戶收到消息) //也可以使用:session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); //創建隊列,也就是消息發送的目的地 destination = session.createTopic("FirstTopic"); //創建消息發布者 MessageProducer messageProducer = session.createProducer(destination); //創建需要發送的消息 TextMessage textMessage = session.createTextMessage(); textMessage.setText("Hello,broadcast message NO.1!"); //發送消息 messageProducer.send(textMessage); //一定要記得這個,提交呀, session.commit(); System.out.println("消息發布者:"+textMessage.getText()); }catch(JMSException e){ e.printStackTrace(); } finally { //關閉連接 if (conn != null) { try { conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
消息訂閱者01的定義: MQConsumer01.java
主要步驟:
/**
* 1.創建連接工廠
* 2.創建連接實例
* 3、啟動連接
* 4、 創建接收或發送的線程實例(消費者就不需要開啟事務了)
* 5、創建隊列(消息發送的目的地)
* 6、創建消息接收者
* 7、注冊消息監聽
*/
代碼示例:
package cn.dfx.activeMQ_demo.topic; import java.io.IOException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 訂閱消息的消息接收方01 * @author Administrator * */ public class MQConsumer01 { public static void main(String[] args) throws IOException{ /** * 1.創建連接工廠 * 2.創建連接實例 * 3、啟動連接 * 4、 創建接收或發送的線程實例(消費者就不需要開啟事務了) * 5、創建隊列(消息發送的目的地) * 6、創建消息接收者 * 7、注冊消息監聽 */ ConnectionFactory connFactory = null; Connection conn = null; Session session = null; Destination destination = null; //創建連接工廠需要的參數 String name = "system"; String password = "manager"; String url = "failover://tcp://localhost:61616"; try{ //創建連接工廠 connFactory = new ActiveMQConnectionFactory(name,password,url); //創建連接實例 conn = connFactory.createConnection(); //啟動連接 conn.start(); //創建session(創建接收或發送的線程實例(消費者就不需要開啟事務了)) session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); //創建消息目的地(消費者從這里讀取消息) destination = session.createTopic("FirstTopic"); //創建消費者 MessageConsumer messageConsumer = session.createConsumer(destination); //消費者讀取消息,監聽消息 messageConsumer.setMessageListener(new Listener01()); System.out.println("訂閱者01已經准備好接收消息!"); // //8、程序等待接收用戶消息 // System.in.read(); // //9、關閉資源 // messageConsumer.close(); // session.close(); // conn.close(); }catch(JMSException e){ e.printStackTrace(); } } }
消息訂閱者01的監聽器的編寫: Listener01.java
package cn.dfx.activeMQ_demo.topic; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息訂閱 消息接收方的訂閱者01的監聽器 * @author Administrator * */ public class Listener01 implements MessageListener { @Override public void onMessage(Message message) { try{ System.out.println("訂閱者01接收到的消息為:"+((TextMessage) message).getText()); }catch(JMSException e){ e.printStackTrace(); } } }
消息訂閱者02的定義: MQConsumer02.java
主要步驟:
/**
* 1.創建連接工廠
* 2.創建連接實例
* 3、啟動連接
* 4、創建接收或發送的線程實例(消費者就不需要開啟事務了)
* 5、創建隊列(消息發送的目的地)
* 6、創建消息接收者
* 7、注冊消息監聽
*/
代碼示例:
package cn.dfx.activeMQ_demo.topic; import java.io.IOException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 訂閱消息的消息接收方02 * @author Administrator * */ public class MQConsumer02 { public static void main(String[] args) throws IOException{ /** * 1.創建連接工廠 * 2.創建連接實例 * 3、啟動連接 * 4、 創建接收或發送的線程實例(消費者就不需要開啟事務了) * 5、創建隊列(消息發送的目的地) * 6、創建消息接收者 * 7、注冊消息監聽 */ ConnectionFactory connFactory = null; Connection conn = null; Session session = null; Destination destination = null; //創建連接工廠需要的參數 String name = "system"; String password = "manager"; String url = "failover://tcp://localhost:61616"; try{ //創建連接工廠 connFactory = new ActiveMQConnectionFactory(name,password,url); //創建連接實例 conn = connFactory.createConnection(); //啟動連接 conn.start(); //創建session(創建接收或發送的線程實例(消費者就不需要開啟事務了)) session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); //創建消息目的地(消費者從這里讀取消息) destination = session.createTopic("FirstTopic"); //創建消費者 MessageConsumer messageConsumer = session.createConsumer(destination); //消費者讀取消息,監聽消息 messageConsumer.setMessageListener(new Listener01()); System.out.println("訂閱者02已經准備好接收消息!"); }catch(JMSException e){ e.printStackTrace(); } } }
消息訂閱者02的監聽器的編寫: Listener02.java
package cn.dfx.activeMQ_demo.topic; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息訂閱 消息接收方的訂閱者02的監聽器 * @author Administrator * */ public class Listener01 implements MessageListener { @Override public void onMessage(Message message) { try{ System.out.println("訂閱者02接收到的消息為:"+((TextMessage) message).getText()); }catch(JMSException e){ e.printStackTrace(); } } }
(3)測試步驟
1、先啟動activemq
啟動方法:找到activemq安裝目錄,進入bin目錄,雙擊activemq.bat
啟動結果:
2、啟動消息訂閱者01
啟動方法: 打開MQConsumer01.java --> 右擊 --> Run as --> java Application
啟動結果:
3、啟動消息訂閱者02
啟動方法: 打開MQConsumer02.java --> 右擊 --> Run as --> java Application
啟動結果:
4、啟動消息發布者
啟動方法: 打開MQProducer.java --> 右擊 --> Run as --> java Application
啟動結果:(訂閱者會自動讀取消息,可以切換控制台查詢讀取情況)
訂閱者01讀取消息后:
訂閱者02讀取消息后: