ActiveMQ的使用以及應用場景


兩種消息模式

消息列隊有兩種消息模式,一種是點對點的消息模式,還有一種就是訂閱的模式.;下面來說說這兩種模式。

1、點對點的消息模式

點對點的模式主要建立在一個隊列上面,當連接一個列隊的時候,發送端不需要知道接收端是否正在接收,可以直接向ActiveMQ發送消息,發送的消息,將會先進入隊列中,如果有接收端在監聽,則會發向接收端,如果沒有接收端接收,則會保存在activemq服務器,直到接收端接收消息,點對點的消息模式可以有多個發送端,多個接收端,但是一條消息,只會被一個接收端給接收到,哪個接收端先連上ActiveMQ,則會先接收到,而后來的接收端則接收不到那條消息

2、訂閱模式

訂閱/發布模式,同樣可以有着多個發送端與多個接收端,但是接收端與發送端存在時間上的依賴,就是如果發送端發送消息的時候,接收端並沒有監聽消息,那么ActiveMQ將不會保存消息,將會認為消息已經發送,換一種說法,就是發送端發送消息的時候,接收端不在線,是接收不到消息的,哪怕以后監聽消息,同樣也是接收不到的。這個模式還有一個特點,那就是,發送端發送的消息,將會被所有的接收端給接收到,不類似點對點,一條消息只會被一個接收端給接收到

什么情況下使用ActiveMQ

緊耦合應用系統存在許多問題,但是,要將緊耦合系統重構成松耦合系統是一件值得但比較繁瑣的事情。使用松耦合的主要優勢體現在將同步改為異步。使用異步通信,應用程序將從接收者反饋的等待中解放出來,其他的任務可以得到執行,這樣提高了應用程序的效率。

只要是兩個應用程序間需要通信的情況,都可以考慮使用JMS,不論這種通信是在本地的(就是通信的兩個應用程序在同一台主機上),還是分布在不同機器上。盡管是在同一個主機上的兩個應用程序需要通信也可以使用ActiveMQ。ActiveMQ可以確保消息投遞成功並采用異步方式通信。

多個需要通信的應用程序在同一個機器上的情況下,您可以考慮在執行機上獨立運行ActiveMQ或者將ActiveMQ嵌入到Java應用服務中。無論采用哪種方式,都可以確保應用程序能夠發送和接收消息。您可以選擇訂閱模式(pub/sub)或者采用PTP(point to point)模式,這兩種模式都無需等待執行反饋信息。每一個應用程序都可以簡單的將消息發送給ActiveMQ,然后繼續做其他的工作;應用程序無需阻塞式等待消息的返回。

對於分布在多台主機上的應用程序來說,可以使用多種布置策略。主要包括單一ActiveMQ實例和多ActiveMQ實例。單一ActiveMQ實例是一個簡單解決方案。所有的應用程序都向同一個ActiveMQ中介發送和接收消息,這與上面提到的單機多服務雷同。單一的ActiveMQ可以布置到一台單獨的主機上,也可以和其中的一些服務布置在一起。重要的是,所有的應用必須能夠直接與ActiveMQ中介進行交互,所以,你必須考慮到你的網絡設計。

第二種情況比較復雜,但是有ActiveMQ來負責遠程通信,而不是應用程序自身。在這種場景下,每一個應用程序都會實例化一個ActiveMQ(無論是嵌入式的還是獨立式的),應用程序從其本地的ActiveMQ發送和接收消息。之后這些ActiveMQ實例將會以一種聯合的方式協同工作。消息將會基於每一個應用的要求在多個ActiveMQ中介間傳遞到遠程的處理者。在ActiveMQ中,這種模式被稱為netWork of brokers。采用這種模式對於處理大量的ActiveMQ消息是可行的,但是,我們往往需要減輕網絡拓撲的復雜性,這樣直接將消息投遞到遠程接收者的ActiveMQ是不可行的。在后一種情況下,不同的協議使用可以使ActiveMQ更輕松的傳遞消息。

ActiveMQ配置傳輸連接

ActiveMQ提供了廣泛的連接模式,包括HTTP/S、JGroups、JXTA、muticast、SSL、TCP、UDP、XMPP等。提供了如此多的連接模式表明了ActiveMQ具有較高的靈活性。

配置格式如下:

 1  <transportConnectors>
 2 
 3             <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
 4 
 5             <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
 6 
 7             <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
 8             <transportConnector name="ssl" uri="ssl://0.0.0.0:61617?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
 9             <transportConnector name="stomp" uri="stomp://0.0.0.0:61618?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
10             <transportConnector name="xmpp" uri="xmpp://0.0.0.0:61619?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
11         </transportConnectors>
View Code

生產者和消費着可以使用不同的傳輸協議來傳輸信息。比如生產者用nio協議生產消息,消費者用tcp協議接收消息。

ActiveMQ配置網絡連接

當應用到Broker的集群時,Borker與Broker的通信就用到了網絡連接。

配置格式如下:

1 <networkConnectors>
2 <!-- 動態連接方式
3 <networkConnector name="default-nc" uri="multicast://default"
4                    dynamicOnly="true" networkTTL="3" prefetchSize="1" decreaseNetworkConsumerPriority="true"
5          /> -->
6          <!-- 靜態連接方式 <networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/> -->
7 </networkConnectors>
View Code

 ActiveMQ持久化存儲模式

ActiveMq主要實現了如下幾種存儲:

1.4.1.   AMQ消息存儲—默認的消息存儲

它是一種基於文件存儲的消息數據庫並且不依賴第三方數據庫。配置如下

<amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/>

  

1.4.2.   KahaDB 消息存儲—提供容量的提升和恢復能力

  它是一種新的消息存儲機制,配置如下

<kahaDB directory="${activemq.data}/kahadb" />

1.4.3.   JDBC 消息存儲—消息基於JDBC存儲

1 <persistenceAdapter>
2     <jdbcPersistenceAdapter dataSource="#mysql-ds" />
3 </persistenceAdapter>
4 <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
5     <property name="driverClassName" value="com.mysql.jdbc.Driver" />
6     <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true" />
7     <property name="username" value="activemq" /> <property name="password" value="activemq" />
8     <property name="maxActive" value="200" /> <property name="poolPreparedStatements" value="true" />
9 </bean>
View Code

1.4.4.   Memory 消息存儲—基於內容的消息存儲

  ActiveMQ支持將消息保存到內存中,這種情況沒有動態的緩存存在。

  這種情況的配置很簡單,只要將Broker的“prsistent” 屬性設置為“false”即可。

ActiveMQ攔截器使用

在ActiveMQ中使用攔截器和過濾器的使用多采用插件的形式實現,繼承BrokerFilter實現BrokerPlugin接口類。BrokerFilter實質一個實現Broker接口的類。

日志攔截

日志攔截器是Broker的一個攔截器,默認的日志級別為INFO。你如你想改變日志的級別。這個日志攔截器支持Commons-log和Log4j兩種日志。

<plugins>
      <loggingBrokerPlugin logAll="true" logConnectionEvents="false"/>
</plugins>

  部分參數如下:

屬性名稱 默認值 描述
logAll false 記錄所有事件的日志
logMessageEvents false 記錄消息事件日志
logConnectionEvents true 記錄連接事件日志
logTransactionEvents false 記錄消息事務事件日志
logConsumerEvents false 記錄消息消費者事件日志
logProducerEvents false 記錄消息生產者事件日志
logInternalEvents false  

 

 

 

 

 

 

 

 

 

 

ActiveMQ安全配置

ActiveMQ也可以對各個主題和隊列設置用戶名和密碼,配置如下:

 1 <plugins>
 2     <!-- Configure authentication; Username, passwords and groups -->
 3     <simpleAuthenticationPlugin>
 4         <users>
 5             <authenticationUser username="system" password="manager" groups="users,admins" />
 6             <authenticationUser username="user" password="password" groups="users" />
 7             <authenticationUser username="guest" password="password" groups="guests" />
 8             <authenticationUser username="testUser" password="123456" groups="testGroup" />
 9         </users>
10     </simpleAuthenticationPlugin>
11     <!--  Lets configure a destination based authorization mechanism -->
12     <authorizationPlugin>
13         <map>
14             <authorizationMap>
15                 <authorizationEntries>
16                     <authorizationEntry queue="queue.group.uum" read="users" write="users" admin="users" />
17                     <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
18                     <authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
19                     <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
20                     <authorizationEntry queue="TEST.Q" read="guests" write="guests" />
21                     <authorizationEntry queue="test" read="testGroup" write="testGroup" />
22                     <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
23                     <authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
24                     <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
25                     <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users ,testGroup" write="guests,users ,testGroup " admin="guests,users ,testGroup " />
26                 </authorizationEntries>
27             </authorizationMap>
28         </map>
29     </authorizationPlugin>
30 </plugins>
View Code

ActiveMQ Async Sends

Acivemq 支持異步和同步發送消息。在 ActiveMQ4.0 以上,所有的異步或同步對

於 Consumer 來說是變得可配置了。默認是在 ConnectionFactory、Connection、

Connection URI等方面配置對於一個基於 Destination  的Consumer來說。

眾所周之,如果你想傳遞給 Slow Consumer 那么你可能使用異步的消息傳遞,但是對於 Fast Consumer 你可能使用同步發送消息。(這樣可以避免同步和上下文切換額外的增加Queue 堵塞花費。如果對於一個 Slow Consumer,你使用同步發送消息可能出現Producer 堵塞等顯現。

ActiveMQ默認設置 dispatcheAsync=true是最好的性能設置。如果你處理的是

Slow Consumer 則使用 dispatcheAsync=true,反之,那你使用的是 Fast Consumer則使用dispatcheAsync=false。

用Connection URI 來配置Async如下:

ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

用ConnectionFactory 配置Async如下:

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

用Connection 配置Async 如下:

((ActiveMQConnection)connection).setUseAsyncSend(true);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM