因為本文會用到集群介紹,因此准備了三台虛擬機(當然讀者也可以使用一個虛擬機,然后使用不同的端口來模擬實現偽集群):
192.168.209.133 test1 192.168.209.134 test2 192.168.209.135 test3
因為ActiveMQ是java編寫,因此需要java的運行環境,這個不做介紹,網上有一堆的教程。
其次,下載ActiveMQ包,官網下載地址:https://archive.apache.org/dist/activemq/ ,讀者可以選擇一個版本下載使用,比如,當前最新版是5.16.1,下載地址:https://archive.apache.org/dist/activemq/5.16.1/
官網下載速度比較慢,可能需要一兩個小時,有百度網盤會員的可以移步百度網盤:https://pan.baidu.com/s/1-ToyzCqo_ypk7FXcjgI3yg (提取碼: jcd7 )
單機安裝與配置
首先將tar包傳到linux上去(我這里使用的是Ubuntu16.04),然后在tar包所在目錄進行解壓:
# -C 表示解壓出來的文件保存目錄,默認是tar包所在目錄,這里我選擇的是/opt目錄,注意修改權限,如果你是root用戶,則不需要修改
sudo tar -zxf apache-activemq-5.16.1-bin.tar.gz -C /opt
其實我們下載好的tar包是編譯打包好的,只需解壓就可以啟動了:
# 使用后台線程啟動
sudo ./bin/activemq start
# 使用控制台方式啟動,這種方式會造成當前shell阻塞,如果想使用服務單元或者supervisor這樣的工具做守護進程,那么應該采用這種啟動方式
sudo ./bin/activemq console
# 停止應用
sudo ./bin/activemq stop
# 重啟應用
sudo ./bin/activemq restart
第一次建議采用控制台console的形式啟動,看時候會報錯,報錯則會打印異常信息,方便我們排查,啟動后大概是這樣的:
注意輸出的信息,可以看到:
1、ActiveMQ版本是5.16.1,使用KahaDB做持久化,版本7
2、openwire方式連接啟動,監聽:tcp://test1:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600,還有ampq、stomp、mqtt、ws等分別在不同端口啟動監聽(注,這里的test1是在hosts中添加的,看我上面提供的三個虛擬機)
3、Jetty服務啟動,其實就是啟動ActiveMQ的管理后台
4、ActiveMQ管理后台地址:http://127.0.0.1:8161/,ActiveMQ的Jolokia API地址:http://127.0.0.1:8161/api/jolokia/,Jolokia API主要是通過API接口獲取ActiveMQ的狀態信息,方便第三方程序進行監控
主要到,上面服務啟動后,管理后台啟動在127.0.0.1:8161,這表明我們只能在本地訪問,虛擬機以外的主機是訪問不了的,所以我需要修改相關配置。
配置文件在 conf 目錄下,我們常用到的配置文件就是activemq.xml以及jetty.xml,里面全是一些 java bean(有點少見的東西)。activemq.xml主要是ActiveMQ相關配置,而jetty.xml主要是管理后台的一些配置,接下來介紹一些常用的幾個配置:
1、修改Connector
打開conf/activemq.xml,找到 transportConnectors 節點配置,注釋掉多余的連接協議,只留自己需要的就行了,比如我一般用openwire,那我就將其它的注釋掉。
說明一下,為什么要這么做,主要是沒必要,也可以避免端口沖突,比如,rabbitmq默認采用ampq監聽5672端口,這時你會發現activemq啟動不了,因為端口沖突。
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <!--<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>--> </transportConnectors>
需要注意的是,連接端口也是在上面的uri中去配置。
2、查看持久化方式配置
在conf/activemq.xml找到 persistenceAdapter 節點,這時消息持久化模式配置,可以看到默認的方式是kahaDB,數據目錄是data/kahadb
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
這個節點只是說明一下,因為后續介紹ActiveMQ集群模式就是修改這個節點kahaDB為replicatedLevelDB進行配置,所以這個暫時可以不用動,當然如果你願意,可以修改成AMQ、JDBC等其他方式。
3、死信隊列配置
ActiveMQ提供了眾多的策略,這里以死信隊列策略配置為例來介紹。
在conf/activemq.xml找到 destinationPolicy 節點,這里配置的是一些隊列和topic的策略,當然包括死信隊列了,默認策略配置如下:
說明: policyEntry 節點是配置入口
topic=">" 表示對所有的topic生效,我們也可以增加一個policyEntry節點,用途queue屬性來表示對隊列生效,如下文介紹
pendingMessageLimitStrategy 消息限制策略,只對Topic且非持久化訂閱者有效,用於當通道中有大量的消息積壓時,broker可以保留的消息量,這是為了防止Topic中有慢速消費者,導致整個通道消息積壓
constantPendingMessageLimitStrategy
保留固定條數的消息,如果消息量超過了限制,將使用消息剔除策略移除消息
ActiveMQ死信隊列配置策略主要有兩種:共享的死信策略(SharedDeadLetterStrategy)、單獨的死信策略(IndividualDeadLetterStrategy)
共享的死信策略(SharedDeadLetterStrategy)
共享的死信策略(SharedDeadLetterStrategy)表示將所有的死信消息保存在一個共享隊列中,這是ActiveMQ的默認策略
常用配置屬性:
deadLetterQueue:共享隊列名稱,默認的隊列名為 ActiveMQ.DLQ
processExpired:表示是否將過期消息放入死信隊列,默認為true
processNonPersistent:表示是否將“非持久化”消息放入死信隊列,默認為false
需要注意的是,之前共享的死信策略配置可以是這樣子:
<deadLetterStrategy> <sharedDeadLetterStrategy deadLetterQueue="DLQ.Queue"/> </deadLetterStrategy>
但是不知道從什么時候起,這樣配置會拋出異常:
Caused by: java.lang.IllegalStateException: Cannot convert value of type 'java.lang.String' to required type 'org.apache.activemq.command.ActiveMQDestination' for property 'deadLetterQueue': no matching editors or conversion strategy found at org.springframework.beans.TypeConverterDelegate.convertIfNecessary(TypeConverterDelegate.java:307) at org.springframework.beans.AbstractNestablePropertyAccessor.convertIfNecessary(AbstractNestablePropertyAccessor.java:588) ... 52 more
但是我們可以使用bean去配置,如:
<bean id="deadLetterQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg name="name" value="DLQ.Queue"/> </bean> ...... <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" > <deadLetterStrategy> <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy"> <property name="deadLetterQueue" ref="deadLetterQueue" /> </bean> </deadLetterStrategy> </policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
....
</broker>
上面的配置中,定義了一個id為deadLetterQueue的bean,然后在策略中,queue=">" 表示對所有隊列生效,如果想對topic生效,改成 topic=“>” 即可,而策略時一個bean,class指向SharedDeadLetterStrategy,它的屬性deadLetterQueue指向我們前面定義的那個bean的id
其實,上面的配置就是說,對於所有的隊列的死信消息,通通發到一個名為DQL.Queue的隊列中去。
單獨的死信策略(IndividualDeadLetterStrategy)
單獨的死信策略(IndividualDeadLetterStrategy)表示把死信消息放入各自的死信通道中,區分的條件時使用不同的前綴,死信通道可以是隊列,也可以是topic 常用配置屬性: queuePrefix:當死信隊列是queue時使用的前綴,默認是 ActiveMQ.DLQ.Queue. topicPrefix:當死信隊列是topic是使用的前綴,默認是 ActiveMQ.DLQ.Topic. useQueueForTopicMessages:表示是否將Topic的死信消息保存在Queue中,默認為true,表示Topic的死信消息保存至Queue中
processExpired:表示是否將過期消息放入死信隊列,默認為true
processNonPersistent:表示是否將“非持久化”消息放入死信隊列,默認為false
例如:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" > <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="queue.DLQ." useQueueForQueueMessages="false"/> </deadLetterStrategy> </policyEntry> <policyEntry topic=">" > <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000"/> </pendingMessageLimitStrategy> <deadLetterStrategy> <individualDeadLetterStrategy topicPrefix="topic.DLQ." useQueueForQueueMessages="true"/> </deadLetterStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>
上面的例子中,queue=">" 表示對所有隊列啟用策略,即對每個的隊列的死信消息,保存到一個以queue.DLQ.[隊列名]的topic中,比如一個名為demo的隊列對應的死信通道是一個名稱是queue.DLQ.demo的topic。
同理,topic=">"表示對所有topic生效,表示將每個topic的死信消息保存在一個已topic.DLQ.[topic名]的隊列中,比如一個名稱為demo的topic對應的死信通道是一個名稱為topic.DLQ.demo的隊列
4、管理后台啟動配置
前面說到默認情況下啟動時,ActiveMQ管理后台綁定到127.0.01:8161,這表示我們只能在本地訪問,這肯定是不能接受的,我們需要修改這個地址。
打開conf/jetty.xml,找到id="jettyPort"的bean,
修改這里的host成0.0.0.0,端口隨意,也可以使用默認的8161:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8161"/> </bean>
保存后啟動ActiveMQ,然后就可以在瀏覽器上訪問管理后台了,比如我部署的服務器IP是192.168.209.133,那么就訪問http://192.168.209.133:8161,接着會要求輸入賬號和密碼,默認都是admin(至於怎么添加用戶及管理權限,這就不在本文范圍內了,有空再單獨用博文介紹),登錄后進入歡迎也,然后在點擊Manage ActiveMQ broker進入管理后台:
展示的管理界面如下,至於相關頁面的操作信息等等,就不做介紹了:
集群部署及配置
說起ActiveMQ的集群,其實它是一種主從架構,是一種和redis集群中的主從模式很像,就是集群只有主節點提供服務,子節點只是備用,當主節點掛了,就會選用一個子節點作為主節點繼續提供服務,如:
1、正常情況下是主節點提供服務,子節點作為備用只從主節點同步數據
2、當主節點掛了,就會重新選擇一個子節點作為主節點重新提供服務
3、如果之后主節點又起來了,此時它將變作為子節點留作備用
根據ActiveMQ官網的介紹,ActiveMQ集群有三種部署方式:基於共享文件系統的主從架構(文件系統如:SAN)、基於數據庫的主從架構,基於Zookeeper的主從架構(http://activemq.apache.org/masterslave.html)
這里介紹基於Zookeeper的主從架構。
准備部署
1、因為是基於Zookeeper的主從架構,因此我們需要一個Zookeeper集群服務,可以參考博文:Zookeeper基礎教程(二):Zookeeper安裝
我這里采用集群地址就是 192.168.209.133:2181,192.168.209.134:2181,192.168.209.135:2181 (其實就是在當前准備部署ActiveMQ的三台虛擬機上部署的Zookeeper集群)
2、安裝上面介紹的單機部署的方式,先在三個節點都安裝部署完成ActiveMQ
3、接着就到最重要的環節了。
打開conf/activemq.xml,修改 persistenceAdapter 節點中的持久化方式為 replicatedLevelDB ,如:
test1節點
<persistenceAdapter> <!--<kahaDB directory="${activemq.data}/kahadb"/>--> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:61619" zkAddress="192.168.209.133:2181,192.168.209.134:2181,192.168.209.135:2181" zkPath="/activemq" hostname="test1" /> </persistenceAdapter>
test2節點
<persistenceAdapter> <!--<kahaDB directory="${activemq.data}/kahadb"/>--> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:61619" zkAddress="192.168.209.133:2181,192.168.209.134:2181,192.168.209.135:2181" zkPath="/activemq" hostname="test2" /> </persistenceAdapter>
test3節點
<persistenceAdapter> <!--<kahaDB directory="${activemq.data}/kahadb"/>--> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://0.0.0.0:61619" zkAddress="192.168.209.133:2181,192.168.209.134:2181,192.168.209.135:2181" zkPath="/activemq" hostname="test3" /> </persistenceAdapter>
說明一下:
directory:表示的是當前節點數據文件保存的目錄,不存在就會自動創建,默認值是LevelDB
,注意,這個目錄是根據ActiveMQ的根目錄來設置,比如上面的配置,是將數據文件保存在ActiveMQ的根目錄下的 data/leveldb 目錄下 replicas:集群節點數,因為采用Zookeeper,要求這個值至少為3
bind:集群通信地址端口配置,就是說如果當前節點變成了master節點,然后slave節點會根據這個地址端口進行數據同步。
zkAddress:Zookeeper集群地址,默認值是 127.0.0.1:2181
zkPath:根ZNode節點配置,你總不希望Zookeeper只為一個ActiveMQ集群服務吧?默認值是 /default
zkPassword:如果Zookeeper有認證,那這個就是認證信息,比如我這里沒有,就可以不用配置
hostname:集群內節點通信時采用的hostname,可以認為就是節點的名字,每個節點應該設置成不一樣的hostname,建議采用IP作為hostname的值,如果不設置,ActiveMQ將會自動創建一個hostname
注意,集群要求每個節點對Zookeeper的配置都一樣,比如我這里,要求每個節點的 replicas、zkAddress、zkPath 配置值是一樣的。replicatedLevelDB節點更多的配置可以參考官網介紹:https://activemq.apache.org/replicated-leveldb-store
在啟動Zookeeper之后,在每個節點ActiveMQ的根目錄下使用下面的命令啟動ActiveMQ:
# 這里采用console啟動,因為它會輸出日志,如果報錯方便我們處理,不報錯的話,可以在使用start后台啟動: sudo ./bin/activemq start
sudo ./bin/activemq console
啟動之后,查看Zookeeper,在上面配置的zkPath節點下會有每個broker節點的配置:
管理后台
集群啟動之后,我們可以查看管理后台,但是需要注意的是,ActiveMQ集群是master-slave模式,只有master節點對外提供服務,slave節點只是備份,因此,如果要訪問master節點的管理后台。
比如上圖,我這邊test3被選為master,因此我訪問的是:http://192.168.209.135:8161
那么你問,如果master節點掛了呢?那就對不起了,原master節點的管理后台地址就訪問不了了,你必須訪問新的master節點的管理后台才行,如果嫌麻煩,可以自行使用nginx做處理。
另外,如果集群部署啟動報錯:java.io.IOException: com/google/common/util/concurrent/internal/InternalFutureFailureAccess
結語
ActiveMQ安裝好了,我們也可以使用程序訪問試試,提示一下,單機部署的ActiveMQ和集群部署的ActiveMQ在代碼中的訪問地址形式可能不一樣,比如我用的C#使用 Apache.NMS.ActiveMQ 第三方包訪問ActiveMQ:
單機部署時訪問地址是:
string brokerUri = "activemq:tcp://192.168.209.133:61616"; NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
集群部署時訪問地址是:
string brokerUri = "activemq:failover:(tcp://192.168.209.133:61616,tcp://192.168.209.134:61616,tcp://192.168.209.135:61616)"; NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);
注意:集群部署時訪問地址中的端口是conf/activemq.xml中transportConnectors節點下配置的端口,不是replicatedLevelDB節點bind屬性配置的節點,bind屬性配置的節點是如果當前節點是master時,它與其它slave的通信端口。
另外,知道怎么部署了,我們還可以試下ActiveMQ眾多的策略配置及用法,還可以嘗試基於共享文件系統的主從架構(文件系統如:SAN)、基於數據庫的主從架構搭建集群,想更深入點的話可以看看集群的負載均衡配置原理。
ActiveMQ的安裝部署就介紹到這里吧,主要是還有安裝啟動過程中還有幾個報錯,等之后再寫吧。