一、概述
ActiveMQ是Apache出品的消息中間件(MOM),它遵循JMS規范(Java Message Service)。它為企業消息傳遞提供高可用,出色性能,可擴展,穩定和安全保障。ActiveMQ使用Apache許可協議。因此,任何人都可以使用和修改它而不必反饋任何改變。這對於商業上將ActiveMQ用在重要用途的人尤為關鍵。MOM的工作是在分布式的各應用之間調度事件和消息,使之到達指定的接收者。所以高可用,高性能,高可擴展性尤為關鍵。
特點:
1、支持多種語言編寫客戶端
2、對spring的支持,很容易和spring整合
3、支持多種傳輸協議:TCP,SSL,NIO,UDP等
4、支持AJAX
消息形式:
1、點對點(queue)
2、一對多(topic)
二、服務安裝
2.1、下載
http://activemq.apache.org/download.html
2.2、解壓並運行xxx\apache-activemq-5.15.9\bin\win64\activemq.bat
2.3、訪問:http://localhost:8161/admin/queues.jsp 用戶名密碼默認admin/admin
2.4、使用Maven導入對應依賴包
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.4</version> </dependency>
注意:不同版本jdk對應不同版本MQ
MQ版本號 Build-Jdk 依賴JDK
apache-activemq-5.0.0 1.5.0_12 1.5+
apache-activemq-5.1.0 1.5.0_12 1.5+
apache-activemq-5.2.0 1.5.0_15 1.5+
apache-activemq-5.3.0 1.5.0_17 1.5+
apache-activemq-5.4.0 1.5.0_19 1.5+
apache-activemq-5.5.0 1.6.0_23 1.6+
apache-activemq-5.6.0 1.6.0_26 1.6+
apache-activemq-5.7.0 1.6.0_33 1.6+
apache-activemq-5.8.0 1.6.0_37 1.6+
apache-activemq-5.9.0 1.6.0_51 1.6+
apache-activemq-5.10.0 1.7.0_12-ea 1.7+
apache-activemq-5.11.0 1.7.0_60 1.7+
apache-activemq-5.12.0 1.7.0_80 1.7+
apache-activemq-5.13.0 1.7.0_80 1.7+
apache-activemq-5.14.0 1.7.0_80 1.7+
apache-activemq-5.15.0 1.8.0_112 1.8+
三、編碼使用
3.1、Queue—隊列模式
step1:創建生產者
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author 51ma * @dateTime 2019年8月16日 下午2:53:23 */ public class MqQueueProduct { private static final String url="tcp://127.0.0.1:61616";//服務地址,端口默認61616 private static final String queueName="queue-test";//要創建的消息名稱 public static void main(String[] args) throws JMSException { //1.創建ConnectiongFactory,綁定地址 ConnectionFactory factory=new ActiveMQConnectionFactory(url); //2.創建Connection Connection connection= factory.createConnection(); //3.啟動連接 connection.start(); //4.創建會話 Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建一個目標 Destination destination=session.createQueue(queueName); //6.創建一個生產者 MessageProducer producer=session.createProducer(destination); for (int i = 0; i < 20; i++) { //7.創建消息 TextMessage textMessage=session.createTextMessage("我是消息生產者:"+i); //8.發送消息 producer.send(textMessage); System.out.println("發送消息:"+i); } connection.close(); } }
運行:
step1:創建消費者
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author 51ma * @dateTime 2019年8月16日 下午2:56:18 */ public class MqQueueConsumer { private static final String url="tcp://127.0.0.1:61616";//端口默認 private static final String queueName="queue-test";//要消費的消息名稱 public static void main(String[] args) throws JMSException { //1.創建ConnectiongFactory,綁定地址 ConnectionFactory factory=new ActiveMQConnectionFactory(url); //2.創建Connection Connection connection= factory.createConnection(); //3.啟動連接 connection.start(); //4.創建會話 Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建一個目標 Destination destination=session.createQueue(queueName); //6.創建一個消費者 MessageConsumer consumer=session.createConsumer(destination); //7.創建一個監聽器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message arg0) { TextMessage textMessage=(TextMessage)arg0; try { System.out.println("接收消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
運行:
3.2 Topic-主題模式
step1:創建生產者
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author 51ma * @dateTime 2019年8月16日 下午2:53:23 */ public class MqTopicProduct { private static final String url="tcp://127.0.0.1:61616";//服務地址,端口默認61616 private static final String topicName="topic-test";//要創建的消息名稱 public static void main(String[] args) throws JMSException { //1.創建ConnectiongFactory,綁定地址 ConnectionFactory factory=new ActiveMQConnectionFactory(url); //2.創建Connection Connection connection= factory.createConnection(); //3.啟動連接 connection.start(); //4.創建會話 Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建一個目標 Destination destination=session.createTopic(topicName); //6.創建一個生產者 MessageProducer producer=session.createProducer(destination); for (int i = 0; i < 15; i++) { //7.創建消息 TextMessage textMessage=session.createTextMessage("我是消息生產者:"+i); //8.發送消息 producer.send(textMessage); System.out.println("發送消息:"+i); } connection.close(); } }
運行:
step2:創建消費者
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author 51ma * @dateTime 2019年8月16日 下午2:53:23 */ public class MqTopicConsumer { private static final String url="tcp://127.0.0.1:61616";//端口默認 private static final String topicName="topic-test";//要消費的消息名稱 public static void main(String[] args) throws JMSException { //1.創建ConnectiongFactory,綁定地址 ConnectionFactory factory=new ActiveMQConnectionFactory(url); //2.創建Connection Connection connection= factory.createConnection(); //3.啟動連接 connection.start(); //4.創建會話 Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.創建一個目標 Destination destination=session.createTopic(topicName); //6.創建一個消費者 MessageConsumer consumer=session.createConsumer(destination); //7.創建一個監聽器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message arg0) { TextMessage textMessage=(TextMessage)arg0; try { System.out.println("接收消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
運行:
發現沒有接收到生產者消息,先運行消費者再運行生產者,消費者才能接收到
四、結論
4.1、隊列模式的消費者在運行后也能收到之前的消息,不過屬於輪流進行消費,每個消費者接收不到完整的消息
4.2、主題模式的消費者在運行后只能收到之后的消息,運行之前生產的消息接收不到;並且每個消費者都能接收全部的消息