一、部署和啟動ActiveMQ
去官網下載:http://activemq.apache.org/
我下載的是apache-activemq-5.12.0-bin.tar.gz,
解壓到本地目錄,進入到bin路徑下,
運行activemq啟動ActiveMQ。
運行方式:
啟動 ./activemq start
ActiveMQ默認使用的TCP連接端口是61616,
5.0以上版本默認啟動時,開啟了內置的Jetty服務器,可以進入控制台查看管理。
啟動ActiveMQ以后,登陸:http://localhost:8161/admin/,
默認用戶名admin/admin。

這里我在虛擬機里啟動,訪問地址:
http://192.168.106.128:8161/admin/
ActiveMQ的控制台功能十分強大,管理起來也很直觀。
二、使用Java連接
1.創建POM文件
在Eclipse中新建Java工程,這里使用Maven管理依賴,
下面是pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>activemq-sample</groupId> <artifactId>activemq-sample</artifactId> <version>0.0.1-SNAPSHOT</version> <name>activemq-sample</name> <description>an activemq practice</description> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <!-- activemq-core 5.7.0 使用bunble打包,需要添加相關插件 --> <plugin> <groupId>org.apache.felix</groupId> <artifactId>maven-bundle-plugin</artifactId> <extensions>true</extensions> </plugin> </plugins> </build> <dependencies> <!-- activemq的maven依賴 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> <type>bundle</type> </dependency> </dependencies> </project>
在第一次添加activemq的maven依賴時報錯,后來發現activemq-core 5.7.0采用了bundle的打包方式,
必須在pom中配置maven-bundle-plugin。
2.創建消息創建者 MsgProducer:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* @Description: Message Producer
* @author: Bing Yue
*/
public class MsgProducer {
//如果你在本地啟動,可以直接使用空的ActiveMQConnectionFactory構造函數
private static final String BROKER_URL="failover://tcp://192.168.106.128:61616";
public static void main(String[] args) throws JMSException, InterruptedException{
//創建連接工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL);
//獲得連接
Connection conn = connectionFactory.createConnection();
//start
conn.start();
//創建Session,此方法第一個參數表示會話是否在事務中執行,第二個參數設定會話的應答模式
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創建隊列
Destination dest = session.createQueue("test-queue");
//創建消息生產者
MessageProducer producer = session.createProducer(dest);
for (int i=0;i<100;i++) {
//初始化一個mq消息
TextMessage message = session.createTextMessage("這是第 " + i+" 條消息!");
//發送消息
producer.send(message);
System.out.println("send message:消息"+i);
//暫停3秒
Thread.sleep(3000);
}
//關閉mq連接
conn.close();
}
}
3.創建消息接收者 MsgProducer:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
*
* @Description: Message Consumer
* @author: Bing Yue
*/
public class MsgConsumer implements MessageListener {
private static final String BROKER_URL="failover://tcp://192.168.106.128:61616";
public static void main(String[] args) throws JMSException{
//創建連接工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL);
//獲得連接
Connection conn = connectionFactory.createConnection();
//start
conn.start();
//創建Session,此方法第一個參數表示會話是否在事務中執行,第二個參數設定會話的應答模式
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創建隊列
Destination dest = session.createQueue("test-queue");
//創建消息生產者
MessageConsumer consumer = session.createConsumer(dest);
//初始化MessageListener
MsgConsumer msgConsumer = new MsgConsumer();
//給消費者設定監聽對象
consumer.setMessageListener(msgConsumer);
}
/**
* 消費者需要實現MessageListener接口
* 接口有一個onMessage(Message message)需要在此方法中做消息的處理
*/
@Override
public void onMessage(Message msg) {
TextMessage txtMessage = (TextMessage)msg;
try {
System.out.println("get message:" + txtMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
運行MsgProducer,
登錄后台查看test-queue隊列,可以看到發出的消息正在等待被處理:

運行MsgConsumer,接收消息並在控制台打印:

通過這個實例可以對ActiveMQ的應用有一個簡單的了解。
代碼地址:https://github.com/bingyue/activemq-sample
在實際開發中,通常還需要設置優先級處理,大部分情況下,消息的發送和接收方都會啟用多線程,
通過線程池來提高處理效率,解耦的同時保持業務處理能力。
