第一步: 下載ActiveMQ工具包 url:http://activemq.apache.org

第二部解壓並啟動:
全家福

啟動方式: 注 不要直接啟動bin目錄下的bat 按系統版本啟動相應的bat 比如我的是64位


啟動完畢 進入控制台 默認地址 http://127.0.0.1:8161/admin 賬號admin密碼admin

第三部:創建java工程簡單實現簡單單點發布和消費 導入activemq-all-xxx.jar

創建消息生產者類Producer.java

package com.sgor.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息生產者類
* @author xixiao
*
*/
public class Producer {
public static void main(String[] args){
ConnectionFactory connectionFactory; //創建鏈接工廠
Connection connection = null;//鏈接
Session session;//創建會話
Destination destination;//消息目的地 消息隊列
MessageProducer messageProducer;//消息生產者
//實例化鏈接工廠 參數為 用戶,密碼,url
connectionFactory = new ActiveMQConnectionFactory("xixiao", "xixiao", ActiveMQConnection.DEFAULT_BROKER_URL);
try {
connection=connectionFactory.createConnection();//通過鏈接工廠創建鏈接
connection.start();//啟動鏈接
//創建會話 Session.AUTO_ACKNOWLEDGE。receive 或MessageListener.onMessage()成功返回的時候,自動確認收到消息。
session =connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//創建一個消息隊列名稱為hello ActiveMQ 消息隊列中可包含需要發布消息
destination = session.createQueue("Hello ActiveMQ");
//將創建的消息隊列hello ActiveMQ交給消息發布者messageProdecer
messageProducer=session.createProducer(destination);
for (int i = 0; i <5 ; i++) {
//生產5條消息
TextMessage message=session.createTextMessage(i+"條消息");
System.out.println(message.getText());
//發布消息
messageProducer.send(message);
}
//提交事物
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}finally{
try {
//關閉連接
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
創建消息消費類
這里使用了一個Listener實現了MessageListener中的onMessage 當這個方法成功返回時會話可以自動確認消息被消費
package com.sgor.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer {
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageConsumer messageConsumer;
connectionFactory = new ActiveMQConnectionFactory("xixiao", "xixiao", ActiveMQConnection.DEFAULT_BROKER_URL);
try {
//通過鏈接工廠創建鏈接
connection = connectionFactory.createConnection();
connection.start();//啟動鏈接
//這里直接設為false 不需要事物
session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("Hello ActiveMQ"); //創建消息隊列 用於接收發布的消息
messageConsumer = session.createConsumer(destination);
/**
* 監聽生產者方式接受消息,生產者產生消息才開始接收 需要監聽器 實現MessageListener (javax.jms.MessageListener包)
*/
messageConsumer.setMessageListener(new Listener());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Listener
package com.sgor.activemq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class Listener implements MessageListener{
@Override
public void onMessage(Message arg0) {
try {
System.out.println("消息:"+((TextMessage)arg0).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
測試一下:
首先生產消息

查看控制台》》點擊Queues 待處理消息5 消費者0 列隊中的消息5 成功生產了5條消息

接下來執行消費類,將隊列中的消息確認

打開控制台查看消息確認情況 待消費消息0 消費者1 消費消息5 消息被1個客戶消費

