activemq消息隊列的使用及應用docker部署常見問題及注意事項
docker用https://hub.docker.com/r/rmohr/activemq/
配置在/data/docker/activemq/conf
重啟命令:docker restart activemq
查看進程:docker ps | grep activemq
$ docker ps | grep activemq
927860512db9 rmohr/activemq:5.15.4-alpine
從上面可以看到版本是activemq:5.15.4-alpine
================
要改activemq的默認配置:
用持久化消息,開啟事務模式,將臨時文件限制盡可能的調大。
將prefetch設為1,每次處理1條消息,處理完再去取
自己發送消息的邏輯代碼,要在方法里加上try catch,避免因程序邏輯錯誤導致重連才行

activemq.xml
如果帳號在配置文件目錄下沒有權限上傳覆蓋文件可以采用sudo vi activemq.xml的命令進行編輯。
1.設置預取限制,指定topic消費者的預取限制。
<policyEntry topic=">" > 改為
<policyEntry topic=">" topicPrefetch="1">
broker為該主題最多保存1000條消息,如果消息數目超過了1000,舊消息將被丟棄
<constantPendingMessageLimitStrategy limit="1000"/>
2.去掉非必需的通信協議(Client與Broker、Broker與Broker之間使用該協議進行通信),只留下TCP協議(61616是broker的監聽端口)
3.增加
<subscriptionRecoveryPolicy>
<!--恢復最近30分鍾內的信息-->
<timedSubscriptionRecoveryPolicy recoverDuration="1800000"/>
</subscriptionRecoveryPolicy>
4.持久化默認文件大小32M改成128MB
<kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="128mb"/>
topicPrefetch="10" 這個今天測試了沒達到效果,還要再測試下改其他配置
都啟動后再發送的消息幾個客戶端都可以消費
5.持久化消息
activemq上配置使用<broker persistent="true"
生產者代碼設置持久化
//設置持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
開啟事務模式
//支持事務sender設置為true,receiver則必須設置為Boolean.FALSE, Session.AUTO_ACKNOWLEDGE 才行
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
6.run.sh要把ca=`docker rmi sendemail-service`
echo $ca
改成
docker rmi sendemail-service -f &
echo 'docker rmi success'
activemq用的是默認的賬號密碼admin,admin,從安全角度出發的話,如果外網可以訪問到,activemq的賬號密碼也需要改掉的,生產者,消費者也要用對應的賬號密碼連接,需要同時改配置和代碼
=======================
目錄:
一:JMQ的兩種消息模式
1.1:點對點的消息模式
1.2:訂閱模式
二:點對點的實現代碼
2.1:點對點的發送端
2.2:點對點的接收端
三:訂閱/發布模式的實現代碼
3.1:訂閱模式的發送端
3.2:訂閱模式的接收端
四:發送消息的數據類型
4.1:傳遞javabean對象
4.2:發送文件
五:ActiveMQ的應用
5.1:保證消息的成功處理
5.2:避免消息隊列的並發
5.3:消息有效期的管理
5.4:過期消息,處理失敗的消息如何處理
六:ActiveMQ的安全配置
6.1:管理后台的密碼設置
=====================
生產者(發送消息)Java代碼:
public static void sendMessage(String data) { ConnectionFactory connectionFactory; // ConnectionFactory--連接工廠,JMS用它創建連接 // Provider 的連接 Connection connection = null; // Connection :JMS 客戶端到JMS Session session; // Session: 一個發送或接收消息的線程 Destination destination; // Destination :消息的目的地;消息發送給誰. MessageProducer producer; // MessageProducer:消息發送者 // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, SendSms.mqconnection); try { // 構造從工廠得到連接對象 connection = connectionFactory.createConnection(); // 啟動 connection.start(); // 獲取操作連接,生產者事務的要設置為TRUE才行 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 創建隊列名稱testQueue destination = session.createQueue("testQueue"); // 得到消息生成者【發送者】 producer = session.createProducer(destination); // 設置持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 構造消息,項目就是參數,或者方法獲取 TextMessage message = session.createTextMessage(data); System.out.println("send data to testQueue"); producer.send(message); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } }
消費者(接收消息)Java代碼:
public static void receiverMessage() { // ConnectionFactory :連接工廠,JMS 用它創建連接 ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的連接 Connection connection = null; // Session: 一個發送或接收消息的線程 Session session = null; // Destination :消息的目的地;消息發送給誰. Destination destination; // 消費者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, SendSms.mqconnection); try { // 構造從工廠得到連接對象 connection = connectionFactory.createConnection(); // 啟動 connection.start(); //receiver則必須設置為Boolean.FALSE, Session.AUTO_ACKNOWLEDGE 才行 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 創建隊列名稱,需要跟sender的一致 destination = session.createQueue("testQueue"); consumer = session.createConsumer(destination); SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date time = sdf.parse(sdf.format(new Date())); System.out.println("start listeningSmsQueueMessage " + time); while (true) { Date time2 = sdf.parse(sdf.format(new Date())); // 設置接收者接收消息的時間 // TextMessage message = (TextMessage) consumer.receive(receivetime); TextMessage message = (TextMessage) consumer.receive(); if (null != message) { System.out.println(time2 + "getSmsQueueMessage:" + message.getText()); SendSms.sender(message.getText()); } else { System.out.println("getSmsQueueMessage:null"); } } } catch (Exception e) { System.out.println("receiverMessage error:"+e.getMessage()); try { Thread.sleep(5000); } catch (InterruptedException e1) { System.out.println("InterruptedException error:"+e1.getMessage()); } //SendSms.listeningSms();//這個是自己發送消息的邏輯代碼,要在方法里加上try catch,避免因程序邏輯錯誤導致重連才行 } finally { System.out.println("finally session close "); if(session != null){ try { session.close(); } catch (JMSException ignore) { System.out.println("session.close error:"+ignore.getMessage()); } } System.out.println("finally connection close "); if (null != connection) { try { connection.close(); } catch (JMSException ignore) { System.out.println("connection.close error:"+ignore.getMessage()); } } } }
activemq.xml 配置內容:
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <!-- START SNIPPET: example --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <!-- Allows us to use system properties as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:${activemq.conf}/credentials.properties</value> </property> </bean> <!-- Allows accessing the server log --> <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop"> </bean> <!-- The <broker> element is used to configure the ActiveMQ broker. --> <broker xmlns="http://activemq.apache.org/schema/core" persistent="true" brokerName="localhost" dataDirectory="${activemq.data}"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" topicPrefetch="1"> <!-- The constantPendingMessageLimitStrategy is used to prevent slow topic consumers to block producers and affect other consumers by limiting the number of messages that are retained For more information, see: http://activemq.apache.org/slow-consumer-handling.html --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000"/> </pendingMessageLimitStrategy> <subscriptionRecoveryPolicy> <timedSubscriptionRecoveryPolicy recoverDuration="1800000"/> </subscriptionRecoveryPolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <!-- The managementContext is used to configure how ActiveMQ is exposed in JMX. By default, ActiveMQ uses the MBean server that is started by the JVM. For more information, see: http://activemq.apache.org/jmx.html --> <managementContext> <managementContext createConnector="false"/> </managementContext> <!-- Configure message persistence for the broker. The default persistence mechanism is the KahaDB store (identified by the kahaDB tag). For more information, see: http://activemq.apache.org/persistence.html --> <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="128mb"/> </persistenceAdapter> <!-- The systemUsage controls the maximum amount of space the broker will use before disabling caching and/or slowing down producers. For more information, see: http://activemq.apache.org/producer-flow-control.html --> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage percentOfJvmHeap="70" /> </memoryUsage> <storeUsage> <storeUsage limit="100 gb"/> </storeUsage> <tempUsage> <tempUsage limit="50 gb"/> </tempUsage> </systemUsage> </systemUsage> <!-- The transport connectors expose ActiveMQ over a given protocol to clients and other brokers. For more information, see: http://activemq.apache.org/configuring-transports.html --> <transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <!-- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> --> </transportConnectors> <!-- destroy the spring context on shutdown to stop jetty --> <shutdownHooks> <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> </shutdownHooks> </broker> <!-- Enable web consoles, REST and Ajax APIs and demos The web consoles requires by default login, you can disable this in the jetty.xml file Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details --> <import resource="jetty.xml"/> </beans> <!-- END SNIPPET: example -->
