一、什么是ActiveMQ?
百度百科對ActiveMQ的描述:
二、ActiveMQ有什么優點?哪些情況下適合使用ActiveMQ?
2.1ActiveMQ 優點:
(1) 跨平台(JAVA編寫與平台無關有,ActiveMQ幾乎可以運行在任何的JVM上)
(2) 支持多種語言
(3) 降低系統間模塊的耦合度,解耦(消息的發送方和接收方並不需要彼此聯系,也不需要受對方的影響,即解耦和)
(4) 對Spring的,軟件擴展性好
(5) 自動重連功能
2.2 ActiveMQ的使用場景:
多個項目之間集成
系統前后端隔離,屏蔽高安全區(處理高並發)
再舉兩個例子:(來自知乎上的觀點);
例一:
假設用戶在你的軟件中注冊,服務端收到用戶的注冊請求后,它會做這些操作:
校驗用戶名等信息,如果沒問題會在數據庫中添加一個用戶記錄
如果是用郵箱注冊會給你發送一封注冊成功的郵件,手機注冊則會發送一條短信
分析用戶的個人信息,以便將來向他推薦一些志同道合的人,或向那些人推薦他
發送給用戶一個包含操作指南的系統通知 等等……
但是對於用戶來說,注冊功能實際只需要第一步,只要服務端將他的賬戶信息存到數據庫中他便可以登錄上去做他想做的事情了。至於其他的事情,非要在這一次請求中全 部完成么?值得用戶浪費時間等你處理這些對他來說無關緊要的事情么?所以實際當第一步做完后,服務端就可以把其他的操作放入對應的消息隊列中然后馬上返回用戶結 果,由消息隊列異步的進行這些操作。
或者還有一種情況,同時有大量用戶注冊你的軟件,再高並發情況下注冊請求開始出現一些問題,例如郵件接口承受不住,或是分析信息時的大量計算使cpu滿載,這將會出 現雖然用戶數據記錄很快的添加到數據庫中了,但是卻卡在發郵件或分析信息時的情況,導致請求的響應時間大幅增長,甚至出現超時,這就有點不划算了。面對這種情況一 般也是將這些操作放入消息隊列(生產者消費者模型),消息隊列慢慢的進行處理,同時可以很快的完成注冊請求,不會影響用戶使用其他功能。
所以在軟件的正常功能開發中,並不需要去刻意的尋找消息隊列的使用場景,而是當出現性能瓶頸時,去查看業務邏輯是否存在可以異步處理的耗時操作,如果存在的話便可 以引入消息隊列來解決。否則盲目的使用消息隊列可能會增加維護和開發的成本卻無法得到可觀的性能提升,那就得不償失了。
例二:
跨系統的異步通信,所有需要異步交互的地方都可以使用消息隊列。就像我們除了打電話(同步)以外,還需要發短信,發電子郵件(異步)的通訊方式。
多個應用之間的耦合,由於消息是平台無關和語言無關的,而且語義上也不再是函數調用,因此更適合作為多個應用之間的松耦合的接口。基於消息隊列的耦合,不需要發送 方和接收方同時在線。
在企業應用集成(EAI)中,文件傳輸,共享數據庫,消息隊列,遠程過程調用都可以作為集成的方法。
應用內的同步變異步,比如訂單處理,就可以由前端應用將訂單信息放到隊列,后端應用從隊列里依次獲得消息處理,高峰時的大量訂單可以積壓在隊列里慢慢處理掉。由於 同步通常意味着阻塞,而大量線程的阻塞會降低計算機的性能。
消息驅動的架構(EDA),系統分解為消息隊列,和消息制造者和消息消費者,一個處理流程可以根據需要拆成多個階段(Stage),階段之間用隊列連接起來,前一個階段 處理的結果放入隊列,后一個階段從隊列中獲取消息繼續處理。
應用需要更靈活的耦合方式,如發布訂閱,比如可以指定路由規則。
三、ActiveMQ官方(最新)下載地址 http://activemq.apache.org/
提示 Kafka 這個也是消息中間件也是Apche的這里不做研究
四、ActiveMQ官方(最新)下載地址
4.1 與Tomcat相似解壓即可, 解壓后
bin:其中包含MQ的啟動腳本
conf:包含MQ的所有配置文件
data:日志文件及持久性消息數據
example:MQ的示例
lib:MQ運行所需的所有Lib
webapps:MQ的Web控制台及一些相關的DEMO
activemq-all-5.5.0.jar:所有MQ JAR包的集合,用於用戶系統調用
4.2 啟動ActiveMQ, 進入bin后xp系統雙擊activemq.bat 如果win7系統是32或64位啟動對應的activemq.bat批處理文件
ActiveMQ四種協議的簡單介紹:
TCP協議:默認端口61616
TCP(Transmission Control Protocol 傳輸控制協議)是一種面向連接的、可靠的、基於字節流的傳輸層通信協議,由IETF的RFC 793定義。在簡化的計算機網絡OSI 模型中,它完成第四層傳輸層所指定的功能,用戶數據報協議(UDP)是同一層內[1] 另一個重要的傳輸協議。在因特網協議族(Internet protocol suite)中,TCP層 是位於IP層之上,應用層之下的中間層。不同主機的應用層之間經常需要可靠的、像管道一樣的連接,但是IP層不提供這樣的流機制,而是提供不可靠的包交換。
AMQP協議:默認端口5672
即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標准高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。基於此協議 的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不同產品,不同開發語言等條件的限制。
STOMP協議:默認端口61613
STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息協議,是一種為MOM(Message Oriented Middleware,面向消息的中間件)設計的簡單 文本協議
MQTT協議:默認端口1883
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸)是IBM開發的一個即時通訊協議,有可能成為物聯網的重要組成部分。該協議支持所有平台,幾 乎可以把所有聯網物品和外部連接起來,被用來當做傳感器和致動器(比如通過Twitter讓房屋聯網)的通信協議。
OpenWire協議:(😀控制台上沒有看到)官方網站對其的介紹如下:
OpenWire is our cross language Wire Protocol to allow native access to ActiveMQ from a number of different languages and platforms. The Java OpenWire transport is the default transport in ActiveMQ 4.x or later. For other languages see the following...
ActiveMQ默認使用的TCP連接端口是61616, 通過查看該端口的信息可以測試ActiveMQ是否成功啟動 netstat -an|find “61616”
4.3 訪問ActiveMQ的WEB管理員監控控制台 http://127.0.0.1:8161/admin/
4.3 停止ActiveMQ服務,按Ctrl+Shift+C,之后輸入y即可
注意:啟動ActiveMQ時閃退問題解決方案
解壓縮apache-activemq-5.14.0-bin.zip后雙擊/bin目錄下的activemq.bat批處理文件發現啟動窗口一閃而過無法啟動,最后找到原因是因為在環境變量-系統變量中
JAVA_HOME、classpath、Path這些系統變量中沒有添加JDK相關的安裝路徑。
JAVA_HOME下添加"D:\soft\Java\jdk1.7.0_09"
classpath下添加".;%JAVA_HOME%\lib;%JAVA_HOME%\lib\tools.jar"
Path下添加"%JAVA_HOME%\bin;"
特別注意如果系統變量中沒有classpath變量、JAVA_HOME變量則新增然后再添加上述相關路徑。
上述路徑是我本地JDK版本安裝的路徑。如果JDK安裝在其他版本下則相應的調整下就可以了。
五、使用TCP方式測試ActiveMQ:
Sender code
package yingyue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { private static final int SEND_NUMBER = 5; public static void main(String[] args) { // ConnectionFactory :連接工廠,JMS 用它創建連接 ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的連接 Connection connection = null; // Session: 一個發送或接收消息的線程 Session session; // Destination :消息的目的地;消息發送給誰. Destination destination; // MessageProducer:消息發送者 MessageProducer producer; // TextMessage message; // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { // 構造從工廠得到連接對象 connection = connectionFactory.createConnection(); // 啟動 connection.start(); // 獲取操作連接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置 destination = session.createQueue("TestQueue"); // 得到消息生成者【發送者】 producer = session.createProducer(destination); // 設置不持久化,此處學習,實際根據項目決定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 構造消息,此處寫死,項目就是參數,或者方法獲取 sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } public static void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 1; i <= SEND_NUMBER; i++) { TextMessage message = session .createTextMessage("ActiveMq " + "HelloWorld " + i); // 發送消息到目的地方 System.out.println("發送消息:" + "ActiveMq " + "HelloWorld " + i); producer.send(message); } } }
Receiver code
package yingyue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver { public static void main(String[] args) { // ConnectionFactory :連接工廠,JMS 用它創建連接 ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的連接 Connection connection = null; // Session: 一個發送或接收消息的線程 Session session; // Destination :消息的目的地;消息發送給誰. Destination destination; // 消費者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { // 構造從工廠得到連接對象 connection = connectionFactory.createConnection(); // 啟動 connection.start(); // 獲取操作連接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置 destination = session.createQueue("TestQueue"); consumer = session.createConsumer(destination); while (true) { //設置接收者接收消息的時間,為了便於測試,這里誰定為100s TextMessage message = (TextMessage) consumer.receive(100000); if (null != message) { System.out.println("收到消息:" + message.getText()); } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }
如下: