第一種:activemq:
1、從官網下載apache-activemq-5.15.3-bin.zip並解壓;
2、啟動activemq, CMD--/bin/activemq start ,訪問127.0.0.1:8161/ 用戶名密碼都默認為admin;
3、新建java工程,引入jar包;可以在解壓的文件夾中獲取如下jar包:
4、開始寫代碼測試;
1、生產者消費者模式(p2p模式):
生產者代碼:

package com.acmq.test.p2p; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { private static final int SEND_NUMBER = 5; static DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS"); static ConnectionFactory connectionFactory; static Connection connection = null; static Session session; static Destination destination; static MessageProducer producer; public static void main(String[] args) { connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("duilie"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } public static void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 1; i <= SEND_NUMBER; i++) { TextMessage message = session.createTextMessage(" 發送的消息" + i); System.out.println(df.format(new Date())+"發送消息:" + "ActiveMq 發送的消息" + i); Thread.sleep(3000); producer.send(message); } } }
消費者代碼:

package com.acmq.test.p2p; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Reciver { static DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS"); static ConnectionFactory connectionFactory; static Connection connection = null; static Session session; static Destination destination; static MessageConsumer consumer; public static void main(String[] args) { connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("duilie"); consumer = session.createConsumer(destination); while (true) { //監聽和receive只能使用一個 //consumer.setMessageListener(new AcListener()); TextMessage message = (TextMessage) consumer.receive(100000); if (null != message) { System.out.println(df.format(new Date())+"收到消息" + message.getText()); } else { break; } Thread.sleep(3000); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }
消息監聽機制和receive方式接收消失只能使用一個;消息監聽代碼如下:
package com.acmq.test; import java.text.DateFormat; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.Date; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class AcListener implements MessageListener{ static DateFormat dfm = new SimpleDateFormat("HH:mm:ss:SSS"); @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage msg = (TextMessage)message; System.out.println(dfm.format(new Date())+"收到消息" + msg.getText()); } if (message instanceof MapMessage){ MapMessage map = (MapMessage)message; String stock = map.getString("stock"); double price = map.getDouble("price"); double offer = map.getDouble("offer"); boolean up = map.getBoolean("up"); DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" ); System.out.println(dfm.format(new Date())+"收到消息"+stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up?"up":"down")); } } catch (Exception ee) { } } }
2、發布者訂閱者模式:publisher-Subscriber

package com.acmq.test.pubsub; import java.text.DateFormat; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MapMessage; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import com.acmq.test.AcListener; public class Subscriber { static DateFormat dfm = new SimpleDateFormat("HH:mm:ss:SSS"); static ConnectionFactory factory; static Connection connection = null; static Session session; static MessageConsumer messageConsumer; public static void main(String[] args) throws Exception { factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); for (int i = 0; i < 5; i++) { Destination destination = session.createTopic("STOCKS." + i); messageConsumer = session.createConsumer(destination); messageConsumer.setMessageListener(new AcListener()); //new Thread(new SubThread(i, session)).start();; } } } class SubThread implements Runnable{ DateFormat dfm = new SimpleDateFormat("HH:mm:ss:SSS"); public int num; public Session session; public SubThread(int num,Session session){ this.num = num; this.session = session; } @Override public void run() { while (true) { try { Destination destination = session.createTopic("STOCKS." + num); MessageConsumer messageConsumer = session.createConsumer(destination); MapMessage map = (MapMessage) messageConsumer.receive(100000); if (null != map) { String stock = map.getString("stock"); double price = map.getDouble("price"); double offer = map.getDouble("offer"); boolean up = map.getBoolean("up"); DecimalFormat df = new DecimalFormat("#,###,###,##0.00"); System.out.println(dfm.format(new Date())+ "收到消息" + stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up ? "up" : "down")); } } catch (Exception e) { e.printStackTrace(); } } } }

package com.acmq.test.pubsub; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; public class Publisher { public static final int SEND_NUMBER = 5; static DateFormat df = new SimpleDateFormat("HH:mm:ss:SSS"); static ConnectionFactory factory; static Connection connection = null; static Session session; static Destination[] destinations; static MessageProducer producer; public static void main(String[] args) throws Exception{ factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connection = factory.createConnection(); try { connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); //設置topic destinations = new Destination[SEND_NUMBER]; for(int i = 0; i < SEND_NUMBER; i++) { destinations[i] = session.createTopic("STOCKS." + i); } //發送消息 sendMessage(); //關閉連接 if (connection != null) { connection.close(); } } static void sendMessage() throws JMSException { for(int i = 0; i < SEND_NUMBER; i++) { Message message = createStockMessage(i, session); System.out.println(df.format(new Date())+ "Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destinations[i]); producer.send(destinations[i], message); } } static Message createStockMessage(int stock, Session session) throws JMSException { MapMessage message = session.createMapMessage(); message.setString("stock", stock+""); message.setDouble("price", 1.00); message.setDouble("offer", 0.01); message.setBoolean("up", true); return message; } }
監聽代碼如上所示;
3、請求回復模式:request-response

package com.acmq.test.reqres; import java.util.UUID; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class MqClient { public static void main(String[] args) { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection; try { connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination adminQueue = session.createQueue("client"); MessageProducer producer = session.createProducer(adminQueue); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //臨時隊列,用來接收回復 Destination tempDest = session.createTemporaryQueue(); MessageConsumer responseConsumer = session.createConsumer(tempDest); responseConsumer.setMessageListener(new ClientListener()); TextMessage txtMessage = session.createTextMessage(); txtMessage.setText("ClientMessage"); txtMessage.setJMSReplyTo(tempDest); String correlationId = UUID.randomUUID().toString(); txtMessage.setJMSCorrelationID(correlationId); producer.send(txtMessage); } catch (JMSException e) { e.printStackTrace(); } } }

package com.acmq.test.reqres; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class MqServer { public static void main(String[] args) { setupMessageQueueConsumer(); } private static void setupMessageQueueConsumer() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection; try { connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination adminQueue = session.createQueue("client"); MessageConsumer consumer = session.createConsumer(adminQueue); consumer.setMessageListener(new ServerListener(session)); } catch (JMSException e) { e.printStackTrace(); } } }

package com.acmq.test.reqres; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class ClientListener implements MessageListener{ @Override public void onMessage(Message message) { String messageText = null; try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; messageText = textMessage.getText(); System.out.println("收到回復: " + messageText); } } catch (JMSException e) { //Handle the exception appropriately e.printStackTrace(); } } }

package com.acmq.test.reqres; import javax.jms.DeliveryMode; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; public class ServerListener implements MessageListener { Session session; public ServerListener(Session session) { this.session = session; } @Override public void onMessage(Message message) { try { MessageProducer replyProducer = session.createProducer(null); replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); TextMessage response = session.createTextMessage(); if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String messageText = txtMsg.getText(); System.out.println("收到消息:" + messageText); if("ClientMessage".equals(messageText)){ response.setText("ServerReply"); response.setJMSCorrelationID(message.getJMSCorrelationID()); replyProducer.send(message.getJMSReplyTo(), response); } } } catch (Exception e) { e.printStackTrace(); } } }
4、測試代碼;
第二種:rocketmq
1、從官網下載rocketmq-all-4.2.0-bin-release.zip;(這個mq之前是阿里的,后來給了Apache了,所以官網是Apache的)
2、解壓文件,並設置HOME;啟動是需要設置,如圖:
3、啟動nameserver,如圖所示,啟動后默認端口為9876;
4、啟動broker; 啟動時需配置nameserver地址;mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true
5、啟動日志在用戶文件夾下的logs文件夾下面;
6、編寫代碼:
新建工程,引入下圖所示依賴jar包,所以包都可以在下載的壓縮文件里面找到;在lib文件夾下;

package com.rocketmq.test; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("Producer"); producer.start(); for (int i = 0; i < 100; i++) { try { { Message msg = new Message("TopicTest1", // topic "TagA", // tag "OrderID001", // key ("Hello A1").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest2", // topic "TagB", // tag "OrderID0034", // key ("Hello B2").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest3", // topic "TagC", // tag "OrderID061", // key ("Hello C3").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } TimeUnit.MILLISECONDS.sleep(1000); } producer.shutdown(); } }

package com.rocketmq.test; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setInstanceName("Consumber"); /** * 訂閱指定topic下tags分別等於TagA或TagC或TagD */ consumer.subscribe("TopicTest1", "TagA || TagC || TagD"); /** * 訂閱指定topic下所有消息<br> * 注意:一個consumer對象可以訂閱多個topic */ consumer.subscribe("TopicTest2", "*"); consumer.subscribe("TopicTest1", "TagC"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數來批量接收消息 */ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTest1")) { // 執行TopicTest1的消費邏輯 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 執行TagA的消費 System.out.println(new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagC")) { // 執行TagC的消費 } else if (msg.getTags() != null && msg.getTags().equals("TagD")) { // 執行TagD的消費 } } else if (msg.getTopic().equals("TopicTest2")) { System.out.println(new String(msg.getBody())); }else if(msg.getTopic().equals("TopicTest3")){ System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br> */ consumer.start(); System.out.println("Consumer Started."); } }
7、運行測試,需硬盤空閑空間達到4G以上;