activemq消息隊列的使用及應用docker部署常見問題及注意事項


activemq消息隊列的使用及應用docker部署常見問題及注意事項

docker用https://hub.docker.com/r/rmohr/activemq/
配置在/data/docker/activemq/conf
重啟命令:docker restart activemq
查看進程:docker ps | grep activemq

$ docker ps | grep activemq
927860512db9 rmohr/activemq:5.15.4-alpine
從上面可以看到版本是activemq:5.15.4-alpine

================

要改activemq的默認配置:
用持久化消息,開啟事務模式,將臨時文件限制盡可能的調大。
將prefetch設為1,每次處理1條消息,處理完再去取

自己發送消息的邏輯代碼,要在方法里加上try catch,避免因程序邏輯錯誤導致重連才行

activemq.xml

如果帳號在配置文件目錄下沒有權限上傳覆蓋文件可以采用sudo vi activemq.xml的命令進行編輯。

1.設置預取限制,指定topic消費者的預取限制。
<policyEntry topic=">" > 改為
<policyEntry topic=">" topicPrefetch="1">
broker為該主題最多保存1000條消息,如果消息數目超過了1000,舊消息將被丟棄
<constantPendingMessageLimitStrategy limit="1000"/>

2.去掉非必需的通信協議(Client與Broker、Broker與Broker之間使用該協議進行通信),只留下TCP協議(61616是broker的監聽端口)

3.增加
<subscriptionRecoveryPolicy>
<!--恢復最近30分鍾內的信息-->
<timedSubscriptionRecoveryPolicy recoverDuration="1800000"/>
</subscriptionRecoveryPolicy>

4.持久化默認文件大小32M改成128MB
<kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="128mb"/>

topicPrefetch="10" 這個今天測試了沒達到效果,還要再測試下改其他配置
都啟動后再發送的消息幾個客戶端都可以消費

5.持久化消息
activemq上配置使用<broker persistent="true"
生產者代碼設置持久化
//設置持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

開啟事務模式
//支持事務sender設置為true,receiver則必須設置為Boolean.FALSE, Session.AUTO_ACKNOWLEDGE 才行
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

6.run.sh要把ca=`docker rmi sendemail-service`
echo $ca
改成
docker rmi sendemail-service -f &
echo 'docker rmi success'

activemq用的是默認的賬號密碼admin,admin,從安全角度出發的話,如果外網可以訪問到,activemq的賬號密碼也需要改掉的,生產者,消費者也要用對應的賬號密碼連接,需要同時改配置和代碼


=======================
目錄:
一:JMQ的兩種消息模式
1.1:點對點的消息模式
1.2:訂閱模式
二:點對點的實現代碼
2.1:點對點的發送端
2.2:點對點的接收端
三:訂閱/發布模式的實現代碼
3.1:訂閱模式的發送端
3.2:訂閱模式的接收端
四:發送消息的數據類型
4.1:傳遞javabean對象

4.2:發送文件
五:ActiveMQ的應用
5.1:保證消息的成功處理
5.2:避免消息隊列的並發
5.3:消息有效期的管理
5.4:過期消息,處理失敗的消息如何處理
六:ActiveMQ的安全配置
6.1:管理后台的密碼設置

=====================

生產者(發送消息)Java代碼:

public static void sendMessage(String data) {
        ConnectionFactory connectionFactory; // ConnectionFactory--連接工廠,JMS用它創建連接
        // Provider 的連接
        Connection connection = null; // Connection :JMS 客戶端到JMS
        Session session; // Session: 一個發送或接收消息的線程
        Destination destination; // Destination :消息的目的地;消息發送給誰.
        MessageProducer producer; // MessageProducer:消息發送者
        // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar
        connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, SendSms.mqconnection);
        try { // 構造從工廠得到連接對象
            connection = connectionFactory.createConnection();
            // 啟動
            connection.start();
            // 獲取操作連接,生產者事務的要設置為TRUE才行
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 創建隊列名稱testQueue
            destination = session.createQueue("testQueue");
            // 得到消息生成者【發送者】
            producer = session.createProducer(destination);
            // 設置持久化
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            // 構造消息,項目就是參數,或者方法獲取
            TextMessage message = session.createTextMessage(data);
            System.out.println("send data to testQueue");
            producer.send(message);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

消費者(接收消息)Java代碼:

public static void receiverMessage() {

        // ConnectionFactory :連接工廠,JMS 用它創建連接
        ConnectionFactory connectionFactory;
        // Connection :JMS 客戶端到JMS Provider 的連接
        Connection connection = null;
        // Session: 一個發送或接收消息的線程
        Session session = null;
        // Destination :消息的目的地;消息發送給誰.
        Destination destination;
        // 消費者,消息接收者
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, SendSms.mqconnection);
        try {
            // 構造從工廠得到連接對象
            connection = connectionFactory.createConnection();
            // 啟動
            connection.start();
            //receiver則必須設置為Boolean.FALSE, Session.AUTO_ACKNOWLEDGE 才行
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // 創建隊列名稱,需要跟sender的一致
            destination = session.createQueue("testQueue");
            consumer = session.createConsumer(destination);
            SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date time = sdf.parse(sdf.format(new Date()));
            System.out.println("start listeningSmsQueueMessage " + time);
            while (true) {
                Date time2 = sdf.parse(sdf.format(new Date()));
                // 設置接收者接收消息的時間
//                TextMessage message = (TextMessage) consumer.receive(receivetime);
                TextMessage message = (TextMessage) consumer.receive();
                if (null != message) {
                    System.out.println(time2 + "getSmsQueueMessage:" + message.getText());
                    SendSms.sender(message.getText());
                } else {
                    System.out.println("getSmsQueueMessage:null");
                }
            }
        } catch (Exception e) {
            System.out.println("receiverMessage error:"+e.getMessage());
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e1) {
                System.out.println("InterruptedException error:"+e1.getMessage());
            }
            //SendSms.listeningSms();//這個是自己發送消息的邏輯代碼,要在方法里加上try catch,避免因程序邏輯錯誤導致重連才行
        } finally {
            System.out.println("finally session close ");
            if(session != null){
                try {
                    session.close();
                } catch (JMSException ignore) {
                    System.out.println("session.close error:"+ignore.getMessage());
                }
            }
            System.out.println("finally connection close ");
            if (null != connection) {
                try {
                    connection.close();
                } catch (JMSException ignore) {
                    System.out.println("connection.close error:"+ignore.getMessage());
                }
            }
        }
    }

activemq.xml 配置內容:

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

   <!-- Allows accessing the server log -->
    <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" persistent="true" brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" topicPrefetch="1">
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                  
                  <subscriptionRecoveryPolicy>
                    <timedSubscriptionRecoveryPolicy recoverDuration="1800000"/>
                  </subscriptionRecoveryPolicy>
                  
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"  journalMaxFileLength="128mb"/>
        </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <!-- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> -->
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>

    </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM