springboot整合ActiveMQ 2(主備模式,負載均衡)


  單個 MQ 節點總是不可靠的,一旦該節點出現故障,MQ 服務就不可用了,勢必會產生較大的損失。這里記錄 activeMQ 如何開啟主從備份,一旦 master(主節點故障),slave(從節點)立即提供服務,實現原理是運行多個 MQ 使用同一個持久化數據源,這里以 jdbc 數據源為例。同一時間只有一個節點(節點 A)能夠搶到數據庫的表鎖,其他節點進入阻塞狀態,一旦 A 發生錯誤崩潰,其他節點就會重新獲取表鎖,獲取到鎖的節點成為 master,其他節點為 slave,如果節點 A 重新啟動,也將成為 slave。

  主從備份解決了單節點故障的問題,但是同一時間提供服務的只有一個 master,顯然是不能面對數據量的增長,所以還需要一種橫向拓展的集群方式來解決面臨的問題。

一、activeMQ 設置

1、平台版本說明:

  • 平台:windows
  • activeMQ 版本:5.9.1,下載地址
  • jdk 版本:1.8

2、下載 jdbc 依賴

  下載下面三個依賴包,放到 activeMQ 安裝目錄下的 lib 文件夾中。

mysql 驅動

dhcp 依賴

commons-pool2 依賴

二、主從備份

1、修改 jettty

  首先修改 conf->jetty.xml,這里是修改 activemq 的 web 管理端口,管理界面賬號密碼默認為 admin/admin

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="port" value="8161"/>
</bean>

2、修改 activemq.xml

  然后修改 conf->activemq.xml

  • 設置連接方式

    默認是下面五種連接方式都打開,這里我們只要 tcp,把其他的都注釋掉,然后在這里設置 activemq 的服務端口,可以看到每種連接方式都對應一個端口。

    <transportConnectors>
      <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
      <!-- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
      <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
      <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
      <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> -->
    </transportConnectors>
    
  • 設置 jdbc 數據庫

    mysql 數據庫中創建 activemq 庫,在broker標簽的下面也就是根標簽beans的下一級創建一個 bean 節點,內容如下:

    <bean id="mysql-qs" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
        <property name="username" value="root"/>
        <property name="password" value="123456"/>
        <property name="poolPreparedStatements" value="true"/>
    </bean>
    
  • 設置數據源

    首先修改 broker 節點,設置 name 和 persistent(默認為 true),也可不做修改,修改后如下:

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="mq1" persistent="true" dataDirectory="${activemq.data}">
    

    然后設置持久化方式,使用到我們之前設置的 mysql-qs

    <persistenceAdapter>
      <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
      <jdbcPersistenceAdapter dataDirectory="${activemq.base}/activemq-data" dataSource="#mysql-qs"/>
    </persistenceAdapter>
    

3、啟動

  設置完畢后啟動 activemq(雙擊 bin 中的 acitveMQ.jar),啟動完成后可以看到如下日志信息:

 INFO | Using a separate dataSource for locking: org.apache.commons.dbcp2.BasicDataSource@179ece50
 INFO | Attempting to acquire the exclusive lock to become the Master broker
 INFO | Becoming the master on dataSource: org.apache.commons.dbcp2.BasicDataSource@179ece50

​ 接着我們修改一下 tcp 服務端口,改為 61617,然后重新啟動,日志信息如下:

 INFO | Using a separate dataSource for locking: org.apache.commons.dbcp2.BasicDataSource@179ece50
 INFO | Attempting to acquire the exclusive lock to become the Master broker
 INFO | Failed to acquire lock.  Sleeping for 10000 milli(s) before trying again...
 INFO | Failed to acquire lock.  Sleeping for 10000 milli(s) before trying again...
 INFO | Failed to acquire lock.  Sleeping for 10000 milli(s) before trying again...
 INFO | Failed to acquire lock.  Sleeping for 10000 milli(s) before trying again...

可以看到從節點一直在嘗試獲取表鎖成為主節點,這樣一旦主節點失效,從節點能夠立刻取代主節點提供服務。這樣我們便實現了主從備份。

三、負載均衡

  activemq 可以實現多個 mq 之間進行路由,假設有兩個 mq,分別為 brokerA 和 brokerB,當一條消息發送到 brokerA 的隊列 test 中,有一個消費者連上了 brokerB,並且想要獲取 test 隊列,brokerA 中的 test 隊列就會路由到 brokerB 上。

   開啟負載均衡需要設置networkConnectors節點,靜態路由配置如下:

<networkConnectors>
  <networkConnector uri="static:failover://(tcp://localhost:61616,tcp://localhost:61617)"           duplex="false"/>
</networkConnectors>

brokerA 和 brokerB 都要設置該配置,以連上對方。

四、測試

1、建立 mq

  組建兩組 broker,每組做主從配置。

  • brokerA:
    • 主:設置 web 管理端口 8761,設置 mq 名稱mq,設置數據庫地址為 activemq,設置 tcp 服務端口 61616,設置負載均衡靜態路由static:failover://(tcp://localhost:61618,tcp://localhost:61619),然后啟動
    • 從:上面的基礎上修改 tcp 服務端口為 61617,然后啟動
  • brokerB:
    • 主:設置 web 管理端口 8762,設置 mq 名稱mq1,設置數據庫地址 activemq1,設置 tcp 服務端口 61618,設置負載均衡靜態路由static:failover://(tcp://localhost:61616,tcp://localhost:61617),然后啟動
    • 從:上面的基礎上修改 tcp 服務端口為 61619,然后啟動

2、springboot 測試

   沿用上一篇的項目,修改配置文件的 broker-url 為failover:(tcp://localhost:61616,tcp://localhost:61617,tcp://localhost:61618,tcp://localhost:61619),然后啟動項目訪問會在控制台看到如下日志:

2018-07-31 15:09:25.076  INFO 12780 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to tcp://localhost:61618
1I'm from queue1:hello
2018-07-31 15:09:26.599  INFO 12780 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to tcp://localhost:61618
2I'm from queue1:hello
2018-07-31 15:09:29.002  INFO 12780 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to tcp://localhost:61616
1I'm from queue1:hello
2018-07-31 15:09:34.931  INFO 12780 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to tcp://localhost:61618
2I'm from queue1:hello

證明負載均衡成功。

本文原創發布於:https://www.tapme.top/blog/detail/2018-09-06-10-38


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM