ActiveMQ入門實例Demo


  前面我們已經搭建和配置好了ActiveMQ,下面來看一個Demo,體驗一下MQ。

JMS 消息模型

  JMS消息服務應用程序結構支持兩種模型:點對點模型,發布者/訂閱者模型。  

  (1)點對點模型(Queue)

    一個生產者向一個特定的隊列發布消息,一個消費者從這個隊列中依次讀取消息。

    模型特點:只有一個消費者獲得消息。

  (2)發布者/訂閱者模型(Topic)

    0個或多個訂閱者可以接受特定主題的消息。

    模型特點:多個消費者可獲得消息。

    Topic和Queue的最大區別在於Topic是以廣播的形式,通知所有在線監聽的客戶端有新的消息,沒有監聽的客戶端將收不到消息;而Queue則是以點對點的形式通知多個處於監聽狀態的客戶端中的一個。

JMS消息格式

  • MapMessage -- key-value鍵值對
  • TextMessage -- 字符串對象
  • ObjcetMessage -- 一個序列化的Java對象
  • ByteMessage -- 一個未解釋字節的數據流
  • StreamMessage -- Java原始值的數據流

點對點模型Demo

public class Constants {

    public static final String MQ_NAME = "parry";
    
    public static final String MQ_PASSWORD = "parry123";
    
    public static final String MQ_BROKETURL = "tcp://192.168.56.129:61616";
}
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.parry.demo.constant.Constants;

/**
 * 
 * <p>
 * MSProduct 點對點模型-消息生產者
 * <p>
 */
public class MSProduct {

    public static void main(String[] args) {
        // 連接工廠
        ConnectionFactory factory;
        // 連接實例
        Connection connection = null;
        // 收發的線程實例
        Session session;
        // 消息發送目標地址
        Destination destination;
        // 消息創建者
        MessageProducer messageProducer;
        try {
            factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD,
                    Constants.MQ_BROKETURL);
            // 獲取連接實例
            connection = factory.createConnection();
            // 啟動連接
            connection.start();
            // 創建接收或發送的線程實例(創建session的時候定義是否要啟用事務,且事務類型是Auto_ACKNOWLEDGE也就是消費者成功在Listern中獲得消息返回時,會話自動確定用戶收到消息)
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 創建隊列(返回一個消息目的地)
            destination = session.createQueue("parryQuene");
            // 創建消息生產者
            messageProducer = session.createProducer(destination);
            // 創建TextMessage消息實體
            TextMessage message = session.createTextMessage("我是parry,這是我的第一個消息!");
            messageProducer.send(message);
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.parry.demo.constant.Constants;
/**
 * <p>
 * MQConsumer 點對點--消息消費者
 * <p>
 */
public class MQConsumer {

    public static void main(String[] args) {
        // 連接工廠
        ConnectionFactory connectionFactory;
        // 連接實例
        Connection connection = null;
        // 收發的線程實例
        Session session;
        // 消息發送目標地址
        Destination destination;
        try {
            // 實例化連接工廠
            connectionFactory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL);
            // 獲取連接實例
            connection = connectionFactory.createConnection();
            // 啟動連接
            connection.start();
            // 創建接收或發送的線程實例(消費者就不需要開啟事務了)
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            // 創建隊列(返回一個消息目的地)
            destination = session.createQueue("parryQuene");
            // 創建消息消費者
            MessageConsumer consumer = session.createConsumer(destination);
            //注冊消息監聽
            consumer.setMessageListener(new MQListerner());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
 * <p>
 * MQListerner 生產者監聽器
 * <p>
 */
public class MQListerner implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println(((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

 發布者/訂閱者模型Demo

  (1)發布者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.parry.demo.constant.Constants;

/**
 * <p>
 * MQProducer 訂閱消息的發送者
 * <p>
 */
public class MQProducer {

    public static void main(String[] args) {
        // 連接工廠
        ConnectionFactory factory;
        // 連接實例
        Connection connection = null;
        // 收發的線程實例
        Session session;
        // 消息發送目標地址
        Destination destination;

        try {
            // 實例化連接工廠
            factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL);
            // 獲取連接實例
            connection = factory.createConnection();
            // 啟動連接
            connection.start();
            // 創建接收或發送的線程實例(創建session的時候定義是否要啟用事務,且事務類型是Auto_ACKNOWLEDGE也就是消費者成功在Listern中獲得消息返回時,會話自動確定用戶收到消息)
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 創建隊列(返回一個消息目的地)
            destination = session.createTopic("parryTopic");
            // 創建消息發布者
            MessageProducer producer = session.createProducer(destination);
            // 創建TextMessage消息
            TextMessage message = session.createTextMessage("你好,這是我發布的第一條消息!");
            // 發布消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  (2)訂閱者01

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.parry.demo.constant.Constants;

/**
 * <p>
 * MQCousumer01 訂閱-發布模式 訂閱者01
 * <p>
 */
public class MQCousumer01 {

    public static void main(String[] args) {
        // 連接工廠
        ConnectionFactory factory;
        // 連接實例
        Connection connection = null;
        // 收發的線程實例
        Session session;
        // 消息發送目標地址
        Destination destination;
        try {
            // 實例化連接工廠
            factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL);
            // 獲取連接實例
            connection = factory.createConnection();
            // 啟動連接
            connection.start();
            // 創建接收或發送的線程實例
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // 創建隊列(返回一個消息目的地)
            destination = session.createTopic("parryTopic");
            // 創建消息訂閱者
            MessageConsumer consumer = session.createConsumer(destination);
            // 消息發布者添加監聽器
            consumer.setMessageListener(new Listerner01());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
 * <p>
 * Listerner01 訂閱者01的監聽器
 * <p>
 */
public class Listerner01 implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("訂閱者01接收到消息:" + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

  (3)訂閱者02

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.parry.demo.constant.Constants;

/**
 * <p>
 * MQCousumer02 訂閱-發布模式 訂閱者02
 * <p>
 */
public class MQCousumer02 {

    public static void main(String[] args) {
        // 連接工廠
        ConnectionFactory factory;
        // 連接實例
        Connection connection = null;
        // 收發的線程實例
        Session session;
        // 消息發送目標地址
        Destination destination;
        try {
            // 實例化連接工廠
            factory = new ActiveMQConnectionFactory(Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL);
            // 獲取連接實例
            connection = factory.createConnection();
            // 啟動連接
            connection.start();
            // 創建接收或發送的線程實例
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // 創建隊列(返回一個消息目的地)
            destination = session.createTopic("parryTopic");
            // 創建消息訂閱者
            MessageConsumer consumer = session.createConsumer(destination);
            // 消息發布者添加監聽器
            consumer.setMessageListener(new Listerner02());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
 * <p>
 * Listerner02 訂閱者02的監聽器
 * <p>
 */
public class Listerner02 implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("訂閱者02接收到消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM