參考博主 搭建~ https://www.cnblogs.com/jaycekon/p/6225058.html
ActiveMQ官網下載地址:http://activemq.apache.org/download.html

我下的是windows版本的
下載解壓之后進入D:\config\apache-activemq-5.15.7\bin\win64
雙擊運行activemq.bat,啟動本地MQ服務,


started說明啟動成功。
接下來是代碼部分:
生產者:Producer
package com.mqtest; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author Maggie.Hao * @date 2018/11/5 14:31 */ public class Producer{ private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); //ActiveMq 的默認用戶名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //ActiveMq 的默認登錄密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //ActiveMQ 的鏈接地址 private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; AtomicInteger count = new AtomicInteger(0); //鏈接工廠 ConnectionFactory connectionFactory; //鏈接對象 Connection connection; //事務管理 Session session; ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>(); public void init(){ LOGGER.info("Product init"); try{ //創建一個鏈接工廠 // connectionFactory = new ActiveMQConnectionFactory("admin","demo","tcp://127.0.0.1:61616"); connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL); //從工廠中創建一個鏈接 connection = connectionFactory.createConnection(); //開啟鏈接 connection.start(); //創建一個事務(這里通過參數可以設置事務的級別) session = connection.createSession(true, Session.SESSION_TRANSACTED); }catch (JMSException e){ LOGGER.error("", e); } } public void sendMessage(String disname){ try{ //創建一個消息隊列 Queue queue = session.createQueue(disname); //消息生產者 MessageProducer messageProducer = null; if (threadLocal.get() != null){ messageProducer = threadLocal.get(); }else{ messageProducer = session.createProducer(queue); threadLocal.set(messageProducer); } while (true){ Thread.sleep(1000); int num = count.getAndIncrement(); //創建一條消息 TextMessage msg; msg = session.createTextMessage(Thread.currentThread().getName() + "==Productor:我現在正在生產東西!,count:" + num); LOGGER.info("msg:{} + {}", msg, num); //發送消息 messageProducer.send(msg); //提交事務 session.commit(); } }catch (JMSException e){ LOGGER.error("", e); }catch (InterruptedException e){ LOGGER.error("", e); } } }
消費者:Consumer
package com.mqtest; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author Maggie.Hao * @date 2018/11/5 14:34 */ public class Consumer{ private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class); private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory; Connection connection; Session session; ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>(); AtomicInteger count = new AtomicInteger(); public void init(){ try{ connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); }catch (JMSException e){ LOGGER.error("", e); } } public void getMessage(String disname){ try{ Queue queue = session.createQueue(disname); MessageConsumer consumer = null; if (threadLocal.get() != null){ consumer = threadLocal.get(); }else{ consumer = session.createConsumer(queue); threadLocal.set(consumer); } while (true){ Thread.sleep(1000); TextMessage msg = (TextMessage) consumer.receive(); if (msg != null){ msg.acknowledge(); LOGGER.info("{}:Consumer:我是消費者,我正在消費Msg:{}----->{}", Thread.currentThread().getName(), msg.getText(), count.getAndIncrement()); }else{ break; } } }catch (JMSException e){ LOGGER.error("", e); }catch (InterruptedException e){ LOGGER.error("", e); } } }
啟動生產者:
package com.mqtest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Maggie.Hao * @date 2018/11/5 14:34 */ public class TestProducer{ private static final Logger LOGGER = LoggerFactory.getLogger(TestProducer.class); public static void main(String[] args){ Producer producer = new Producer(); producer.init(); TestProducer testMq = new TestProducer(); try{ Thread.sleep(1000); }catch (InterruptedException e){ LOGGER.error("", e); } //Thread 1 new Thread(testMq.new ProductorMq(producer)).start(); //Thread 2 new Thread(testMq.new ProductorMq(producer)).start(); //Thread 3 new Thread(testMq.new ProductorMq(producer)).start(); //Thread 4 new Thread(testMq.new ProductorMq(producer)).start(); //Thread 5 new Thread(testMq.new ProductorMq(producer)).start(); } private class ProductorMq implements Runnable{ Producer producter; public ProductorMq(Producer producter){ this.producter = producter; } @Override public void run(){ while (true){ try{ producter.sendMessage("Jaycekon-MQ"); Thread.sleep(10000); }catch (InterruptedException e){ LOGGER.error("{}", e); } } } } }
啟動消費者:
package com.mqtest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Maggie.Hao * @date 2018/11/5 15:39 */ public class TestConsumer{ private static final Logger LOGGER = LoggerFactory.getLogger(TestConsumer.class); public static void main(String[] args){ Consumer comsumer = new Consumer(); comsumer.init(); TestConsumer testConsumer = new TestConsumer(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); } private class ConsumerMq implements Runnable{ Consumer consumer; public ConsumerMq(Consumer consumer){ this.consumer = consumer; } @Override public void run(){ while (true){ try{ consumer.getMessage("Jaycekon-MQ"); Thread.sleep(10000); }catch (InterruptedException e){ LOGGER.error("", e); } } } } }
控制台輸出結果:

可以在 http://127.0.0.1:8161/admin/queues.jsp 查看結果
用戶名和密碼默認都為:admin
點擊Queues可以看到我們的消息隊列信息

