1 ActiveMQ簡介
1.1 ActiveMQ是什么
ActiveMQ是一個消息隊列應用服務器(推送服務器)。支持JMS規范。
1.1.1 JMS概述
全稱:Java Message Service ,即為Java消息服務,是一套java消息服務的API標准。(標准即接口)
實現了JMS標准的系統,稱之為JMS Provider。
1.1.2 消息隊列
1.1.2.1 概念
消息隊列是在消息的傳輸過程中保存消息的容器,提供一種不同進程或者同一進程不同線程直接通訊的方式。
Producer:消息生產者,負責產生和發送消息到 Broker;
Broker:消息處理中心。負責消息存儲、確認、重試等,一般其中會包含多個 queue;
Consumer:消息消費者,負責從 Broker 中獲取消息,並進行相應處理;
1.1.2.2 常見消息隊列應用
(1)、ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現。
(2)、RabbitMQ
RabbitMQ是一個在AMQP基礎上完成的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。開發語言為Erlang。
(3)、RocketMQ
由阿里巴巴定義開發的一套消息隊列應用服務。
1.2 ActiveMQ能做什么
(1)實現兩個不同應用(程序)之間的消息通訊。
(2)實現同一個應用,不同模塊之間的消息通訊。(確保數據發送的穩定性)
1.3 ActiveMQ下載
ActiveMQ官網地址: http://activemq.apache.org
ActiveMQ下載地址:http://activemq.apache.org/download-archives.html
--可供下載的歷史版本
--說明:
ActiveMQ 5.10.x以上版本必須使用JDK1.8才能正常使用。
ActiveMQ 5.9.x及以下版本使用JDK1.7即可正常使用。
|
--根據操作系統,選擇下載版本。(本教程下載Linux版本)
|
1.4 ActiveMQ主要特點
(1)支持多語言、多協議客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
(2)對Spring的支持,ActiveMQ可以很容易整合到Spring的系統里面去。
(3)支持高可用、高性能的集群模式。
2 入門示例
2.1 需求
使用ActiveMQ實現消息隊列模型。
2.2 配置步驟說明
(1)搭建ActiveMQ消息服務器。
(2)創建一個java項目。
(3)創建消息生產者,發送消息。
(4)創建消息消費者,接收消息。
2.3 第一部分:搭建ActiveMQ消息服務器
2.3.1 第一步:下載、上傳至Linux
--說明:確保已經安裝了jdk
|
2.3.2 第二步:安裝到/usr/local/activemq目錄
(1)解壓到/usr/local目錄下
[root@node07192 ~]# tar -zxvf apache-activemq-5.9.0-bin.tar.gz -C /usr/local |
(2)修改名稱為activemq
[root@node07192 ~]# cd /usr/local/ [root@node07192 local]# mv apache-activemq-5.9.0/ activemq |
2.3.3 第三步:啟動ActiveMQ服務器
--說明:ActiveMQ是免安裝軟件,解壓即可啟動服務。
[root@node07192 local]# cd activemq/bin [root@node07192 bin]# ./activemq start |
--查看ActiveMQ啟動狀態
[root@node07192 bin]# ./activemq status
|
2.3.4 第四步:瀏覽器訪問ActiveMQ管理界面
2.3.4.1 Step1:查看ActiveMQ管理界面的服務端口。在/conf/jetty.xml中
--訪問管理控制台的服務端口,默認為:8161
[root@node07192 bin]# cd ../conf [root@node07192 conf]# vim jetty.xml
|
2.3.4.2 Step2:查看ActiveMQ用戶、密碼。在/conf/users.properties中:
--默認的用戶名、密碼均為amdin
[root@node07192 conf]# vim users.properties
|
2.3.4.3 Step3:訪問ActiveMQ管理控制台。地址:http://ip:8161/
--注意:防火牆是沒有配置該服務的端口的。
因此,要訪問該服務,必須在防火牆中配置。
(1)修改防火牆,開放8161端口
[root@node07192 conf]# vim /etc/sysconfig/iptables |
(2)重啟防火牆
[root@node07192 conf]# service iptables restart |
(3)登錄管理控制台
--登陸,用戶名、密碼均為admin
|
--控制台主界面
|
--搭建ActiveMQ服務器成功!!!
2.4 第二部分:創建java項目,導入jar包
--導包說明:
ActiveMQ的解壓包中,提供了運行ActiveMQ的所有jar。
|
--創建項目
|
2.5 第三部分:創建消息生成者,發送消息
--說明:ActiveMQ是實現了JMS規范的。在實現消息服務的時候,必須基於API接口規范。
2.5.1 JMS常用的API說明
下述API都是接口類型,定義在javax.jms包中,是JMS標准接口定義。ActiveMQ完全實現這一套api標准。
2.5.1.1 ConnectionFactory
鏈接工廠, 用於創建鏈接的工廠類型。
2.5.1.2 Connection
鏈接,用於建立訪問ActiveMQ連接的類型, 由鏈接工廠創建。
2.5.1.3 Session
會話, 一次持久有效、有狀態的訪問,由鏈接創建。
2.5.1.4 Destination & Queue & Topic
目的地, 即本次訪問ActiveMQ消息隊列的地址,由Session會話創建。
(1)interface Queue extends Destination
(2)Queue:隊列模型,只有一個消費者。消息一旦被消費,默認刪除。
(3)Topic:主題訂閱中的消息,會發送給所有的消費者同時處理。
2.5.1.5 Message
消息,在消息傳遞過程中數據載體對象,是所有消息【文本消息TextMessage,對象消息ObjectMessage等】具體類型的頂級接口,可以通過會話創建或通過會話從ActiveMQ服務中獲取。
2.5.1.6 MessageProducer
消息生成者, 在一次有效會話中, 用於發送消息給ActiveMQ服務的工具,由Session會話創建。
2.5.1.7 MessageCustomer
消息消費者【消息訂閱者,消息處理者】, 在一次有效會話中, 用於ActiveMQ服務中獲取消息的工具,由Session會話創建。
我們定義的消息生產者和消費者,都是基於上面API實現的。
2.5.2 第一步:創建MyProducer類,定義sendMessage方法
package cn.gzsxt.mq.producer;
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MyProducer {
// 定義鏈接工廠 ConnectionFactory connectionFactory = null; // 定義鏈接 Connection connection = null; // 定義會話 Session session = null; // 定義目的地 Destination destination = null; // 定義消息生成者 MessageProducer producer = null; // 定義消息 Message message = null;
public void sendToMQ(){
try{
/* * 創建鏈接工廠 * ActiveMQConnectionFactory - 由ActiveMQ實現的ConnectionFactory接口實現類. * 構造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL) * userName - 訪問ActiveMQ服務的用戶名, 用戶名可以通過jetty-realm.properties配置文件配置. * password - 訪問ActiveMQ服務的密碼, 密碼可以通過jetty-realm.properties配置文件配置. * brokerURL - 訪問ActiveMQ服務的路徑地址. 路徑結構為 - 協議名://主機地址:端口號 * 此鏈接基於TCP/IP協議. */ connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 創建鏈接對象 connection = connectionFactory.createConnection(); // 啟動鏈接 connection.start();
/* * 創建會話對象 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); * transacted - 是否使用事務, 可選值為true|false * true - 使用事務, 當設置此變量值, 則acknowledgeMode參數無效, 建議傳遞的acknowledgeMode參數值為 * Session.SESSION_TRANSACTED * false - 不使用事務, 設置此變量值, 則acknowledgeMode參數必須設置. * acknowledgeMode - 消息確認機制, 可選值為: * Session.AUTO_ACKNOWLEDGE - 自動確認消息機制 * Session.CLIENT_ACKNOWLEDGE - 客戶端確認消息機制 * Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認消息機制 */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建目的地, 目的地命名即隊列命名, 消息消費者需要通過此命名訪問對應的隊列 destination = session.createQueue("test-mq");
// 創建消息生成者, 創建的消息生成者與某目的地對應, 即方法參數目的地. producer = session.createProducer(destination);
// 創建消息對象, 創建一個文本消息, 此消息對象中保存要傳遞的文本數據. message = session.createTextMessage("hello,activeme");
// 發送消息 producer.send(message); System.out.println("消息發送成功!"); }catch(Exception e){ e.printStackTrace(); System.out.println("訪問ActiveMQ服務發生錯誤!!"); }finally{ try { // 回收消息發送者資源 if(null != producer) producer.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收會話資源 if(null != session) session.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收鏈接資源 if(null != connection) connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
}
|
2.5.3 第二步:創建一個測試類MessageTest
--添加junit類庫,快捷鍵ctrl+1
package cn.gzsxt.mq.test;
import org.junit.Test;
import cn.gzsxt.mq.producer.MyProducer;
public class MessageTest {
@Test public void sendToMQ(){ MyProducer producer = new MyProducer(); producer.sendToMQ(); } } |
2.5.4 第三步:測試
(1)設置防火牆,配置61616端口。注意修改之后重啟防火牆。
(2)測試結果:
--查看控制台
|
--查看ActiveMQ管理控制界面
|
--消息發送成功!!!
2.6 第四部分:創建消息消費者,消費消息
2.6.1 第一步:創建MyConsumer類
package cn.gzsxt.mq.consumer;
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/** * @ClassName:MyConsumer * @Description: 消息消費者代碼 */ public class MyConsumer {
// 定義鏈接工廠 ConnectionFactory connectionFactory = null; // 定義鏈接 Connection connection = null; // 定義會話 Session session = null; // 定義目的地 Destination destination = null; // 定義消息消費者 MessageConsumer consumer = null; // 定義消息 Message message = null;
public void recieveFromMQ(){
try{
/* * 創建鏈接工廠 * ActiveMQConnectionFactory - 由ActiveMQ實現的ConnectionFactory接口實現類. * 構造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL) * userName - 訪問ActiveMQ服務的用戶名, 用戶名可以通過jetty-realm.properties配置文件配置. * password - 訪問ActiveMQ服務的密碼, 密碼可以通過jetty-realm.properties配置文件配置. * brokerURL - 訪問ActiveMQ服務的路徑地址. 路徑結構為 - 協議名://主機地址:端口號 * 此鏈接基於TCP/IP協議. */ connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 創建鏈接對象 connection = connectionFactory.createConnection(); // 啟動鏈接 connection.start();
/* * 創建會話對象 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); * transacted - 是否使用事務, 可選值為true|false * true - 使用事務, 當設置此變量值, 則acknowledgeMode參數無效, 建議傳遞的acknowledgeMode參數值為 * Session.SESSION_TRANSACTED * false - 不使用事務, 設置此變量值, 則acknowledgeMode參數必須設置. * acknowledgeMode - 消息確認機制, 可選值為: * Session.AUTO_ACKNOWLEDGE - 自動確認消息機制 * Session.CLIENT_ACKNOWLEDGE - 客戶端確認消息機制 * Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認消息機制 */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建目的地, 目的地命名即隊列命名, 消息消費者需要通過此命名訪問對應的隊列 destination = session.createQueue("test-mq");
// 創建消息消費者, 創建的消息消費者與某目的地對應, 即方法參數目的地. consumer = session.createConsumer(destination);
// 從ActiveMQ服務中獲取消息 message = consumer.receive();
TextMessage tMsg = (TextMessage) message;
System.out.println("從MQ中獲取的消息是:"+tMsg.getText());
}catch(Exception e){ e.printStackTrace(); System.out.println("訪問ActiveMQ服務發生錯誤!!");
}finally{ try { // 回收消息消費者資源 if(null != consumer) consumer.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收會話資源 if(null != session) session.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收鏈接資源 if(null != connection) connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } |
2.6.2 第二步:修改測試類MessageTest,新增測試方法
@Test public void recieveFromMQ(){ MyConsumer consumer = new MyConsumer(); consumer.recieveFromMQ(); } |
2.6.3 第三步:測試
--查看Eclipse控制台
|
--查看ActiveMQ管理控制界面
|
--消息被消費了,測試成功!!!
3 ActiveMQ監聽器
問題:在前面的示例中,我們發現消費者每次只能消費一條消息。當隊列中有多條消息的時候,我們需要多次運行消費者,才能消費完這些消息。很麻煩!!!!如何解決這個問題呢?我們希望一次將所有的消息全部接收。
答:使用ActiveMQ監聽器來監聽隊列,持續消費消息。
3.1 配置步驟說明
(1)創建一個監聽器對象。
(2)修改消費者代碼,加載監聽器。
3.2 配置步驟
3.2.1 第一步:創建監聽器MyListener類
--說明:自定義監聽器需要實現MessageListener接口
package cn.gzsxt.mq.listener;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;
public class MyListener implements MessageListener{
@Override public void onMessage(Message message) {
if(null!=message){ TextMessage tMsg = (TextMessage) message;
try { System.out.println("從MQ中獲取的消息是:"+tMsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } } } |
3.2.2 第二步:修改MyConsumer代碼,加載監聽器
--說明:監聽器需要持續加載,因此消費程序不能結束。
這里我們使用輸入流阻塞消費線程結束。(實際開發中,使用web項目加載)
package cn.gzsxt.mq.consumer;
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import cn.gzsxt.mq.listener.MyListener;
/** * @ClassName:MyConsumer * @Description: 消息消費者代碼 */ public class MyConsumer {
// 定義鏈接工廠 ConnectionFactory connectionFactory = null; // 定義鏈接 Connection connection = null; // 定義會話 Session session = null; // 定義目的地 Destination destination = null; // 定義消息消費者 MessageConsumer consumer = null; // 定義消息 Message message = null;
public Message recieveFromMQ(){
try{
/* * 創建鏈接工廠 * ActiveMQConnectionFactory - 由ActiveMQ實現的ConnectionFactory接口實現類. * 構造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL) * userName - 訪問ActiveMQ服務的用戶名, 用戶名可以通過jetty-realm.properties配置文件配置. * password - 訪問ActiveMQ服務的密碼, 密碼可以通過jetty-realm.properties配置文件配置. * brokerURL - 訪問ActiveMQ服務的路徑地址. 路徑結構為 - 協議名://主機地址:端口號 * 此鏈接基於TCP/IP協議. */ connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 創建鏈接對象 connection = connectionFactory.createConnection(); // 啟動鏈接 connection.start();
/* * 創建會話對象 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode); * transacted - 是否使用事務, 可選值為true|false * true - 使用事務, 當設置此變量值, 則acknowledgeMode參數無效, 建議傳遞的acknowledgeMode參數值為 * Session.SESSION_TRANSACTED * false - 不使用事務, 設置此變量值, 則acknowledgeMode參數必須設置. * acknowledgeMode - 消息確認機制, 可選值為: * Session.AUTO_ACKNOWLEDGE - 自動確認消息機制 * Session.CLIENT_ACKNOWLEDGE - 客戶端確認消息機制 * Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認消息機制 */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創建目的地, 目的地命名即隊列命名, 消息消費者需要通過此命名訪問對應的隊列 destination = session.createQueue("test-mq");
// 創建消息消費者, 創建的消息消費者與某目的地對應, 即方法參數目的地. consumer = session.createConsumer(destination);
// // 從ActiveMQ服務中獲取消息 // message = consumer.receive(); // // TextMessage tMsg = (TextMessage) message; // // System.out.println("從MQ中獲取的消息是:"+tMsg.getText()); //加載監聽器 consumer.setMessageListener(new MyListener()); //監聽器需要持續加載,這里我們使用輸入流阻塞當前線程結束。 System.in.read();
}catch(Exception e){ e.printStackTrace(); System.out.println("訪問ActiveMQ服務發生錯誤!!");
}finally{ try { // 回收消息消費者資源 if(null != consumer) consumer.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收會話資源 if(null != session) session.close(); } catch (JMSException e) { e.printStackTrace(); } try { // 回收鏈接資源 if(null != connection) connection.close(); } catch (JMSException e) { e.printStackTrace(); } } return message; } } |
3.3 測試
(1)多次運行生產者,發送多條消息到隊列中。
|
(2)運行消費者。觀察結果
--查看Eclipse控制台,一次消費了3條消息
|
--查看ActiveMQ管理控制界面,所有消息都被消費了!
|
--測試成功!!!
4 ActiveMQ消息服務模式
問題:在入門示例中,只能向一個消費者發送消息。但是有一些場景,需求有多個消費者都能接收到消息,比如:美團APP每天的消息推送。該如何實現呢?
答:ActiveMQ是通過不同的服務模式來解決這個問題的。
所以,要搞清楚這個問題,必須知道ActiveMQ有哪些應用模式。
4.1 PTP模式(point to point)
--消息模型
|
消息生產者生產消息發送到queue中,然后消息消費者從queue中取出並且消費消息。
消息被消費以后,queue中不再有存儲,所以消息消費者不可能消費到已經被消費的消息。
Queue支持存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費、其它的則不能消費此消息了。
當消費者不存在時,消息會一直保存,直到有消費消費
我們的入門示例,就是采用的這種PTP服務模式。
4.2 TOPIC(主題訂閱模式)
--消息模型
|
消息生產者(發布)將消息發布到topic中,同時有多個消息消費者(訂閱)消費該消息。
和點對點方式不同,發布到topic的消息會被所有訂閱者消費。
當生產者發布消息,不管是否有消費者。都不會保存消息
所以,主題訂閱模式下,一定要先有消息的消費者(訂閱者),后有消息的生產者(發布者)。
我們前面已經實現了PTP模式,下面我們來實現TOPIC模式。
5 Topic模式實現
5.1 配置步驟說明
(1)搭建ActiveMQ消息服務器。(已實現)
(2)創建主題訂閱者。
(3)創建主題發布者。
5.2 配置步驟
5.2.1 第一部分:搭建消息服務器。(已實現)
5.2.2 第二部分:創建主題訂閱者MySubscriber
--說明:主題訂閱模式下,可以有多個訂閱者。我們這里用多線程來模擬。
配置步驟:
(1)創建訂閱者(線程類)。
(2)修改測試類。
(3)查看測試結果。
5.2.2.1 第一步:創建MySubscriber類,實現Runnable接口
package cn.gzsxt.mq.subscribe;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MySubscirber implements Runnable{
TopicConnectionFactory factory = null; TopicConnection connection = null; TopicSession session = null; Topic topic = null; TopicSubscriber subscriber = null; Message message =null;
@Override public void run() { try{ // 創建鏈接工廠 factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616"); // 通過工廠創建一個連接 connection = factory.createTopicConnection(); // 啟動連接 connection.start(); // 創建一個session會話 session = connection.createTopicSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 創建一個消息隊列 topic = session.createTopic("gzsxt.topic"); // 創建消息制作者 subscriber = session.createSubscriber(topic); message = subscriber.receive(); if(null!=message){ TextMessage tMsg = (TextMessage) message; System.out.println(Thread.currentThread().getName()+"訂閱的內容是:"+tMsg.getText()); } } catch (Exception e) { e.printStackTrace(); System.out.println("消息訂閱異常"); } finally { // 關閉釋放資源 if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } } |
5.2.2.2 第二步:修改測試類MessageTest
--說明:junit單元測試,不支持多線程測試。
所以,這里我們在測試類的main方法中測試。
--修改MessageTest類,新增main方法。
public static void main(String[] args) { MySubscirber subscirber = new MySubscirber(); Thread t1 = new Thread(subscirber); Thread t2 = new Thread(subscirber); t1.start(); t2.start(); } |
5.2.2.3 第三步:查看測試結果
--查看AcitveMQ管理界面
|
--測試成功!!!
5.2.3 第三部分:創建主題發布者MyPublisher
-配置步驟說明:
(1)創建發布者
(2)修改測試類測試
(3)查看測試結果
5.2.3.1 第一步:創建MyPublish類
package cn.gzsxt.mq.topic;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MyPublisher {
TopicConnectionFactory factory = null; TopicConnection connection = null; TopicSession session = null; Topic topic = null; TopicPublisher publisher = null; Message message =null;
public void publishTopic(){ try { // 創建鏈接工廠 factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616"); // 通過工廠創建一個連接 connection = factory.createTopicConnection(); // 啟動連接 connection.start(); // 創建一個session會話 session = connection.createTopicSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 創建一個消息隊列 topic = session.createTopic("gzsxt.topic"); // 創建主題發布者 publisher = session.createPublisher(topic); // 創建消息 message = session.createTextMessage("hello,topic"); // 發布消息 publisher.publish(message); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally { // 關閉釋放資源 if (publisher != null) { try { publisher.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } } |
5.2.3.2 第二步:修改測試類,新增測試方法
@Test public void publishTopic(){ MyPublisher publisher = new MyPublisher(); publisher.publishTopic(); } |
5.2.3.3 第三步:查看測試結果
--查看Eclipse控制。發現訂閱者消費到了消息
|
--查看ActiveMQ主界面
|
--消息被消費了。測試成功!
5.3 Topic小結
(1)Topic模式能夠實現多個訂閱者同時消費消息。
(2)Topic主題模式下,消息不會保存,只有在線的訂閱者才會接收到消息。
通常可以用來解決公共消息推送的相關業務。
6 ActiveMQ持久化
問題:當隊列中有未被消費的消息時,我們重新啟動ActiveMQ服務器后,發現消息仍然在隊列中。消息時如何保持的呢?
答:ActiveMQ是支持持久化的,可以永久保存消息。
消息是保存在內存中的。當內存空間不足,或者ActiveMQ服務關閉的時候,消息會被持久化到磁盤上。
被消費的時候,再加載到內存空間中。
--說明:ActiveMQ持久化方式在/conf/activemq.xml中指定
[root@node07192 conf]# vim activemq.xml |
6.1 kahadb方式
是ActiveMQ默認的持久化策略。不會保存已經被消費過的消息。
|
--消息存儲位置
|
6.2 AMQ方式(已過時)
--說明:5.3版本之前,現在已經過時,不考慮。
6.3 JDBC持久化方式(了解)
ActiveMQ將數據持久化到數據庫中。可以使用任意的數據庫。
本教程中使用MySQL數據庫。
6.3.1 配置步驟說明
(1)創建數據庫。
(2)添加數據庫連接jar依賴到ActiveMQ服務器。
(3)修改ActiveMQ配置,創建數據源。
(4)修改ActiveMQ配置,修改持久化方式為jdbc
6.3.2 配置步驟
6.3.2.1 第一步:創建數據庫
數據庫最好不要跟ActiveMQ服務器在同一台機器。
因為當cpu線程資源不足時,往隊列中寫入消息時,如果數據庫上一次持久化還沒結束,容易造成線程阻塞。
|
6.3.2.2 第二步:添加jar依賴
--配置數據源時,是支持連接池的。我們這里使用dbcp2作為連接池。
將jdbc驅動、dbcp2的jar上傳到/lib/目錄下。
|
6.3.2.3 第三步:修改/conf/activemq.xml,創建數據源
--在<broker>節點外,創建數據源節點
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://192.168.7.149:3306/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="gzsxt"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> |
--數據源節點位置如下如下
|
6.3.2.4 第四步:修改/conf/activemq.xml,修改為jdbc持久化方式
--在<broker>節點內部,注釋kahadb方式,添加jdbc方式
添加如下配置:
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true”/> </persistenceAdapter> |
--注意:注釋<kahaDB>節點
|
6.3.3 測試
(1)重新啟動ActiveMQ
[root@node07192 bin]# ./activemq restart |
(2)查看數據庫,發現生成了三張表
|
(3)運行入門示例中的測試類,往隊列中寫入一條消息
--數據庫表activemq_msgs中,新寫入了一條數據
|
--配置成功!!!
6.3.4 三張表說明
數據表名稱 |
作用 |
activemq_msgs |
存儲消息,Queue和Topic都存儲在這個表中 |
activemq_acks |
用於存儲訂閱關系。訂閱模式下有效 |
activemq_lock |
集群模式下,存儲主從節點關系 |
6.3.5 補充說明
Jdbc持久化方式,只要Mysql數據庫穩定運行,就能保證隊列中消息的安全。
安全級別高,但是效率低。
因此,在實際開發中,除非是像銀行這類對數據安全極高的業務,我們一般都是使用默認持久化方式kahadb。
7 ActiveMQ應用場景
7.1 多模塊解耦(模塊之間消息通訊)
我們判斷一個程序的優劣,有一個很重要的指標:高內聚、低耦合。
高內聚:同一個模塊中,功能是高度緊密的。
低耦合:各模塊之間,業務盡量不要交叉。
但是有一些業務功能,必須涉及到兩個不同的業務,那我們就要想辦法,盡量將它們解耦開來。
以我們前面學習的solr為例。我們知道solr的數據來自數據庫。這就意味着,當數據庫中的商品發生變化時,我們需要同步更新索引庫。
這個時候我們就可以使用消息隊列模型來解耦添加添加業務和同步索引庫業務。
|
--后面的電商項目中,會重點講解這個應用場景!!!
7.2 流量削峰(解決並發請求)
|
訂單處理,就可以由前端應用將訂單信息放到隊列,后端應用從隊列里依次獲得消息處理,高峰時的大量訂單可以積壓在隊列里慢慢處理掉。由於同步通常意味着阻塞,而大量線程的阻塞會降低計算機的性能。
7.3 日志處理
日志處理是指將消息隊列用在日志處理中,比如Kafka的應用,解決大量日志傳輸的問題。架構簡化如下:
|
7.4 同步業務異步處理
需要:當我們在網站注冊的時候,有時候需要認證郵箱或者手機號,這個時候保存數據到數據庫之前,需要先等待認證結束。如果說認證程序耗時比較大,會影響影響用戶注冊的業務。
這個時候,我們可以使用消息隊列模型,將同步執行的業務,通過隊列,變成異步處理
|
(1)在保存數據到數據庫的時候,只需要將用戶的郵箱寫入隊列,不需要等待郵箱認證程序執行結束,才把數據保存到數據庫。
(2)認證程序,通過監聽隊列,從中獲取用戶的郵箱地址,發送認證鏈接。
8 Spring整合ActiveMQ
8.1 必要性
Spring已經整合了jms規范了(spring-jms.jar),而ActiveMQ是實現了jms規范的。這就意味着Spring整合ActiveMQ是非常方便的。
並且Spring-jms,提供了一個JmsTemplate類,用來簡化消息讀寫的業務代碼。Spring整合ActivMQ之后,就可以使用該類,簡化開發!!!
8.2 需求
使用Spring整合ActiveMQ,模擬限時搶購下的流量削峰問題。
8.3 配置步驟說明
(1)搭建環境。(創建項目,導入jar包)
(2)spring整合SpringMVC。
(3)spring整合ActiveMQ
8.4 配置步驟
8.4.1 第一部分:創建項目(使用maven)
8.4.1.1 第一步:使用maven創建項目
--注意:maven創建web項目時,默認創建web.xml文件。
在/WEB-INF/目錄下,手動創建一個web.xml文件。
|
8.4.1.2 第二步:導入pom依賴
導包說明:
Spring核心包+AOP
common-logging
activemq核心包
spring整合jms包
jsp相關依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.gzsxt.activemq</groupId> <artifactId>activemq-demo-02-spring</artifactId> <version>1.0</version> <packaging>war</packaging>
<dependencies> <!-- ActiveMQ客戶端完整jar包依賴 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.9.0</version> </dependency>
<!-- Spring-JMS插件相關jar包依賴 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.6.RELEASE</version> </dependency> <!-- Spring框架上下文jar包依賴 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.6.RELEASE</version> </dependency> <!-- SpringMVC插件jar包依賴 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>4.1.6.RELEASE</version> </dependency> <!-- jsp相關 --> <dependency> <groupId>jstl</groupId> <artifactId>jstl</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> <version>2.5</version> <scope>provided</scope> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>jsp-api</artifactId> <version>2.0</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <!-- 配置Tomcat插件 --> <plugin> <groupId>org.apache.tomcat.maven</groupId> <artifactId>tomcat7-maven-plugin</artifactId> <version>2.2</version> <configuration> <port>9099</port> <path>/</path> </configuration> </plugin> </plugins> </build> </project> |
8.4.2 第二部分:spring整合springmvc
8.4.2.1 第一步:修改web.xml,配置springmvc核心控制器
<?xml version="1.0" encoding="UTF-8"?> <web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xml="http://www.w3.org/XML/1998/namespace" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_2_5.xsd "> <!-- 編碼過濾器 --> <filter> <filter-name>characterEncodingFilter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>utf-8</param-value> </init-param> </filter> <filter-mapping> <filter-name>characterEncodingFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping>
<!-- 配置springmvc核心控制器 --> <servlet> <servlet-name>dispatcherServlet</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:springmvc.xml</param-value> </init-param>
<load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>dispatcherServlet</servlet-name> <url-pattern>*.action</url-pattern> </servlet-mapping>
</web-app> |
8.4.2.2 第二步:配置springmvc.xml核心配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd">
<context:component-scan base-package="cn.gzsxt.controller" /> <mvc:annotation-driven />
</beans>
|
8.4.2.3 第三步:創建相關jsp頁面
--訂單頁面order.jsp
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>Insert title here</title> </head> <body> <form action="/save.action" method="post"> 用戶編號:<input type="text" name="userid"><br> 訂單金額:<input type="text" name="price"><br> <input type="submit" value="提交"> </form> </body> </html> |
--成功頁面success.jsp
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>Insert title here</title> </head> <body> 訂單提交成功!!!請稍后去結算中心支付。。。 </body> </html> |
8.4.2.4 第四步:java代碼實現
--創建訂單Order類
package cn.gzsxt.jms.pojo;
public class Order {
private Integer id;
private Integer userid;
private float price;
public Order() { super(); }
public Integer getId() { return id; }
public void setId(Integer id) { this.id = id; }
public Integer getUserid() { return userid; }
public void setUserid(Integer userid) { this.userid = userid; }
public float getPrice() { return price; }
public void setPrice(float price) { this.price = price; }
} |
--創建OrderController類
package cn.gzsxt.jms.controller;
import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping;
import cn.gzsxt.jms.pojo.Order;
@Controller public class OrderController {
@RequestMapping("/save.action") public String save(Order order){
System.out.println("當前提交的訂單用戶是:"+order.getUserid()+",訂單金額:"+order.getPrice());
return "/success.jsp"; } } |
8.4.2.5 第五步:整合測試
--以tomcat插件啟動項目,訪問訂單業務,提交訂單
|
--整合springmvc成功!!!
8.4.3 第三部分:Spring整合ActiveMQ
整合步驟說明:
(1)搭建ActiveMQ服務器。(已實現)
(2)創建消息生產者
(3)創建消息消費者
(4)spring整合activemq
8.4.3.1 第一步:搭建ActiveMQ服務器。(已實現)
8.4.3.2 第二步:創建消息生成者OrderProducer
--說明:在這里,我們注入JmsTemplate類,來簡化代碼
package cn.gzsxt.jms.producer;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component;
import cn.gzsxt.jms.pojo.Order;
@Component public class OrderProducer {
@Autowired private JmsTemplate jmsTemplate;
//注意:內部類調用外部類屬性,需要用final修飾 public void sendToMQ(final Order order){ //指定隊列名稱 order-mq jmsTemplate.send("order-mq", new MessageCreator() {
@Override public Message createMessage(Session session) throws JMSException { //ActiveMQ處理對象消息時,對象需要實現序列化 Message message = session.createObjectMessage(order);
return message; } }); } } |
--注意事項
(1)ActiveMQ處理對象時,對象必須實現序列化
--修改Order類,實現序列化接口
|
(2)匿名內部類訪問外部類屬性,該屬性需要用final修飾。
8.4.3.3 第三步:創建消息消費者OrderListener類
--這里使用監聽器模式
package cn.gzsxt.jms.listener;
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage;
import org.springframework.stereotype.Component;
import cn.gzsxt.jms.pojo.Order;
@Component public class OrderListener implements MessageListener{
@Override public void onMessage(Message message) {
if(null!=message){ ObjectMessage oMsg = (ObjectMessage) message;
try { Order order = (Order) oMsg.getObject(); System.out.println("當前提交的訂單用戶是:"+order.getUserid()+",訂單金額:"+order.getPrice()); /* * 偽代碼: * * orderDao.save(order); */ } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } |
8.4.3.4 第四步:spring整合ActiveMQ
--創建spring-jms.xml文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">
<!-- 1、配置activemq連接工程 使用連接池好處:鏈接只需要初始化一次,每次要使用的時候,直接從連接池獲取,用完之后還給連接池。省去了每次創建、銷毀連接的時間。 --> <bean name="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.23.12:61616"/> <property name="userName" value="admin"/> <property name="password" value="admin"/> </bean> </property> <property name="maxConnections" value="20"></property> </bean>
<!-- 2、spring整合activemq鏈接工廠 可以緩存session。 --> <bean name="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="pooledConnectionFactory"></property> <property name="sessionCacheSize" value="5"></property> </bean>
<!-- 3、spring整合消息操作對象JmsTemplate 使用jmsTemplate可以簡化代碼,不需要自己去創建消息的發送對象。 --> <bean name="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory"></property> </bean>
<!-- 4、spring加載監聽器 acknowledge="auto" 表示消息獲取之后,自動出隊列 container-type 表示的容器的類型 default|simple default:支持session緩存。 --> <jms:listener-container acknowledge="auto" container-type="default" destination-type="queue" connection-factory="cachingConnectionFactory"> <!-- 指定監聽器 destination="order-mq" 指定監聽的是哪一個隊列 ref="orderListener" 指定監聽器對象 使用注解的時候,對象的名稱是類名首字母小寫 --> <jms:listener destination="order-mq" ref="orderListener"/> </jms:listener-container>
</beans> |
8.4.3.5 第五步:修改web.xml文件,加載jms配置
<!-- 配置springmvc核心控制器 --> <servlet> <servlet-name>dispatcherServlet</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param> <param-name>contextConfigLocation</param-name> <!-- <param-value>classpath:springmvc.xml</param-value> --> <param-value>classpath:spring*.xml</param-value> </init-param>
<load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>dispatcherServlet</servlet-name> <url-pattern>*.action</url-pattern> </servlet-mapping> |
8.5 整合測試
8.5.1 第一步:修改OrderController類
--注入OrderProducer,修改業務邏輯
@Controller public class OrderController { @Autowired private OrderProducer producer;
@RequestMapping("/save.action") public String save(Order order){
// System.out.println("當前提交的訂單用戶是:"+order.getUserid()+",訂單金額:"+order.getPrice());
producer.sendToMQ(order);
return "/success.jsp"; } } |
8.5.2 第二步:重新啟動項目,提交多個訂單
--查看Eclipse控制台
|
--查看ActiveMQ控制台
|
--整合成功!!!