ActiveMQ(一)初步接觸-編寫Demo


聲明 轉載請注明出處! Reprint please indicate the source!

http://www.hiknowledge.top/?p=86&preview=true

MessageQueue是分布式的系統里經常要用到的組件。

什么是JMS

JMS即Java消息服務(Java Message Service)應用程序接口,是一個Java平台中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平台無關的API,絕大多數MOM提供商都對JMS提供支持。

(引用自百度百科)

JMS 在其中扮演的角色與JDBC 很相似,正如JDBC 提供了一套用於訪問各種不同關系數據庫的公共API,JMS 也提供了獨立於特定廠商的企業消息系統訪問方式。JMS 的編程過程很簡單,概括為:應用程序A 發送一條消息到消息服務器(也就是JMS Provider)的某個目得地(Destination),然后消息服務器把消息轉發給應用程序B。因為應用程序A 和應用程序B 沒有直接的代碼關連,所以兩者實現了解偶。

JMS消息解耦

(引用自博客)

JMS的用途

  • 解耦
  • 數據的可靠傳輸
  • 保證數據不重發,不丟失
  • 能夠實現跨平台操作,能夠為不同操作系統上的軟件集成數據傳送服務。

消息的傳遞模型

JMS支持兩種消息傳遞模型:

點對點(point-to-point,簡稱PTP)和發布/訂閱(publish/subscribe,簡稱pub/sub)。這兩種消息傳遞模型非常相似,但有以下區別:

  • a. PTP消息傳遞模型規定了一條消息之恩能夠傳遞費一個接收方。
  • b. Pub/sub消息傳遞模型允許一條消息傳遞給多個接收方 每個模型都通過擴展公用基類來實現。例如:javax.jms.Queue和Javax.jms.Topic都擴展自javax.jms.Destination類。

上面兩種消息傳遞模型里,我們都需要定義消息生產者和消費者,生產者吧消息發送到JMS Provider的某個目標地址(Destination),消息從該目標地址傳送至消費者。消費者可以同步或異步接收消息,一般而言,異步消息消費者的執行和伸縮性都優於同步消息接收者,體現在:

  1. 異步消息接收者創建的網絡流量比較小。單向對東消息,並使之通過管道進入消息監聽器。管道操作支持將多條消息聚合為一個網絡調用。
  2. 異步消息接收者使用線程比較少。異步消息接收者在不活動期間不使用線程。同步消息接收者在接收調用期間內使用線程,結果線程可能會長時間保持空閑,尤其是如果該調用中指定了阻塞超時。
  3. 對於服務器上運行的應用程序代碼,使用異步消息接收者幾乎總是最佳選擇,尤其是通過消息驅動Bean。使用異步消息接收者可以防止應用程序代碼在服務器上執行阻塞操作。而阻塞操作會是服務器端線程空閑,甚至會導致死鎖。阻塞操作使用所有線程時則發生死鎖。如果沒有空余的線程可以處理阻塞操作自身解鎖所需的操作,這該操作永遠無法停止阻塞。

(引用自博客)

什么是ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。

(引用自百度百科)

官方主頁

權威書籍

ActiveMQ in Action

環境配置

Maven依賴

引入核心包

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-core</artifactId>
  <version>5.7.0</version>
</dependency>

TIPS:如果你引入的是下面的activemq-all.jar,且工程中已經引入了SLF4J,會與activemq-all.jar中的SLF4J發生沖突。

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.14.4</version>
</dependency>

啟動broker

要運行起來demo得先,啟動broker。我是在虛擬機上測試的。ip:192.168.235.100

進入到 執行:

activemq start

測試

訪問頁面 http://192.168.235.100:8161/

broker管理頁面

默認用戶名/密碼:admin/admin

接口介紹

API文檔

由於歷史原因,JMS提供有四套API接口。

  • JMS1.0 定義了兩個域相關的API,queue、topic。
  • JMS1.1 引入了一組新API,也叫做傳統API。
  • JMS2.0 引入了一組簡化API,擁有傳統API所有的特性,同時接口更少、使用更方便。

每組API挺一組不同的接口集合,擁有連接到JMS提供者、發送和接收消息。它們共享一組代表消息、消息目的地和其他各方面功能特性的通用接口。所有的接口都在javax.jms下。

傳統API接口

傳統API接口模型圖 傳統API接口模型圖

Destination 接口是Queue和Topic的頂層接口。

接口 用處
ConnectionFactory 用戶用來創建到JMS提供者的連接的被管對象。JMS客戶通過可移植的接口訪問連接,這樣當下層的實現改變時,代碼不需要進行修改。 管理員在JNDI名字空間中配置連接工廠,這樣,JMS客戶才能夠查找到它們。根據消息類型的不同,用戶將使用隊列連接工廠,或者主題連接工廠。
Connection 連接代表了應用程序和消息服務器之間的通信鏈路。在獲得了連接工廠后,就可以創建一個與JMS提供者的連接。根據不同的連接類型,連接允許用戶創建會話,以發送和接收隊列和主題到目標。
Session 表示一個單線程的上下文,用於發送和接收消息。由於會話是單線程的,所以消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支持事務。如果用戶選擇了事務支持,會話上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務之前,用戶可以使用回滾操作取消這些消息。一個會話允許用戶創建消息生產者來發送消息,創建消息消費者來接收消息。
Destination 目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發布和接收的地點,或者是隊列,或者是主題。JMS管理員創建這些對象,然后用戶通過JNDI發現它們。和連接工廠一樣,管理員可以創建兩種類型的目標,點對點模型的隊列,以及發布者/訂閱者模型的主題。
MessageConsumer 由會話創建的對象,用於接收發送到目標的消息。消費者可以同步地(阻塞模式),或異步(非阻塞)接收隊列和主題類型的消息。
MessageProducer 由會話創建的對象,用於發送消息到目標。用戶可以創建某個目標的發送者,也可以創建一個通用的發送者,在發送消息時指定目標。
Message 是在消費者和生產者之間傳送的對象,也就是說從一個應用程序創送到另一個應用程序。一個消息有三個主要部分:消息頭(必須):包含用於識別和為消息尋找路由的操作設置。一組消息屬性(可選):包含額外的屬性,支持其他提供者和用戶的兼容。可以創建定制的字段和過濾器(消息選擇器)。

一個消息體(可選):允許用戶創建五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)。消息接口非常靈活,並提供了許多方式來定制消息的內容。

簡化API接

簡化API接口模型圖 簡化版API接口模型圖

demo注意:ActiveMQ是沒有實現簡化版接口的。不僅ActiveMQ,很多廠商也沒有支持簡化版API接口。

點對點模式

點對點模式,有點類似關系數據庫。從編程角度,它里面的Acknowledge,就類似於數據庫的commit。Connection連接、Session會話、工廠模式等,在設計上與數據庫很像。

ActiveMQ中Queue實現了點對點模型。

JMSProducer.java

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/** * Created with IntelliJ IDEA.<br> * Description: JMS ActiveMQ Demo測試 消息生產者<br> * 運行前,需要打開本地的activemq。 * 如果需要更改broker地址,要提前運行相應的broker。 * User: jahen<br> * Date: 2017-04-02<br> * Time: 11:06<br> */
public class JMSProducer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默認連接
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默認密碼
// private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默認連接地址 為 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連接地址 (my VM)
    private static final int SENDNUM = 10; // 發送的消息數量

    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 連接工程,生產Connection
        Connection connection = null; // 連接
        Session session; // 會話 接受或者發送消息的線程
        Destination destination; // 消息的目的地
        MessageProducer messageProducer; // 消息生產者

        // 實例化連接工廠
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        // 創建連接
        try {
            connection = connectionFactory.createConnection();
            connection.start(); // 啟動連接

            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 提交事務,自動確認

            destination = session.createQueue("FirstQueue"); // 創建消息隊列
            messageProducer = session.createProducer(destination); // 創建消息發送者

            sendMessage(session, messageProducer); // 發送消息
            session.commit(); // 提交事務
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection!=null)
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
        }
    }

    /** * 發送消息 * @param session 會話 * @param messageProducer 消息生產者 */
    private static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        for (int i=0; i<JMSProducer.SENDNUM; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ 發送的消息 "+i);
            System.out.println("發送消息: ActiveMQ 發送的消息 "+i);
            messageProducer.send(message);
        }
    }
}

運行一下JMSProudcer,生產10條消息。

JMSConsumer.java

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;

import javax.jms.*;

/** * Created with IntelliJ IDEA.<br> * Description: 消息消費者1-點對點模式<br> * 實現方式1 循環檢測<br> * User: jahen<br> * Date: 2017-04-02<br> * Time: 13:44<br> */
public class JMSConsumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默認連接
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默認密碼
    // private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默認連接地址 為 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連接地址 (my VM)

    public static void main(String args[]) {
        ConnectionFactory connectionFactory; // 連接工程,生產Connection
        Connection connection = null; // 連接
        Session session; // 會話 接受或者發送消息的線程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息消費者

        // 實例化連接工廠
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消費消息不需要事務,自動確認
            destination = session.createQueue("FirstQueue"); // 創建消息隊列

            messageConsumer = session.createConsumer(destination); // 創建消息消費者

            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);// 設置延時為100s
                if (textMessage!=null) { // 接收到消息
                    System.out.println("接收的消息:"+textMessage.getText());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

運行一下JMSConsumer,消費10條消息。

image

這種方式消費消息,通過循環檢查,顯然是不高明的。

下面,通過設置監聽的方式,實現消息消費。

再次運行一下JMSProudcer,生產10條消息。

又生產了10條消息

首先實現一下監聽器

Listenr.java

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/** * Created with IntelliJ IDEA.<br> * Description: 消息監聽者<br> * User: jahen<br> * Date: 2017-04-02<br> * Time: 14:30<br> */
public class Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("收到消息:" + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

JMSConsumer2.java

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/** * Created with IntelliJ IDEA.<br> * Description: 消息消費者2-點對點模式<br> * 實現方式2 設置監聽<br> * User: jahen<br> * Date: 2017-04-02<br> * Time: 13:44<br> */
public class JMSConsumer2 {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默認連接
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默認密碼
    // private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默認連接地址 為 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連接地址 (my VM)

    public static void main(String args[]) {
        ConnectionFactory connectionFactory; // 連接工程,生產Connection
        Connection connection = null; // 連接
        Session session; // 會話 接受或者發送消息的線程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息消費者

        // 實例化連接工廠
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消費消息不需要事務,自動確認
            destination = session.createQueue("FirstQueue"); // 創建消息隊列

            messageConsumer = session.createConsumer(destination); // 創建消息消費者

            messageConsumer.setMessageListener(new Listener());// 注冊消息監聽
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

運行一下JMSConsumer2,新產生的消息被消費了。

新產生的消息被消費了

消息發布/訂閱模式

發布/訂閱模式是一對多的關系。

注意:發布/訂閱要先運行訂閱,再運行發布才能收到消息。

發布者和訂閱者之間有時間上的依賴性。針對某個主題Topic的訂閱者,它必須創建一個訂閱者之后,才能消費發布者的消息,而且為了消費消息,訂閱者必須保持運行的狀態。

Topic 實現了發布/訂閱模型。

JMSConsumer.java

package com.jahentao.activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/** * Created with IntelliJ IDEA.<br> * Description: 消息消費者-發布訂閱模式 消息訂閱者<br> * 實現方式 設置監聽<br> * 消息訂閱者1<br> * User: jahen<br> * Date: 2017-04-02<br> * Time: 13:44<br> */
public class JMSConsumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默認連接
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默認密碼
    // private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默認連接地址 為 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連接地址 (my VM)

    public static void main(String args[]) {
        ConnectionFactory connectionFactory; // 連接工程,生產Connection
        Connection connection = null; // 連接
        Session session; // 會話 接受或者發送消息的線程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息消費者

        // 實例化連接工廠
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消費消息不需要事務,自動確認
// destination = session.createQueue("FirstQueue"); // 創建消息隊列
            destination = session.createTopic("FirstTopic"); // 創建消息訂閱者
            messageConsumer = session.createConsumer(destination); // 創建消息消費者

            messageConsumer.setMessageListener(new Listener());// 注冊消息監聽
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

Listener.java

package com.jahentao.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/** * Created with IntelliJ IDEA.<br> * Description: 訂閱者1消息監聽器<br> * User: jahen<br> * Date: 2017-04-02<br> * Time: 14:46:52<br> */
public class Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("訂閱者一 收到消息:" + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

JMSConsumer2.java

package com.jahentao.activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/** * Created with IntelliJ IDEA.<br> * Description: 消息消費者-發布訂閱模式 消息訂閱者<br> * 實現方式 設置監聽<br> * 消息訂閱者2<br> * User: jahen<br> * Date: 2017-04-02<br> * Time: 13:44<br> */
public class JMSConsumer2 {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默認連接
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默認密碼
    // private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默認連接地址 為 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連接地址 (my VM)

    public static void main(String args[]) {
        ConnectionFactory connectionFactory; // 連接工程,生產Connection
        Connection connection = null; // 連接
        Session session; // 會話 接受或者發送消息的線程
        Destination destination; // 消息的目的地
        MessageConsumer messageConsumer; // 消息消費者

        // 實例化連接工廠
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消費消息不需要事務,自動確認
// destination = session.createQueue("FirstQueue"); // 創建消息隊列
            destination = session.createTopic("FirstTopic"); // 創建消息訂閱者
            messageConsumer = session.createConsumer(destination); // 創建消息消費者

            messageConsumer.setMessageListener(new Listener2());// 注冊消息監聽
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

Listener2.java

package com.jahentao.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/** * Created with IntelliJ IDEA.<br> * Description: 訂閱者2消息監聽器<br> * User: jahen<br> * Date: 2017-04-02<br> * Time: 14:46:52<br> */
public class Listener2 implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("訂閱者二 收到消息:" + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

首先,分別運行JMSConsumer、JMSConsumer2進行訂閱。

2個訂閱者

JMSProducer.java

package com.jahentao.activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/** * Created with IntelliJ IDEA.<br> * Description: JMS ActiveMQ Demo測試 發布訂閱模式 消息發布者<br> * 運行前,需要打開本地的activemq。 * 如果需要更改broker地址,要提前運行相應的broker。 * User: jahen<br> * Date: 2017-04-02<br> * Time: 14:42:59<br> */
public class JMSProducer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默認連接
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默認密碼
// private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默認連接地址 為 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連接地址 (my VM)
    private static final int SENDNUM = 10; // 發送的消息數量

    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 連接工程,生產Connection
        Connection connection = null; // 連接
        Session session; // 會話 接受或者發送消息的線程
        Destination destination; // 消息的目的地
        MessageProducer messageProducer; // 消息生產者

        // 實例化連接工廠
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        // 創建連接
        try {
            connection = connectionFactory.createConnection();
            connection.start(); // 啟動連接

            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 提交事務,自動確認

// destination = session.createQueue("FirstQueue"); // 創建消息隊列
            destination = session.createTopic("FirstTopic"); // 創建主題
            messageProducer = session.createProducer(destination); // 創建消息發送者

            sendMessage(session, messageProducer); // 發送消息
            session.commit(); // 提交事務
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection!=null)
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
        }
    }

    /** * 發送消息 * @param session 會話 * @param messageProducer 消息生產者 */
    private static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        for (int i = 0; i< JMSProducer.SENDNUM; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ 發送的消息 "+i);
            System.out.println("發送消息: ActiveMQ 發送的消息 "+i);
            messageProducer.send(message);
        }
    }
}

然后運行JMSProducer。

發布消息訂閱者收到消息

參考

java1234上發布的教程"一頭扎進ActiveMQ"

這里學習的源碼,托管在碼雲


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM