兩種消息模式
消息列隊有兩種消息模式,一種是點對點的消息模式,還有一種就是訂閱的模式.;下面來說說這兩種模式。
1、點對點的消息模式
點對點的模式主要建立在一個隊列上面,當連接一個列隊的時候,發送端不需要知道接收端是否正在接收,可以直接向ActiveMQ發送消息,發送的消息,將會先進入隊列中,如果有接收端在監聽,則會發向接收端,如果沒有接收端接收,則會保存在activemq服務器,直到接收端接收消息,點對點的消息模式可以有多個發送端,多個接收端,但是一條消息,只會被一個接收端給接收到,哪個接收端先連上ActiveMQ,則會先接收到,而后來的接收端則接收不到那條消息
2、訂閱模式
訂閱/發布模式,同樣可以有着多個發送端與多個接收端,但是接收端與發送端存在時間上的依賴,就是如果發送端發送消息的時候,接收端並沒有監聽消息,那么ActiveMQ將不會保存消息,將會認為消息已經發送,換一種說法,就是發送端發送消息的時候,接收端不在線,是接收不到消息的,哪怕以后監聽消息,同樣也是接收不到的。這個模式還有一個特點,那就是,發送端發送的消息,將會被所有的接收端給接收到,不類似點對點,一條消息只會被一個接收端給接收到
什么情況下使用ActiveMQ
緊耦合應用系統存在許多問題,但是,要將緊耦合系統重構成松耦合系統是一件值得但比較繁瑣的事情。使用松耦合的主要優勢體現在將同步改為異步。使用異步通信,應用程序將從接收者反饋的等待中解放出來,其他的任務可以得到執行,這樣提高了應用程序的效率。
只要是兩個應用程序間需要通信的情況,都可以考慮使用JMS,不論這種通信是在本地的(就是通信的兩個應用程序在同一台主機上),還是分布在不同機器上。盡管是在同一個主機上的兩個應用程序需要通信也可以使用ActiveMQ。ActiveMQ可以確保消息投遞成功並采用異步方式通信。
多個需要通信的應用程序在同一個機器上的情況下,您可以考慮在執行機上獨立運行ActiveMQ或者將ActiveMQ嵌入到Java應用服務中。無論采用哪種方式,都可以確保應用程序能夠發送和接收消息。您可以選擇訂閱模式(pub/sub)或者采用PTP(point to point)模式,這兩種模式都無需等待執行反饋信息。每一個應用程序都可以簡單的將消息發送給ActiveMQ,然后繼續做其他的工作;應用程序無需阻塞式等待消息的返回。
對於分布在多台主機上的應用程序來說,可以使用多種布置策略。主要包括單一ActiveMQ實例和多ActiveMQ實例。單一ActiveMQ實例是一個簡單解決方案。所有的應用程序都向同一個ActiveMQ中介發送和接收消息,這與上面提到的單機多服務雷同。單一的ActiveMQ可以布置到一台單獨的主機上,也可以和其中的一些服務布置在一起。重要的是,所有的應用必須能夠直接與ActiveMQ中介進行交互,所以,你必須考慮到你的網絡設計。
第二種情況比較復雜,但是有ActiveMQ來負責遠程通信,而不是應用程序自身。在這種場景下,每一個應用程序都會實例化一個ActiveMQ(無論是嵌入式的還是獨立式的),應用程序從其本地的ActiveMQ發送和接收消息。之后這些ActiveMQ實例將會以一種聯合的方式協同工作。消息將會基於每一個應用的要求在多個ActiveMQ中介間傳遞到遠程的處理者。在ActiveMQ中,這種模式被稱為netWork of brokers。采用這種模式對於處理大量的ActiveMQ消息是可行的,但是,我們往往需要減輕網絡拓撲的復雜性,這樣直接將消息投遞到遠程接收者的ActiveMQ是不可行的。在后一種情況下,不同的協議使用可以使ActiveMQ更輕松的傳遞消息。
ActiveMQ配置傳輸連接
ActiveMQ提供了廣泛的連接模式,包括HTTP/S、JGroups、JXTA、muticast、SSL、TCP、UDP、XMPP等。提供了如此多的連接模式表明了ActiveMQ具有較高的靈活性。
配置格式如下:

1 <transportConnectors> 2 3 <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 4 5 <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> 6 7 <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> 8 <transportConnector name="ssl" uri="ssl://0.0.0.0:61617?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> 9 <transportConnector name="stomp" uri="stomp://0.0.0.0:61618?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> 10 <transportConnector name="xmpp" uri="xmpp://0.0.0.0:61619?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> 11 </transportConnectors>
生產者和消費着可以使用不同的傳輸協議來傳輸信息。比如生產者用nio協議生產消息,消費者用tcp協議接收消息。
ActiveMQ配置網絡連接
當應用到Broker的集群時,Borker與Broker的通信就用到了網絡連接。
配置格式如下:

1 <networkConnectors> 2 <!-- 動態連接方式 3 <networkConnector name="default-nc" uri="multicast://default" 4 dynamicOnly="true" networkTTL="3" prefetchSize="1" decreaseNetworkConsumerPriority="true" 5 /> --> 6 <!-- 靜態連接方式 <networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/> --> 7 </networkConnectors>
ActiveMQ持久化存儲模式
ActiveMq主要實現了如下幾種存儲:
1.4.1. AMQ消息存儲—默認的消息存儲
它是一種基於文件存儲的消息數據庫並且不依賴第三方數據庫。配置如下
<amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/>
1.4.2. KahaDB 消息存儲—提供容量的提升和恢復能力
它是一種新的消息存儲機制,配置如下
<kahaDB directory="${activemq.data}/kahadb" />
1.4.3. JDBC 消息存儲—消息基於JDBC存儲

1 <persistenceAdapter> 2 <jdbcPersistenceAdapter dataSource="#mysql-ds" /> 3 </persistenceAdapter> 4 <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> 5 <property name="driverClassName" value="com.mysql.jdbc.Driver" /> 6 <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true" /> 7 <property name="username" value="activemq" /> <property name="password" value="activemq" /> 8 <property name="maxActive" value="200" /> <property name="poolPreparedStatements" value="true" /> 9 </bean>
1.4.4. Memory 消息存儲—基於內容的消息存儲
ActiveMQ支持將消息保存到內存中,這種情況沒有動態的緩存存在。
這種情況的配置很簡單,只要將Broker的“prsistent” 屬性設置為“false”即可。
ActiveMQ攔截器使用
在ActiveMQ中使用攔截器和過濾器的使用多采用插件的形式實現,繼承BrokerFilter實現BrokerPlugin接口類。BrokerFilter實質一個實現Broker接口的類。
日志攔截
日志攔截器是Broker的一個攔截器,默認的日志級別為INFO。你如你想改變日志的級別。這個日志攔截器支持Commons-log和Log4j兩種日志。
<plugins> <loggingBrokerPlugin logAll="true" logConnectionEvents="false"/> </plugins>
部分參數如下:
屬性名稱 | 默認值 | 描述 |
logAll | false | 記錄所有事件的日志 |
logMessageEvents | false | 記錄消息事件日志 |
logConnectionEvents | true | 記錄連接事件日志 |
logTransactionEvents | false | 記錄消息事務事件日志 |
logConsumerEvents | false | 記錄消息消費者事件日志 |
logProducerEvents | false | 記錄消息生產者事件日志 |
logInternalEvents | false |
ActiveMQ安全配置
ActiveMQ也可以對各個主題和隊列設置用戶名和密碼,配置如下:

1 <plugins> 2 <!-- Configure authentication; Username, passwords and groups --> 3 <simpleAuthenticationPlugin> 4 <users> 5 <authenticationUser username="system" password="manager" groups="users,admins" /> 6 <authenticationUser username="user" password="password" groups="users" /> 7 <authenticationUser username="guest" password="password" groups="guests" /> 8 <authenticationUser username="testUser" password="123456" groups="testGroup" /> 9 </users> 10 </simpleAuthenticationPlugin> 11 <!-- Lets configure a destination based authorization mechanism --> 12 <authorizationPlugin> 13 <map> 14 <authorizationMap> 15 <authorizationEntries> 16 <authorizationEntry queue="queue.group.uum" read="users" write="users" admin="users" /> 17 <authorizationEntry queue=">" read="admins" write="admins" admin="admins" /> 18 <authorizationEntry queue="USERS.>" read="users" write="users" admin="users" /> 19 <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" /> 20 <authorizationEntry queue="TEST.Q" read="guests" write="guests" /> 21 <authorizationEntry queue="test" read="testGroup" write="testGroup" /> 22 <authorizationEntry topic=">" read="admins" write="admins" admin="admins" /> 23 <authorizationEntry topic="USERS.>" read="users" write="users" admin="users" /> 24 <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" /> 25 <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users ,testGroup" write="guests,users ,testGroup " admin="guests,users ,testGroup " /> 26 </authorizationEntries> 27 </authorizationMap> 28 </map> 29 </authorizationPlugin> 30 </plugins>
ActiveMQ Async Sends
Acivemq 支持異步和同步發送消息。在 ActiveMQ4.0 以上,所有的異步或同步對
於 Consumer 來說是變得可配置了。默認是在 ConnectionFactory、Connection、
Connection URI等方面配置對於一個基於 Destination 的Consumer來說。
眾所周之,如果你想傳遞給 Slow Consumer 那么你可能使用異步的消息傳遞,但是對於 Fast Consumer 你可能使用同步發送消息。(這樣可以避免同步和上下文切換額外的增加Queue 堵塞花費。如果對於一個 Slow Consumer,你使用同步發送消息可能出現Producer 堵塞等顯現。
ActiveMQ默認設置 dispatcheAsync=true是最好的性能設置。如果你處理的是
Slow Consumer 則使用 dispatcheAsync=true,反之,那你使用的是 Fast Consumer則使用dispatcheAsync=false。
用Connection URI 來配置Async如下:
ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
用ConnectionFactory 配置Async如下:
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
用Connection 配置Async 如下:
((ActiveMQConnection)connection).setUseAsyncSend(true);