ActiveMQ簡單實現之一對一生產和消費


第一步: 下載ActiveMQ工具包 url:http://activemq.apache.org

第二部解壓並啟動:

全家福

啟動方式: 注 不要直接啟動bin目錄下的bat  按系統版本啟動相應的bat  比如我的是64位

啟動完畢 進入控制台  默認地址 http://127.0.0.1:8161/admin   賬號admin密碼admin

第三部:創建java工程簡單實現簡單單點發布和消費 導入activemq-all-xxx.jar

創建消息生產者類Producer.java

package com.sgor.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 消息生產者類
 * @author xixiao
 *
 */
public class Producer {
	public static void main(String[] args){
		ConnectionFactory connectionFactory; //創建鏈接工廠
		Connection connection = null;//鏈接
		Session session;//創建會話
		Destination destination;//消息目的地 消息隊列
		MessageProducer messageProducer;//消息生產者
		//實例化鏈接工廠  參數為 用戶,密碼,url
		connectionFactory = new ActiveMQConnectionFactory("xixiao", "xixiao", ActiveMQConnection.DEFAULT_BROKER_URL);
		try {
			connection=connectionFactory.createConnection();//通過鏈接工廠創建鏈接
			connection.start();//啟動鏈接
			//創建會話 Session.AUTO_ACKNOWLEDGE。receive 或MessageListener.onMessage()成功返回的時候,自動確認收到消息。
			session =connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
			//創建一個消息隊列名稱為hello ActiveMQ 消息隊列中可包含需要發布消息 
			destination = session.createQueue("Hello ActiveMQ");
			//將創建的消息隊列hello ActiveMQ交給消息發布者messageProdecer
			messageProducer=session.createProducer(destination);
			for (int i = 0; i <5 ; i++) {
				//生產5條消息
				TextMessage message=session.createTextMessage(i+"條消息");
				System.out.println(message.getText());
                    //發布消息 messageProducer.send(message); } //提交事物 session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally{ try { //關閉連接 connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }

 創建消息消費類

 這里使用了一個Listener實現了MessageListener中的onMessage  當這個方法成功返回時會話可以自動確認消息被消費

package com.sgor.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {
	public static void main(String[] args) {
		ConnectionFactory connectionFactory; 
		Connection connection = null;
		Session session;
		Destination destination;
		MessageConsumer messageConsumer;
		connectionFactory = new ActiveMQConnectionFactory("xixiao", "xixiao", ActiveMQConnection.DEFAULT_BROKER_URL);
		try {
			//通過鏈接工廠創建鏈接
			connection  = connectionFactory.createConnection();
			connection.start();//啟動鏈接
			//這里直接設為false  不需要事物
			session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("Hello ActiveMQ"); //創建消息隊列 用於接收發布的消息
			messageConsumer = session.createConsumer(destination);
			/**
			 * 監聽生產者方式接受消息,生產者產生消息才開始接收 需要監聽器 實現MessageListener (javax.jms.MessageListener包)
			 */
			messageConsumer.setMessageListener(new Listener());
		} catch (JMSException e) {
			e.printStackTrace();
		}
		
		
	}
}

 Listener

package com.sgor.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Listener implements MessageListener{

	@Override
	public void onMessage(Message arg0) {
		try {
			System.out.println("消息:"+((TextMessage)arg0).getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
		
	}

}

 測試一下:

首先生產消息

查看控制台》》點擊Queues 待處理消息5  消費者0 列隊中的消息5   成功生產了5條消息

接下來執行消費類,將隊列中的消息確認

打開控制台查看消息確認情況   待消費消息0 消費者1 消費消息5   消息被1個客戶消費

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM