消息中間件ActiveMQ使用詳解


消息中間件ActiveMQ使用詳解

一、消息中間件的介紹

介紹

消息隊列 是指利用 高效可靠消息傳遞機制 進行與平台無關的 數據交流,並基於 數據通信 來進行分布式系統的集成。

特點(作用)

  • 應用解耦
  • 異步通信
  • 流量削峰
  • (海量)日志處理
  • 消息通訊
  • …...

應用場景

根據消息隊列的特點,可以衍生出很多場景,或者說很多場景都能用到。下面舉幾個例子:

1)異步通信

​ 注冊時的短信、郵件通知,減少響應時間;

2)應用解耦

​ 信息發送者和消息接受者無需耦合,比如調用第三方;

3)流量削峰

​ 例如秒殺系統;

二、消息中間件的對比

1.ActiveMQ

官網:activemq.apache.org/

簡介:

ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。

特點:

  1. 支持來自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各種跨語言客戶端和協議

  2. 完全支持JMS客戶端和Message Broker中的企業集成模式

  3. 支持許多高級功能,如消息組,虛擬目標,通配符和復合目標

  4. 完全支持JMS 1.1和J2EE 1.4,支持瞬態,持久,事務和XA消息

  5. Spring支持,以便ActiveMQ可以輕松嵌入到Spring應用程序中,並使用Spring的XML配置機制進行配置

  6. 專為高性能集群,客戶端 - 服務器,基於對等的通信而設計

  7. CXF和Axis支持,以便ActiveMQ可以輕松地放入這些Web服務堆棧中以提供可靠的消息傳遞

  8. 可以用作內存JMS提供程序,非常適合單元測試JMS

  9. 支持可插拔傳輸協議,例如in-VM,TCP,SSL,NIO,UDP,多播,JGroups和JXTA傳輸

  10. 使用JDBC和高性能日志支持非常快速的持久性

2.RabbitMQ

官網:www.rabbitmq.com/

簡介:

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。RabbitMQ輕巧且易於部署在雲端。 它支持多種消息傳遞協議。 RabbitMQ可以部署在分布式和聯合配置中,以滿足高規模,高可用性需求。RabbitMQ可運行在許多操作系統和雲環境中,並為大多數流行語言提供廣泛的開發工具。(來自官網翻譯)

AMQP (Advanced MessageQueue):高級消息隊列協議。它是應用層協議的一個開放標准,為面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。

RabbitMQ最初廣泛應用於金融行業,根據官網描述,它具有如下特點:

特點:

  1. 異步消息傳遞:支持多種消息協議,消息隊列,傳送確認,靈活的路由到隊列,多種交換類型;
  2. 支持幾乎所有最受歡迎的編程語言:Java,C,C ++,C#,Ruby,Perl,Python,PHP等等;
  3. 可以部署為高可用性和吞吐量的集群; 跨多個可用區域和區域進行聯合;
  4. 可插入的身份驗證,授權,支持TLS和LDAP。;
  5. 提供了一個易用的用戶界面,使得用戶可以監控和管理消息 Broker 的許多方面;
  6. 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件。

3. Kafka

官網:kafka.apache.org/

簡介:

Kafka是由Apache軟件基金會開發的一個開源流處理平台,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由於吞吐量的要求而通過處理日志和日志聚合來解決。 對於像Hadoop的一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。

Kafka它主要用於處理活躍的流式數據,因此Kafaka在大數據系統中使用較多。

特點:

  1. 同時為發布和訂閱提供高吞吐量。據了解,Kafka每秒可以生產約25萬消息(50 MB),每秒處理55萬消息(110 MB)。

  2. 可進行持久化操作。將消息持久化到磁盤,因此可用於批量消費,例如ETL,以及實時應用程序。通過將數據持久化到硬盤以及replication防止數據丟失。

  3. 分布式系統,易於向外擴展。所有的producer、broker和consumer都會有多個,均為分布式的。無需停機即可擴展機器。

  4. 消息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡。

  5. 支持online和offline的場景。

4. RocketMQ

官網:rocketmq.apache.org/

簡介:

RocketMQ是阿里開源的消息中間件,目前在Apache孵化,使用純Java開發,具有高吞吐量、高可用性、適合大規模分布式系統應用的特點。RocketMQ思路起源於Kafka,但並不是簡單的復制,它對消息的可靠傳輸及事務性做了優化,目前在阿里集團被廣泛應用於交易、充值、流計算、消息推送、日志流式處理、binglog分發等場景,支撐了阿里多次雙十一活動。

特點:

  1. 支持發布/訂閱(Pub/Sub)和點對點(P2P)消息模型
  2. 在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞
  3. 支持拉(pull)和推(push)兩種消息模式
  4. 單一隊列百萬消息的堆積能力
  5. 支持多種消息協議,如 JMS、MQTT 等
  6. 分布式高可用的部署架構,滿足至少一次消息傳遞語義
  7. 提供 docker 鏡像用於隔離測試和雲集群部署
  8. 提供配置、指標和監控等功能豐富的 Dashboard

三、ActiveMQ的安裝

1.安裝步驟

activemq在各個系統下都有對應的安裝包。以下來演示Linux系統下安裝activemq。

進入apache-activemq-5.15.8/bin目錄,啟動activemq./activemq start

輸出以上信息,表示啟動成功。

2.安裝遇到的問題

在安裝過程中,通過查看activemq的運行狀態,

顯示以上。

通過./bin/activemq console 命令查看運行日志:

主機名中包含非法字符;

那么解決辦法就很簡單了,改主機名:

1、方法一使用hostnamectl命令

hostnamectl set-hostname 主機名

2、方法二:修改配置文件 /etc/hostname 保存退出

修改完成之后重啟即可,這里我使用的是方法一:

hostnamectl set-hostname activemq

查看運行狀態:

五、ActiveMQ頁面介紹

待ActiveMQ安裝啟動好,訪問http://ip:8161/admin,登錄名和密碼都是admin(在配置文件中可修改),進入ActiveMQ的主頁:

下面來介紹每個菜單的功能:

1.Queue消息隊列頁面

Name:消息隊列的名稱。

Number Of Pending Messages:未被消費的消息數目。

Number Of Consumers:消費者的數量。

Messages Enqueued:進入隊列的消息 ;進入隊列的總消息數目,包括已經被消費的和未被消費的。 這個數量只增不減。

Messages Dequeued:出了隊列的消息,可以理解為是被消費掉的消息數量。在Queues里它和進入隊列的總數量相等(因為一個消息只會被成功消費一次),如果暫時不等是因為消費者還沒來得及消費。

2.Topic主題頁面

Name:主題名稱。

Number Of Pending Messages:未被消費的消息數目。

Number Of Consumers:消費者的數量。

Messages Enqueued:進入隊列的消息 ;進入隊列的總消息數目,包括已經被消費的和未被消費的。 這個數量只增不減。

Messages Dequeued:出了隊列的消息,可以理解為是被消費掉的消息數量。在Topics里,因為多消費者從而導致數量會比入隊列數高。

3.Subscribers查看訂閱者頁面

查看訂閱者信息,只在Topics消息類型中這個頁面才會有數據。

4.Connections查看連接數頁面

六、簡單使用

引入jar包:

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>

1.點對點(P2P)模型

​ 點對點模型,采用的是隊列(Queue)作為消息載體。在該模式中,一條消息只能被一個消費者消費,沒有被消費的,只能留在隊列中,等待被消費,或者超時。舉個例子,如果隊列中有10條消息,有兩個消費者,就是一個消費者消費5條信息,你一條我一條。以下以代碼演示。

消息發布者:

public static void main(String[] args) throws JMSException {
    /*
     * 實現步驟
     * 1.建立ConnectionFactory工廠對象,需要填入用戶名、密碼、連接地址(一般使用默認,如果沒有修改的話)
     * 2.通過ConnectionFactory對象創建一個Connection連接,並且調用Connection的start方法開啟連接,Connection方法默認是關閉的
     * 3.通過Connection對象創建Session會話(上下文環境對象),用於接收消息,參數1是是否啟用事物,參數2是簽收模式,一般設置為自動簽收
     * 4.通過Session對象創建Destination對象,指的是一個客戶端用來制定生產消息目標和消費消息來源的對象。在PTP的模式中,Destination被稱作隊列,在Pub/Sub模式中,Destination被稱作主題(Topic)
     * 5.通過Session對象創建消息的發送和接收對象(生產者和消費者)
     * 6.通過MessageProducer的setDeliverMode方法為其設置持久化或者非持久化特性
     * 7.使用JMS規范的TextMessage形式創建數據(通過Session對象),並用MessageProducer的send方法發送數據。客戶端同理。記得關閉
     */
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
            ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://94.191.49.192:61616");
    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("queue");
    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    for (int i=0;i<=5;i++) {
        TextMessage textMessage = session.createTextMessage();
        textMessage.setText("我是第"+i+"消息");
        producer.send(textMessage);
    }
    if(connection!=null){
        connection.close();
    }
}

消息消費者:

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://94.191.49.192:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("queue");
        MessageConsumer consumer = session.createConsumer(destination);
        while (true){
            TextMessage message = (TextMessage) consumer.receive();
            if (message==null){
                break;
            }
            System.out.println(message.getText());
        }
        if(connection!=null){
            connection.close();
        }
    }

先啟動兩個消費者,在啟動發布者:

2.發布/訂閱(Pub/Sub)模型

發布/訂閱模型采用的是主題(Topic)作為消息通訊載體。該模式類似微信公眾號的模式。發布者發布一條信息,然后將該信息傳遞給所有的訂閱者。注意:訂閱者想要接收到該信息,必須在該信息發布之前訂閱。

發布者發布信息:

     public static void main(String[] args) throws JMSException, IOException {
        // 創建一個ConnectionFactory對象連接MQ服務器
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://94.191.49.192:61616");
        // 創建一個連接對象
        Connection connection;
        connection = connectionFactory.createConnection();
        // 開啟連接
        connection.start();
        // 使用Connection對象創建一個Session對象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 創建一個Destination對象。topic對象
        Topic topic = session.createTopic("test-topic");
        // 使用Session對象創建一個消費者對象。
        MessageConsumer consumer = session.createConsumer(topic);
        // 接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                // 打印結果
                TextMessage textMessage = (TextMessage) message;
                String text;
                try {
                    text = textMessage.getText();
                    System.out.println("這是接收到的消息:" + text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }

            }
        });
        System.out.println("topic消費者啟動。。。。");
        // 等待接收消息
        System.in.read();
        // 關閉資源
        consumer.close();
        session.close();
        connection.close();
    }

訂閱者訂閱信息:

    public static void main(String[] args) throws JMSException {
        // 1、創建一個連接工廠對象,需要指定服務的ip及端口。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://94.191.49.192:61616");
        // 2、使用工廠對象創建一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        // 3、開啟連接,調用Connection對象的start方法。
        connection.start();
        // 4、創建一個Session對象。
        // 第一個參數:是否開啟事務。如果true開啟事務,第二個參數無意義。一般不開啟事務false。
        // 第二個參數:應答模式。自動應答或者手動應答。一般自動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、使用Session對象創建一個Destination對象。兩種形式queue、topic,現在應該使用topic
        Topic topic = session.createTopic("test-topic");
        // 6、使用Session對象創建一個Producer對象。
        MessageProducer producer = session.createProducer(topic);
        // 7、創建一個Message對象,可以使用TextMessage。
        for (int i = 0; i < 50; i++) {
            TextMessage textMessage = session.createTextMessage("第" + i + "一個ActiveMQ隊列目的地的消息");
            // 8、發送消息
            producer.send(textMessage);
        }
        // 9、關閉資源
        producer.close();
        session.close();
        connection.close();
    }

訂閱者要提前訂閱,所以先運行訂閱者。

3.兩種模式對比

1)由以上,我們可以總結出ActiveMQ的實現步驟:

  • 建立ConnectionFactory工廠對象,需要填入用戶名、密碼、連接地址
  • 通過ConnectionFactory對象創建一個Connection連接
  • 通過Connection對象創建Session會話
  • 通過Session對象創建Destination對象;在P2P的模式中,Destination被稱作隊列(Queue),在Pub/Sub模式中,Destination被稱作主題(Topic)
  • 通過Session對象創建消息的發送和接收對象
  • 發送消息
  • 關閉資源

2)可以看出,P2P模式和Pub/Sub模式,在實現上的區別是通過Session創建的Destination對象不一樣,在P2P的模式中,Destination被稱作隊列(Queue),在Pub/Sub模式中,Destination被稱作主題(Topic)

七、參考

  1. https://www.jianshu.com/p/0363ac9ff574
  2. https://juejin.im/post/5adaaae351882567356415eb

作者:追夢1819
原文:https://www.cnblogs.com/yanfei1819/p/10615605.html
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM