上一篇介紹了基於ZK的ActiveMQ HA方案,雖然理解起來比較容易,但是有二個不足:
1) 占用的節點數過多,1個zk集群至少3個節點,1個activemq集群也至少得3個節點,但其實正常運行時,只有一個master節點在對外響應,換句話說,花6個節點的成本只為了保證1個activemq master節點的高可用,太浪費資源了。
2) 性能下降太明顯,比起單節點的activemq,性能下降了近1個數量級。
這一篇將介紹基於networks of brokers的HA方案,不需要借助zk等第3方組件,只需要2個activemq節點就能達到類似效果,進入正題之前,先來簡單談談Broker這個概念。
Broker一詞的原意是『經紀人、中間人』,用在ActiveMQ的構架中,即:Broker作為Producer與Consumer的中間人(或代理人),生產者不用知道消費者在哪里、如何消費這些細節,只要將消息扔給中間人Broker即可,類似的,消費者也不用關心消息是從哪個生產者過來的,它只知道這是從Broker那里拿來的,如果畫一張圖來描述,就是下面這樣(引用自本文最后參考文章中的圖片)
那么,當生產者將消息發給Broker時,會發生什么?下圖描述的就是這個過程:
1) 生產者將消息發給Broker
2) Broker將消息落地存儲
3) 然后給生產者反饋:事情我已經辦妥了!
繼續,再來看看消費者又是如何跟Broker打交道的:
1) Broker將接收到的消息,從db中取出
2) 然后發送給消費者
3) 如果消費者使用的是自動確認模式(即:Session.AUTO_ACKNOWLEDGE),則Consumer會馬上告訴Broker:ok,消息我已經收到了。
4) 然后進行自己的業務處理
5) Broker一旦收到確認,將會馬上更新消息的狀態為已消費(或直接刪除,取決於持久化的實現機制)(注:雖然圖中步驟5排在步驟4之后,但是步驟4、5幾乎是同時發生的)
在一些大型應用中,如果一個Broker出現性能瓶頸抗不住壓力,可能會拆分成多個Broker,如下圖所示:
(注:上圖中箭頭的方法並非數據流向,而應該理解成調用關系,即:Producer調用Broker1,Consumer調用Broker2...)
Producer將消息發送給Broker1,而Consumer從另一個Broker2接收消息,有點類似數據庫讀寫分離的意思,這樣系統的性能可以提升一定程度的提升,但是問題來了,Broker1上的消息,如何"同步"(見下面的注釋)到Broker2呢,這就依賴networkConnector的配置。
注:同步這個詞用在這里可能不太准確,但也找不到一個更精確的詞來描述,實際上,二個broker用上述機制組建成小集群后,如果生產者連接到broker1,消費者連接到broker2,當消息發送到broker1后,broker1不會將該消息復制一份到broker2,而是等消費者從broker2上消費該消息時,這條消息才從broker1取到broker2上,相當於此時broker2是消費者,從broker1消費了一條消息,然后broker2上就有這條消息了,最終消費者才能broker2上拿到這條消息。

1 <beans 2 xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 5 http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> 6 7 <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> 8 <property name="locations"> 9 <value>file:${activemq.conf}/credentials.properties</value> 10 </property> 11 </bean> 12 13 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="activemq-1"> 14 <networkConnectors> 15 <networkConnector uri="static:(tcp://127.0.0.1:61626)"/> 16 </networkConnectors> 17 <persistenceAdapter> 18 <kahaDB directory="${activemq.data}/kahadb"/> 19 </persistenceAdapter> 20 <transportConnectors> 21 <transportConnector name="openwire" 22 uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> 23 </transportConnectors> 24 </broker> 25 26 <import resource="jetty.xml"/> 27 </beans>
注意:14-16行及21-22行,該Broker對外暴露61616端口,同時"連接"到61626端口(即另1個broker),最終的效果相當於,如果有producer把消息發到61616(broker1),則從另一個broker(61626端口)上也能消費這條消息。
明白這些基本原理后,在61626對應的activemq上,也做類似的配置,只不過"連接方向"正好相反,參考以下配置:

1 <beans 2 xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 5 http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> 6 7 <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> 8 <property name="locations"> 9 <value>file:${activemq.conf}/credentials.properties</value> 10 </property> 11 </bean> 12 13 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="activemq-2"> 14 <networkConnectors> 15 <networkConnector uri="static:(tcp://127.0.0.1:61616)"/> 16 </networkConnectors> 17 <persistenceAdapter> 18 <kahaDB directory="${activemq.data}/kahadb"/> 19 </persistenceAdapter> 20 <transportConnectors> 21 <transportConnector name="openwire" 22 uri="tcp://0.0.0.0:61626?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> 23 </transportConnectors> 24 </broker> 25 26 <import resource="jetty.xml"/> 27 </beans>
(注:如果希望2個activemq上都能訪問admin管理界面,jetty.xml中的端口要修改,不要沖突)
這樣,activemq-1與activemq-2這二個broker就互為主備,發給你的消息會同步到我,發給我的消息也會同步到你,實現了HA,示意圖如下:
Producer與Consumer連接到activemq時,配置文件可以這么寫:
1 <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> 2 <property name="connectionFactory"> 3 <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 4 <!--broker服務的地址--> 5 <property name="brokerURL" value="failover:(tcp://localhost:61616,tcp://localhost:61626)"/> 6 ... 7 </bean> 8 </property> 9 </bean>
這種HA方案的優點是占用的節點數更少(只需要2個節點),而且2個broker都可以響應消息的接收與發送,性能比zookeeper方案要好一些。
最后,再給一個簡化配置的技巧,以上述的2節點HA方案中,二個activemq的配置文件都要加networkConnector配置,如果想減輕配置的工作量,也可以只在其中一個activemq設置,參考以下片段:
<networkConnectors> <networkConnector uri="static:(tcp://127.0.0.1:61626)" duplex="true"/> </networkConnectors>
即:在61616這個activemq的配置文件中,添加networkConnector時,增加duplex="true",也就是雙工通訊的意思,這樣61616與61626這二個activemq上的broker就建立了雙向通訊連接,另一個activemq上就無需額外配置了(注:如果在61626上配置了,反而會報錯)
參考文章:
http://www.jakubkorab.net/2011/11/understanding-activemq-broker-networks.html
http://activemq.apache.org/networks-of-brokers.html
最后貼二段程序運行的輸出日志,以說明同步機制的正確性,打消回復中“大鵬520”的顧慮:
背景: *.*.*.15 與 *.*.*.16 作為HA(雙主)的activemq集群,發送程序只連接到15發消息,然后退出。 接收程序只從16上收消息,如果收到了,表明15上的消息同步到16。
下面是發送程序的輸出片段:(注意輸出日志中關於IP的部分,這是只連接到*.*.*.[b]15[/b]上發送的) (注:部分敏感信息,比如真實IP前綴,公司package名,用*代替了)
14:53:09,996 <*.DemoSender> INFO [main]: 准備發送消息...
14:53:10,270 <org.apache.activemq.transport.WireFormatNegotiator> DEBUG [main]: Sending: WireFormatInfo { version=11, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, Host=*.*.*.15, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
14:53:10,306 <org.apache.activemq.transport.InactivityMonitor> DEBUG [ActiveMQ Transport: tcp:///*.*.*.15:61616@50616]: Using min of local: WireFormatInfo { version=11, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, Host=*.*.*.15, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=11, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=104857600, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
14:53:10,306 <org.apache.activemq.transport.WireFormatNegotiator> DEBUG [ActiveMQ Transport: tcp:///*.*.*.15:61616@50616]: Received WireFormat: WireFormatInfo { version=11, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=104857600, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
14:53:10,306 <org.apache.activemq.transport.WireFormatNegotiator> DEBUG [ActiveMQ Transport: tcp:///*.*.*.15:61616@50616]: tcp:///*.*.*.15:61616@50616 before negotiation: OpenWireFormat{version=11, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
14:53:10,307 <org.apache.activemq.transport.WireFormatNegotiator> DEBUG [ActiveMQ Transport: tcp:///*.*.*.15:61616@50616]: tcp:///*.*.*.15:61616@50616 after negotiation: OpenWireFormat{version=11, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
14:53:10,416 <org.springframework.jms.core.JmsTemplate> DEBUG [main]: Executing callback on JMS Session: PooledSession { ActiveMQSession {id=ID:yangjunmings-MacBook-Pro.local-50615-1461480790088-1:1:1,started=false} java.lang.Object@5a56cdac }
14:53:10,471 <org.springframework.jms.core.JmsTemplate> DEBUG [main]: Sending created message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = message test:0}
14:53:10,516 <*.DemoSender> INFO [main]: 消息0發送完成!
14:53:11,520 <org.springframework.jms.core.JmsTemplate> DEBUG [main]: Executing callback on JMS Session: PooledSession { ActiveMQSession {id=ID:yangjunmings-MacBook-Pro.local-50615-1461480790088-1:1:1,started=false} java.lang.Object@5a56cdac }
14:53:11,521 <org.springframework.jms.core.JmsTemplate> DEBUG [main]: Sending created message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = message test:1}
運行完以后,關掉發送程序,然后啟動接收程序:
14:54:40,965 <org.apache.activemq.transport.WireFormatNegotiator> DEBUG [main]: Sending: WireFormatInfo { version=11, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, Host=*.*.*.16, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
14:54:41,002 <org.apache.activemq.transport.InactivityMonitor> DEBUG [ActiveMQ Transport: tcp:///*.*.*.16:61616@50642]: Using min of local: WireFormatInfo { version=11, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, Host=*.*.*.16, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=11, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=104857600, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
14:54:41,003 <org.apache.activemq.transport.WireFormatNegotiator> DEBUG [ActiveMQ Transport: tcp:///*.*.*.16:61616@50642]: Received WireFormat: WireFormatInfo { version=11, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, StackTraceEnabled=true, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=104857600, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
14:54:41,003 <org.apache.activemq.transport.WireFormatNegotiator> DEBUG [ActiveMQ Transport: tcp:///*.*.*.16:61616@50642]: tcp:///*.*.*.16:61616@50642 before negotiation: OpenWireFormat{version=11, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
14:54:41,004 <org.apache.activemq.transport.WireFormatNegotiator> DEBUG [ActiveMQ Transport: tcp:///*.*.*.16:61616@50642]: tcp:///*.*.*.16:61616@50642 after negotiation: OpenWireFormat{version=11, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
14:54:41,163 <org.apache.activemq.thread.TaskRunnerFactory> DEBUG [main]: Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@485966cc[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
14:54:41,181 <*.DemoListener> INFO [ActiveMQ Session Task-1]: message test:0
注意接收程序中IP的部分,這是從*.*.*.[b]16[/b]上接收到的,說明消息已經從 *.*.*.15同步到*.*.*.16上了,否則不可能收到消息。