消息隊列,目前在實際的開發項目中應用十分廣泛。本文主要介紹入門級的ActiveMQ的基本使用以及相關的概念。
一、JMS
全稱 Java Message Service,即Java消息服務。JMS是一套Java的面向消息中間件的API接口規范,用於在不同應用程序中異步的發送消息。JMS本身語言無關,絕大多數的消息中間件廠商都提供了對JMS的支持。基於JMS實現的消息中間件,也叫做JMS Provider。
消息服務,傳遞的載體自然是消息(Message)。在JMS中,消息主體可以簡單分為幾個類型:簡單文本(TextMessage)、可序列化的對象 (ObjectMessage)、屬性集合 (MapMessage)、字節流 (BytesMessage)、原始值流 (StreamMessage)。
JMS中,有一整套的名詞提供,下面簡單說說相關的名詞以及解釋:
1、Destination
目的地。JMS Provider(消息中間件)進行維護,用於對消息(Message)對象進行管理。MessageProducer需要指定Destination才能進行發送消息,MessageConsumer需要指定Destination才能進項消息的接收消費。
2、Producer
消息生產者,負責發送消息到Destination目的地。應用接口為MessageProducer。
3、Consumer
消息接收者,負責接收消費指定Destination的消息,應用接口為MessageConsumer。
4、Message
消息體,一般常用的有:TextMessage、ObjectMessage、BytesMessage。
5、ConnectionFactory
連接工廠。用於創建連接信息的。
6、Connection
連接。用於和ActiveMQ服務端建立連接,一般由連接工廠創建。
7、Session
會話。Session是操作消息的接口。可以通過session創建生產者、消費者、消息等信息。Session支持事務特征,當需要批處理(發送或者接收)消息的時候,可以將這些操作放到一個事務中進行。
8、Queue和Topic
Queue - 隊列目的地。Topic - 主題目的地。都是Destination的子接口。
Queue:一般隊列中的一條消息,默認的只能被一個消費者消費。消費完成即刪除。
Topic:消息會發送給所有訂閱的消費者。消息不會持久化,也即如果發消息時不存在訂閱關系,則消息直接丟棄。
9、PTP
point to point,點對點模型。針對Queue實現的消息處理方式。
10、Pub/Sub
Publish & Subscribe,發布訂閱模型。針對Topic實現的消息處理方式。
二、ActiveMQ簡介
ActiveMQ是純Java編寫的消息中間件服務,完全支持JMS規范。支持多種語言編寫客戶端:C、C++、C#、Java、PHP、Python等。應用協議包括:OpenWire、STOMP、WS-Notification、MQTT以及AMQP。對Spring的支持非常好,可以很容易的集成到現有的Spring系統中去使用。在消息的持久化上,支持jdbc和journal兩種方式的使用。另外,在集群搭建上,也比較容易。上手難度比較低,適合大多數的中小型項目使用。
三、ActiveMQ安裝
1、下載安裝包
ActiveMQ官網下載包,注意的是,ActiveMQ 5.10.x以上版本需要JDK1.8的環境。其他只需要1.7環境即可。
2、 上傳linux服務器
本文下載上傳的是最新的 5.15.9版本。
3、解壓安裝文件
tar -zxf apache-activemq-5.15.9-bin.tar.gz
4、檢查權限
ls -al apache-activemq-5.15.9/bin
如果權限不足的話,會無法執行,修改文件權限:chmod 755 activemq
5、移動到集中目錄(可選)
cp -r apache-activemq-5.15.9 /usr/local/activemq
6、ActiveMQ配置文件簡介
配置文件目錄為${activemq_home}/conf,對配置文件的修改,都必須重新啟動ActiveMQ才能生效。
6.1、activemq.xml
就是Spring配置文件,配置了MQ使用的默認的對象組件。
broker - ActiveMQ的實例標簽,配置的內容基本在此標簽內部
destinationPolicy - 配置目的地的規則信息
persistenceAdapter - 配置持久化策略
systemUsage - 內存信息設置
transportConnectors - 配置連接端口信息,一般Java使用最多的就是openwire協議,也就是基於tcp的協議訪問,默認開放端口61616,可自定義修改。
6.2、jetty.xml
ActiveMQ默認控制台的配置文件,也是個Spring的配置文件。一般在標簽 jettyPort 中可以修改控制台的訪問端口,默認是8161。控制台管理的時候需要用戶名密碼登錄,默認為admin:admin。
7、啟動或停止ActiveMQ
啟動:${activemq_home}/bin/activemq start
重啟:${activemq_home}/bin/activemq restart
停止:${activemq_home}/bin/activemq stop
8、測試ActiveMQ
8.1、查看是否成功啟動:jps命令,查看是否有activemq.jar的信息:

8.2、檢查進程信息:ps -ef|grep activemq

8.3、ActiveMQ控制台
一般,使用瀏覽器訪問ActiveMQ控制台管理信息,地址格式:http://ip:port/admin
其中端口設置見上述 6.2節中提及。默認為8161。訪問正確,出現下面界面幾位成功啟動:

9、查看ActiveMQ狀態
${activemq_home}/bin/activemq status

四、ActiveMQ基本使用
上述安裝完ActiveMQ之后,就可以使用Java代碼去進行訪問操作啦。下面開始介紹使用。
編寫生產者:
package com.cfang.mq.simpleCase;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SimpleProducer {
public static void main(String[] args) {
SimpleProducer simpleProducer = new SimpleProducer();
simpleProducer.sendMsg("我有一只小毛驢");
}
public void sendMsg(String msg) {
ConnectionFactory factory = null; //連接工廠
Connection connection = null; //連接對象
Session session = null; //session會話
Destination destination = null; //目的地
MessageProducer producer = null; //生產者
Message message = null; //消息
try {
//創建連接工廠,前兩個參數是做安全認證使用,本例中尚未開啟。
factory = new ActiveMQConnectionFactory(null, null, "tcp://172.31.31.160:61616");
//通過工廠創建連接對象
connection = factory.createConnection();
//啟動連接。生產者通常來說不是必須顯式啟動的,在發送消息的時候,會檢測是否啟動,未啟動的話會先進行啟動操作。
connection.start();
/**
* 根據連接對象信息,創建session會話信息。
* 第一個參數為是否開啟事務特性。
* false - 不開啟事務。使用比較多的配置。
* true - 開啟事務。如果開啟事務,這第二個參數默認無效了,建議還是寫成Session.SESSION_TRANSACTED
* 第二個參數表示消息確認機制。
* AUTO_ACKNOWLEDGE - 自動消息確認。消息消費者接受處理消息后,自動發送確認信息
* CLIENT_ACKNOWLEDGE - 手動確認。消息消費者在接受處理消息后,必須手動發起確認ack信息
* DUPS_OK_ACKNOWLEDGE - 有副本的手動確認機制。
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創建目的地,參數是目的地名稱,也即隊列名。
destination = session.createQueue("tp_simple_queue");
//創建消息生產者,參數為目的地,也可以不指定,在發送消息的時候再指定
producer = session.createProducer(destination);
//創建消息
message = session.createTextMessage(msg);
//發送到ActiveMQ指定的目的地中
producer.send(message);
System.out.println("=====send msg ok!=====");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 回收資源
if(producer != null){ // 回收消息發送者
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if(session != null){ // 回收會話對象
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if(connection != null){ // 回收連接對象
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
編寫消費者
package com.cfang.mq.simpleCase;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SimpleConsumer {
public static void main(String[] args) {
SimpleConsumer simpleConsumer = new SimpleConsumer();
System.out.println("=====receive msg: " + simpleConsumer.receiveMsg());
}
public String receiveMsg() {
String result = "";
ConnectionFactory factory = null; //連接工廠
Connection connection = null; //連接對象
Session session = null; //session會話
Destination destination = null; //目的地
MessageConsumer consumer = null; //生產者
Message message = null; //消息
try {
factory = new ActiveMQConnectionFactory(null, null, "tcp://172.31.31.160:61616");
connection = factory.createConnection();
//不同於生產者存在自動啟動機制,消息的消費者必須顯式的手動啟動連接
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創建目的地,參數是目的地名稱,也即隊列名。
destination = session.createQueue("tp_simple_queue");
//創建消費者,參數為目的地,也可以不指定,在發送消息的時候再指定
consumer = session.createConsumer(destination);
//接收一條消息
message = consumer.receive();
//手動確認
// message.acknowledge();
result = ((TextMessage)message).getText();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 回收資源
if(consumer != null){ // 回收消息發送者
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if(session != null){ // 回收會話對象
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if(connection != null){ // 回收連接對象
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
return result;
}
}
五、ActiveMQ安全認證
上面簡單的寫了個生產者、消費者的案例,其中在創建連接工廠ConnectionFactory的時候,前兩個參數都沒有去設置值,程序運行也沒有任何的問題。這是因為安裝的ActiveMQ尚未啟用安全認證插件,這種情況下,只要知道mq的地址信息,均可以連接上去進行消息的收發。
在一些特定的需求中,需要對ActiveMQ的連接進行認證,下面介紹ActiveMQ的安全認證配置。
ActiveMQ的安全認證配置,是基於用戶名密碼校驗的。啟用安全認證,需要對activemq.xml進行修改,具體修改辦法:
在broker標簽中,增加安全認證插件:
<plugins>
<!-- use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
<!-- 添加jaas認證插件activemq在login.config里面定義,詳細見login.config-->
<jaasAuthenticationPlugin configuration="activemq" />
<!-- lets configure a destination based authorization mechanism -->
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
<authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
<authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
<authorizationEntry queue="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>
/conf/login.config配置內容:
activemq {
org.apache.activemq.jaas.PropertiesLoginModule required
org.apache.activemq.jaas.properties.user="users.properties"
org.apache.activemq.jaas.properties.group="groups.properties";
};
其中 user 代表的是用戶的配置文件信息,group 代表的是用戶組信息配置文件。
/conf/user.properties,配置文件中格式為:用戶名=密碼
admin=admin
/conf/group.properties,配置文件中格式為:用戶組名=用戶名,用戶名,用戶名
admins=admin
確認添加完畢后,需要重啟ActiveMQ。之后Java應用程序創建訪問ConnectionFactory的時候,必須指定上述配置文件中正確的用戶名密碼,否則會報錯如下:

