ActiveMQ學習教程/2.簡單示例


ActiveMQ學習教程(二)——簡單示例

一。應用IDEA構建Maven項目

File-》New-》Module...-》Maven-》勾選-》選擇-》Next -》

GroupId:com.jd.myMaven   |    ArtifactId:activeMQ    |    version:默認   -》Finish

項目構建成功!項目結構如下所示:

二。創建生產者類,模擬生產者發消息

Step1:java/activemq/JMSProducer.java

package activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;


import javax.jms.*;
import java.util.Map;

/**
 * 消息生產者
 */
public class JMSProducer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默認的連接用戶名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默認的連接密碼
    private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默認的連接地址
    private static final int SENDNUM = 2;//發送的消息數量

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = null;//連接工廠
        Connection connection = null;//連接
        Session session = null;//會話,接受或者發送消息的線程
        Destination destination = null;//消息的目的地
        MessageProducer messageProducer = null;//消息的生產者
        //實例化連接工廠(指定連接用戶名|密碼|連接地址)
        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKERURL);
        try {
            connection = connectionFactory.createConnection();//通過連接工廠獲取連接
            connection.start();//啟動連接
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//創建session
            destination = session.createQueue("TestQueue");//創建消息隊列
            messageProducer = session.createProducer(destination);//創建消息生產者
            sendMessage(session, messageProducer);//發送消息
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    //發送消息
    private static void sendMessage(Session session, MessageProducer messageProducer) {
        try {
            //創建消息Map<key,value>
            MapMessage message = session.createMapMessage();
            message.setString("userName", "syf");
            message.setInt("age", 30);
            message.setDouble("salary", 1000);
            message.setBoolean("isGirl", true);
            System.out.println("Sending:" + ((ActiveMQMapMessage)message).getContentMap());
            //發送消息
            messageProducer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Step2:啟動ActiveMQ,運行生產者類,模擬生產消息

控制台顯示:

Sending:{isGirl=true, userName=syf, salary=1000.0, age=30}

打開瀏覽器,訪問activeMQ監控畫面 http://127.0.0.1:8161/admin

【1】Queues

【2】Topics

三。模擬消費者消耗數據

package activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;

import javax.jms.*;

/**
 * 消息消費者
 */
public class JMSConsumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默認的連接用戶名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默認的連接密碼
    private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默認的連接地址

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = null;//連接工廠
        Connection connection = null;//連接
        Session session = null;//會話,接受或者發送消息的線程
        Destination destination = null;//消息的目的地
        MessageConsumer messageConsumer = null;//消息的消費者
        //實例化連接工廠(指定連接用戶名|密碼|連接地址)
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKERURL);
        try {
            connection = connectionFactory.createConnection();//通過連接工廠獲取連接
            connection.start();//啟動連接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//創建session
            destination = session.createQueue("TestQueue");//創建連接的消息隊列(TestQueue:生產者隊列名)
            messageConsumer = session.createConsumer(destination);//創建消息消費者
            while (true) {
                MapMessage mapMessage= (MapMessage) messageConsumer.receive(10000);
                //TextMessage textMessage= (TextMessage) messageConsumer.receive(10000);//10秒接收
                if (mapMessage != null) {
                    System.out.println("收到的消息:" + ((ActiveMQMapMessage)mapMessage).getContentMap());
                } else {
                    System.out.println("沒有消息:");
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

運行代碼:

控制台顯示:收到的消息:{userName=syf, salary=1000.0, isGirl=true, age=30}

打開瀏覽器,查看activeMQ監控畫面 http://127.0.0.1:8161/admin

【1】Queues(1條消息被消費)

【2】Topics

 

示例-----企業應用最常見方式之監聽器監聽方式

增加監聽類JMSListener

package activemq;

import org.apache.activemq.command.ActiveMQMapMessage;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
/**
 * 消息監聽
 * */
public class JMSListener implements MessageListener{
    public void onMessage(Message message) {
        try {
            System.out.println("receive Message:"+((ActiveMQMapMessage)message).getContentMap());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

重新模擬一個消費者,應用監聽器監聽消息

package activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;

import javax.jms.*;

public class JMSConsumerByListener {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默認的連接用戶名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默認的連接密碼
    private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默認的連接地址

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = null;//連接工廠
        Connection connection = null;//連接
        Session session = null;//會話,接受或者發送消息的線程
        Destination destination = null;//消息的目的地
        MessageConsumer messageConsumer = null;//消息的消費者
        //實例化連接工廠(指定連接用戶名|密碼|連接地址)
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumerByListener.USERNAME, JMSConsumerByListener.PASSWORD, JMSConsumerByListener.BROKERURL);
        try {
            connection = connectionFactory.createConnection();//通過連接工廠獲取連接
            connection.start();//啟動連接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//創建session
            destination = session.createQueue("TestQueue2");//創建連接的消息隊列(TestQueue:生產者隊列名)
            messageConsumer = session.createConsumer(destination);//創建消息消費者
            messageConsumer.setMessageListener(new JMSListener());//注冊消息監聽
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

=》先執行生成者,生產消息!

   控制台打印:Sending:{isGirl=true, userName=kaixin, salary=1000.0, age=30}

   ActiveMQ監控畫面顯示:

=》再執行消費者,消費消息!

  控制台打印:receive Message:{userName=kaixin, salary=1000.0, isGirl=true, age=30}

  ActiveMQ監控畫面顯示:

OK!!!大功告成!

 

參考文章:

https://www.toutiao.com/a6345805464718409986/?tt_from=weixin&utm_campaign=client_share&app=news_article&utm_source=weixin&iid=18292470304&utm_medium=toutiao_android&wxshare_count=1

http://blog.csdn.net/xh16319/article/details/12142249

理論博客:http://www.cnblogs.com/Survivalist/p/8094069.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM