前面我們已經搭建和配置好了ActiveMQ,下面來看一個Demo,體驗一下MQ。
JMS 消息模型
JMS消息服務應用程序結構支持兩種模型:點對點模型,發布者/訂閱者模型。
(1)點對點模型(Queue)
一個生產者向一個特定的隊列發布消息,一個消費者從這個隊列中依次讀取消息。
模型特點:只有一個消費者獲得消息。
(2)發布者/訂閱者模型(Topic)
0個或多個訂閱者可以接受特定主題的消息。
模型特點:多個消費者可獲得消息。
Topic和Queue的最大區別在於Topic是以廣播的形式,通知所有在線監聽的客戶端有新的消息,沒有監聽的客戶端將收不到消息;而Queue則是以點對點的形式通知多個處於監聽狀態的客戶端中的一個。
JMS消息格式
- MapMessage -- key-value鍵值對
- TextMessage -- 字符串對象
- ObjcetMessage -- 一個序列化的Java對象
- ByteMessage -- 一個未解釋字節的數據流
- StreamMessage -- Java原始值的數據流
點對點模型Demo
public class Constants { public static final String MQ_NAME = "parry"; public static final String MQ_PASSWORD = "parry123"; public static final String MQ_BROKETURL = "tcp://192.168.56.129:61616"; }
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import com.parry.demo.constant.Constants; /** * * <p> * MSProduct 點對點模型-消息生產者 * <p> */ public class MSProduct { public static void main(String[] args) { // 連接工廠 ConnectionFactory factory; // 連接實例 Connection connection = null; // 收發的線程實例 Session session; // 消息發送目標地址 Destination destination; // 消息創建者 MessageProducer messageProducer; try { factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL); // 獲取連接實例 connection = factory.createConnection(); // 啟動連接 connection.start(); // 創建接收或發送的線程實例(創建session的時候定義是否要啟用事務,且事務類型是Auto_ACKNOWLEDGE也就是消費者成功在Listern中獲得消息返回時,會話自動確定用戶收到消息) session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 創建隊列(返回一個消息目的地) destination = session.createQueue("parryQuene"); // 創建消息生產者 messageProducer = session.createProducer(destination); // 創建TextMessage消息實體 TextMessage message = session.createTextMessage("我是parry,這是我的第一個消息!"); messageProducer.send(message); session.commit(); } catch (JMSException e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import com.parry.demo.constant.Constants; /** * <p> * MQConsumer 點對點--消息消費者 * <p> */ public class MQConsumer { public static void main(String[] args) { // 連接工廠 ConnectionFactory connectionFactory; // 連接實例 Connection connection = null; // 收發的線程實例 Session session; // 消息發送目標地址 Destination destination; try { // 實例化連接工廠 connectionFactory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL); // 獲取連接實例 connection = connectionFactory.createConnection(); // 啟動連接 connection.start(); // 創建接收或發送的線程實例(消費者就不需要開啟事務了) session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); // 創建隊列(返回一個消息目的地) destination = session.createQueue("parryQuene"); // 創建消息消費者 MessageConsumer consumer = session.createConsumer(destination); //注冊消息監聽 consumer.setMessageListener(new MQListerner()); } catch (JMSException e) { e.printStackTrace(); } } }
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * <p> * MQListerner 生產者監聽器 * <p> */ public class MQListerner implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println(((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
發布者/訂閱者模型Demo
(1)發布者
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import com.parry.demo.constant.Constants; /** * <p> * MQProducer 訂閱消息的發送者 * <p> */ public class MQProducer { public static void main(String[] args) { // 連接工廠 ConnectionFactory factory; // 連接實例 Connection connection = null; // 收發的線程實例 Session session; // 消息發送目標地址 Destination destination; try { // 實例化連接工廠 factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL); // 獲取連接實例 connection = factory.createConnection(); // 啟動連接 connection.start(); // 創建接收或發送的線程實例(創建session的時候定義是否要啟用事務,且事務類型是Auto_ACKNOWLEDGE也就是消費者成功在Listern中獲得消息返回時,會話自動確定用戶收到消息) session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 創建隊列(返回一個消息目的地) destination = session.createTopic("parryTopic"); // 創建消息發布者 MessageProducer producer = session.createProducer(destination); // 創建TextMessage消息 TextMessage message = session.createTextMessage("你好,這是我發布的第一條消息!"); // 發布消息 producer.send(message); } catch (JMSException e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
(2)訂閱者01
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import com.parry.demo.constant.Constants; /** * <p> * MQCousumer01 訂閱-發布模式 訂閱者01 * <p> */ public class MQCousumer01 { public static void main(String[] args) { // 連接工廠 ConnectionFactory factory; // 連接實例 Connection connection = null; // 收發的線程實例 Session session; // 消息發送目標地址 Destination destination; try { // 實例化連接工廠 factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL); // 獲取連接實例 connection = factory.createConnection(); // 啟動連接 connection.start(); // 創建接收或發送的線程實例 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 創建隊列(返回一個消息目的地) destination = session.createTopic("parryTopic"); // 創建消息訂閱者 MessageConsumer consumer = session.createConsumer(destination); // 消息發布者添加監聽器 consumer.setMessageListener(new Listerner01()); } catch (JMSException e) { e.printStackTrace(); } } }
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * <p> * Listerner01 訂閱者01的監聽器 * <p> */ public class Listerner01 implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("訂閱者01接收到消息:" + ((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
(3)訂閱者02
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import com.parry.demo.constant.Constants; /** * <p> * MQCousumer02 訂閱-發布模式 訂閱者02 * <p> */ public class MQCousumer02 { public static void main(String[] args) { // 連接工廠 ConnectionFactory factory; // 連接實例 Connection connection = null; // 收發的線程實例 Session session; // 消息發送目標地址 Destination destination; try { // 實例化連接工廠 factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL); // 獲取連接實例 connection = factory.createConnection(); // 啟動連接 connection.start(); // 創建接收或發送的線程實例 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 創建隊列(返回一個消息目的地) destination = session.createTopic("parryTopic"); // 創建消息訂閱者 MessageConsumer consumer = session.createConsumer(destination); // 消息發布者添加監聽器 consumer.setMessageListener(new Listerner02()); } catch (JMSException e) { e.printStackTrace(); } } }
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * <p> * Listerner02 訂閱者02的監聽器 * <p> */ public class Listerner02 implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("訂閱者02接收到消息:"+((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }