從頭開始搭建一個Spring boot+ActiveMQ高可用分布式環境


背景

目前公司項目中有用到activemq,兩台機器上分別通過共享文件方式搭建了master-slave集群,但兩台機器之間並未組建broker cluster,而是在客戶端通過軟負載的方式隨機選擇一組提供服務來達到集群擴展的目的。

上面的方案主要問題在於需要通過軟負載去實現分布式的負載均衡算法,需要解決一系列問題。

下面的文章就在原有基礎上組建broker cluser(activemq自帶),基於學習的目的通過一次搭建過程來體驗下(畢竟我不是運維人員),下面是效果圖:不需要軟負載。

為了簡單,broker cluster只創建兩組,而且全部節點部署在同一台機器上。

節點名稱 tcp open-write端口 管理台端口 共享文件
master-a 61616 8161 /Users/iss/data/activemq/activemq-ha-a
slave-a 61617 8162 /Users/iss/data/activemq/activemq-ha-a
master-b 61618 8163 /Users/iss/data/activemq/activemq-ha-b
slave-b 61619 8164 /Users/iss/data/activemq/activemq-ha-b

activemq安裝

由於最新的版本需要jdk1.8,我這里選擇的是支持jdk1.7的5.14.3

簡單運行,我們只需要修改兩個端口即可:這兩文件在activemq安裝目錄的conf中

  • activemq.xml

這里只用到tcp,所以將其它的可以全部刪除,修改uri中的端口61616為節點的端口。


 <transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
  • jetty.xml

修改下面的port就行,這是activemq的管理系統端口。


<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
         <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8161"/>
</bean>

端口修改好之后,執行下面的腳本即可啟動,然后在data目錄下查看activemq.log。


bin/activemq start

master-slave搭建

為了防止activemq單節點出現故障影響提供服務,所以需要有一個備份的節點當主節點出現故障時馬上替補上。這里采用共享文件的方式,原理就是讓參與高可用的所有節點共用一個數據文件目錄,通過文件鎖的方式來決定誰是master誰是slave。我們需要做的就是將多個節點的數據目錄配置成相同的就行。

環境變量

在bin目錄下有個env文件,里面指定了activemq所使用到的各類變量,數據目錄路徑修改 ACTIVEMQ_DATA:

# Active MQ installation dirs
# ACTIVEMQ_HOME="<Installationdir>/"
# ACTIVEMQ_BASE="$ACTIVEMQ_HOME"
# ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf"
 ACTIVEMQ_DATA="/Users/iss/data/activemq/activemq-ha-a/data"
# ACTIVEMQ_TMP="$ACTIVEMQ_BASE/tmp"

先啟動master,然后再啟動slave,如果配置正常,在slave的啟動日志中會輸出如下日志,表示已經有master鎖定,自己將以slave角色運行。

2018-01-01 01:46:56,769 | INFO  | Database /Users/iss/data/activemq/activemq-ha-a/data/kahadb/lock is locked by another server. This broker is now in slave mode waiting a lock to be acquired | org.apache.activemq.store.SharedFileLocker | main

當master-a出現故障時系統會自動被slave-a取代。

brocker-cluster搭建

上面的高可用只是解決了單點故障問題,同一時間提供服務的只有master一個節點,這顯示無法面對數據量的增長需求,所以就需要一種可擴展節點的集群方式來解決面臨的問題。讓一個broker與其它broker互相通信,我們這里采用靜態uri方式,做法還是修改activemq.xml:

master-a與slave-a組成一個broker-a;master-b與slave-b組成一個broker-b,broker-a與broker-b組成broker cluster

  • broker-a配置修改

讓其能與broker-b通信


<networkConnectors>
    <networkConnector uri="static:(tcp://localhost:61618,tcp://localhost:61619)" duplex="false"/>
</networkConnectors>
  • broker-b配置修改

讓其能與broker-a通信


<networkConnectors>
    <networkConnector uri="static:(tcp://localhost:61616,tcp://localhost:61617)" duplex="false"/>
</networkConnectors>

由於本文出於簡單演示的目的,只組建了兩個broker,它們相互之間的通信配置也很容易。當broker實例比較多時,相互之前的橋接通信的配置還需要仔細研究,待后續補充......

spring-boot示例

整個工程結構如下,包含一個生產消息的,一個消費消息的。

pom引入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

創建activemq啟動配置類

  • brocker url

配置整個集群的url,包含全部master,slave,本文總共是4個。

  • JmsMessagingTemplate

發送消息時支持類,是對JmsTemplate的進一步包裝。

  • JmsListenerContainerFactory

@ComponentScan(basePackages = {"com.jim.framework.activemq"})
@Configuration
public class ActivemqConfiguration {

    private static final String BROKER_URL="failover:(tcp://192.168.10.222:61616,tcp://192.168.10.222:61617,tcp://192.168.10.222:61618,tcp://192.168.10.222:61619)";

    @Bean
    public Queue productActiveMQQueue(){
        return new ActiveMQQueue("jim.queue.product");
    }


    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue() {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(new ActiveMQConnectionFactory(BROKER_URL));
        return bean;
    }

    @Bean
    public JmsMessagingTemplate jmsMessagingTemplate(){
        return new JmsMessagingTemplate(new ActiveMQConnectionFactory(BROKER_URL));
    }
}

定義消息發送接口

public interface ProductSendMessage {

    void sendMessage(Object message);
}

實現消息生產者


@Service
public class ProductProducer implements ProductSendMessage {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue productActiveMQQueue;


    @Override
    public void sendMessage(Object message) {

        this.jmsMessagingTemplate.convertAndSend(this.productActiveMQQueue,message);
    }
}

實現消息消費者

@JmsListener,這個注解即標識監聽哪一個消息隊列。


@Component
public class ProductConsumer {

    @JmsListener(destination = "jim.queue.product",containerFactory = "jmsListenerContainerQueue")
    public void receiveQueue(String text) {
        System.out.println("Consumer,productId:"+text);
    }

}

客戶端調用

簡單的一個web工程,訪問某個鏈接時發送消息


@RestController
@RequestMapping("/product")
public class ProductController{

    @Autowired
    private ProductProducer productProducer;

    @RequestMapping("/{productId}")
    public Long getById(@PathVariable final long productId) {

        this.productProducer.sendMessage(productId);
        return productId;
    }

}

當訪問請求后,看看消費方的輸出:請求分別轉發到了61616以及61618兩個master上了,實現了自動負載均衡。


2018-01-01 09:03:43.683  INFO 18418 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to tcp://192.168.10.222:61616
Consumer,productId:80
2018-01-01 09:03:45.794  INFO 18418 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to tcp://192.168.10.222:61616
Consumer,productId:80
2018-01-01 09:03:47.745  INFO 18418 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to tcp://192.168.10.222:61618
Consumer,productId:80
2018-01-01 09:03:49.669  INFO 18418 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to tcp://192.168.10.222:61616
Consumer,productId:80

模似一個master出現故障,停止master-a后出現這樣的日志,顯然activemq客戶端已經檢測到。

2018-01-01 11:25:19.348  WARN 18418 --- [222:61616@55277] o.a.a.t.failover.FailoverTransport       : Transport (tcp://192.168.10.222:61616) failed , attempting to automatically reconnect: {}

java.io.EOFException: null
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_121]
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) ~[activemq-client-5.14.5.jar:5.14.5]
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.14.5.jar:5.14.5]
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.14.5.jar:5.14.5]
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.14.5.jar:5.14.5]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]

再次請求測試鏈接:發現在停止到master-a后,slave-a(61617)已經成功取代原來的master-a(61616),現在請求已經成功負載到新的master上。


2018-01-01 11:25:19.383  INFO 18418 --- [ActiveMQ Task-3] o.a.a.t.failover.FailoverTransport       : Successfully reconnected to tcp://192.168.10.222:61618
2018-01-01 11:26:47.652  INFO 18418 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to tcp://192.168.10.222:61618
Consumer,productId:80
2018-01-01 11:26:55.408  INFO 18418 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to tcp://192.168.10.222:61618
Consumer,productId:80
2018-01-01 11:26:57.446  INFO 18418 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to tcp://192.168.10.222:61617
Consumer,productId:80

本文源碼

https://github.com/jiangmin168168/jim-framework/tree/master/jim-framework-activemq


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM