ActiveMQ學習教程(二)——簡單示例
一。應用IDEA構建Maven項目
File-》New-》Module...-》Maven-》勾選-》選擇
-》Next -》
GroupId:com.jd.myMaven | ArtifactId:activeMQ | version:默認 -》Finish
項目構建成功!項目結構如下所示:
二。創建生產者類,模擬生產者發消息
Step1:java/activemq/JMSProducer.java
package activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.*; import java.util.Map; /** * 消息生產者 */ public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默認的連接用戶名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默認的連接密碼 private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默認的連接地址 private static final int SENDNUM = 2;//發送的消息數量 public static void main(String[] args) { ConnectionFactory connectionFactory = null;//連接工廠 Connection connection = null;//連接 Session session = null;//會話,接受或者發送消息的線程 Destination destination = null;//消息的目的地 MessageProducer messageProducer = null;//消息的生產者 //實例化連接工廠(指定連接用戶名|密碼|連接地址) connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKERURL); try { connection = connectionFactory.createConnection();//通過連接工廠獲取連接 connection.start();//啟動連接 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//創建session destination = session.createQueue("TestQueue");//創建消息隊列 messageProducer = session.createProducer(destination);//創建消息生產者 sendMessage(session, messageProducer);//發送消息 session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } //發送消息 private static void sendMessage(Session session, MessageProducer messageProducer) { try { //創建消息Map<key,value> MapMessage message = session.createMapMessage(); message.setString("userName", "syf"); message.setInt("age", 30); message.setDouble("salary", 1000); message.setBoolean("isGirl", true); System.out.println("Sending:" + ((ActiveMQMapMessage)message).getContentMap()); //發送消息 messageProducer.send(message); } catch (JMSException e) { e.printStackTrace(); } } }
Step2:啟動ActiveMQ,運行生產者類,模擬生產消息
控制台顯示:
Sending:{isGirl=true, userName=syf, salary=1000.0, age=30}
打開瀏覽器,訪問activeMQ監控畫面 http://127.0.0.1:8161/admin
【1】Queues
【2】Topics
三。模擬消費者消耗數據
package activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.*; /** * 消息消費者 */ public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默認的連接用戶名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默認的連接密碼 private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默認的連接地址 public static void main(String[] args) { ConnectionFactory connectionFactory = null;//連接工廠 Connection connection = null;//連接 Session session = null;//會話,接受或者發送消息的線程 Destination destination = null;//消息的目的地 MessageConsumer messageConsumer = null;//消息的消費者 //實例化連接工廠(指定連接用戶名|密碼|連接地址) connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKERURL); try { connection = connectionFactory.createConnection();//通過連接工廠獲取連接 connection.start();//啟動連接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//創建session destination = session.createQueue("TestQueue");//創建連接的消息隊列(TestQueue:生產者隊列名) messageConsumer = session.createConsumer(destination);//創建消息消費者 while (true) { MapMessage mapMessage= (MapMessage) messageConsumer.receive(10000); //TextMessage textMessage= (TextMessage) messageConsumer.receive(10000);//10秒接收 if (mapMessage != null) { System.out.println("收到的消息:" + ((ActiveMQMapMessage)mapMessage).getContentMap()); } else { System.out.println("沒有消息:"); } } } catch (JMSException e) { e.printStackTrace(); } } }
運行代碼:
控制台顯示:收到的消息:{userName=syf, salary=1000.0, isGirl=true, age=30}
打開瀏覽器,查看activeMQ監控畫面 http://127.0.0.1:8161/admin
【1】Queues(1條消息被消費)
【2】Topics
示例-----企業應用最常見方式之監聽器監聽方式
增加監聽類JMSListener
package activemq; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; /** * 消息監聽 * */ public class JMSListener implements MessageListener{ public void onMessage(Message message) { try { System.out.println("receive Message:"+((ActiveMQMapMessage)message).getContentMap()); } catch (JMSException e) { e.printStackTrace(); } } }
重新模擬一個消費者,應用監聽器監聽消息
package activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.*; public class JMSConsumerByListener { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默認的連接用戶名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默認的連接密碼 private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默認的連接地址 public static void main(String[] args) { ConnectionFactory connectionFactory = null;//連接工廠 Connection connection = null;//連接 Session session = null;//會話,接受或者發送消息的線程 Destination destination = null;//消息的目的地 MessageConsumer messageConsumer = null;//消息的消費者 //實例化連接工廠(指定連接用戶名|密碼|連接地址) connectionFactory = new ActiveMQConnectionFactory(JMSConsumerByListener.USERNAME, JMSConsumerByListener.PASSWORD, JMSConsumerByListener.BROKERURL); try { connection = connectionFactory.createConnection();//通過連接工廠獲取連接 connection.start();//啟動連接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//創建session destination = session.createQueue("TestQueue2");//創建連接的消息隊列(TestQueue:生產者隊列名) messageConsumer = session.createConsumer(destination);//創建消息消費者 messageConsumer.setMessageListener(new JMSListener());//注冊消息監聽 } catch (JMSException e) { e.printStackTrace(); } } }
=》先執行生成者,生產消息!
控制台打印:Sending:{isGirl=true, userName=kaixin, salary=1000.0, age=30}
ActiveMQ監控畫面顯示:
=》再執行消費者,消費消息!
控制台打印:receive Message:{userName=kaixin, salary=1000.0, isGirl=true, age=30}
ActiveMQ監控畫面顯示:
OK!!!大功告成!
參考文章:
https://www.toutiao.com/a6345805464718409986/?tt_from=weixin&utm_campaign=client_share&app=news_article&utm_source=weixin&iid=18292470304&utm_medium=toutiao_android&wxshare_count=1
http://blog.csdn.net/xh16319/article/details/12142249
理論博客:http://www.cnblogs.com/Survivalist/p/8094069.html