ActiveMQ的發布者/訂閱者模型示例


ActiveMQ的發布者/訂閱者模型入門示例

(1)下載安裝activemq,啟動activeMQ。

  詳細步驟參考博客:http://www.cnblogs.com/DFX339/p/9050878.html

   

(2)創建maven項目,java項目或者web項目都可以。

 項目源碼下載地址:https://github.com/DFX339/activeMQ_demo.git

 目錄結構如下:(queue包下的是activemq隊列模型的入門示例,需要的可以參考 http://www.cnblogs.com/DFX339/p/9050950.html)

 需要編寫的文件:MQProducer.java    Listener01.java     MQConsumer01.java     Listener02.java     MQConsumer02.java    pom.xml

 

消息發布者的定義: MQProducer.java

主要步驟:

  /**
         * 1.創建連接工廠
         * 2.創建連接實例
         * 3、啟動連接
         * 4、創建session創建接收或發送的線程實例(創建session的時候定義是否要啟用事務,
         * 且事務類型是Auto_ACKNOWLEDGE也就是消費者成功在Listern中獲得消息返回時,會話自動確定用戶收到消息)
         * 5、創建隊列(消息發送的目的地)
         * 6、創建消息發送者
         * 7、創建消息
         * 8、發送消息
         * 9、session.commit();提交千萬不要忘記了
         */
 代碼如下:

package cn.dfx.activeMQ_demo.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 訂閱消息的發送消息方
 * @author Administrator
 *
 */
public class MQProducer {
    
    public static void main(String[] args){
        
        /**
         * 1.創建連接工廠
         * 2.創建連接實例
         * 3、啟動連接
         * 4、創建session創建接收或發送的線程實例(創建session的時候定義是否要啟用事務,
         * 且事務類型是Auto_ACKNOWLEDGE也就是消費者成功在Listern中獲得消息返回時,會話自動確定用戶收到消息)
         * 5、創建隊列(消息發送的目的地)
         * 6、創建消息發送者
         * 7、創建消息
         * 8、發送消息
         * 9、session.commit();提交千萬不要忘記了 
         */
        
        ConnectionFactory connFactory = null;
        Connection conn = null;
        Session session = null;
        Destination destination = null;
        
        //連接參數定義: 用戶名 密碼 url 
        String name = "system";
        String password = "manager";
        String url = "failover://tcp://localhost:61616";
        
        System.out.println("消息發布者開始發布消息了……");
        try{
            
            //創建連接工廠
            //這里的連接參數可以使用常量:Constants.MQ_NAME, Constants.MQ_PASSWORD, Constants.MQ_BROKETURL
            connFactory = new ActiveMQConnectionFactory(name,password,url);
            
            //通過連接工廠創建連接實例
            conn = connFactory.createConnection();
            
            //啟動連接
            conn.start();
            
            //4、創建session創建接收或發送的線程實例(創建session的時候定義是否要啟用事務,且事務類型是Auto_ACKNOWLEDGE也就是消費者成功在Listern中獲得消息返回時,會話自動確定用戶收到消息)
            //也可以使用:session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
            
            //創建隊列,也就是消息發送的目的地
            destination = session.createTopic("FirstTopic");
            
            //創建消息發布者
            MessageProducer  messageProducer = session.createProducer(destination);
            
            //創建需要發送的消息
            TextMessage textMessage = session.createTextMessage();
            textMessage.setText("Hello,broadcast message NO.1!");
            
            //發送消息
            messageProducer.send(textMessage);
            
            //一定要記得這個,提交呀,
            session.commit();
            
            System.out.println("消息發布者:"+textMessage.getText());
            
        }catch(JMSException e){
             e.printStackTrace();
        } finally {
            
            //關閉連接
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }    
            }
        
        }
    
    }
}

 

消息訂閱者01的定義: MQConsumer01.java

主要步驟:

  /**
         * 1.創建連接工廠
         * 2.創建連接實例
         * 3、啟動連接
         * 4、  創建接收或發送的線程實例(消費者就不需要開啟事務了)
         * 5、創建隊列(消息發送的目的地)
         * 6、創建消息接收者
         * 7、注冊消息監聽
         */
  代碼示例:

package cn.dfx.activeMQ_demo.topic;

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 訂閱消息的消息接收方01
 * @author Administrator
 *
 */
public class MQConsumer01 {
    
    public static void main(String[] args) throws IOException{
        
        /**
         * 1.創建連接工廠
         * 2.創建連接實例
         * 3、啟動連接
         * 4、  創建接收或發送的線程實例(消費者就不需要開啟事務了)
         * 5、創建隊列(消息發送的目的地)
         * 6、創建消息接收者
         * 7、注冊消息監聽
         */
        
        ConnectionFactory connFactory = null;
        Connection conn = null;
        Session session = null;
        Destination destination = null;
        
        //創建連接工廠需要的參數
        String name  = "system";
        String password = "manager";
        String url = "failover://tcp://localhost:61616";
        
        try{
            
            //創建連接工廠
            connFactory = new ActiveMQConnectionFactory(name,password,url);
            
            //創建連接實例
            conn = connFactory.createConnection();
            
            //啟動連接
            conn.start();
            
            //創建session(創建接收或發送的線程實例(消費者就不需要開啟事務了))
            session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
            
            //創建消息目的地(消費者從這里讀取消息)
            destination = session.createTopic("FirstTopic");
            
            //創建消費者
            MessageConsumer messageConsumer = session.createConsumer(destination);
            
            //消費者讀取消息,監聽消息
            messageConsumer.setMessageListener(new Listener01());
            
            System.out.println("訂閱者01已經准備好接收消息!");
            
//            //8、程序等待接收用戶消息
//            System.in.read();
//            //9、關閉資源
//            messageConsumer.close();
//            session.close();
//            conn.close();
            
        }catch(JMSException e){
            e.printStackTrace();
        }
    }
}

 

消息訂閱者01的監聽器的編寫: Listener01.java

package cn.dfx.activeMQ_demo.topic;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 消息訂閱 消息接收方的訂閱者01的監聽器
 * @author Administrator
 *
 */
public class Listener01 implements MessageListener {

    @Override
    public void onMessage(Message message) {
        
        try{
            System.out.println("訂閱者01接收到的消息為:"+((TextMessage) message).getText());
        }catch(JMSException e){
            e.printStackTrace();
        }
        
    }

}

 

 

消息訂閱者02的定義: MQConsumer02.java

主要步驟:

  /**
         * 1.創建連接工廠
         * 2.創建連接實例
         * 3、啟動連接
         * 4、創建接收或發送的線程實例(消費者就不需要開啟事務了)
         * 5、創建隊列(消息發送的目的地)
         * 6、創建消息接收者
         * 7、注冊消息監聽
         */
  代碼示例:

package cn.dfx.activeMQ_demo.topic; import java.io.IOException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 訂閱消息的消息接收方02 * @author Administrator * */ public class MQConsumer02 { public static void main(String[] args) throws IOException{ /** * 1.創建連接工廠 * 2.創建連接實例 * 3、啟動連接 * 4、 創建接收或發送的線程實例(消費者就不需要開啟事務了) * 5、創建隊列(消息發送的目的地) * 6、創建消息接收者 * 7、注冊消息監聽 */ ConnectionFactory connFactory = null; Connection conn = null; Session session = null; Destination destination = null; //創建連接工廠需要的參數 String name = "system"; String password = "manager"; String url = "failover://tcp://localhost:61616"; try{ //創建連接工廠 connFactory = new ActiveMQConnectionFactory(name,password,url); //創建連接實例 conn = connFactory.createConnection(); //啟動連接  conn.start(); //創建session(創建接收或發送的線程實例(消費者就不需要開啟事務了)) session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); //創建消息目的地(消費者從這里讀取消息) destination = session.createTopic("FirstTopic"); //創建消費者 MessageConsumer messageConsumer = session.createConsumer(destination); //消費者讀取消息,監聽消息 messageConsumer.setMessageListener(new Listener01()); System.out.println("訂閱者02已經准備好接收消息!");  }catch(JMSException e){ e.printStackTrace(); } } }

 

消息訂閱者02的監聽器的編寫: Listener02.java

package cn.dfx.activeMQ_demo.topic; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息訂閱 消息接收方的訂閱者02的監聽器 * @author Administrator * */ public class Listener01 implements MessageListener { @Override public void onMessage(Message message) { try{ System.out.println("訂閱者02接收到的消息為:"+((TextMessage) message).getText()); }catch(JMSException e){ e.printStackTrace(); } } }

 

(3)測試步驟

1、先啟動activemq

  啟動方法:找到activemq安裝目錄,進入bin目錄,雙擊activemq.bat

  啟動結果:

  

 

2、啟動消息訂閱者01

  啟動方法: 打開MQConsumer01.java --> 右擊 --> Run as  --> java Application

  啟動結果:

  

 

3、啟動消息訂閱者02

  啟動方法: 打開MQConsumer02.java --> 右擊 --> Run as  --> java Application

  啟動結果:

  

 

4、啟動消息發布者

  啟動方法: 打開MQProducer.java --> 右擊 --> Run as  --> java Application

  啟動結果:(訂閱者會自動讀取消息,可以切換控制台查詢讀取情況)

  

  訂閱者01讀取消息后:

  

 

  訂閱者02讀取消息后:

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM