ActiveMQ使用


ActiveMQ介紹

ActiveMQ是一種消息中間件,有兩種模式,一種點對點模式 發布者將發布的消息發送給服務器,等待用戶監聽並接受數據;第二種訂閱模式 發布者將消息發布給消息服務器,讓服務器將所有的數據直接轉發給再監聽的用戶,進行一對多通信(類似微信公眾號)。

ActiveMQ主要是為了降低程序間的耦合,以及異步執行處理時間較長的代碼(如網絡通信)。

點對點模式:

  發布者發布8條信息,這時有3個用戶在監聽服務器消息,則3個用戶共同消費這8條消息。服務器中的每條消息只能被一個用戶消費,這種模式服務器會存儲發布者發布的數據,當未被用戶接收的數據則會留在服務器中,等待下個監聽服務器的用戶接收數據。

 

訂閱模式(持久訂閱模式/非持久訂閱模式):

  發布者發布消息給消息服務器,消息服務器則將消息直接轉發給監聽的用戶,這要求發布者發布消息的同時用戶也在監聽消息,若沒有用戶監聽, 則不保留數據,認為數據已發送完成。也就是發布者發布時,用戶沒在監聽消息,則不會在收到該數據。即使用戶以后再監聽也接收不到

  

  持久訂閱模式:訂閱者會注冊一個clientId,當訂閱者離線時,ActiveMQ會為這個 ID 保存所有擁有這個ID的主題的消息,當訂閱者連接時,則會通過自己的clientId得到所有自己處於離線時所要接收主題消息

  非持久訂閱模式:只有當訂閱者處於連接狀態才會接收到發布者發布出來的消息,並且發送完成后ActiveMQ則將消息丟棄。

在程序中使用ActiveMQ

  從官網中下載activeMQ,下載地址:http://activemq.apache.org/download.html

  解壓后,打開目錄下的bin,根據自己的系統選擇win32或win64安裝Active服務,並開啟activeMQ

  開啟后瀏覽器訪問該地址:http://127.0.0.1:8161/,選擇 Manage ActiveMQ broker,輸入賬號密碼,默認都是admin

 

 

  創建一個maven項目,在pom.xml文件中引入jar包

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.9.0</version>
</dependency>

 點對點模式

創建一個消費者和一個生產者的類

Consumer.java 消費者通過消息監聽器監聽服務器上的信息

package cn.lcf.activeMQ;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Hello world!
 *
 */
public class Consumer {
    //設置連接地址
    private static final String url = "tcp://127.0.0.1:61616";
    //設置消息隊列名稱
    private static final String queueName = "queue-text";
    
    public static void main(String[] args) throws JMSException {
        // 1、創建連接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        // 2、創建連接對象
        Connection createConnection = connectionFactory.createConnection();
        // 3、啟動連接
        createConnection.start();
        // 4、創建會話     createSession第一個參數表示是否支持事務,第二個參數是客戶端接收確認模式,Session.AUTO_ACKNOWLEDGE是自動確認,Session.CLIENT_ACKNOWLEDGE 客戶端通過調用消息的 acknowledge 方法簽收消息。
        Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
        // 5、創建消息目標 
         Queue createQueue = createSession.createQueue(queueName); 
         // 6 、創建消費者 
         MessageConsumer createConsumer = createSession.createConsumer(createQueue); 
         // 7、設置消費者監聽 
         createConsumer.setMessageListener(new MessageListener() { 
             @Override 
             public void onMessage(Message message) { 
                 TextMessage textMessage = (TextMessage) message; 
                 try { 
                     System.out.println("接收的消息為" + textMessage.getText()); 
                 } catch (JMSException e) {
                     // TODO Auto-generated catch block e.printStackTrace();
                 } 
            } 
         });
    }
}

  producer.java  生產者跟消費者差不多,這是第6步開始,變成創建生產者,並發送消息,發送完成之后需要關閉連接。

package cn.lcf.activeMQ;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Hello world!
 *
 */
public class Producer 
{
    private static final String url = "tcp://127.0.0.1:61616";
    
    private static final String queueName = "queue-text";
    
    public static void main( String[] args ) throws JMSException
    {
        // 1、創建連接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        // 2、創建連接對象
        Connection createConnection = connectionFactory.createConnection();
        // 3、啟動連接
        createConnection.start();
        // 4、創建會話
        Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、創建消息目標
        Queue createQueue = createSession.createQueue(queueName);
        // 6、創建生產者
        MessageProducer createProducer = createSession.createProducer(createQueue);
        for (int i=0;i<100;++i) {
            // 7、創建消息
            TextMessage textMessage = createSession.createTextMessage("666  " + i);
            // 8、發布消息
            createProducer.send(textMessage);
            System.out.println("發送的消息為:" + "666  " + i);
        }
        
        // 9、關閉連接
        createConnection.close();
        
    }
}

運行下生產者的類 Producer.java,將消息存到ActiveMQ服務器上

查看ActiveMQ中的隊列信息

擁有100條信息,未出列,這時候運行一個消費者(Consumer.java)去消費這100條信息

查看ActiveMQ上的信息,100條信息全被這個1個消費者接收

 

清空,然后同時運行三個消費者(Consumer.java執行三次后可在console切換不同類的控制台),在運行一個生產者

可以看到這100條消息被這三個消費者平分了。

點對點模式主要用於消除程序高並發高峰對數據庫造成的巨大壓力,可以通過使用消息隊列,讓消費者進程從消息隊列中獲取數據,然后異步將數據寫入數據庫,由於消息隊列的服務處理速度遠快於數據庫,因此用戶的響應延遲可得到有效改善。

 

訂閱模式(非持久訂閱)

  將消費者(Consumer.java)修改成以下內容,寫法相同,只是session不再是創建隊列消費者,而是創建主題消費者

package cn.lcf.TestActiveMQ;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消費者
 *
 */
public class Consumer {
    private static final String URL = "tcp://127.0.0.1:61616";
    //訂閱模式名稱
    private static final String topicName = "topic-name";
    
    public static void main(String[] args) throws JMSException {
        //創建連接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        //創建連接
        Connection createConnection = connectionFactory.createConnection();//打開連接
        createConnection.start();
        //創建會話
        Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //創建發布/訂閱模式消息
        Topic createTopic = createSession.createTopic(topicName);//        非持久訂閱
        //創建消費者
        MessageConsumer createConsumer = createSession.createConsumer(createTopic);         //設置消費者監聽
        createConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message; try { System.out.println("接收的消息為:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } });
    }
}

將生產者(Producer.java)修改成以下內容

package cn.lcf.TestActiveMQ;

import java.util.Enumeration;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 生產者
 *
 */
public class Producer {

    private static final String URL = "tcp://127.0.0.1:61616";
    //發布/訂閱模式名稱
    private static final String topicName = "topic-name";
    
    public static void main(String[] args) throws JMSException {
        
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        Connection createConnection = connectionFactory.createConnection();
        
        createConnection.start();
        
        Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //發布/訂閱模式
        Topic createTopic = createSession.createTopic(topicName);
        
        MessageProducer createProducer = createSession.createProducer(createTopic);
        
        for (int i = 0; i < 100; i++) {
            TextMessage textMessage = createSession.createTextMessage("666 " + i);
            createProducer.send(textMessage);
            System.out.println("發送的消息為:" + textMessage.getText());
        }
        createConnection.close();
    }
}

之后先運行消費者,在運行生產者,消費者才能接受到信息,否則生產者發布信息時若沒有在監聽的消費者則會將信息丟棄,這樣消費者是接收不到信息的。

同時運行多個消費者,在運行生產者,消費者將獲取生產者發布的所有消息

 

 

訂閱模式(持久訂閱)

持久訂閱模式的客戶端需要創建一個鏈接id,以保證服務器確認該客戶端是否已消費信息,創建完訂閱模式,之后不再是創建一個消費者,而是創建一個帶有id的用戶,這個用戶id是唯一的,若有兩個相同的id連接,則會報錯

public static void main(String[] args) throws JMSException {
        //創建連接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        //創建連接
        Connection createConnection = connectionFactory.createConnection();
        //創建客戶端ID
        createConnection.setClientID("333");
        //打開連接
        createConnection.start();
        //創建會話
        Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        //創建發布/訂閱模式消息
        Topic createTopic = createSession.createTopic(topicName);
        
        //創建持久訂閱 即未在發布者發布時監聽消息,在之后也能接收消息
        TopicSubscriber subscriber = createSession.createDurableSubscriber(createTopic, "333");
        
        subscriber.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接受消息:" + textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
    }

生產者需要將消息模式設為持久訂閱模式

public static void main(String[] args) throws JMSException {
        
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        
        Connection createConnection = connectionFactory.createConnection();
        
        createConnection.start();
        
        Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //發布/訂閱模式
        Topic createTopic = createSession.createTopic(topicName);
        
        MessageProducer createProducer = createSession.createProducer(createTopic);
        //設置為持久訂閱模式
        createProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        
        for (int i = 0; i < 100; i++) {
            TextMessage textMessage = createSession.createTextMessage("666 " + i);
            createProducer.send(textMessage);
            System.out.println("發送的消息為:" + textMessage.getText());
        }
        createConnection.close();
    }

運行用戶(Consumer.java),創建連接id,之后將用戶連接關閉,啟動生產者(Producer.java)發布消息,最后在重新連接用戶獲取信息。當用戶離線狀態時,發布者發布的消息會將信息存在activeMQ服務器上,等待用戶監聽時將消息發送給用戶。

生成的用戶會在subscribers中顯示

運行發布者(Producer.java),並且用戶處於離線狀態,則會顯示消息等待出列。

 

最后再次連接上用戶(Consumer.java),則用戶的能立即獲取消息。

持久傳輸和非持久傳輸最大的區別是:采用持久傳輸時,傳輸的消息會保存到磁盤中(messages are persisted to disk/database),即“存儲轉發”方式。先把消息存儲到磁盤中,然后再將消息“轉發”給訂閱者。

采用非持久傳輸時,發送的消息不會存儲到磁盤中。

采用持久傳輸時,當Borker宕機 恢復后,消息還在。采用非持久傳輸,Borker宕機重啟后,消息丟失。比如,當生產者將消息投遞給Broker后,Broker將該消息存儲到磁盤中,在Broker將消息發送給Subscriber之前,Broker宕機了,如果采用持久傳輸,Broker重啟后,從磁盤中讀出消息再傳遞給Subscriber;如果采用非持久傳輸,這條消息就丟失了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM