ActiveMQ基本使用


消息隊列,目前在實際的開發項目中應用十分廣泛。本文主要介紹入門級的ActiveMQ的基本使用以及相關的概念。

一、JMS

   全稱 Java Message Service,即Java消息服務。JMS是一套Java的面向消息中間件的API接口規范,用於在不同應用程序中異步的發送消息。JMS本身語言無關,絕大多數的消息中間件廠商都提供了對JMS的支持。基於JMS實現的消息中間件,也叫做JMS Provider。

  消息服務,傳遞的載體自然是消息(Message)。在JMS中,消息主體可以簡單分為幾個類型:簡單文本(TextMessage)、可序列化的對象 (ObjectMessage)、屬性集合 (MapMessage)、字節流 (BytesMessage)、原始值流 (StreamMessage)。

  JMS中,有一整套的名詞提供,下面簡單說說相關的名詞以及解釋:

  1、Destination

    目的地。JMS Provider(消息中間件)進行維護,用於對消息(Message)對象進行管理。MessageProducer需要指定Destination才能進行發送消息,MessageConsumer需要指定Destination才能進項消息的接收消費。

  2、Producer

    消息生產者,負責發送消息到Destination目的地。應用接口為MessageProducer。

  3、Consumer

    消息接收者,負責接收消費指定Destination的消息,應用接口為MessageConsumer。

  4、Message

    消息體,一般常用的有:TextMessage、ObjectMessage、BytesMessage。

  5、ConnectionFactory

    連接工廠。用於創建連接信息的。

  6、Connection

    連接。用於和ActiveMQ服務端建立連接,一般由連接工廠創建。

  7、Session

    會話。Session是操作消息的接口。可以通過session創建生產者、消費者、消息等信息。Session支持事務特征,當需要批處理(發送或者接收)消息的時候,可以將這些操作放到一個事務中進行。

  8、Queue和Topic

    Queue - 隊列目的地。Topic - 主題目的地。都是Destination的子接口。

    Queue:一般隊列中的一條消息,默認的只能被一個消費者消費。消費完成即刪除。

    Topic:消息會發送給所有訂閱的消費者。消息不會持久化,也即如果發消息時不存在訂閱關系,則消息直接丟棄。

  9、PTP

    point to point,點對點模型。針對Queue實現的消息處理方式。

  10、Pub/Sub

    Publish & Subscribe,發布訂閱模型。針對Topic實現的消息處理方式。

二、ActiveMQ簡介

   ActiveMQ是純Java編寫的消息中間件服務,完全支持JMS規范。支持多種語言編寫客戶端:C、C++、C#、Java、PHP、Python等。應用協議包括:OpenWire、STOMP、WS-Notification、MQTT以及AMQP。對Spring的支持非常好,可以很容易的集成到現有的Spring系統中去使用。在消息的持久化上,支持jdbc和journal兩種方式的使用。另外,在集群搭建上,也比較容易。上手難度比較低,適合大多數的中小型項目使用。

三、ActiveMQ安裝

   1、下載安裝包

    ActiveMQ官網下載包,注意的是,ActiveMQ 5.10.x以上版本需要JDK1.8的環境。其他只需要1.7環境即可。

  2、 上傳linux服務器

    本文下載上傳的是最新的 5.15.9版本。

  3、解壓安裝文件

    tar -zxf apache-activemq-5.15.9-bin.tar.gz

  4、檢查權限

    ls -al apache-activemq-5.15.9/bin

    如果權限不足的話,會無法執行,修改文件權限:chmod 755 activemq

  5、移動到集中目錄(可選)

    cp -r apache-activemq-5.15.9 /usr/local/activemq

  6、ActiveMQ配置文件簡介

    配置文件目錄為${activemq_home}/conf,對配置文件的修改,都必須重新啟動ActiveMQ才能生效。

    6.1、activemq.xml

      就是Spring配置文件,配置了MQ使用的默認的對象組件。

      broker -  ActiveMQ的實例標簽,配置的內容基本在此標簽內部

      destinationPolicy - 配置目的地的規則信息

      persistenceAdapter - 配置持久化策略

      systemUsage - 內存信息設置

      transportConnectors - 配置連接端口信息,一般Java使用最多的就是openwire協議,也就是基於tcp的協議訪問,默認開放端口61616,可自定義修改。

    6.2、jetty.xml

      ActiveMQ默認控制台的配置文件,也是個Spring的配置文件。一般在標簽 jettyPort 中可以修改控制台的訪問端口,默認是8161。控制台管理的時候需要用戶名密碼登錄,默認為admin:admin。

  7、啟動或停止ActiveMQ

    啟動:${activemq_home}/bin/activemq start

    重啟:${activemq_home}/bin/activemq restart

    停止:${activemq_home}/bin/activemq stop

  8、測試ActiveMQ

    8.1、查看是否成功啟動:jps命令,查看是否有activemq.jar的信息:

    8.2、檢查進程信息:ps -ef|grep activemq

    8.3、ActiveMQ控制台

    一般,使用瀏覽器訪問ActiveMQ控制台管理信息,地址格式:http://ip:port/admin

    其中端口設置見上述 6.2節中提及。默認為8161。訪問正確,出現下面界面幾位成功啟動:

  9、查看ActiveMQ狀態

  ${activemq_home}/bin/activemq status

   

四、ActiveMQ基本使用

   上述安裝完ActiveMQ之后,就可以使用Java代碼去進行訪問操作啦。下面開始介紹使用。

   編寫生產者:

package com.cfang.mq.simpleCase;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SimpleProducer {

	public static void main(String[] args) {
		SimpleProducer simpleProducer = new SimpleProducer();
		simpleProducer.sendMsg("我有一只小毛驢");
	}
	
	public void sendMsg(String msg) {
		ConnectionFactory factory = null;	//連接工廠
		Connection connection = null;		//連接對象
		Session session = null;				//session會話
		Destination destination = null;		//目的地
		MessageProducer producer = null;	//生產者
		Message message = null;				//消息
		try {
			//創建連接工廠,前兩個參數是做安全認證使用,本例中尚未開啟。
			factory = new ActiveMQConnectionFactory(null, null, "tcp://172.31.31.160:61616");
			//通過工廠創建連接對象
			connection = factory.createConnection();
			//啟動連接。生產者通常來說不是必須顯式啟動的,在發送消息的時候,會檢測是否啟動,未啟動的話會先進行啟動操作。
			connection.start();
			/**
			 * 	根據連接對象信息,創建session會話信息。
			 * 	第一個參數為是否開啟事務特性。
			 * 		false - 不開啟事務。使用比較多的配置。
			 * 		true - 開啟事務。如果開啟事務,這第二個參數默認無效了,建議還是寫成Session.SESSION_TRANSACTED
			 * 	第二個參數表示消息確認機制。
			 * 		AUTO_ACKNOWLEDGE - 自動消息確認。消息消費者接受處理消息后,自動發送確認信息
			 * 		CLIENT_ACKNOWLEDGE - 手動確認。消息消費者在接受處理消息后,必須手動發起確認ack信息
			 * 		DUPS_OK_ACKNOWLEDGE - 有副本的手動確認機制。
			 */
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//創建目的地,參數是目的地名稱,也即隊列名。
			destination = session.createQueue("tp_simple_queue");
			//創建消息生產者,參數為目的地,也可以不指定,在發送消息的時候再指定
			producer = session.createProducer(destination);
			//創建消息
			message = session.createTextMessage(msg);
			//發送到ActiveMQ指定的目的地中
			producer.send(message);
			System.out.println("=====send msg ok!=====");
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 回收資源
			if(producer != null){ // 回收消息發送者
				try {
					producer.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if(session != null){ // 回收會話對象
				try {
					session.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if(connection != null){ // 回收連接對象
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

  編寫消費者

package com.cfang.mq.simpleCase;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SimpleConsumer {

	public static void main(String[] args) {
		SimpleConsumer simpleConsumer = new SimpleConsumer();
		System.out.println("=====receive msg: " + simpleConsumer.receiveMsg());
	}
	
	public String receiveMsg() {
		String result = "";
		ConnectionFactory factory = null;	//連接工廠
		Connection connection = null;		//連接對象
		Session session = null;				//session會話
		Destination destination = null;		//目的地
		MessageConsumer consumer = null;	//生產者
		Message message = null;				//消息
		try {
			factory = new ActiveMQConnectionFactory(null, null, "tcp://172.31.31.160:61616");
			connection = factory.createConnection();
			//不同於生產者存在自動啟動機制,消息的消費者必須顯式的手動啟動連接
			connection.start();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//創建目的地,參數是目的地名稱,也即隊列名。
			destination = session.createQueue("tp_simple_queue");
			//創建消費者,參數為目的地,也可以不指定,在發送消息的時候再指定
			consumer = session.createConsumer(destination);
			//接收一條消息
			message = consumer.receive();
			//手動確認
//			message.acknowledge();
			result = ((TextMessage)message).getText();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 回收資源
			if(consumer != null){ // 回收消息發送者
				try {
					consumer.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if(session != null){ // 回收會話對象
				try {
					session.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if(connection != null){ // 回收連接對象
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
		return result;
	}
}

 

五、ActiveMQ安全認證 

  上面簡單的寫了個生產者、消費者的案例,其中在創建連接工廠ConnectionFactory的時候,前兩個參數都沒有去設置值,程序運行也沒有任何的問題。這是因為安裝的ActiveMQ尚未啟用安全認證插件,這種情況下,只要知道mq的地址信息,均可以連接上去進行消息的收發。

  在一些特定的需求中,需要對ActiveMQ的連接進行認證,下面介紹ActiveMQ的安全認證配置。

  ActiveMQ的安全認證配置,是基於用戶名密碼校驗的。啟用安全認證,需要對activemq.xml進行修改,具體修改辦法:

    在broker標簽中,增加安全認證插件:

<plugins>
            <!--  use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
			<!--  添加jaas認證插件activemq在login.config里面定義,詳細見login.config-->
            <jaasAuthenticationPlugin configuration="activemq" />
            <!--  lets configure a destination based authorization mechanism -->
            <authorizationPlugin>
                <map>
                    <authorizationMap>
                        <authorizationEntries>
                            <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
                            <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
                            <authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
                            <authorizationEntry queue="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
                        </authorizationEntries>
                    </authorizationMap>
                </map>
            </authorizationPlugin>
        </plugins>

  /conf/login.config配置內容:

activemq {
    org.apache.activemq.jaas.PropertiesLoginModule required
        org.apache.activemq.jaas.properties.user="users.properties"
        org.apache.activemq.jaas.properties.group="groups.properties";
};

  其中 user 代表的是用戶的配置文件信息,group 代表的是用戶組信息配置文件。

  /conf/user.properties,配置文件中格式為:用戶名=密碼

admin=admin

  /conf/group.properties,配置文件中格式為:用戶組名=用戶名,用戶名,用戶名

admins=admin

  確認添加完畢后,需要重啟ActiveMQ。之后Java應用程序創建訪問ConnectionFactory的時候,必須指定上述配置文件中正確的用戶名密碼,否則會報錯如下:

 

 

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM