activeMQ 本地測試


參考博主 搭建~ https://www.cnblogs.com/jaycekon/p/6225058.html

ActiveMQ官網下載地址:http://activemq.apache.org/download.html

我下的是windows版本的

下載解壓之后進入D:\config\apache-activemq-5.15.7\bin\win64

雙擊運行activemq.bat,啟動本地MQ服務,

 

 

 started說明啟動成功。

接下來是代碼部分:

生產者:Producer

package com.mqtest;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Maggie.Hao
 * @date 2018/11/5 14:31
 */
public class Producer{

    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    //ActiveMq 的默認用戶名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    //ActiveMq 的默認登錄密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    //ActiveMQ 的鏈接地址
    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    AtomicInteger count = new AtomicInteger(0);

    //鏈接工廠
    ConnectionFactory connectionFactory;

    //鏈接對象
    Connection connection;

    //事務管理
    Session session;

    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();

    public void init(){
        LOGGER.info("Product init");
        try{
            //創建一個鏈接工廠
            //            connectionFactory = new ActiveMQConnectionFactory("admin","demo","tcp://127.0.0.1:61616");
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
            //從工廠中創建一個鏈接
            connection = connectionFactory.createConnection();
            //開啟鏈接
            connection.start();
            //創建一個事務(這里通過參數可以設置事務的級別)
            session = connection.createSession(true, Session.SESSION_TRANSACTED);
        }catch (JMSException e){
            LOGGER.error("", e);
        }
    }

    public void sendMessage(String disname){
        try{
            //創建一個消息隊列
            Queue queue = session.createQueue(disname);
            //消息生產者
            MessageProducer messageProducer = null;
            if (threadLocal.get() != null){
                messageProducer = threadLocal.get();
            }else{
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }

            while (true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //創建一條消息
                TextMessage msg;
                msg = session.createTextMessage(Thread.currentThread().getName() + "==Productor:我現在正在生產東西!,count:" + num);
                LOGGER.info("msg:{} + {}", msg, num);
                //發送消息
                messageProducer.send(msg);
                //提交事務
                session.commit();
            }
        }catch (JMSException e){
            LOGGER.error("", e);
        }catch (InterruptedException e){
            LOGGER.error("", e);
        }
    }
}

消費者:Consumer

package com.mqtest;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Maggie.Hao
 * @date 2018/11/5 14:34
 */
public class Consumer{

    private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();

    AtomicInteger count = new AtomicInteger();

    public void init(){
        try{
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        }catch (JMSException e){
            LOGGER.error("", e);
        }
    }

    public void getMessage(String disname){
        try{
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

            if (threadLocal.get() != null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while (true){
                Thread.sleep(1000);
                TextMessage msg = (TextMessage) consumer.receive();
                if (msg != null){
                    msg.acknowledge();
                    LOGGER.info("{}:Consumer:我是消費者,我正在消費Msg:{}----->{}", Thread.currentThread().getName(), msg.getText(), count.getAndIncrement());
                }else{
                    break;
                }
            }
        }catch (JMSException e){
            LOGGER.error("", e);
        }catch (InterruptedException e){
            LOGGER.error("", e);
        }
    }
}

啟動生產者:

package com.mqtest;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author Maggie.Hao
 * @date 2018/11/5 14:34
 */
public class TestProducer{

    private static final Logger LOGGER = LoggerFactory.getLogger(TestProducer.class);

    public static void main(String[] args){
        Producer producer = new Producer();
        producer.init();
        TestProducer testMq = new TestProducer();
        try{
            Thread.sleep(1000);
        }catch (InterruptedException e){
            LOGGER.error("", e);
        }
        //Thread 1
        new Thread(testMq.new ProductorMq(producer)).start();
        //Thread 2
        new Thread(testMq.new ProductorMq(producer)).start();
        //Thread 3
        new Thread(testMq.new ProductorMq(producer)).start();
        //Thread 4
        new Thread(testMq.new ProductorMq(producer)).start();
        //Thread 5
        new Thread(testMq.new ProductorMq(producer)).start();
    }

    private class ProductorMq implements Runnable{

        Producer producter;

        public ProductorMq(Producer producter){
            this.producter = producter;
        }

        @Override
        public void run(){
            while (true){
                try{
                    producter.sendMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                }catch (InterruptedException e){
                    LOGGER.error("{}", e);
                }
            }
        }
    }
}

啟動消費者:

package com.mqtest;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author Maggie.Hao
 * @date 2018/11/5 15:39
 */
public class TestConsumer{

    private static final Logger LOGGER = LoggerFactory.getLogger(TestConsumer.class);

    public static void main(String[] args){
        Consumer comsumer = new Consumer();
        comsumer.init();
        TestConsumer testConsumer = new TestConsumer();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
    }

    private class ConsumerMq implements Runnable{

        Consumer consumer;

        public ConsumerMq(Consumer consumer){
            this.consumer = consumer;
        }

        @Override
        public void run(){
            while (true){
                try{
                    consumer.getMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                }catch (InterruptedException e){
                    LOGGER.error("", e);
                }
            }
        }
    }
}

控制台輸出結果:

可以在   http://127.0.0.1:8161/admin/queues.jsp 查看結果

用戶名和密碼默認都為:admin 

點擊Queues可以看到我們的消息隊列信息

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM