ActiveMQ的應用實例


一、部署和啟動ActiveMQ

去官網下載:http://activemq.apache.org/

我下載的是apache-activemq-5.12.0-bin.tar.gz,

解壓到本地目錄,進入到bin路徑下,
運行activemq啟動ActiveMQ。

運行方式:
啟動 ./activemq start

ActiveMQ默認使用的TCP連接端口是61616,
5.0以上版本默認啟動時,開啟了內置的Jetty服務器,可以進入控制台查看管理。

啟動ActiveMQ以后,登陸:http://localhost:8161/admin/

默認用戶名admin/admin

這里我在虛擬機里啟動,訪問地址:
http://192.168.106.128:8161/admin/

ActiveMQ的控制台功能十分強大,管理起來也很直觀。

二、使用Java連接

1.創建POM文件

在Eclipse中新建Java工程,這里使用Maven管理依賴,
下面是pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>activemq-sample</groupId>
<artifactId>activemq-sample</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>activemq-sample</name>
<description>an activemq practice</description>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>

<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<!-- activemq-core 5.7.0 使用bunble打包,需要添加相關插件 -->
<plugin> 
<groupId>org.apache.felix</groupId> 
<artifactId>maven-bundle-plugin</artifactId> 
<extensions>true</extensions> 
</plugin>

</plugins>
</build>
<dependencies>
<!-- activemq的maven依賴 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
<type>bundle</type>
</dependency>

</dependencies>

</project>

  

在第一次添加activemq的maven依賴時報錯,后來發現activemq-core 5.7.0采用了bundle的打包方式,

必須在pom中配置maven-bundle-plugin。

2.創建消息創建者 MsgProducer:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* @Description: Message Producer
* @author: Bing Yue
*/
public class MsgProducer {
//如果你在本地啟動,可以直接使用空的ActiveMQConnectionFactory構造函數
private static final String BROKER_URL="failover://tcp://192.168.106.128:61616";

public static void main(String[] args) throws JMSException, InterruptedException{
//創建連接工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL);
//獲得連接
Connection conn = connectionFactory.createConnection();
//start
conn.start();

//創建Session,此方法第一個參數表示會話是否在事務中執行,第二個參數設定會話的應答模式
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

//創建隊列
Destination dest = session.createQueue("test-queue");
//創建消息生產者
MessageProducer producer = session.createProducer(dest);

for (int i=0;i<100;i++) {
//初始化一個mq消息
TextMessage message = session.createTextMessage("這是第 " + i+" 條消息!");
//發送消息
producer.send(message);
System.out.println("send message:消息"+i);
//暫停3秒
Thread.sleep(3000);
}

//關閉mq連接
conn.close();
}

}

  

3.創建消息接收者 MsgProducer:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 
* @Description: Message Consumer
* @author: Bing Yue
*/
public class MsgConsumer implements MessageListener {

private static final String BROKER_URL="failover://tcp://192.168.106.128:61616";

public static void main(String[] args) throws JMSException{

//創建連接工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL);
//獲得連接
Connection conn = connectionFactory.createConnection();
//start
conn.start();

//創建Session,此方法第一個參數表示會話是否在事務中執行,第二個參數設定會話的應答模式
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創建隊列
Destination dest = session.createQueue("test-queue");
//創建消息生產者
MessageConsumer consumer = session.createConsumer(dest);

//初始化MessageListener
MsgConsumer msgConsumer = new MsgConsumer();

//給消費者設定監聽對象
consumer.setMessageListener(msgConsumer);

}


/**
* 消費者需要實現MessageListener接口
* 接口有一個onMessage(Message message)需要在此方法中做消息的處理
*/
@Override
public void onMessage(Message msg) {
TextMessage txtMessage = (TextMessage)msg;
try {
System.out.println("get message:" + txtMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}

}
}

  

運行MsgProducer,

登錄后台查看test-queue隊列,可以看到發出的消息正在等待被處理:

 

運行MsgConsumer,接收消息並在控制台打印:

通過這個實例可以對ActiveMQ的應用有一個簡單的了解。

代碼地址:https://github.com/bingyue/activemq-sample

在實際開發中,通常還需要設置優先級處理,大部分情況下,消息的發送和接收方都會啟用多線程,
通過線程池來提高處理效率,解耦的同時保持業務處理能力。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM