ActiveMQ 事務、集群、持久訂閱者、ActiveMQ監控


JMS介紹

JMS是什么?

  JMS的全稱Java Message Service,既Java消息服務。

  JMS是SUN提供的旨在統一各種MOM(Message-Oriented Middleware)系統接口的規范,它包含點對點(Point to Point,PTP)發布/訂閱(Publish/Subscribe,pub/sub)兩種消息模型,提供可靠消息傳輸、事務和消息過濾等機制。

  ActiveMQ是Apache出品的開源項目,他是JMS規范的一個實現。

MOM是什么?

  MOM(Message-Oriented Middleware)面向消息的中間件,使用消息中間件來協調消息傳輸操作。

MOM需要提供API和管理工具

  • 客戶端調用API,把消息發送到消息中間件指定的目的地。在消息發送之后,客戶端會繼續執行其他的工作。
  • 接收方收到這個消息確認之前,消息中間件一直保留該消息。

JMS的作用是什么?

  在不同應用之間進行通信或者從一個系統傳輸數據到另外一個系統。兩個應用程序之間,或分布式系統中發送消息,進行異步通信,程序或應用之間解耦

  它主要用於在生產者和消費者之間進行消息傳遞,生產者負責產生消息,而消費者負責接收消息。把它應用到實際的業務需求中的話我們可以在特定的時候利用生產者生成-消息,並進行發送,對應的消費者在接收到對應的消息后去完成對應的業務邏輯。

JMS的應用場景

主要可以應用於規模和復雜度較高的分布式系統:

  • 異步通信:客戶發出調用后,不用等待服務對象完成處理並返回結果后就能繼續執行;
  • 客戶和服務對象的生命周期解耦合:客戶進行和服務對象進行不需要都正常運行;如果由於服務對象崩潰或網絡故障導致客戶的請求不可達,不會影響客戶端正常響應;
  • 一對一或一對多通信:客戶的一次調用可以發送給一個或多個目標對象;

JMS中的角色

三種角色:生產者(Java應用程序)、消費者(Java應用程序)、消息中間件(MQ)

JMS消息模型

點對點模型(基於隊列)

  • 消息的生產者和消費者之間沒有時間上的相關性。
  • 生產者把消息發送到隊列中(Queue),可以有多個發送者,但只能被一個消費者消費。一個消息只能被一個消費者消費一次。
  • 消費者無需訂閱,當消費者未消費到消息時就會處於阻塞狀態

發布者/訂閱者模型(基於主題的)

  • 生產者和消費者之間有時間上的相關性,訂閱一個主題的消費者只能消費自它訂閱之后發布的消息
  • 生產者將消息發送到主題上(Topic)
  • 消費者必須先訂閱,JMS規范允許提供客戶端創建持久訂閱

JMS消息組成

消息頭

消息正文

  JMS定義了五種不同的消息正文格式,以及調用的消息類型,允許你發送並接收一些不同形式的數據,提供現有消息格式的一些級別的兼容性。

  • StreamMessage --Java原始值得數據流
  • MapMessage --一套名稱-值對
  • TextMessage --一個字符串對象
  • ObjectMessage --一個序列化的Java對象
  • BytesMessage --一個字節的數據流

消息屬性

總結

1、JMS是什么?是指定消息發送和接收的一套標准

2、JMS的角色:生產者、消費者、MOM消息中間件

3、JMS消息模型:點對點、發布訂閱模型

4、JMS消息正文:Stream、Map、Text、Byte、Object

ActiveMQ介紹

什么是ActiveMQ

  MQ,既Message Queue,就是消息隊列的意思。

  ActiveMQ是Apache出品,最流行,能力強勁的開源消息總線。ActiveMQ是一個完全支持JMS1.1和J2EE 1.4規范的JMS Provider實現,盡管JMS規范出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演着特殊地位。

ActiveMQ主要特點

  1. 多種語言和協議編寫客戶端,語言:CC++C#DelphiErlangAdobe FlashHaskellJavaJavaScriptPerlPHPPikePythonRuby
  2. 支持Java消息服務(JMS) 1.1 版本
  3. 對Srping的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去,而且也支持Spring2.0的特性
  4. 協議支持包括:OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP以及AMQP
  5. 集群

ActiveMQ下載安裝

下載

http://activemq.apache.org/components/classic/download/

下載版本(我使用的版本最新):5.15.12

安裝jdk(必須要安裝)

1、先卸載系統自帶的jdk

1、查看安裝的jdk
rpm -qa | grep java

2、卸載系統自帶jdk
rpm -e --nodeps 包名

2、安裝JDK,ActiveMQ是使用Java開發的

  當前最新版本下載地址:http://www.oracle.com/technetwork/java/javase/downloads/index.html

  歷史版本下載地址:  http://www.oracle.com/technetwork/java/javase/archive-139210.html   

我下載的是1.8,點我直達

鏈接: https://pan.baidu.com/s/1DZGsJuLUrhpEQm7jaSKTwg  密碼: baa5

3、解壓到指定位置

tar -zxvf jdk-8u202-linux-x64.tar.gz

4、修改/etc/profile文件

在最下面添加兩行代碼

 export JAVA_HOME=/cyb/soft/jdk1.8.0_202
 export PATH=$JAVA_HOME/bin:$PATH

5、執行source操作

source /etc/profile

6、檢查是否安裝成功

java -version

安裝ActiveMQ

1、解壓縮

tar -zxvf apache-activemq-5.15.12-bin.tar.gz

2、啟動ActiveMQ

cd apache-activemq-5.15.12/bin/

./activemq start

3、訪問ActiveMQ后台

地址:http://192.168.191.132:8161/admin/
賬戶:admin
密碼:admin

4、訪問測試

注:為什么端口是8161,因為ActiveMQ用的內嵌web服務器jetty,端口可以修改,配置文件在/conf/jetty.xml

補充

  ActiveMQ與jdk是有版本對應關系的!!!!!

ActiveMQ使用

創建Demo工程

  • 消息生產者:activemq-producer-demo工程(jar)
  • 消息消費者:activemq-consumer-demo工程(jar)

添加Maven依賴

  生產者和消費者都要加入以下依賴

    <dependencies>
        <!--activemq依賴-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.12</version>
        </dependency>
        <!--junit依賴-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

點對點模式演示

提供者(activemq-producer-demo)

package com.cyb.activemq.producer;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.junit.Test;

import javax.jms.*;

public class Producer {
    @Test
    public void testQueueProducer() throws Exception {
        Connection connection = null;
        MessageProducer producer = null;
        Session session = null;
        try {
            //第一步:創建ConnectionFactory,用於連接broker
            String brokerURL = "tcp://192.168.1.106:61616";
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
            //設置消息發送為同步發送
            ((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
            //設置
            ((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000);
            //第二步:通過工廠,創建Connection
            connection = connectionFactory.createConnection();
            ((ActiveMQConnection) connection).setUseAsyncSend(true);
            //第三步:連接啟動
            connection.start();
            //第四步:通過連接獲取session會話
            //第一個參數:是否啟用ActiveMQ事務,如果為true,第二個參數無用
            //第二個參數:應答模式,AUTO_ACKNOWLEDGE為自動應答
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //第五步:通過session創建destination,兩種目的地:Queue、Topic
            //參數:消息隊列的名稱,在后台管理系統中可以看到
            Queue queue = session.createQueue("cyb-queue");
            //第六步:通過session創建MessageProducer
            producer = session.createProducer(queue);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            //第七步:創建Message
            //方式一
            //TextMessage message=new ActiveMQTextMessage();
            //message.setText("queue test");
            //方式二
            TextMessage message1 = session.createTextMessage("博客園地址:https://www.cnblogs.com/chenyanbin/");
            //第八步:通過producer發送消息
            producer.send(message1);
            //session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //第九步:關閉資源
            producer.close();
            session.close();
            connection.close();
        }
    }
}

消費者(activemq-consumer-demo)

package com.cyb.activemq.consumer;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

import javax.jms.*;

public class Consumer {
    @Test
    public void testQueueConsumer() throws Exception{
        //第一步:創建ConnectionFactory
        String brokerURL="tcp://192.168.1.106:61616";
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(brokerURL);
        //第二步:通過工廠,創建Connection
        Connection connection=connectionFactory.createConnection();
        //第三步:打開鏈接
        connection.start();
        //第四步:通過Connection創建session
        Session session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        //第五步:通過session創建Consumer
        Queue queue=session.createQueue("cyb-queue");
        MessageConsumer consumer=session.createConsumer(queue);
        //第六步:通過consumer接收信息(兩種方式:1、receive方法接收(同步);2、通過監聽器接收(異步))
        //方式1、receive方法接收信息
        Message message=consumer.receive(100000);
        //第七步:處理信息
        if (message!=null&&message instanceof TextMessage){
            TextMessage tm=(TextMessage)message;
            System.out.println(tm.getText());
        }

        //方式2:監聽器接收信息
//        consumer.setMessageListener(new MessageListener() {
//            @Override
//            public void onMessage(Message message) {
//                //第七步:處理信息
//                if (message instanceof TextMessage){
//                    TextMessage tm=(TextMessage)message;
//                    try{
//                        System.out.println(tm.getText());
//                    }
//                    catch (Exception e){
//                        e.printStackTrace();
//                    }
//                }
//            }
//        });
        //session.commit();
        //第八步:關閉資源
        consumer.close();
        session.close();
        connection.close();
    }

}

測試

發布訂閱模式演示

提供者(activemq-producer-demo)

    @Test
    public void testTopicProducer() throws Exception {
        Connection connection = null;
        MessageProducer producer = null;
        Session session = null;
        try {
            //第一步:創建ConnectionFactory,用於連接broker
            String brokerURL = "tcp://192.168.1.106:61616";
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
            //設置消息發送為同步發送
            ((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
            //設置
            ((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000);
            //第二步:通過工廠,創建Connection
            connection = connectionFactory.createConnection();
            ((ActiveMQConnection) connection).setUseAsyncSend(true);
            //第三步:連接啟動
            connection.start();
            //第四步:通過連接獲取session會話
            //第一個參數:是否啟用ActiveMQ事務,如果為true,第二個參數無用
            //第二個參數:應答模式,AUTO_ACKNOWLEDGE為自動應答
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //第五步:通過session創建destination,兩種目的地:Queue、Topic
            //參數:消息隊列的名稱,在后台管理系統中可以看到
            Topic topic=session.createTopic("cyb-topic");
            //第六步:通過session創建MessageProducer
            producer = session.createProducer(topic);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            //第七步:創建Message
            //方式一
            //TextMessage message=new ActiveMQTextMessage();
            //message.setText("queue test");
            //方式二
            TextMessage message1 = session.createTextMessage("topic->博客園地址:https://www.cnblogs.com/chenyanbin/");
            //第八步:通過producer發送消息
            producer.send(message1);
            //session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //第九步:關閉資源
            producer.close();
            session.close();
            connection.close();
        }
    }

消費者(activemq-consumer-demo)

    @Test
    public void testTopicProducer() throws Exception {
        Connection connection = null;
        MessageProducer producer = null;
        Session session = null;
        try {
            //第一步:創建ConnectionFactory,用於連接broker
            String brokerURL = "tcp://192.168.1.106:61616";
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
            //設置消息發送為同步發送
            ((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
            //設置
            ((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000);
            //第二步:通過工廠,創建Connection
            connection = connectionFactory.createConnection();
            ((ActiveMQConnection) connection).setUseAsyncSend(true);
            //第三步:連接啟動
            connection.start();
            //第四步:通過連接獲取session會話
            //第一個參數:是否啟用ActiveMQ事務,如果為true,第二個參數無用
            //第二個參數:應答模式,AUTO_ACKNOWLEDGE為自動應答
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //第五步:通過session創建destination,兩種目的地:Queue、Topic
            //參數:消息隊列的名稱,在后台管理系統中可以看到
            Topic topic=session.createTopic("cyb-topic");
            //第六步:通過session創建MessageProducer
            producer = session.createProducer(topic);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            //第七步:創建Message
            //方式一
            //TextMessage message=new ActiveMQTextMessage();
            //message.setText("queue test");
            //方式二
            TextMessage message1 = session.createTextMessage("topic->博客園地址:https://www.cnblogs.com/chenyanbin/");
            //第八步:通過producer發送消息
            producer.send(message1);
            //session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //第九步:關閉資源
            producer.close();
            session.close();
            connection.close();
        }
    }

測試

  先啟動消費者,在啟動提供者

自定義BrokerServer

package com.cyb.activemq;

import org.apache.activemq.broker.BrokerService;

public class MyBrokerServer {
    public static void main(String[] args) {
        BrokerService brokerService=new BrokerService();
        String bindAddress="tcp://localhost:61616";
        try
        {
            brokerService.setUseJmx(true);
            brokerService.addConnector(bindAddress);
            brokerService.start();
        }
        catch (Exception e){
            e.printStackTrace();
        }
    }
}

JMS事務

創建事務

創建事務的方法:createSession(paramA,paramB);

  • paramA是設置事務的,paramB設置acknowledgment mode(應答模式)
  • paramA設置為true時,paramB的值忽略,acknowledgment mode被jms服務器設置Session.SESSION_TRANSACTED。
  • paramA設置為false時,paramB的值可為Session.AUTO_ACKNOWLEDGE,Session.CLTENT_ACKNOWLEDGE,Session.DUPS_OK_ACKNOWLEDGE其中一個。

事務的應答模式

  • JMS消息被應答確認后,才會認為是被成功消費,broker才會將消息清除掉
  • 消息的消費包含三個階段:客戶端接收消息、客戶端處理消息、消息被確認

SESSION_TRANSACTED(開啟事務,默認):

  當一個事務被commit的時候,消息確認就會自動發生。如果開啟了事務,最后沒有執行commit方法,那么消費者會重復消費該消息

AUTO_ACKNOWLEDGE:

  自動確認,當客戶成功的從receive方法返回的時候,或者從MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的消息。

CLIENT_ACKNOWLEDGE(針對消費者):

  客戶端確認。客戶端接收到消息后,必須調用 javax.jmx.Message的acknowledge方法,broker才會刪除消息。(默認是批量確認)

Message.acknowledge();

DUPS_OK_ACKNOWLEDGE:

  允許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收,而且允許重復確認。如果是重復的消息,那么JMS provider必須將消息頭的JMSRedelivered字段設置為true。

事務的作用

  在一個JMS客戶端,可以使用本地事務來組合消息的發送和接收,JMS Session接口提供了commit和rollback方法。

  開啟事務之后,JMS Provider會緩存每個生產者當前生產的所有消息,直到commit或rollback。在事務未提交之前,消息時不會被持久化存儲的,也不會被消費者消費。

  • commit:操作將會導致生產者事務中所有的消息被持久存儲,消費者的所有消息都被確認。
  • rollback:操作將會導致生產者事務中所有的消息被清除,消費者的所有消息不被確認。

消息生產者處理

消息的持久化和非持久化

  ActiveMQ支持兩種傳輸模式:持久傳輸和非持久傳輸,默認情況下使用的是持久傳輸

兩者差異

  • 采用持久傳輸時,傳輸的消息會保存到磁盤中,既“存儲轉發”模式,先把消息存儲到磁盤中,然后再將消息“轉發”給訂閱者。當Borker宕機恢復后,消息還在
  • 采用非持久傳輸時,發送的消息不會存儲到磁盤中。當Borker宕機重啟后,消息丟失

通過MessageProducer類的setDeliveryMode設置傳輸模式

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

消息同步發送和異步發送

producer 發送消息有同步和異步兩種模式,可以通過以下方式設置

1、設置ConnectionFactory時指定使用異步

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.109:61616?jms.useAsyncSend=true");

2、不在構造函數中指定,而是修改ConnectionFactory配置

            //第一步:創建ConnectionFactory,用於連接broker
            String brokerURL = "tcp://192.168.1.109:61616";
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
            //設置消息發送為異步發送
            ((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);

3、在實例化后的ActiveMQConnection對象中設置異步發送

String brokerURL = "tcp://192.168.1.109:61616";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
Connection connection = connectionFactory.createConnection();
((ActiveMQConnection) connection).setUseAsyncSend(true);

在不考慮事務的情況下:

  • producer發送持久化消息是同步發送,發送是阻塞的,直到收到確認。
  • producer發送非持久化消息時異步發送,異步發送不會等待broker的確認,不阻塞。

  消息生產者使用持久傳遞模式發送消息的時候,producer.send(message)方法會被阻塞,直到broker發送一個確認消息給生產者,這個確認消息暗示broker已經成功接收消息並把消息保存到二級存儲中。這個過程通常稱為同步發送。

  如果應用程序能容忍一些消息的丟失,那么可以使用異步發送。異步發送不會受到broker的確認之前一直阻塞Producer.send方法。

生產者流量控制

ProducerWindowSize

  在ActiveMQ5.0版本中,我們可以分別一個共享連接上的各個生產者進行流量控制,而不需要掛起整個連接。“流量控制”意味着當代理(broker)檢測目標(destination)的內存,或臨時文件空間或文件存儲空間超過了限制,消息的流量可以被減慢。生產者將會被阻塞直至資源可用,或者受到一個JMSException異常

  • 同步發送的消息將會自動對每一個生產者使用流量控制;除非你使用了 useAsynSend標志,否則這將對同步發送的持久性消息都適用。
  • 適用異步發送的生產者不需要等待來自代理的任何確認消息;所以,如果內存限制被超過了,你不會被通知。如果你真的想知道什么時候代理的限制被超過了,你需要配置ProducerWindowSize這一連接選項,這樣就算是異步消息也會對每一個生產者進行流量控制。

3種方式設置ProducerWindowSize

方式一、

            String brokerURL = "tcp://192.168.1.109:61616";
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
            ((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000);

方式二、

  在brokerURL中設置:"tcp://192.168.1.109:61616?jms.produceWindowSize=1000",這種設置將會對所有的produce生效。

方式三、

  在destinationUrl中設置"cyb-queue?producer.windowSize=1000",此參數只會對使用此Destination實例的producer生效,將會覆蓋brokerUrl中的producerWindowSize值

配置說明:

  ProducerWindowSize是一個生產者在等到確認消息之前,可以發送給代理的數據最大byte,這個確認消息用來告訴生產者,代理已經收到之前發送的消息了。

  它主要用來約束在異步發送時producer端允許異步發送的(尚未ACK)的消息尺寸,且只對異步發送有意義

  值越大,意味着消耗Broker服務器的內存就越大

alwaysSyncSend

  如果你要發送非持久化的消息(消息默認是異步發送的),並且想要每次都得到隊列或者主題的內存限制是否達到,你只需將連接工廠配置為“alwaysSyncSend”,雖然這樣會變得稍微慢一點,但是這將保證當出現內存問題時,你的消息生產者能夠及時得到通知。

((ActiveMQConnection) connection).setAlwaysSyncSend(true);

如何提升消息發送效率?()

  • 在某些場景下,我們的Producer的個數非常有限的,可能只有幾個,比如基於Queue的“訂單接入網管”(生成訂單原始信息並負責傳遞),但是響應的Consumer的個數相對較多,在整體上Producer效能小於Consumer。
  • 還有一些場景,Producer的數量非常多,消息量也很大,但是Consumer的個數或者效能相對較低,比如“用戶點擊流”、“用戶消息Push系統”等

消息持久化

1、持久化類型的消息,對broker端性能消耗遠遠大於非持久化類型

2、這歸結於ActiveMQ本身對持久化消息確保“最終一致性”,持久化意味着“消息不丟失”,即當broker接收到消息后需要一次強制性磁盤同步

3、對於Consumer在消費消息后,也會觸發磁盤寫入

4、通常broker端還會開啟相關的“過期消息檢測”線程,將存儲器中的數據載入內存並檢測,這個過程也是內存,磁盤IO消耗的。由此可見,持久化類型的消息從始至終,都在“拖累”系統的性能和吞吐能力。

消息屬性

1、通過Producer發送消息(Message)中,除了消息本身的負荷體之外(Consumer),還有大量的JMS屬性和Properties可以設置,因為JMS中,支持對JMS屬性和properties使用selector,那么這些內容將會加大和復雜化message header,我們盡可能的在properties中攜帶更少

異步發送

1、如果消息是非持久性的,或者Session是基於事務的,建議開發者不要關閉異步發送;這是提升Producer發送效率的重要的策略。

2、設置合適的windowSize,開啟Broker端“Flow Control”等

事務

對於Producer而言,使用事務並不會消耗Broker太多的性能,主要會占用內存,所有未提交的事務消息,都會保存在內存中,有些基於日志的存儲器,事務類型的持久化消息暫存在額外的文件中,直到日志提交或回滾后清除。所以,Producer端不要在事務中,積壓太多的消息,盡可能早的提交事務。

提升Consumer消費速率

無論是Queue還是Topic,快速的Consumer,無疑是提升整體效能的最好手段。

選擇合適的存儲器

  activeMQ目前支持JDBC、kahadb、LevelDB三種存儲器。

  JDBC主要面向基於RDBMS方向,通常如果消息不僅面向ActiveMQ,還可能被用於第三方平台的操作,JDBC的特點就是透明度高,可擴展方案較多(擴展成本高)。

  kahadb和LevelDB,同屬於日志存儲+BTree索引,性能很高,對於消息較多(單位尺寸較小),消費速度較快的應用,是最好的選擇,這兩種存儲器也最常用,推薦LevelDB

Broker Server處理

導讀

  以下內容都是修改:vim /cyb/soft/apache-activemq-5.15.12/conf/activemq.xml

流量控制

設置指定隊列和主題失效

  可以通過在代理配置中,將適當的目的地(destination)的策略(policy)中的producerFlowControl標志設置為false,使代理商特定的JMS隊列和主題不適用流量控制

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry topic="FOO.>" producerFlowControl="false"/>
    </policyEntries>
  </policyMap>
</destinationPolicy>

生存內存限制

  注意,在ActiveMQ 5.x中引入了新的file cursor,非持久化消息會被刷到臨時文件存儲中來減少內存使用量。所以,你會發現queue的memoryLimit永遠達不到,因為file cursor花不了多少內存,如果你真的要把所有非持久化消息保存在內存中,並且當memoryLimit達到時停止producer,你應該配置<vmQueueCursor>。

<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">      
    <pendingQueuePolicy>    
        <vmQueueCursor/>  
    </pendingQueuePolicy>
</policyEntry>

上面的片段能保證,所有的消息保存在內存中,並且每一個隊列只有1Mb的限制。

配置生產者客戶端的異常

  應對Broker代理空間不足,而導致不確定的阻塞send()操作的一種替代方案,就是將其配置成客戶端拋出一個異常。通過將sendFailIfNoSpace屬性設置為true,代理將會引起send()方法失敗,並拋出javax.jmx.ResourceAllocationException異常,傳播到客戶端

<systemUsage>
 <systemUsage sendFailIfNoSpace="true">
   <memoryUsage>
     <memoryUsage limit="20 mb"/>
   </memoryUsage>
 </systemUsage>
</systemUsage>

這個屬性的好處是,客戶端可以捕獲javax.jms.ResourceAllocationException異常,稍等一下,並重試send()操作,而不是無限期地傻等下去。

 從5.3.1版本之后,sendFailIfNoSpaceAfterTimeout 屬性被加了進來。這個屬性同樣導致send()方法失敗,並在客戶端拋出異常,但僅當等待了指定時間之后才觸發。如果在配置的等待時間過去之后,代理上的空間仍然沒有被釋放,僅當這個時候send()方法才會失敗,並且在客戶端拋出異常。下面是一個示例:

<systemUsage>
 <systemUsage sendFailIfNoSpaceAfterTimeout="3000">
   <memoryUsage>
     <memoryUsage limit="20 mb"/>
   </memoryUsage>
 </systemUsage>
</systemUsage>

定義超時的單位是毫秒,所以上面的例子將會在使send()方法失敗並對客戶端拋出異常之前,等待三秒。這個屬性的優點是,它僅僅阻塞配置指定的時間,而不是立即另發送失敗,或者無限期阻塞。這個屬性不僅在代理端提供了一個改進,還對客戶端提供了一個改進,使得客戶端能捕獲異常,等待一下並重試send()操作。

使用流量控制無效

  一個普通的需求是使流量控制無效,使得消息分布能夠持續,直到所有可用的磁盤被掛起的消息耗盡。要這樣做,你可以使用消息游標。

ActiveMQ的消息游標分為三種類型

  • Store-based
  • VM
  • File-based

系統占用(重要)

你還可以通過<systemUsage>元素的一些屬性來減慢生產者。來看一眼下面的例子:

<systemUsage>
  <systemUsage>
    <memoryUsage>
      <memoryUsage limit="64 mb" />
    </memoryUsage>
    <storeUsage>
      <storeUsage limit="100 gb" />
    </storeUsage>
    <tempUsage>
      <tempUsage limit="10 gb" />
    </tempUsage>
  </systemUsage>
</systemUsage>

你可以為非持久化的消息(NON_PERSISTENT messages)設置內存限制,為持久化消息(PERSISTENT messages)設置磁盤空間,以及為臨時消息設置總的空間,代理將在減慢生產者之前使用這些空間。使用上述的默認設置,代理將會一直阻塞sen()方法的調用,直至一些消息被消費,並且代理有了可用空間。默認值如上例所述,你可能需要根據你的環境增加這些值。

解決消費緩慢及無法消費的問題(重要)

  其中broker中還以單獨設置生產者使用的 producerSystemUsage和消費者使用 consumerSystemUsage,格式跟systemUsage一樣。

  默認情況下,沒有配置 producerSystemUsage 和consumerSystemUsage,則生產者和消費者都使用 systemUsage。

問題:

  可能會因為生產者線程把內存用完,導致消費者線程處理緩慢甚至無法消費的問題。這種情況下,添加消費端的機器和消費者數量可能都無法增加消費的速度。

解決辦法:

  在broker上設置 splitSystemUsageForProducersConsumers="true",使得生產者線程和消費者線程各使用各的內存。

  默認是 生產者線程內存:消費者線程內存 => 6:4

  也可以通過如下兩個參數設置生產者線程內存和消費者內存各一半:

producerSystemUsagePortion = "50"
consumerSystemUsagePortion = "50"
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" splitSystemUsageForProducersConsumers="true" producerSystemUsagePortion = "50" consumerSystemUsagePortion = "50">

消息定時刪除(重要)

<broker xmlns="http://activemq.apache.org/schema/core" schedulePeriodForDestinationPurge="10000">
    <destinationPolicy>
       <policyMap>
          <policyEntries>
             <policyEntry topic=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/>
          </policyEntries>
       </policyMap>
    </destinationPolicy>     
  </broker>

實現定時自動清除無效的Topic和Queue需要設置三個屬性

  • schedulePeriodForDestinationPurge:執行清理任務的周期,單位是毫秒
  • gclnactiveDestinations="true":啟動清理功能
  • inactiveTiomoutBeforeGC="3000":Topic或Queue超時時間,在規定的時間內,無有效訂閱,沒有入隊記錄,超時就會被清理。

持久化存儲方式

KahaDB基於文件的存儲(默認)

  KahaDB是從ActiveMQ 5.4開始 默認的持久化插件,也是我們項目現在使用的持久化方式。KahaDB恢復時間遠遠小於其前身AMQ並且使用更少的數據文件,所以可以完全替代AMQ。KahaDB的持久化機制同樣是基於日志文件,索引和緩存。

        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
        </persistenceAdapter>

  directory:指定持久化消息的存儲目錄

  journalMaxFileLength:指定保存消息的日志文件大小,具體根據你的實際應用配置

KahaDB主要特性

  1. 日志形式存儲消息
  2. 消息索引以B-Tree結構存儲,可以快速更新;
  3. 完全支持JMS事務
  4. 支持多種恢復機制

AMQ 基於文件的存儲

  性能高於JDBC,寫入消息時,會將消息寫入日志文件,由於很高。為了提升性能,創建消息主鍵索引,並且提供緩存機制,進一步提升性能。每個日志文件的大小都是有限制的(默認32m,可配置)。

  當超過這個大小,系統會重新建立一個文件。當所有的消息都消費完成,系統會刪除這個文件或者歸檔

主要的缺點是:

  • AMQ Message會為每一個Destination創建一個索引,如果使用了大量的Queue,索引文件的大小會占用很多磁盤空間
  • 由於索引巨大,一旦Broker崩潰,重建索引的速度會非常慢
        <persistenceAdapter>
               <amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/>
        </persistenceAdapter>

 JDBC基於數據庫的存儲

1、首先將以下驅動放到lib目錄下,驅動包和ActiveMQ我已上傳至百度雲,下面有連接供下載

驅動包,百度雲盤地址:https://pan.baidu.com/s/1veqFD2k49x5m97FA6CAwJA  密碼: gea6

2、修改配置文件:conf/activemq.xml

        <persistenceAdapter>
           <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
        <jdbcPersistenceAdapter dataSource="#activemq-db" createTablesOnStartup="true" />
        </persistenceAdapter>

dataSource指定持久化數據庫的bean,createTablesOnStartup是否在啟動的時候創建數據表,默認使用true,這樣每次啟動都會去創建數據表了,一般第一次啟動的時候設置為true,之后改成false

3、在配置文件中的broker節點外增加以下內容

   <bean id="activemq-db" class="org.apache.commons.dbcp.BasicDataSource">
      <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
      <property name="url" value="jdbc:mysql://192.168.31.206:3306/activemq"/>
      <property name="username" value="root"/>
      <property name="password" value="root"/>
      <property name="maxActive" value="200"/>
      <property name="poolPreparedStatements" value="true"/>
    </bean>

4、從配置中可以看出數據庫的名稱是activemq,需要手動在mysql中增加這個庫,然后重啟消息隊列,你會發現多了三張表

  • activeme_acks ->存儲持久訂閱的信息
  • activemq_lock ->鎖表(用來做集群的時候,實現master選舉的表)
  • activemq_msgs ->消息表

 補充:

  mysql必須支持遠程連接!!!!

控制台:
1、mysql -uroot -proot

2、GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION;

Memory基於內存

  基於內存的消息存儲,就是消息存儲在內存中。persistent="false":表示不設置持久化存儲,直接存儲到內存中

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="false">

消息消費者處理

prefetch機制

  prefetch即在activemq中消費者預獲取消息數量,重要的調優參數之一。當消費者存活時,broker將會批量push prefetchSize條消息給消費者,消費者也可以配合optimizeAcknowledge來批量確認它們。由於broker批量push消息給消費者,提高了網絡傳輸效率,默認為1000。

  broker端將會根據consumer指定的prefetchSize來決定pendingBuffer的大小,prefetchSize越大,broker批量發送的消息就會越多,如果消費者消費速度較快,再配合optimizeAck,這將是相對完美的消息傳送方案。

  不過,prefetchSize也會帶來一定的問題,在Queue中(Topic中沒有效果),broker將使用“輪詢”方式來平衡多個消費者之間的消息傳送數量,如果消費者消費速度較慢,而prefetchSize較大,這將不利於消息量在多個消費者之間平衡。通常情況下,如果consumer數量較多,或者消費速度較慢,或者消息量較少時,我們應該設定prefetchSize為較小的值。

設置prefetchSize的方式如下:

            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue("cyb-queue?customer.prefetchSize=100");

prefetch值建議在destinationUrl中指定,因為在brokerUrl中指定比較繁瑣;在brokerUrl中,queuePrefetchSize和topicPrefetchSize都需要單獨設置:"&jms.prefetchPolicy.queuePrefetch=12&jms.prefetchPolicy.topicPrefetch=12"等逐個指定。

optimizeACK機制

  optimizeACK,可優化的消息ACK策略,關系到是否批量確認消息的策略,這個是Consumer端最重要的調優參數之一。optimizeAcknowledge表示是否開啟“優化ACK選項”,當開啟optimizeACK策略后,只有當optimizeACK為true,也只會當session的ACK_MODE為AUTO_ACKNOWLEDGE時才會生效。

該參數的具體含義和消費端的處理如下:

  • 當consumer.optimizeACK有效時,如果客戶端已經消費但尚未確認的消息(deliveredMessage)達到prefetch*0.65,從consumer端將會自動進行ACK。
  • 同事如果離上一次ACK的時間間隔,已經超過“optimizeAcknowledgeTimeout”毫秒,也會導致自動進行ACK。
            String brokerURL = "tcp://192.168.31.215:61616?jms.optimizeAcknowledge=true&jms.optimizeAcknowledgeTimeOut=30000";
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);

ACK模型和類型介紹

ACK模型

  ACK模型是確定應答的時機

  • AUTO_ACKNOWLEDGE = 1 ->自動確認
  • CLIENT_ACKNOWLEDGE = 2 ->客戶端手動確認
  • DUPS_OK_ACKNOWLEDGE = 3 ->自動批量確認
  • SESSION_TRANSACTED = 0 ->事務提交並確認

ACK類型

  ACK類型是確定應答的類型,客戶端根據ACK類型的不同,需要不同的處理,比如消息重發。

  Client端指定了ACK模式,但是在Client與broker在交換ACK指令的時候,還需要告知ACK_TYPE,ACK_TYPE表示此確認指定的類型,不同的ACK_TYPE將傳遞着消息的狀態,broker可以根據不同的ACK_TYPE對消息進行不同的操作。

  比如Consumer消費消息時出現異常,就需要向broker發送ACK指定,ACK_TYPE為“REDELIVERED_ACK_TYPE”

,那么broker就會重新發送此消息。在JMS API中並沒有定義ACK_TYPE,因為它通常是一種內部機制,並不會面向開發者。ActiveMQ中定義了如下幾種ACK_TYPE

  • DELIVERED_ACK_TYPE = 0 消息“已接收”,但尚未處理結束
  • STANDARD_ACK_TYPE = 2 “標准”類型,通常表示為消息“處理成功”,broker端可以刪除消息了
  • POSION_ACK_TYPE = 1 消息“錯誤”,通常表示“拋棄”此消息,比如消息重發多次后,都無法正常處理時,消息將會被刪除或DLQ(死信隊列)
  • REDELIVERED_ACK_TYPE = 3 消息需“重發”,比如consumer處理消息時拋出異常,broker稍后會重新發送此消息
  • INDIVIDUAL_ACK_TYPE = 4 表示只確認“單條消息”,無論在任何ACK_MODE下
  • UNMATCHED_ACK_TYPE = 5  在Topic中,如果一條消息在轉發給“訂閱者”時,發現此消息不合符Selector過濾條件,那么此消息將不會轉發給訂閱者,消息將會被存儲引擎刪除

重發機制

  可以在brokerUrl中配置“redelivery”策略,比如當一條消息處理異常時,broker端還可以重發的最大次數。當消息需要broker端重發時,consumer會首先在本地的“deliveredMessage隊列”(Consumer已經接收但未確認的消息隊列)刪除它,善后向broker發送“REDELIVERED_ACK_TYPE”類型的確認指令,broker將會把指令中指定的消息重新添加到pendingQueue中,直到合適的時機,再次push給client。

持久化訂閱和非持久化訂閱

注意事項:

  1. 持久化訂閱和非持久化訂閱針對的消息模型是Pub/Sub,而不是P2P
  2. 持久化訂閱需要消費者先執行訂閱,然后生產者再發送消息
  3. 如果消費者宕機,而又不想丟失它宕機期間的消息,就需要開啟持久訂閱。如果對於同一個消息有多個消費者需要開啟持久訂閱的情況,則設置的clientID不能相同

消費者

    public void testTopicConsumer2() throws Exception {
        //第一步:創建ConnectionFactory
        String brokerURL = "tcp://192.168.31.215:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
        //第二步:通過工廠,創建Connection
        Connection connection = connectionFactory.createConnection();
        //設置持久訂閱的客戶端ID
        String clientId = "10086";
        connection.setClientID(clientId);
        //第三步:打開鏈接
        connection.start();
        //第四步:通過Connection創建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //第五步:通過session創建Consumer
        Topic topic = session.createTopic("cyb-topic");

        //創建持久訂閱的消費者客戶端
        //第一個參數是指定Topic
        //第二個參數是自定義的ClientId
        MessageConsumer consumer = session.createDurableSubscriber(topic, "client1-sub");
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                //第七步:處理信息
                if (message instanceof TextMessage){
                    TextMessage tm=(TextMessage)message;
                    try{
                        System.out.println(tm.getText());
                    }
                    catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        });
        //session.commit();
        //第八步:關閉資源
        consumer.close();
        session.close();
        connection.close();
    }

提供者

    public void testTopicProducer() throws Exception {
        Connection connection = null;
        MessageProducer producer = null;
        Session session = null;
        try {
            //第一步:創建ConnectionFactory,用於連接broker
            String brokerURL = "tcp://192.168.31.215:61616";
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
            //設置消息發送為同步發送
            ((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(false);
            //設置
            //((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000);
            //第二步:通過工廠,創建Connection
            connection = connectionFactory.createConnection();
            ((ActiveMQConnection) connection).setUseAsyncSend(false);
            //第三步:連接啟動
            connection.start();
            //第四步:通過連接獲取session會話
            //第一個參數:是否啟用ActiveMQ事務,如果為true,第二個參數無用
            //第二個參數:應答模式,AUTO_ACKNOWLEDGE為自動應答
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //第五步:通過session創建destination,兩種目的地:Queue、Topic
            //參數:消息隊列的名稱,在后台管理系統中可以看到
            Topic topic = session.createTopic("cyb-topic");
            //第六步:通過session創建MessageProducer
            producer = session.createProducer(topic);
            //producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //第七步:創建Message
            //方式一
            //TextMessage message=new ActiveMQTextMessage();
            //message.setText("queue test");
            //方式二
            TextMessage message1 = session.createTextMessage("topic->博客園地址:https://www.cnblogs.com/chenyanbin/");
            //第八步:通過producer發送消息
            producer.send(message1,DeliveryMode.PERSISTENT,1,1000*60*5);
            //session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //第九步:關閉資源
            producer.close();
            session.close();
            connection.close();
        }
    }

測試

  此處就不測試了,因為當初測試時候,踩了一次坑,測試結果,已經記錄到另一篇博客:ActiveMQ 持久訂閱者,執行結果與初衷相違背,驗證離線訂閱者無效,問題解決

ActiveMQ集群

ActiveMQ集群配置

刪除一些不用的端口

修改activemq.xml配置文件

方式一:

在任意一台Linux機器上,activemq.xml的broker 標簽下,添加以下內容,然后重啟即可

 

方式二

  還在修改activemq.xml,在broker標簽下,加入以下內容,去掉duplex="true",配置對方的ip地址,若有多個逗號隔開即可“,” 然后重啟

測試

提供者端代碼,brokerUrl中加入容錯機制,若果第一個沒連上,就連接第一個,默認先連接第一個

failover:(tcp://192.168.1.108:61616,tcp://192.168.1.109:61616)
    public void testQueueProducer() throws Exception {
        Connection connection = null;
        MessageProducer producer = null;
        Session session = null;
        try {
            //第一步:創建ConnectionFactory,用於連接broker
            String brokerURL = "failover:(tcp://192.168.1.108:61616,tcp://192.168.1.109:61616)";
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
            //設置消息發送為同步發送
            //((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
            //設置
            ((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000);
            //第二步:通過工廠,創建Connection
            connection = connectionFactory.createConnection();

            //((ActiveMQConnection) connection).setUseAsyncSend(true);
            //第三步:連接啟動
            connection.start();
            //第四步:通過連接獲取session會話
            //第一個參數:是否啟用ActiveMQ事務,如果為true,第二個參數無用
            //第二個參數:應答模式,AUTO_ACKNOWLEDGE為自動應答
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //第五步:通過session創建destination,兩種目的地:Queue、Topic
            //參數:消息隊列的名稱,在后台管理系統中可以看到
            Queue queue = session.createQueue("cyb-queue");
            //第六步:通過session創建MessageProducer
            producer = session.createProducer(queue);
            //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            //第七步:創建Message
            //方式一
            //TextMessage message=new ActiveMQTextMessage();
            //message.setText("queue test");
            //方式二
            TextMessage message1 = session.createTextMessage("博客園地址:https://www.cnblogs.com/chenyanbin/");
            //第八步:通過producer發送消息
            producer.send(message1);
            //session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //第九步:關閉資源
            producer.close();
            session.close();
            connection.close();
        }
    }

消費者端代碼

    public void testQueueConsumer() throws Exception {
        //第一步:創建ConnectionFactory
        String brokerURL = "tcp://192.168.1.109:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
        //第二步:通過工廠,創建Connection
        Connection connection = connectionFactory.createConnection();
        //第三步:打開鏈接
        connection.start();
        //第四步:通過Connection創建session
        Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
        //第五步:通過session創建Consumer
        Queue queue = session.createQueue("cyb-queue");
        MessageConsumer consumer = session.createConsumer(queue);
        //第六步:通過consumer接收信息(兩種方式:1、receive方法接收(同步);2、通過監聽器接收(異步))
        //方式1、receive方法接收信息
        Message message = consumer.receive(100000);
        //第七步:處理信息
        if (message != null && message instanceof TextMessage) {
            TextMessage tm = (TextMessage) message;
            System.out.println(tm.getText());
            message.acknowledge();
        }

        //方式2:監聽器接收信息
//        consumer.setMessageListener(new MessageListener() {
//            @Override
//            public void onMessage(Message message) {
//                //第七步:處理信息
//                if (message instanceof TextMessage){
//                    TextMessage tm=(TextMessage)message;
//                    try{
//                        System.out.println(tm.getText());
//                    }
//                    catch (Exception e){
//                        e.printStackTrace();
//                    }
//                }
//            }
//        });
        //session.commit();
        //第八步:關閉資源
        consumer.close();
        session.close();
        connection.close();
    }

演示

  這里我們可以看到,提供者先連接192.168.1.108這台機器,消費者去消費192.168.1.109,照樣可以消費成功,監控平台上,也可以看到響應信息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM