一、編譯
雖然ActiveMQ提供了發布版本,但是建議同學們自己下載源代碼編譯,以后萬一有坑,還可以嘗試自己改改源碼。
1.1 https://github.com/apache/activemq/releases 到這里下載最新的release版源碼(當前最新版本為5.13.2),並解壓到某個目錄(以下用$ACTIVEMQ_HOME代替解壓根目錄)
1.2 編譯
cd $ACTIVEMQ_HOME mvn clean install -Dmaven.test.skip=true
編譯成功后,在$ACTIVEMQ_HOME/assembly/target下會生成可xxx.bin.tar.gz的可執行文件壓縮包
二、啟動
將編譯后得到的xxx.bin.tar.gz解壓,然后執行
tar -zxvf apache-activemq-5.13.2-bin.tar.gz cd apache-activemq-5.13.2/bin ./activemq start
后面的可選參數還有 status、restart、stop、list等,不清楚的地方,直接 --help 查看。
注:生產環境中,可能會對activemq的jvm內存設置上限,可以直接修改bin/activemq啟動腳本,vi bin/activemq 找到下面的位置:
# Note: This function uses globally defined variables # - $ACTIVEMQ_PIDFILE : the name of the pid file # - $ACTIVEMQ_OPTS : Additional options ACTIVEMQ_OPTS="-server -Xms512M -Xmx512M -XX:PermSize=64M -XX:MaxPermSize=128M " # - $ACTIVEMQ_SUNJMX_START : options for JMX settings # - $ACTIVEMQ_SSL_OPTS : options for SSL encryption
設置ACTIVEMQ_OPTS即可,然后重啟activemq,建議啟動成功后,用jinfo {activemq的pid} 來驗證查看一下
三、管理界面
啟動成功后,可以瀏覽 http://localhost:8161/admin/
默認用戶名、密碼:admin/admin
管理界面是用jetty做容器的,如果想修改管理界面的端口,可以編輯../conf/jetty.xml,找到下面這一段:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8161"/> </bean>
用戶名/密碼是在 ../conf/jetty-realm.properties 里,比如要增加一個管理員jimmy/123456,可參考下面修改:
admin: admin, admin jimmy: 123456, admin user: user, user
注:管理界面有一個小坑,ActiveMQ 5.13.2與jdk1.8兼容性有點問題,如果使用jdk1.8,管理界面進入Queues標簽頁時,偶爾會報錯,但是並不影響消息正常收發,只是無法從界面上查看隊列情況,如果出現該問題,可將jdk版本降至1.7,同時最好清空data目錄下的所有數據,再重啟activemq即可。
2016-06-18 注:最新版的5.13.3已經修復了這個bug,建議大家使用最新版本。
四、示例代碼
通常消息隊列都支持二種模式:基於主題(topic)的發布(Publish)/訂閱(Subscribe)模式、點對點(p2p)模式,下面的示例代碼為p2p場景。
先給出gradle項目的依賴項
dependencies { compile "org.springframework:spring-core:4.2.5.RELEASE" compile "org.springframework:spring-beans:4.2.5.RELEASE" compile "org.springframework:spring-context:4.2.5.RELEASE" compile "org.springframework:spring-jms:4.2.3.RELEASE" compile 'org.apache.activemq:activemq-all:5.13.2' compile 'org.apache.commons:commons-pool2:2.4.2' testCompile group: 'junit', name: 'junit', version: '4.12' }
4.1 spring配置文件

1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> 5 6 <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> 7 <property name="connectionFactory"> 8 <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 9 <!--broker服務的地址--> 10 <property name="brokerURL" value="tcp://localhost:61616"/> 11 <!--默認值為1000,如果不需要這么大,可以調小--> 12 <property name="maxThreadPoolSize" value="100"/> 13 </bean> 14 </property> 15 </bean> 16 17 <bean id="dest" class="org.apache.activemq.command.ActiveMQQueue"> 18 <!--隊列名稱--> 19 <property name="physicalName" value="myQueue"/> 20 </bean> 21 22 <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 23 <property name="connectionFactory" ref="jmsFactory"/> 24 <!--默認的隊列--> 25 <property name="defaultDestination" ref="dest"/> 26 <!--接收超時時間10秒--> 27 <property name="receiveTimeout" value="10000"/> 28 </bean> 29 30 </beans>
注:brokerURL的地址是在conf/activemq.xml里定義里,見下面的片段

1 <transportConnectors> 2 <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 3 <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> 4 <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> 5 <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> 6 <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> 7 <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> 8 </transportConnectors>
另外,連接ActiveMQ默認情況下,沒有任何安全機制,也就是說任何人只要知道brokerURL都能連接,這顯然不安全,可以在activemq.xml里,找到<broker>節點,緊貼它的地方添加下面這段:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"> <plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/> </users> </simpleAuthenticationPlugin> </plugins> ... </broker>
那么問題來了,這個${activemq.username}及${activemq.password}的值是在哪里定義的呢?仍然在activemq.xml里找答案,在最開始的地方有一段:
1 <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> 2 <property name="locations"> 3 <value>file:${activemq.conf}/credentials.properties</value> 4 </property> 5 </bean>
換句話說,conf/credentials.properties這里保存的就是連接activemq的用戶名和密碼,啟用連接的安全機制后,spring的配置文件要做如下調整:

1 <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> 2 <property name="connectionFactory"> 3 <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 4 <!--broker服務的地址--> 5 <property name="brokerURL" value="tcp://localhost:61616"/> 6 <!--默認值為1000,如果不需要這么大,可以調小--> 7 <property name="maxThreadPoolSize" value="100"/> 8 <property name="userName" value="system"/> 9 <property name="password" value="manager"/> 10 </bean> 11 </property> 12 </bean>
4.2 生產者代碼
發送消息的代碼有二種寫法:
a)利用spring-jms的JmsTemplate
package com.cnblogs.yjmyzz.activemq; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; /** * ActiveMQ消息發送示例(利用JMSTemplate) * Author:菩提樹下的楊過 http://yjmyzz.cnblogs.com */ public class JmsTemplateProducer { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml"); JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class); System.out.println("准備發送消息..."); int max = 100000; Long start = System.currentTimeMillis(); for (int i = 0; i < max; i++) { jmsTemplate.convertAndSend("message test:" + i); } Long end = System.currentTimeMillis(); Long elapse = end - start; int perform = Double.valueOf(max / (elapse / 1000d)).intValue(); System.out.print("發送 " + max + " 條消息,耗時:" + elapse + "毫秒,平均" + perform + "條/秒"); } }
b) 利用activeMQ的Producer
package com.cnblogs.yjmyzz.activemq; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.pool.PooledConnectionFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import javax.jms.*; import java.io.IOException; /** * ActiveMQ消息發送示例(利用Producer) * Author:菩提樹下的楊過 http://yjmyzz.cnblogs.com */ public class ActiveMQProducer { public static void main(String[] args) throws JMSException, IOException, InterruptedException { ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml"); PooledConnectionFactory connectionFactory = context.getBean(PooledConnectionFactory.class); ActiveMQQueue destination = context.getBean(ActiveMQQueue.class); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(destination); System.out.println("准備發送消息..."); int max = 100000; Long start = System.currentTimeMillis(); for (int i = 0; i < max; i++) { TextMessage msg = session.createTextMessage("message test:" + i); //msg.setIntProperty("id", i); producer.send(msg); } Long end = System.currentTimeMillis(); Long elapse = end - start; int perform = Double.valueOf(max / (elapse / 1000d)).intValue(); System.out.print("發送 " + max + " 條消息,耗時:" + elapse + "毫秒,平均" + perform + "條/秒"); //producer.send(session.createTextMessage("SHUTDOWN")); //Thread.sleep(1000 * 3); //connection.close(); System.exit(0); } }
這二種方式在性能上差不多,4核8G的mac book pro上,大致每秒可以寫入3k+條消息。但是從代碼量來講,明顯JmsTemplate的代碼量更少,推薦使用。
4.3 消費者代碼
當然也可以用JmsTemplate接收消息,但是一般得自己去寫while(true)循環,而且默認情況下,上下文如果不是同一個連接,JmsTemplate A發出的消息,JmsTemplate B是接收不到的,所以不建議這種方式。最好參考下面的示例,使用JMS的MessageLisenter去監聽消息,這也是JMS規范建議的標准做法:
package com.cnblogs.yjmyzz.activemq; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.pool.PooledConnectionFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import javax.jms.*; import java.io.IOException; /** * ActiveMQ消息接收示例(使用MessageListener) * Author:菩提樹下的楊過 http://yjmyzz.cnblogs.com */ public class MessageListenerConsumer { public static void main(String[] args) throws JMSException, IOException { ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml"); PooledConnectionFactory connectionFactory = context.getBean(PooledConnectionFactory.class); ActiveMQQueue destination = context.getBean(ActiveMQQueue.class); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new ActiveMQListener()); System.in.read(); } static class ActiveMQListener implements MessageListener { @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { System.out.println(((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } } } }
4.4 嵌入式Broker
類似jetty、tombat之類可以內嵌到代碼中啟動一樣,ActiveMQ也可以直接在代碼中內嵌啟動,這個很方便一些輕量級的使用場景,示例代碼如下:
public class EmbbedBroker { public static void main(String[] args) throws Exception { BrokerService broker = new BrokerService(); broker.addConnector("tcp://localhost:61616"); broker.start(); System.out.println("ActiveMQ 已啟動!"); } }
關於嵌入式Broker的更多細節,可以參考 http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html
4.5 消息的自動確認與手動確認
在接收消息時,如果Session使用的是 Session.AUTO_ACKNOWLEDGE,即:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
則消息一旦被接受,不論onMessage()里的業務邏輯執行成功與否,消息都將從ActiveMQ的隊列里立刻刪除。如果希望業務處理成功后,再通知ActiveMQ刪除消息,可以改成:
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
然后onMessage方法調用message.acknowledge手動確認,參考以下代碼:
static class ActiveMQListener implements MessageListener { @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { System.out.println(((TextMessage) message).getText()); message.acknowledge(); //手動確認消息 } } catch (JMSException e) { e.printStackTrace(); } } }