ActiveMQ
入門概述
MQ 的產品種類和對比
kafka
編程語言:scala。
大數據領域的主流MQ。
rabbitmq
編程語言:erlang
基於erlang語言,不好修改底層,不要查找問題的原因,不建議選用。
rocketmq
編程語言:java
適用於大型項目。適用於集群。
activemq
編程語言:java
適用於中小型項目。
MQ 的產生背景
系統之間直接調用存在的問題?
微服務架構后,鏈式調用是我們在寫程序時候的一般流程,為了完成一個整體功能會將其拆分成多個函數(或子模塊),比如模塊A調用模塊B,模塊B調用模塊C,模塊C調用模塊D。但在大型分布式應用中,系統間的RPC交互繁雜,一個功能背后要調用上百個接口並非不可能,從單機架構過渡到分布式微服務架構的通例。這些架構會有哪些問題?
系統之間接口耦合比較嚴重
每新增一個下游功能,都要對上游的相關接口進行改造;
舉個例子:如果系統A要發送數據給系統B和系統C,發送給每個系統的數據可能有差異,因此系統A對要發送給每個系統的數據進行了組裝,然后逐一發送;
當代碼上線后又新增了一個需求:把數據也發送給D,新上了一個D系統也要接受A系統的數據,此時就需要修改A系統,讓他感知到D系統的存在,同時把數據處理好再給D。在這個過程你會看到,每接入一個下游系統,都要對系統A進行代碼改造,開發聯調的效率很低。其整體架構如下

面對大流量並發時,容易被沖垮
每個接口模塊的吞吐能力是有限的,這個上限能力如果是堤壩,當大流量(洪水)來臨時,容易被沖垮。
舉個例子秒殺業務:上游系統發起下單購買操作,就是下單一個操作,很快就完成。然而,下游系統要完成秒殺業務后面的所有邏輯(讀取訂單,庫存檢查,庫存凍結,余額檢查,余額凍結,訂單生產,余額扣減,庫存減少,生成流水,余額解凍,庫存解凍)。
等待同步存在性能問題
RPC接口上基本都是同步調用,**整體的服務性能遵循 [ 木桶理論 ] **,即整體系統的耗時取決於鏈路中最慢的那個接口。比如A調用B/C/D都是50ms,但此時B又調用了B1,花費2000ms,那么直接就拖累了整個服務性能。

根據上述的幾個問題,在設計系統時可以明確要達到的目標:
1,要做到系統解耦,當新的模塊接進來時,可以做到代碼改動最小;能夠解耦
2,設置流量緩沖池,可以讓后端系統按照自身吞吐能力進行消費,不被沖垮;能削峰
3,強弱依賴梳理能將非關鍵調用鏈路的操作異步化並提升整體系統的吞吐能力;能夠異步
MQ 的主要作用
- 異步。調用者無需等待。
- 解耦。解決了系統之間耦合調用的問題。
- 削峰。抵御洪峰流量,保護了主業務。
MQ 的定義
面向消息的中間件(Message-Oriented Middleware)MOM能夠很好的解決以上問題。是指利用高效可靠的消息傳遞機制與平台無關的數據交流,並基於數據通信來進行分布式系統的集成。通過提供消息傳遞和消息排隊模型在分布式環境下提供應用解耦,彈性伸縮,冗余存儲、流量削峰,異步通信,數據同步等功能。
大致的過程是這樣的:發送者把消息發送給消息服務器,消息服務器將消息存放在若干隊列/主題topic中,在合適的時候,消息服務器回將消息轉發給接受者。在這個過程中,發送和接收是異步的,也就是發送無需等待,而且發送者和接受者的生命周期也沒有必然的關系;尤其在發布pub/訂閱sub模式下,也可以完成一對多的通信,即讓一個消息有多個接受者。( 類似微信公眾號 )

MQ 的特點
采用異步處理模式
消息發送者可以發送一個消息而無須等待響應。消息發送者將消息發送到一條虛擬的通道(主題或者隊列)上;
消息接收者則訂閱或者監聽該愛通道。一條消息可能最終轉發給一個或者多個消息接收者,這些消息接收者都無需對消息發送者做出同步回應。整個過程都是異步的。
案例:
也就是說,一個系統跟另一個系統之間進行通信的時候,假如系統A希望發送一個消息給系統B,讓他去處理。但是系統A不關注系統B到底怎么處理或者有沒有處理好,所以系統A把消息發送給MQ,然后就不管這條消息的“死活了”,接着系統B從MQ里面消費出來處理即可。至於怎么處理,是否處理完畢,什么時候處理,都是系統B的事兒,與系統A無關。
應用系統之間解耦合
發送者和接受者不必了解對方,只需要確認消息。
發送者和接受者不必同時在線。
MQ的缺點
兩個系統之間不能同步調用,不能實時回復,不能響應某個調用的回復。
安裝 ActiveMQ
安裝啟動 ActiveMQ
官網下載
https://activemq.apache.org/components/classic/download/
上傳壓縮包
上傳壓縮包到 Linux 系統的 opt
目錄下。
解壓
tar -zxf apache-activemq-5.16.2-bin.tar.gz
創建文件夾
mkdir /usr/local/activemq
移動解壓文件夾
mv apache-activemq-5.16.2 /usr/local/activemq/apache-activemq-5.16.2
啟動 ActiveMQ
cd /usr/local/activemq/apache-activemq-5.16.2/bin
進入 bin mul
./activemq start
啟動,默認的啟動端口 61616
./activemq restart
重新啟動
./activemq stop
關閉
ActiveMQ 控制台
ActiveMQ 占用的端口
后台端口:61616
前台端口:8161
打開 端口
firewall-cmd --permanent --add-port=8161/tcp
開放前台端口
firewall-cmd --permanent --add-port=61616/tcp
開放后台端口
firewall-cmd --reload
重新載入
修改 conf 目錄下的 jetty.xml 文件
將 host
屬性從 127.0.0.1
修改為 0.0.0.0

默認用戶名和密碼都為 amind

登錄后的頁面

Java 編碼實現 ActiveMQ 通信
IDEA 創建 Maven 工程

POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>activemq</artifactId>
<groupId>org.hong</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>activemq-demo</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- activemq 所需要的jar 包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.2</version>
</dependency>
<!-- activemq 和 spring 整合的基礎包-->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.18</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
</project>
JMS編碼總體規范

Destination是目的地。Destination分為兩種:隊列 ( 一對一 ) 和主題 ( 一對多 )。
隊列 ( Queue ) 案例
隊列消息生產者的入門案例
代碼實現
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
// 1.按照給定的url, 創建連接工廠, 使用默認的用戶名和密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 2.通過連接工廠獲得連接connection並啟動
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.創建會話session
// 3.1.參數一: 事務 參數二: 簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.創建目的地(具體是隊列還是主題topic)
Queue queue = session.createQueue(QUEUE_NAME);
// 5.創建消息的生產者
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 3; i++) {
// 6.Session創建消息
TextMessage textMessage = session.createTextMessage("msg---" + i);// 理解為一個字符串
// 7.MessageProducer發送消息給MQ
producer.send(textMessage);
}
// 8.關閉資源
producer.close();
session.close();
connection.close();
System.out.println("消息發布到MQ完成");
}
}
ActiveMQ 控制台

谷歌翻譯:

ActiveMQ 控制台列說明
英文 | 解釋 | 詳細信息 |
---|---|---|
Number Of Pending Messages | 等待消費的消息 | 這個是未出隊列的數量,公式=總接收數-總出隊列數。 |
Number Of Consumers | 消費者數量 | 消費者端的消費者數量。只計算當前為連接狀態的。 |
Messages Enqueued | 進隊列的總消息量 | 包括出隊列的。這個數只增不減。 |
Messages Dequeued | 出隊消息數 | 可以理解為是消費者消費掉的數量。 |
總結:
當有一個消息進入這個隊列時,等待消費的消息是1,進入隊列的消息是1。
當消息消費后,等待消費的消息是0,進入隊列的消息是1,出隊列的消息是1。
當再來一條消息時,等待消費的消息是1,進入隊列的消息就是2。
隊列消息消費者的入門案例
代碼實現
介紹
同步阻塞方式( receive()
)。訂閱者或接收者調用MessageConsumer的receive()方法來接收消息,receive方法在能夠接收到消息之前 ( 或超時之前 ) 將一直阻塞。
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsumer {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
// 1.按照給定的url, 創建連接工廠, 使用默認的用戶名和密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 2.通過連接工廠獲得連接connection並啟動
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.創建會話session
// 3.1.參數一: 事務 參數二: 簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.創建目的地(具體是隊列還是主題topic)
Queue queue = session.createQueue(QUEUE_NAME);
// 5.創建消息消費者
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
// 6.消費消息
TextMessage textMessage = (TextMessage) consumer.receive();
if(textMessage != null){
System.out.println("消費者接收到消息:" + textMessage.getText());
}else{
break;
}
}
// 7.關閉資源
consumer.close();
session.close();
connection.close();
}
}
控制台
消息正常取出,但是程序並沒有結束,依據在運行;由於我們調用的是 receive() 無參方法
,消費者會一直等待,直到獲取到消息,因此程序不會停止。

ActiveMQ 控制台
消息消費者數量為1。

receive 方法詳解
receive() 空參方法
有消息取出消息;沒有消息一直等待,直到有消息。
receive(long time) 帶參方法
有消息取出消息;沒有消息等待指定毫秒數,如果還是沒有消息,返回 null。
Number Of Consumers 介紹
上面的案例:消費者端使用的是 receive() 空參方法
,ActiveMQ 控制台中 Number Of Consumers
的值為1。
現在強制結束消費者端的程序,再次查看 ActiveMQ 控制台中 Number Of Consumers
的值變成了0。

異步監聽式消費者(MessageListener)
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumer {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
// 1.按照給定的url, 創建連接工廠, 使用默認的用戶名和密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 2.通過連接工廠獲得連接connection並啟動
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.創建會話session
// 3.1.參數一: 事務 參數二: 簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.創建目的地(具體是隊列還是主題topic)
Queue queue = session.createQueue(QUEUE_NAME);
// 5.創建消息消費者
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(message -> {
// message就是監聽器獲得到的消息對象
if(message != null && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消費者接收到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 讓主線程不要結束。因為一旦主線程結束了,其他的線程(如此處的監聽消息的線程)也都會被迫結束。
// 實際開發中,我們的程序會一直運行,這句代碼都會省略。
System.in.read();
consumer.close();
session.close();
connection.close();
}
}
隊列消息(Queue)總結
兩種消費方式
同步阻塞方式(receive)
訂閱者或接收者抵用MessageConsumer的receive()方法來接收消息,receive方法在能接收到消息之前(或超時之前)將一直阻塞。
異步非阻塞方式(監聽器onMessage())
訂閱者或接收者通過MessageConsumer的setMessageListener(MessageListener listener)注冊一個消息監聽器,當消息到達之后,系統會自動調用監聽器MessageListener的onMessage(Message message)方法。
隊列的特點
- 每個消息只能有一個消費者,類似1對1的關系。好比個人快遞自己領自己的。
- 消息的生產者和消費者之間沒有時間上的相關性。無論消費者在生產者發送消息的時候是否處於運行狀態,消費者都可以提取消息。好比我們發送短信,發送者發送后不見得接收者會即收即看。
- 消息被消費后隊列中不會再存儲,所以消費者不會消費到已經被消費掉的消息。
消息消費情況
情況1:只啟動消費者1。
結果:消費者1會消費所有的數據。
情況2:先啟動消費者1,再啟動消費者2。
結果:消費者1消費所有的數據。消費者2不會消費到消息。
情況3:生產者發布6條消息,在此之前已經啟動了消費者1和消費者2。
結果:消費者1和消費者2平攤了消息。各自消費3條消息。
疑問:怎么去將消費者1和消費者2不平均分攤呢?而是按照各自的消費能力去消費。我覺得,現在ActiveMQ就是這樣的機制。
JMS 編碼步驟

- 創建 ConnectionFacory
- 通過 ConnectionFacory 創建 JMS Connection
- 啟動 JMS Connection
- 通過 Connection 創建 JMS Session
- 創建 JMS Destination
- 創建 JMS Producer 或者創建 JMS Message 並設置 Destination
- 創建 JMS Consumer 或者是注冊一個 JMS Message Listener
- 發送或者接收 JMS Message
- 關閉所有的 JMS 資源 ( Connection、Session、Producer、Consumer 等 )
主題 ( Topic ) 案例
介紹
在發布訂閱消息傳遞域中,目的地被稱為主題(topic)
發布/訂閱消息傳遞域的特點如下:
- 生產者將消息發布到topic中,每個消息可以有多個消費者,屬於1:N的關系;、
- 生產者和消費者之間有時間上的相關性。訂閱某一個主題的消費者只能消費自它訂閱之后發布的消息。
- 生產者生產時,topic不保存消息它是無狀態的不落地,假如無人訂閱就去生產,那就是一條廢消息,所以,一般先啟動消費者再啟動生產者。
默認情況下如上所述,但是JMS規范允許客戶創建持久訂閱,這在一定程度上放松了時間上的相關性要求。持久訂閱允許消費者消費它在未處於激活狀態時發送的消息。一句話,好比我們的微信公眾號訂閱
主題生產者入門案例
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduceTopic {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException {
// 1.按照給定的url, 創建連接工廠, 使用默認的用戶名和密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 2.通過連接工廠獲得連接connection並啟動
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.創建會話session
// 3.1.參數一: 事務 參數二: 簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.創建目的地(具體是隊列還是主題topic)
Topic topic = session.createTopic(TOPIC_NAME);
// 5.創建消息的生產者
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < 3; i++) {
// 6.Session創建消息
TextMessage textMessage = session.createTextMessage("topicmsg---" + i);// 理解為一個字符串
// 7.MessageProducer發送消息給MQ
producer.send(textMessage);
}
// 8.關閉資源
producer.close();
session.close();
connection.close();
System.out.println("Topic消息發布到MQ完成");
}
}
主題消費者入門案例
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumerTopic {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException, IOException {
// 1.按照給定的url, 創建連接工廠, 使用默認的用戶名和密碼
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 2.通過連接工廠獲得連接connection並啟動
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3.創建會話session
// 3.1.參數一: 事務 參數二: 簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4.創建目的地(具體是隊列還是主題topic)
Topic topic = session.createTopic(TOPIC_NAME);
// 5.創建消息消費者
MessageConsumer consumer = session.createConsumer(topic);
// 6.使用監聽的方式消費消息
consumer.setMessageListener(message -> {
// message就是監聽器獲得到的消息對象
if(message != null && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消費者接收到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 7.關閉資源
System.in.read();
consumer.close();
session.close();
connection.close();
}
}
測試
先啟動消費者,再啟動生產者
啟動2個消費者
IDEA 設置一下啟動項。

ActiveMQ 控制台
可以看到 topic01
現在有2個消費者,正常;其他的主題是 ActiveMQ 自動生成的,不用管,后面再說。

啟動生產者
剛剛生成出來的消息立馬被消費掉,正常。
生產者控制台打印
消息生產完成

消費者控制台打印
1號消費者和2號消費者都消費了3條消息


ActiveMQ 控制台

先啟動生產者,再啟動消費者
啟動生產者

啟動消費者
控制台打印
沒有消費到任何消息

ActiveMQ 控制台
出隊消息為0

隊列和主題的比較
比較項目 | Topic 模式隊列 | Queue 模式隊列 |
---|---|---|
工作模式 | "訂閱-發布" 模式,如果當前沒有訂閱者,消息將會被丟棄。如果有多個訂閱者,那么這些訂閱者都會收到消息 | "負載均衡" 模式,如果當前沒有消費者,消息也不會丟棄;如果有多個消費者,那么一條消息也只會發送給其中一個消費者,並且要求消費者ack消息。 |
有無狀態 | 無狀態 | Queue數據默認會在MQ服務器上以文件行形式保存,比如 ActiveMQ 一般保存在 $AMQ_HOME\data\kr-store\data 下面,也可以配置成 DB 存儲。 |
傳遞完整性 | 如果沒有訂閱者,消息會被丟棄。 | 消息不會被丟棄。 |
處理效率 | 由於消息要按照訂閱者的數量進行復制,所以處理性能會隨着訂閱者的增加而明顯降低,並且還要結合不同的消息協議自身的性能差異。 | 由於一條消息只發送給一個消費者,所有就算消費者再多,性能也不會有明顯的降低。當然不同消息協議的具體性能也是有差異的。 |
JMS 規范
JMS 是什么
什么是Java消息服務?
Java消息服務指的是兩個應用程序之間進行異步通信的API,它為標准協議和消息服務提供了一組通用接口,包括創建、發送、讀取消息等,用於支持Java應用程序開發。在JavaEE中,當兩個應用程序使用JMS進行通信時,它們之間不是直接相連的,而是通過一個共同的消息收發服務組件關聯起來以達到解耦/異步削峰的效果。
JMS 的組成結構和特點

消息頭
MS 的消息頭有哪些屬性:
- JMSDestination:消息目的地
- JMSDeliveryMode:消息持久化模式
- JMSExpiration:消息過期時間
- JMSPriority:消息的優先級
- JMSMessageID:消息的唯一標識符。后面我們會介紹如何解決冪等性。
說明: 消息的生產者可以 set 這些屬性,消息的消費者可以 get 這些屬性。這些屬性在 send 方法里面也可以設置。
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduceTopic {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("topicmsg---" + i);// 理解為一個字符串
// 這里可以指定每個消息的目的地
textMessage.setJMSDestination(topic);
/*
* 持久模式和非持久模式。
* 一條持久性的消息:應該被傳送“一次僅僅一次”,這就意味着如果JMS提供者出現故障,該消息並不會丟失,它會在服務器恢復之后再次傳遞。
* 一條非持久的消息:最多會傳遞一次,這意味着服務器出現故障,該消息將會永遠丟失。
*/
textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
/*
* 可以設置消息在一定時間后過期,默認是永不過期。
* 消息過期時間,等於Destination的send方法中的timeToLive值加上發送時刻的GMT時間值。
* 如果timeToLive值等於0,則JMSExpiration被設為0,表示該消息永不過期。
* 如果發送后,在消息過期時間之后還沒有被發送到目的地,則該消息被清除。
*/
textMessage.setJMSExpiration(1000);
/*
* 消息優先級,從0-9十個級別,0-4是普通消息5-9是加急消息。
* JMS不要求MQ嚴格按照這十個優先級發送消息但必須保證加急消息要先於普通消息到達。默認是4級。
*/
textMessage.setJMSPriority(10);
// 唯一標識每個消息的標識。MQ會給我們默認生成一個,我們也可以自己指定。
textMessage.setJMSMessageID("ABCD");
// 上面有些屬性在send方法里也能設置
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("Topic消息發布到MQ完成");
}
}
消息體
概述
消息體是具體封裝的消息數據,一共有5中消息體格式;發送和接收的消息體類型必須一致。
消息體格式
- TextMessage:普通字符串消息,包含一個 String
- MapMessage:一個 Map 類型的消息,key 為 String 類型,值為 Java 的基本數據類型和 String 類型
- BytesMessage:二進制數組消息,包含一個 byte[]
- StreamMessage:Java 數據流消息,用標准流操作來順序的填充和讀取。
- ObjectMessage:對象消息,包含一個可序列化的 Java 對象
生產者代碼
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 3; i++) {
// 發送TextMessage
TextMessage textMessage = session.createTextMessage("msg---" + i);
producer.send(textMessage);
// 發送MapMessage
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("k1", "v" + i); // MQ中MapMessage中的key是可以重復的
producer.send(mapMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("消息發布到MQ完成");
}
}
消費者代碼
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumer {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(message -> {
// 獲取TextMessage對象
if(message != null && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消費者接收到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
// 獲取MapMessage對象
if(message != null && message instanceof MapMessage){
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("消費者接收到消息:" + mapMessage.getString("k1"));
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
consumer.close();
session.close();
connection.close();
}
}
測試
順序任意,查看消費者控制台打印。

雖然我們設置的鍵都是 k1
,依舊取出了3個值,並且值沒有重復。
消息屬性
如果需要除消息頭字段之外的值,那么可以使用消息屬性。他是識別/去重/重點標注等操作,非常有用的方法。
他們是以屬性名和屬性值對的形式制定的。可以將屬性是為消息頭得擴展,屬性指定一些消息頭沒有包括的附加信息,比如可以在屬性里指定消息選擇器。消息的屬性就像可以分配給一條消息的附加消息頭一樣。它們允許開發者添加有關消息的不透明附加信息。它們還用於暴露消息選擇器在消息過濾時使用的數據。
消息屬性的 API:


JMS 的可靠性
事務偏生產方,簽收偏消費方。
PERSISTENT 持久性
參數設置說明
可以設置消息生產者生產的消息整體是否持久化,或者設置消息是否持久化 ( 使用 JMS 消息頭 )。默認是持久化的。
非持久化
服務器宕機,消息不存在。
// 生產者對象設置是否持久化
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
持久化
當服務器宕機,消息依然存在
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
持久的 Queue
持久化消息這是隊列的默認傳送模式,此模式保證這些消息只被傳送一次和成功使用一次。對於這些消息,可靠性是優先考慮的因素。
可靠性的另一個重要方面是確保持久性消息傳送至目標后,消息服務在向消費者傳送它們之前不會丟失這些消息。
持久的 Topic
topic默認就是非持久化的,因為生產者生產消息時,消費者也要在線,這樣消費者才能消費到消息。
topic消息持久化,只要消費者向MQ服務器注冊過,所有生產者發布成功的消息,該消費者都能收到,不管是MQ服務器宕機還是消費者不在線。
注意:
- 一定要先運行一次消費者,等於向MQ注冊,類似我訂閱了這個主題。
- 然后再運行生產者發送消息。
- 之后無論消費者是否在線,都會收到消息。如果不在線的話,下次連接的時候,會把沒有收過的消息都接收過來。
持久化topic生產者代碼
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduceTopicPersistence {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String TOPIC_NAME = "topic-Persistence";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
// 設置持久化topic
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("topicmsg---" + i);
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("TopicPersistence消息發布到MQ完成");
}
}
持久化topic消費者代碼
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumerTopicPersistence {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String TOPIC_NAME = "topic-Persistence";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
// 設置客戶端ID。向MQ服務器注冊自己的名稱; 先設置再啟動start
connection.setClientID("z3");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
// 創建一個topic訂閱者對象。參數一是topic,參數二是訂閱名稱
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "topicTest");
topicSubscriber.setMessageListener(message -> {
if(message != null && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("訂閱者接收到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
topicSubscriber.close();
session.close();
connection.close();
}
}
測試
先啟動一次訂閱者向 ActiveMQ 注冊自己


訂閱者斷開連接

生產者生產消息

消息生產后再次啟動訂閱者
由於訂閱者向MQ訂閱了消息,雖然消息生產時訂閱者不在線,但是等到訂閱者再次上線后,訂閱者依舊能接收到消息

Transaction 事務
關閉事務
只要執行 send
,就進入隊列中。關閉事務,那第二個簽收參數的設置需要有效
開啟事務
先執行 send
再執行 commit
,消息才被真正的提交到隊列中。消息需要批量發送,需要緩沖區處理。
生產者案例
開啟事務不提交
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduceTx {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 參數一設置為true, 開啟事務
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("msg---" + i);
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("消息發布到MQ完成");
}
}
運行程序觀察 ActiveMQ 控制台
消息並沒有進入到隊列中

開啟事務回滾
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduceTx {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 參數一設置為true, 開啟事務
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("msg---" + i);
producer.send(textMessage);
}
// 回滾事務
session.rollback();
producer.close();
session.close();
connection.close();
System.out.println("消息發布到MQ完成");
}
}
運行程序觀察 ActiveMQ 控制台
消息並沒有進入到隊列中

開啟事務提交
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduceTx {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 參數一設置為true, 開啟事務
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("msg---" + i);
producer.send(textMessage);
}
// 提交事務
session.commit();
producer.close();
session.close();
connection.close();
System.out.println("消息發布到MQ完成");
}
}
運行程序觀察 ActiveMQ 控制台

消費者案例
開啟事務不提交
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumerTx {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 參數一設置為true, 開啟事務
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive(4000L);
if(textMessage != null){
System.out.println("消費者接收到消息:" + textMessage.getText());
}else{
break;
}
}
consumer.close();
session.close();
connection.close();
}
}
運行程序,觀察控制台打印和 ActiveMQ 控制台
消息被消費,但是 ActiveMQ 的隊列中,消息依舊存在,會造成重復消費


開啟事務回滾
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumerTx {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 參數一設置為true, 開啟事務
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive(4000L);
if(textMessage != null){
System.out.println("消費者接收到消息:" + textMessage.getText());
}else{
break;
}
}
// 回滾事務
session.rollback();
consumer.close();
session.close();
connection.close();
}
}
運行程序,觀察控制台打印和 ActiveMQ 控制台
消息被消費,但是 ActiveMQ 的隊列中,消息依舊存在,也會造成重復消費。


開啟事務提交
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumerTx {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 參數一設置為true, 開啟事務
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive(4000L);
if(textMessage != null){
System.out.println("消費者接收到消息:" + textMessage.getText());
}else{
break;
}
}
// 回滾事務
session.commit();
consumer.close();
session.close();
connection.close();
}
}
運行程序,觀察控制台打印和 ActiveMQ 控制台
消息被消費,但是 ActiveMQ 的隊列中,消息依舊存在,也會造成重復消費。


Acknowledge 簽收
非事務
自動簽收
// Session.AUTO_ACKNOWLEDGE 自動簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
這里就不演示了,效果就於入門案例一樣。沒什么好說的。
手動簽收
// Session.CLIENT_ACKNOWLEDGE 手動簽收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
實例代碼
故名思意,如果不簽收消息不會出隊列,會造成重復消費。
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumerTx {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 參數二設置為2(Session.CLIENT_ACKNOWLEDGE), 代表手動簽收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive(4000L);
if(textMessage != null){
System.out.println("消費者接收到消息:" + textMessage.getText());
// Message對象調用acknowledge()方法進行簽收
textMessage.acknowledge();
}else{
break;
}
}
consumer.close();
session.close();
connection.close();
}
}
允許重復消息
Session.DUPS_OK_ACKNOWLEDGE
事務
事務開啟后,只有 commit
才能將全部消息變為以消費。
自己試試看把!
簽收和事務的關系
- 在事務性會話中,當一個事務被成功提交則消息被自動簽收。如果事務回滾,則消息會被再次傳送。
- 非事務性會話中,消息何時被確認取決於創建會話時的應答模式 ( acknowledgement mode )
- 事務大於簽收
ActiveMQ 的 Borker
簡介
用 ActiveMQ Broker 作為獨立的消息服務器來構建 JAVA 應用。ActiveMQ 也支持在 vm 中通信基於嵌入式的 broker,能夠無縫的集成其他 Java 應用。
說白了,Broker 啟動就是實現了用代碼的形式啟動 ActiveMQ 將 MQ 嵌入到 Java 代碼中,一邊隨時用隨時啟動,在用的時候再去啟動這樣能節省資源,也保證了可靠性。
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>activemq</artifactId>
<groupId>org.hong</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>activemq-demo</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- activemq 所需要的jar 包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.2</version>
</dependency>
<!-- activemq 和 spring 整合的基礎包-->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.18</version>
</dependency>
<!-- 引入這個jar包是為了解決(Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.ObjectMapper)錯誤 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
</project>
代碼示例
啟動這個程序就相當於啟動了一個 ActiveMQ。
package org.hong.activemq;
import org.apache.activemq.broker.BrokerService;
public class EmbedBroker {
public static void main(String[] args) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setUseJmx(true);
brokerService.addConnector("tcp://localhost:61616");
brokerService.start();
System.in.read();
}
}
Spring 整合 ActiveMQ
Maven
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>activemq</artifactId>
<groupId>org.hong</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>activemq-demo</artifactId>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- activemq 所需要的jar 包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.2</version>
</dependency>
<!-- activemq 和 spring 整合的基礎包-->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.20</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
<!-- activeMQ jms 的支持 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.3.7</version>
</dependency>
<!-- pool 池化包相關的支持 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.16.2</version>
</dependency>
<!-- aop 相關的支持 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.3.7</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.7</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
</project>
application.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="org.hong.activemq.spring"/>
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.200.130:61616"></property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!-- 創建一個隊列目的地 -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-active-queue"></constructor-arg>
</bean>
<!-- jms 的工具類 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 注入連接工廠 -->
<property name="connectionFactory" ref="jmsFactory"/>
<!-- 注入默認的目的地 -->
<property name="defaultDestination" ref="destinationQueue"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
</beans>
隊列生產者示例
代碼示例
package org.hong.activemq.spring;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
@Service
public class SpringMQProduce {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:application.xml");
SpringMQProduce produce = context.getBean(SpringMQProduce.class);
produce.sendTextMessage("Spring 整合 ActiveMQ 案例");
System.out.println("Send Tack Over");
}
public void sendTextMessage(String text){
/*
* 我們只需要將目的地和消息給JmsTemplate, Spring會將Session對象傳遞給我們, 我們根據Session創建Message對象並返回即可
* JmsTemplate會根據目的地自動創建對於的produce並在發送結束后自動關閉produce
* 因為我們在配置文件中給JmsTemplate注入了一個默認的目的地, 因此可以不用指定目的地, JmsTemplate會使用默認的目的地。
*/
jmsTemplate.send(session -> session.createTextMessage(text));
}
}
測試
運行上面的程序

隊列消費者示例
代碼示例
package org.hong.activemq.spring;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
@Service
public class SpringMQConsumer {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:application.xml");
SpringMQConsumer consumer = context.getBean(SpringMQConsumer.class);
Object messageValue = consumer.getMessageValue();
System.out.println(messageValue);
}
public Object getMessageValue(){
// 沒有指定目的地, 使用默認的目的地
return jmsTemplate.receiveAndConvert();
}
}
測試
運行上面的程序
正常取出之前放入的消息,消息也被正常消費。


主題示例
修改 application.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="org.hong.activemq.spring"/>
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.200.130:61616"></property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!-- 創建一個隊列目的地 -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-active-queue"></constructor-arg>
</bean>
<!-- ================== 添加了一個Topic目的地, 並修改了jmsTemplate的默認目的地 ======================== -->
<!-- 創建一個主題目的地 -->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-active-topic"></constructor-arg>
</bean>
<!-- jms 的工具類 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 注入連接工廠 -->
<property name="connectionFactory" ref="jmsFactory"/>
<!-- 注入默認的目的地 -->
<property name="defaultDestination" ref="destinationTopic"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
</beans>
代碼無需修改
測試
先啟動消費者再啟動生產者。正常收到消息。

監聽器配置
修改 application.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="org.hong.activemq.spring"/>
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.200.130:61616"></property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!-- 創建一個隊列目的地 -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-active-queue"></constructor-arg>
</bean>
<!-- 創建一個主題目的地 -->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-active-topic"></constructor-arg>
</bean>
<!-- 向Spring容器中注冊一個MQ的監聽器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="destination" ref="destinationTopic"/>
<property name="messageListener" ref="myMessageListener"/>
</bean>
<!-- jms 的工具類 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 注入連接工廠 -->
<property name="connectionFactory" ref="jmsFactory"/>
<!-- 注入默認的目的地 -->
<property name="defaultDestination" ref="destinationTopic"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
</beans>
自定義監聽器
package org.hong.activemq.spring;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
@Component
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
測試
直接啟動生產者
當我們向 MQ 中放入消息時,我們注冊的監聽器立刻就感知到了新的消息,並取出來進行消費。


SpringBoot 整合 ActiveMQ
隊列
隊列生產者
新建 Maven 工程
工程名:boot-mq-produce
包名:org.hong.boot.activemq
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-boot-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.2.2.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.hong</groupId>
<artifactId>boot-mq-produce</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
</project>
YAML
server:
port: 7777
spring:
activemq:
# MQ服務器地址
broker-url: tcp://192.168.200.130:61616
# 用戶名和密碼
user: admin
password: admin
jms:
# false=Queue true=Topic
pub-sub-domain: false
# 自己定義隊列名稱
myqueue: boot-activemq-queue
主啟動
package org.hong.boot.activemq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;
@SpringBootApplication
@EnableJms // 開啟JMS
public class MainApp {
public static void main(String[] args) {
SpringApplication.run(MainApp.class, args);
}
}
Config 配置類
package org.hong.boot.activemq.config;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.Queue;
@Configuration
public class ConfigBean {
@Value("${myqueue}")
private String myQueue;
@Bean
public Queue queue(){
return new ActiveMQQueue(myQueue);
}
}
生產者
package org.hong.boot.activemq.produce;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
@Component
public class QueueProduce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;// JmsMessagingTemplate是JmsTemplate的加強版
@Autowired
private Queue queue;
public void produceMsg(){
jmsMessagingTemplate.convertAndSend(queue, "SpringBoot 整合 ActiveMQ");
}
}
測試單元
package org.hong.boot.activemq;
import org.hong.boot.activemq.produce.QueueProduce;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
@SpringBootTest(classes = MainApp.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class TestActiveMQ {
@Autowired
private QueueProduce queueProduce;
@Test
void testQueueProduceSend(){
queueProduce.produceMsg();
}
}
ActiveMQ 控制台

定時發送
案例修改
修改QueueProduce新增定時投遞方法
// 間隔3秒定投
@Scheduled(fixedDelay = 3000)
public void produceMsgScheduled(){
jmsMessagingTemplate.convertAndSend(queue, "Scheduled" + UUID.randomUUID().toString().substring(0, 6));
System.out.println("produceMsgScheduled send ok");
}
主啟動添加 @EnableScheduling
注解
package org.hong.boot.activemq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableJms // 開啟JMS
@EnableScheduling // 開啟Scheduled注解
public class MainApp {
public static void main(String[] args) {
SpringApplication.run(MainApp.class, args);
}
}
測試
直接運行主啟動了,定時投遞自動運行。自行觀察控制台。
隊列消費者
新建 Maven 工程
工程名:boot-mq-consumer
包名:org.hong.boot.activemq
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-boot-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.2.2.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.hong</groupId>
<artifactId>boot-mq-consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
</project>
YAML
server:
port: 8888
spring:
activemq:
# MQ服務器地址
broker-url: tcp://192.168.200.130:61616
# 用戶名和密碼
user: admin
password: admin
jms:
# false=Queue true=Topic
pub-sub-domain: false
# 自己定義隊列名稱
myqueue: boot-activemq-queue
主啟動
package org.hong.boot.activemq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;
@SpringBootApplication
@EnableJms
public class MainApp {
public static void main(String[] args) {
SpringApplication.run(MainApp.class, args);
}
}
消費者
package org.hong.boot.activemq.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
@Component
public class QueueConsumer {
// 使用@JmsListener標注這個方法是監聽方法, 並指定目的地
// 因為我們在application.yaml中配置的是隊列模式, 因此SpringBoot會根據名字創建隊列的目的地
@JmsListener(destination = "${myqueue}")
public void receive(TextMessage textMessage) throws JMSException {
System.out.println("消費者收到消息: " + textMessage.getText());
}
}
發布訂閱
主題生產者
新建 Maven 工程
工程名:boot-mq-topic-produce
包名:org.hong.boot.activemq.topic
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-boot-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.2.2.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.hong</groupId>
<artifactId>boot-mq-topic-produce</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
</project>
YAML
server:
port: 6666
spring:
activemq:
# MQ服務器地址
broker-url: tcp://192.168.200.130:61616
# 用戶名和密碼
user: admin
password: admin
jms:
# false=Queue true=Topic
pub-sub-domain: true
# 自己定義主題名稱
mytopic: boot-activemq-topic
主啟動
package org.hong.boot.topic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;
@SpringBootApplication
@EnableScheduling
@EnableJms
public class MainApp {
public static void main(String[] args) {
SpringApplication.run(MainApp.class, args);
}
}
Config 配置類
package org.hong.boot.topic.config;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.Topic;
@Configuration
public class ConfigBean {
@Value("${mytopic}")
private String myTopic;
@Bean
public Topic topic(){
return new ActiveMQTopic(myTopic);
}
}
生產者
package org.hong.boot.topic.produce;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.jms.Topic;
import java.util.UUID;
@Component
public class TopicProduce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
@Scheduled(fixedDelay = 3000)
public void produceTopic(){
jmsMessagingTemplate.convertAndSend(topic, "主題消息發布" + UUID.randomUUID().toString().substring(0, 6));
}
}
主題消費者
新建 Maven 工程
工程名:boot-mq-topic-consumer
包名:org.hong.boot.activemq.topic
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>activemq</artifactId>
<groupId>org.hong</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.hong</groupId>
<artifactId>boot-mq-topic-consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
</project>
YAML
server:
port: 5555
spring:
activemq:
# MQ服務器地址
broker-url: tcp://192.168.200.130:61616
# 用戶名和密碼
user: admin
password: admin
jms:
# false=Queue true=Topic
pub-sub-domain: true
# 自己定義主題名稱
mytopic: boot-activemq-topic
主啟動
package org.hong.boot.activemq.topic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;
@SpringBootApplication
@EnableJms
public class MainApp {
public static void main(String[] args) {
SpringApplication.run(MainApp.class, args);
}
}
消費者
package org.hong.boot.activemq.topic.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
@Component
public class TopicConsumer {
@JmsListener(destination = "${mytopic}")
public void receive(TextMessage textMessage) throws JMSException {
System.out.println("消費者收到訂閱的主題: " + textMessage.getText());
}
}
主題測試
先啟動消費者再啟動生產者
啟動多個消費者,IDEA 啟動多個微服務。
每隔3秒,消費者服務會消費一條消息。

ActiveMQ 的傳輸協議
簡介
ActiveMQ 支持的 client-broker 通信協議有:TCP、NIO、UDP、SSL、Http(s)、VM。
啟動配置 Transport Connector
的文件在 ActiveMQ 安裝目錄的 conf/activemq.xml
中的 transportConnectors
標簽之內。

在上文給出的配置文件中。
URI 描述消息的頭部都是采用協議名稱:
描述 amqp 協議的監聽端口時,采用的 URI 描述格式為 amqp://...
。
唯獨在進行 openwire
協議描述時,URI 頭卻采用 tcp://
。這是因為 ActiveMQ 中默認的消息協議就是 openwire。
傳輸協議簡介
Transmission Control Protocol ( TCP )
- 這是默認的 Broker 配置,TCP 的 Client 監聽端口 61616
- 在網絡創數數據前,必須要序列化數據,消息是通過一個叫
wire protocol
來序列化成字節流。默認情況下 ActiveMQ 把wire protocol
叫做OpenWire
,它的目的是促使網絡上的效率和數據快速交互。 - TCP 連接的 URI 形式如:tcp://hostname:port?key=value&key=value,后面是參數是可選的
- TCP 傳輸的優點
- TCP 協議傳輸可靠性高,穩定性強
- 高效性:字節流方式傳遞,效率很高
- 有效性、可用性:應用廣泛,支持任何平台
- 關於 Transport 協議的可配置參數可以參考官網:http://activemq.apache.org/configuring-version-5-transports.html
NEW I/O API Protocol ( NIO )
- NIO 協議和 TCP 協議類似但是 NIO 更側重於底層的訪問操作。它允許開發人員對同一資源可有更多的 client 調用和服務端有更多的負載。
- 適合使用 NIO 協議的場景:
- 可能有大量的 Client 去連接 Broker,一般情況下,大量的 Client 去連接 Broker 是被操作系統的線程所限制的。因此 NIO 的實現比 TCP 需要更少的線程去運行,所以建議使用 NIO 協議
- 可能對於 Broker 有一個很遲鈍的網絡傳輸,NIO 比 TCP 提供更好的性能。
- NIO 連接的 URI 形式:nio//hostname:port?key=value
- Transport Connector 配置示例,參考官網:http://activemq.apache.org/configuring-version-5-transports.html
<transportConnectors>
<transportConnector name="nio" uri="nio://localhost:61618?trace=true"/>
</transportConnectors>
NIO 案例演示
修改 activemq.xml
<transportConnectors>
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/>
</transportConnectors>
如果不特別指定 ActiveMQ 的網絡監聽端口,那么這些端口都將使用過 BIO 網絡 IO 默認。( OpenWire,STOMP,AMQP......就是默認帶的5個 )。所以為了首先提高單節點的網絡吞吐性能,我們需要明確指定 Active 的網絡 IO 模型。如下所示:URI 格式頭以 nio
開頭,表示這個端口使用以 TCP 協議為基礎的 NIO 網絡 IO 模型。( BIO:阻塞 IO,NIO:非阻塞 IO )
修改完成后重啟 ActiveMQ。
ActiveMQ 控制台

測試
修改入門案例代碼,並且打開 61618 端口。
private static final String ACTIVE_URL = "nio://192.168.200.130:61618";
自行測試,肯定是沒問題的。就算不修改連接地址,一樣不受影響,之前的 tcp 協議依舊能過使用。
NIO 增強
問題
URI 格式頭以 nio
開頭,表示這個端口使用以 TCP 協議為基礎的 NIO 網絡 IO 模型,但是這樣的設置方式,只能使這個端口支持 Openwire 協議,我們這么讓這個端口支持 NIO 網絡 IO 模型,又讓它支持多個協議呢?
解決
使用 auto 關鍵字
使用 +
符號來為端口設置多種特性
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50"/>
測試
private static final String ACTIVE_URL = "nio://192.168.200.130:61618";
private static final String ACTIVE_URL = "tcp://192.168.200.130:61618";
我們使用 tcp 或者 nio 協議訪問 61618 端口進行測試,都不會有問題。
ActiveMQ 的消息存儲和持久化
概述
為了避免意外宕機以后丟失信息,需要做到重啟后可以恢復消息隊列,消息系統一般都會采用持久化機制。ActiveMQ 的消息持久化機制有 JDBC、AMQ、KahanDB 和 LevelDB,無論使用哪種持久化方式,消息的存儲邏輯都是一致的。
就是在發送者將消息發送出去后,消息中心首先將消息存儲到本地數據文件、內存數據庫或者遠程數據庫等再試圖將消息發送給接收者,成功則將消息從存儲中刪除,失敗則繼續嘗試發送。
消息中心啟動以后首先要檢查指定的存儲位置,如果有未發送成功的消息,則需要把消息發送出去。
持久化方式介紹
AMQ ( Message Store )
基於文件的存儲方式,是以前的默認消息存儲,現在不用了。
KahaDB 消息存儲 ( 默認 )
基於日志文件,從 ActiveMQ5.4 開始默認的持久化插件

JDBC 消息存儲
消息基於 JDBC 存儲
LevelDB 消息存儲
這種文件系統是從 ActiveMQ5.8 之后引進的,它和 KahaDB 非常相似,也是基於文件的本地數據庫存儲形式,但是它提供比 KahaDB 更快的持久性。但它不使用自定義 B-Tree 來實現索引預寫日志,而是使用基於 LevelDB 的索引
默認配置
<persistenceAdapter>
<levelDB directory="activemq-data"/>
</persistenceAdapter>
JDBC Message store with ActiveMQ Journal
后面有
JDBC 消息存儲
-
拷貝一個 MySQL 數據庫的驅動包到 lib 文件夾下
-
jdbcPersistenceAdapter 配置
修改
activemq.xml
配置文件,修改如下:修改前
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
修改后
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter>
dataSource
指定將要引用的持久化數據庫的 bean 名稱,可以任意寫,#
不能丟。createTablesOnStartup
是否在啟動的時候創建數據庫,默認值是 true,這樣每次啟動都會去創建數據表,一般是第一次啟動的時候設置為 true 之后改為 false。 -
數據庫連接池配置
修改
conf/activemq.xml
配置文件<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql//192.168.200.1:3306/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="1234"/> <property name="maxTotal" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>
注意添加的位置。
-
創建數據庫和對應的表
-
創建一個名為 activemq 的數據庫
create database activemq;
-
啟動 ActiveMQ 后運行程序會自動創建表
如果啟動不了,可能是 MySQL 不允許外部連接
-- '%'允許任意主機使用root用戶名的1234密碼進行連接 GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '1234' WITH GRANT OPTION; FLUSH PRIVILEGES;
-
Queue
在沒有消費者消費的情況下會將消息保存到 activemq_msgs 表中,只要有任意一個消費者已經消費過了,消費之后這些消息將會被立即刪除
Topic
一般是先啟動消費訂閱然后再生產的情況下會將消息保存到 activemq_acks 表中。
下划線坑爹
java.lang.IllegalStateException:BeanFactory not initialized or already closed
這是因為你的操作系統的機器名中有 _
符號。
JDBC Message store with ActiveMQ Journal
簡介
這種方式克服了 JDBC Store 的不做,JDBC 每次消息過來,都需要去寫庫和讀庫。ActiveMQ Journal 使用高速緩存寫入技術,大大提高了性能。當消費者的消費速度能夠及時跟上生產者消費的生產速度時,Journal 文件能夠大大減少需要寫入到 DB 中的消息。
舉個栗子
生產者生產了 1000 條消息,這 1000 條消息會保存到 Journal 文件,如果消費者的消費是速度很快的情況下,在 Journal 文件還沒有同步到 DB 之前,消費者已經消費了 90% 消息,那么這個時候只需要同步剩余的 10% 的消息到 DB。如果消費者的消費速度很慢,這個時候 Journal 文件可以使消息以批量方式寫到 DB。
配置
修改 conf/activemq.xml
文件
修改前
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
修改后
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="activemq-data"/>
</persistenceFactory>

高級特性
異步投遞
概述
ActiveMQ 支持同步、異步兩種方式的模式將消息發送到 broker,模式的選擇對發送延時有巨大的影響。使用異步發送可以顯著的提高發送的性能
ActiveMQ 默認使用異步發送的模式:除非明確指定使用同步發送的方式或者在未使用事務的前提下發送持久化的消息,這兩種情況都是同步發送的。
如果你沒有使用事務且發送的使持久化的消息,每一次發送都是同步發送的且會阻塞 producer 直到 broker 返回一個確認,表示消息已經被安全的持久化到磁盤。確認機制提供了消息安全的保障,但同時會則色客戶端帶來了很大的延時。
很多高性能的應用。允許在失敗的情況下有少量的數據丟失。如果你的應用滿足這個特點,你可以使用異步發送。
異步發送
它可以最大號 producer 端的發送效率。通常在發送消息量比較密集的情況下使用異步發送,它可以很大的提升 producer 性能。不過這也帶來了額外的問題:
- 需要小號較多的 Client 端內存同時也會導致 broker 端性能消耗增加
- 不能有效的確保消息的發送成功。在
useAsyncSend = true
的情況下客戶端需要容忍消息丟失的可能
三種開啟方式
Connection URI
new ActiveMQConnectionFacttory("tcp://localhost:61616?jms.useAsyscSend=true");
ConnectionFatory
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
Connection
((ActiveMQConnection)connection).setUseAsyncSend(true);
異步投遞如何確認發送成功
異步發送丟失消息的場景使:生產者設置 UseAsyncSend=true,使用 producer.send(msg) 持續發送消息。由於消息不阻塞,生產者會認為所有send 的消息均被成功發送至 MQ。如果 MQ 突然宕機,此時生產者端內容中尚未被發送至 MQ 的消息都會丟失。所以,正確的異步發送方法使需要接收回調的。
代碼示例
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;
import javax.jms.*;
import java.util.UUID;
public class JmsProduce {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String QUEUE_NAME = "queue";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
// 消息生產者使用ActiveMQMessageProducer
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("msg---" + i);
String id = UUID.randomUUID().toString();
textMessage.setJMSMessageID(id);
producer.send(textMessage, new AsyncCallback() {
// 成功的回調
@Override
public void onSuccess() {
System.out.println(id + "發送成功");
}
// 失敗的回調
@Override
public void onException(JMSException e) {
System.out.println(id + "發送失敗");
}
});
}
producer.close();
session.close();
connection.close();
System.out.println("消息發布到MQ完成");
}
}
延遲投遞和定時投遞
修改
conf/activemq.xml
新增 schedulerSupport="true"
屬性。

四大屬性
Property Name | type | description |
---|---|---|
AMQ_SCHEDULED_DELAY | long | 延遲投遞的時間 |
AMQ_SCHEDULED_PERIOD | long | 重復投遞的的時間間隔 |
AMQ_SCHEDULED_REPEAT | int | 重復投遞次數 |
AMQ_SCHEDULED_CRON | String | Cron 表達式 |
代碼案例
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
import javax.jms.*;
public class JmsProduce {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String QUEUE_NAME = "queue";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
long delay = 3 * 1000;
long period = 4 * 1000;
int repeat = 5;
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("msg---" + i);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); // 延遲投遞的時間
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); // 重復投遞的時間間隔
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); // 重復投遞次數
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("消息發布到MQ完成");
}
}
消息重發
消息重發的情況
- Client 用了 Transactions 且在 session 中調用了 rollback()
- Client 用了 Transactions 且在調用 commit() 之前關閉或者沒有 commit()
- Client 在 CLIENT_ACKNOWLEDGE 的傳遞模式下,在 sessino 中調用了 recover()
消息重發時間間隔和重發次數
間隔:1
次數:6
測試
消費者端開啟事務,但是不提交事務,會造成重復消費,但是由於消息重發機制的存在,在進行第7次消費時將無法再消費到數據。
消息去哪了?
一個消息被 redelivedred 超過默認的最大重發次數時,消費端會給 MQ 發送一個 poison ack
表示這個消息有毒,告訴 broker 不要再發了。這個時候 broker 會把這個消息方法 DLQ ( 死信隊列 )。
自定義重發策略
package org.hong.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumer {
private static final String ACTIVE_URL = "tcp://192.168.200.130:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
// 自定義重發策略
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive(4000L);
if(textMessage != null){
System.out.println("消費者接收到消息:" + textMessage.getText());
}else{
break;
}
}
// session.commit(); 不提交事務
consumer.close();
session.close();
connection.close();
}
}
屬性說明
collisionAvoidanceFactor:設置防止沖突訪問的正負百分比,只用
屬性 | 默認值 | 描述 |
---|---|---|
backOffMultiplier |
5 |
重連時間間隔遞增倍數,只有值大於1和啟動 useExponentialBackOff 參數時才生效。 |
collisionAvoidanceFactor |
0.15 |
設置防止沖突訪問的正負百分比,只有啟動 useCollisionAvoidance 參數時才生效。也就是延遲時間上再加一個時間波動范圍。 |
initialRedeliveryDelay |
1000L |
初始重發延遲(以毫秒為單位)。 |
maximumRedeliveries |
6 |
最大重傳次數,達到最大重連次數后拋出異常。為-1時不限制次數,為0表示不進行重傳。 |
maximumRedeliveryDelay |
-1 |
最大傳送延遲,只在 useExponentialBackOff=true 時生效。 |
redeliveryDelay |
1000L |
重發延遲時間,當 initialRedeliveryDelay=0 時生效。 |
useCollisionAvoidance |
false |
啟用防止沖突功能。 |
useExponentialBackOff |
false |
啟動指數倍數遞增的方式增加延遲時間。 |
死信隊列
ActiveMQ 中引入了 死信隊列(Dead Letter Queue)的概念
。即一條消息在被重發了多次后,將會被 ActiveMQ 移入死信隊列。開發人員可以在這個 Queue 中查看處理出錯的消息,進行人工干預。
