ActiveMQ筆記(1):編譯、安裝、示例代碼


一、編譯

雖然ActiveMQ提供了發布版本,但是建議同學們自己下載源代碼編譯,以后萬一有坑,還可以嘗試自己改改源碼。

1.1 https://github.com/apache/activemq/releases 到這里下載最新的release版源碼(當前最新版本為5.13.2),並解壓到某個目錄(以下用$ACTIVEMQ_HOME代替解壓根目錄)

1.2 編譯

1
2
cd  $ACTIVEMQ_HOME
mvn clean  install  -Dmaven. test .skip= true

編譯成功后,在$ACTIVEMQ_HOME/assembly/target下會生成可xxx.bin.tar.gz的可執行文件壓縮包

 

二、啟動

將編譯后得到的xxx.bin.tar.gz解壓,然后執行 

1
2
3
tar  -zxvf apache-activemq-5.13.2-bin. tar .gz
cd  apache-activemq-5.13.2 /bin
. /activemq  start

后面的可選參數還有 status、restart、stop、list等,不清楚的地方,直接 --help 查看。

注:生產環境中,可能會對activemq的jvm內存設置上限,可以直接修改bin/activemq啟動腳本,vi bin/activemq 找到下面的位置:

 

1
2
3
4
5
6
# Note: This function uses globally defined variables
# - $ACTIVEMQ_PIDFILE : the name of the pid file
# - $ACTIVEMQ_OPTS : Additional options
ACTIVEMQ_OPTS= "-server -Xms512M -Xmx512M -XX:PermSize=64M -XX:MaxPermSize=128M "
# - $ACTIVEMQ_SUNJMX_START : options for JMX settings
# - $ACTIVEMQ_SSL_OPTS : options for SSL encryption

設置ACTIVEMQ_OPTS即可,然后重啟activemq,建議啟動成功后,用jinfo {activemq的pid} 來驗證查看一下  

 

 

三、管理界面

啟動成功后,可以瀏覽 http://localhost:8161/admin/

默認用戶名、密碼:admin/admin

管理界面是用jetty做容器的,如果想修改管理界面的端口,可以編輯../conf/jetty.xml,找到下面這一段:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8161"/>
</bean>

用戶名/密碼是在 ../conf/jetty-realm.properties 里,比如要增加一個管理員jimmy/123456,可參考下面修改:

1
2
3
admin: admin, admin
jimmy: 123456, admin
user: user, user 

注:管理界面有一個小坑,ActiveMQ 5.13.2與jdk1.8兼容性有點問題,如果使用jdk1.8,管理界面進入Queues標簽頁時,偶爾會報錯,但是並不影響消息正常收發,只是無法從界面上查看隊列情況,如果出現該問題,可將jdk版本降至1.7,同時最好清空data目錄下的所有數據,再重啟activemq即可。

2016-06-18 注:最新版的5.13.3已經修復了這個bug,建議大家使用最新版本。

  

四、示例代碼

通常消息隊列都支持二種模式:基於主題(topic)的發布(Publish)/訂閱(Subscribe)模式、點對點(p2p)模式,下面的示例代碼為p2p場景。

先給出gradle項目的依賴項

1
2
3
4
5
6
7
8
9
dependencies {
     compile  "org.springframework:spring-core:4.2.5.RELEASE"
     compile  "org.springframework:spring-beans:4.2.5.RELEASE"
     compile  "org.springframework:spring-context:4.2.5.RELEASE"
     compile  "org.springframework:spring-jms:4.2.3.RELEASE"
     compile  'org.apache.activemq:activemq-all:5.13.2'
     compile  'org.apache.commons:commons-pool2:2.4.2'
     testCompile group:  'junit' , name:  'junit' , version:  '4.12'
}

4.1 spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!--broker服務的地址-->
<property name="brokerURL" value="tcp://localhost:61616"/>
<!--默認值為1000,如果不需要這么大,可以調小-->
<property name="maxThreadPoolSize" value="100"/>
</bean>
</property>
</bean>

<bean id="dest" class="org.apache.activemq.command.ActiveMQQueue">
<!--隊列名稱-->
<property name="physicalName" value="myQueue"/>
</bean>

<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
<!--默認的隊列-->
<property name="defaultDestination" ref="dest"/>
<!--接收超時時間10秒-->
<property name="receiveTimeout" value="10000"/>
</bean>

</beans>

復制代碼
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
 5 
 6     <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
 7         <property name="connectionFactory">
 8             <bean class="org.apache.activemq.ActiveMQConnectionFactory">
 9                 <!--broker服務的地址-->
10                 <property name="brokerURL" value="tcp://localhost:61616"/>
11                 <!--默認值為1000,如果不需要這么大,可以調小-->
12                 <property name="maxThreadPoolSize" value="100"/>
13             </bean>
14         </property>
15     </bean>
16 
17     <bean id="dest" class="org.apache.activemq.command.ActiveMQQueue">
18         <!--隊列名稱-->
19         <property name="physicalName" value="myQueue"/>
20     </bean>
21 
22     <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
23         <property name="connectionFactory" ref="jmsFactory"/>
24         <!--默認的隊列-->
25         <property name="defaultDestination" ref="dest"/>
26         <!--接收超時時間10秒-->
27         <property name="receiveTimeout" value="10000"/>
28     </bean>
29 
30 </beans>
復制代碼

注:brokerURL的地址是在conf/activemq.xml里定義里,見下面的片段

<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

復制代碼
1 <transportConnectors>
2  <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
3  <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
4  <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
5  <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
6  <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
7  <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
8 </transportConnectors>
復制代碼

另外,連接ActiveMQ默認情況下,沒有任何安全機制,也就是說任何人只要知道brokerURL都能連接,這顯然不安全,可以在activemq.xml里,找到<broker>節點,緊貼它的地方添加下面這段:

復制代碼
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

        <plugins>   
            <simpleAuthenticationPlugin>   
                <users>   
                    <authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>   
                </users>   
            </simpleAuthenticationPlugin>   
        </plugins>
...
   </broker>
復制代碼

那么問題來了,這個activemq.usernameactivemq.username及{activemq.password}的值是在哪里定義的呢?仍然在activemq.xml里找答案,在最開始的地方有一段:

1 <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
2     <property name="locations">
3         <value>file:${activemq.conf}/credentials.properties</value>
4     </property>
5 </bean>

換句話說,conf/credentials.properties這里保存的就是連接activemq的用戶名和密碼,啟用連接的安全機制后,spring的配置文件要做如下調整:

<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<!--broker服務的地址-->
<property name="brokerURL" value="tcp://localhost:61616"/>
<!--默認值為1000,如果不需要這么大,可以調小-->
<property name="maxThreadPoolSize" value="100"/>
<property name="userName" value="system"/>
<property name="password" value="manager"/>
</bean>
</property>
</bean>

復制代碼
 1 <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
 2     <property name="connectionFactory">
 3         <bean class="org.apache.activemq.ActiveMQConnectionFactory">
 4             <!--broker服務的地址-->
 5             <property name="brokerURL" value="tcp://localhost:61616"/>
 6             <!--默認值為1000,如果不需要這么大,可以調小-->
 7             <property name="maxThreadPoolSize" value="100"/>
 8             <property name="userName" value="system"/>
 9             <property name="password" value="manager"/>
10         </bean>
11     </property>
12 </bean>
復制代碼

 

4.2 生產者代碼

發送消息的代碼有二種寫法:

a)利用spring-jms的JmsTemplate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package  com.cnblogs.yjmyzz.activemq;
 
import  org.springframework.context.ApplicationContext;
import  org.springframework.context.support.ClassPathXmlApplicationContext;
import  org.springframework.jms.core.JmsTemplate;
 
/**
  * ActiveMQ消息發送示例(利用JMSTemplate)
  * Author:菩提樹下的楊過 http://yjmyzz.cnblogs.com
  */
public  class  JmsTemplateProducer {
 
     public  static  void  main(String[] args) {
 
         ApplicationContext context =  new  ClassPathXmlApplicationContext( "spring-context.xml" );
         JmsTemplate jmsTemplate = context.getBean(JmsTemplate. class );
 
         System.out.println( "准備發送消息..." );
         int  max =  100000 ;
         Long start = System.currentTimeMillis();
         for  ( int  i =  0 ; i < max; i++) {
             jmsTemplate.convertAndSend( "message test:"  + i);
         }
         Long end = System.currentTimeMillis();
         Long elapse = end - start;
         int  perform = Double.valueOf(max / (elapse / 1000d)).intValue();
 
         System.out.print( "發送 "  + max +  " 條消息,耗時:"  + elapse +  "毫秒,平均"  + perform +  "條/秒" );
     }
}

b) 利用activeMQ的Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package  com.cnblogs.yjmyzz.activemq;
 
import  org.apache.activemq.command.ActiveMQQueue;
import  org.apache.activemq.pool.PooledConnectionFactory;
import  org.springframework.context.ApplicationContext;
import  org.springframework.context.support.ClassPathXmlApplicationContext;
 
import  javax.jms.*;
import  java.io.IOException;
 
/**
  * ActiveMQ消息發送示例(利用Producer)
  * Author:菩提樹下的楊過 http://yjmyzz.cnblogs.com
  */
public  class  ActiveMQProducer {
 
     public  static  void  main(String[] args)  throws  JMSException, IOException, InterruptedException {
         ApplicationContext context =  new  ClassPathXmlApplicationContext( "spring-context.xml" );
         PooledConnectionFactory connectionFactory = context.getBean(PooledConnectionFactory. class );
         ActiveMQQueue destination = context.getBean(ActiveMQQueue. class );
         Connection connection = connectionFactory.createConnection();
         connection.start();
         Session session = connection.createSession( false , Session.AUTO_ACKNOWLEDGE);
         MessageProducer producer = session.createProducer(destination);
 
         System.out.println( "准備發送消息..." );
         int  max =  100000 ;
         Long start = System.currentTimeMillis();
 
         for  ( int  i =  0 ; i < max; i++) {
             TextMessage msg = session.createTextMessage( "message test:"  + i);
             //msg.setIntProperty("id", i);
             producer.send(msg);
         }
         Long end = System.currentTimeMillis();
         Long elapse = end - start;
         int  perform = Double.valueOf(max / (elapse / 1000d)).intValue();
 
         System.out.print( "發送 "  + max +  " 條消息,耗時:"  + elapse +  "毫秒,平均"  + perform +  "條/秒" );
 
         //producer.send(session.createTextMessage("SHUTDOWN"));
         //Thread.sleep(1000 * 3);
         //connection.close();
         System.exit( 0 );
     }
}

這二種方式在性能上差不多,4核8G的mac book pro上,大致每秒可以寫入3k+條消息。但是從代碼量來講,明顯JmsTemplate的代碼量更少,推薦使用。

4.3 消費者代碼

當然也可以用JmsTemplate接收消息,但是一般得自己去寫while(true)循環,而且默認情況下,上下文如果不是同一個連接,JmsTemplate A發出的消息,JmsTemplate B是接收不到的,所以不建議這種方式。最好參考下面的示例,使用JMS的MessageLisenter去監聽消息,這也是JMS規范建議的標准做法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package  com.cnblogs.yjmyzz.activemq;
 
import  org.apache.activemq.command.ActiveMQQueue;
import  org.apache.activemq.pool.PooledConnectionFactory;
import  org.springframework.context.ApplicationContext;
import  org.springframework.context.support.ClassPathXmlApplicationContext;
import  javax.jms.*;
import  java.io.IOException;
 
 
/**
  * ActiveMQ消息接收示例(使用MessageListener)
  * Author:菩提樹下的楊過 http://yjmyzz.cnblogs.com
  */
public  class  MessageListenerConsumer {
 
     public  static  void  main(String[] args)  throws  JMSException, IOException {
         ApplicationContext context =  new  ClassPathXmlApplicationContext( "spring-context.xml" );
         PooledConnectionFactory connectionFactory = context.getBean(PooledConnectionFactory. class );
         ActiveMQQueue destination = context.getBean(ActiveMQQueue. class );
         Connection connection = connectionFactory.createConnection();
         connection.start();
         Session session = connection.createSession( false , Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createConsumer(destination);
         consumer.setMessageListener( new  ActiveMQListener());
         System.in.read();
     }
 
 
     static  class  ActiveMQListener  implements  MessageListener {
         @Override
         public  void  onMessage(Message message) {
             try  {
                 if  (message  instanceof  TextMessage) {
                     System.out.println(((TextMessage) message).getText());
                 }
             catch  (JMSException e) {
                 e.printStackTrace();
             }
         }
     }
}

4.4 嵌入式Broker

類似jetty、tombat之類可以內嵌到代碼中啟動一樣,ActiveMQ也可以直接在代碼中內嵌啟動,這個很方便一些輕量級的使用場景,示例代碼如下:

1
2
3
4
5
6
7
8
public  class  EmbbedBroker {
     public  static  void  main(String[] args)  throws  Exception {
         BrokerService broker =  new  BrokerService();
         broker.addConnector( "tcp://localhost:61616" );
         broker.start();
         System.out.println( "ActiveMQ 已啟動!" );
     }
}

關於嵌入式Broker的更多細節,可以參考 http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html

4.5 消息的自動確認與手動確認

在接收消息時,如果Session使用的是 Session.AUTO_ACKNOWLEDGE,即:

1
Session session = connection.createSession( false , Session.AUTO_ACKNOWLEDGE);

則消息一旦被接受,不論onMessage()里的業務邏輯執行成功與否,消息都將從ActiveMQ的隊列里立刻刪除。如果希望業務處理成功后,再通知ActiveMQ刪除消息,可以改成:

1
Session session = connection.createSession( false , Session.CLIENT_ACKNOWLEDGE);

然后onMessage方法調用message.acknowledge手動確認,參考以下代碼:

1
2
3
4
5
6
7
8
9
10
11
12
13
static  class  ActiveMQListener  implements  MessageListener {
     @Override
     public  void  onMessage(Message message) {
         try  {
             if  (message  instanceof  TextMessage) {
                 System.out.println(((TextMessage) message).getText());
                 message.acknowledge();  //手動確認消息
             }
         catch  (JMSException e) {
             e.printStackTrace();
         }
     }
}

  

作者: 菩提樹下的楊過
出處: http://yjmyzz.cnblogs.com 
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM