ActiveMQ集群


1.ActiveMQ集群介紹

1.為什么要集群?   

  實現高可用,以排除單點故障引起的服務中斷

  實現負載均衡,以提升效率為更多客戶提供服務

2.集群方式

  客戶端集群:讓多個消費者消費同一個隊列

  Broker Cluster:多個Broker之間同步消息(做不了高可用,可以實現負載均衡)

  Master-Slave:高可用(做不了負載均衡)

3.ActiveMq失效轉移

  允許當其中一台消息服務器宕機時,客戶端在傳輸層上重新連接其他消息服務器。

  語法:failover:(uri1,...uriN)?transportOptions

    transportOptions參數說明:randomize:默認為true,表示在URI列表中選擇URI連接時是否采用隨機策略

                initialReconnectDelay默認為10,默認10毫秒,表示第一次嘗試重連之間等待的時間。

                maxReconnectDelay:默認30000,單位毫秒,最長重連時間的間隔

 4.Master/Slave集群配置

  Share nothing storage master/slave(已經過時,5.8之后移除)

  shared storage master/slave 共享存儲(實際上是共享同一文件夾,只不過采用排他鎖,所以只有一個master節點可以訪問,當此服務宕機,另一台slave可以快速強占排他鎖,所以不會造成數據丟失,使用同一文件夾下的東西。如果多態服務器的話需要搭建文件共享服務器)

  Replicated LevelDB Store 基於復制的LevelDB Store

1.共享存儲原理:(獲取排他鎖才可以提供消息服務)--簡單方式

    

 2.Replicated LevelDB Store 基於復制的LevelDB Store原理(基於zookeper)

3.兩種方式對比:

  高可用 負載均衡
Master/Slave
Broker Cluster

 

4.三台機器的完美集群方案:(實現高可用和負載均衡)

 

2.ActiveMQ集群配置

1.方案介紹

 方案如下: B與C采用master-slave共享文件夾存儲(任一時刻只有一個可以占有排他鎖,也就是只有一個可以提供服務),當A宕機之后B會獲取資源鎖提供服務---實現高可用

       A與B、A與C都采用Broker 集群,可以同時提供服務,不管B與C誰獲取鎖,A都可以提供服務(A只能提供服務消費消息,不生成消息)---實現負載均衡

  

 

配置如下:(同一個電腦不同端口模擬集群)

    

 

 2.配置文件 (修改的配置文件都是在apache-activemq-5.15.6\conf文件夾下)

   

 

(1)activemq-a下面的配置:---A服務器

activemq.xml:(注釋掉其他協議,服務端口使用61616端口;增加靜態網絡連接器,同時連接B與C)

 jetty.xml:  采用默認的8161端口

 

(2)activemq-b下面的配置:---B服務器

 activemq.xml:(增加與A的靜態連接器,修改服務端口采用61617,修改共享文件夾的地址)

 

 

 

jitty.xml:修改端口采用8162

 

 

(3)activemq-c下面的配置:---C服務器

 activemq.xml:(增加與A的靜態連接器,修改服務端口采用61618,修改共享文件夾的地址)

 

 

jitty.xml:修改端口采用8163

 

3.依次啟動ActiveMq進行測試 

  啟動順序是:A->B->C,下面通過端口信息驗證集群:

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8161
  TCP    0.0.0.0:8161           0.0.0.0:0              LISTENING       2423496
  TCP    [::]:8161              [::]:0                 LISTENING       2423496

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8162
  TCP    0.0.0.0:8162           0.0.0.0:0              LISTENING       2423756
  TCP    [::]:8162              [::]:0                 LISTENING       2423756
liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8163

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61616
  TCP    0.0.0.0:61616          0.0.0.0:0              LISTENING       2423496
  TCP    127.0.0.1:55338        127.0.0.1:61616        ESTABLISHED     2423756
  TCP    127.0.0.1:61616        127.0.0.1:55338        ESTABLISHED     2423496
  TCP    [::]:61616             [::]:0                 LISTENING       2423496

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61617
  TCP    0.0.0.0:61617          0.0.0.0:0              LISTENING       2423756
  TCP    127.0.0.1:55345        127.0.0.1:61617        ESTABLISHED     2423496
  TCP    127.0.0.1:61617        127.0.0.1:55345        ESTABLISHED     2423756
  TCP    [::]:61617             [::]:0                 LISTENING       2423756
liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61618

  由於先啟動的B,B占有排斥鎖,所以B(8162-61617)處於監聽狀態,而C與B采用共享文件排他鎖集群,所以C處於阻塞狀態,也就是沒有監聽端口,被阻塞。

 

 停掉B服務器,模擬B服務器宕機的情況,再次查看端口:

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8161
  TCP    0.0.0.0:8161           0.0.0.0:0              LISTENING       2423496
  TCP    [::]:8161              [::]:0                 LISTENING       2423496

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8162

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8163
  TCP    0.0.0.0:8163           0.0.0.0:0              LISTENING       2423848
  TCP    [::]:8163              [::]:0                 LISTENING       2423848

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61616
  TCP    0.0.0.0:61616          0.0.0.0:0              LISTENING       2423496
  TCP    127.0.0.1:56704        127.0.0.1:61616        ESTABLISHED     2423848
  TCP    127.0.0.1:61616        127.0.0.1:56704        ESTABLISHED     2423496
  TCP    [::]:61616             [::]:0                 LISTENING       2423496

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61617

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61618
  TCP    0.0.0.0:61618          0.0.0.0:0              LISTENING       2423848
  TCP    127.0.0.1:56791        127.0.0.1:61618        ESTABLISHED     2423496
  TCP    127.0.0.1:61618        127.0.0.1:56791        ESTABLISHED     2423848
  TCP    [::]:61618             [::]:0                 LISTENING       2423848

liqiang@root MINGW64 ~/Desktop

  由於B宕機,所以C會強占排他鎖,也就是會監聽端口,所以可以看到8163與61618端口的監聽狀態。

 

  現在模擬C也宕機,將C服務器也停掉。再次查看端口信息:

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8161
  TCP    0.0.0.0:8161           0.0.0.0:0              LISTENING       2423496
  TCP    [::]:8161              [::]:0                 LISTENING       2423496

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8162

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8163

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61616
  TCP    0.0.0.0:61616          0.0.0.0:0              LISTENING       2423496
  TCP    [::]:61616             [::]:0                 LISTENING       2423496

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61617
  TCP    127.0.0.1:57242        127.0.0.1:61617        SYN_SENT        2423496

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61618

liqiang@root MINGW64 ~/Desktop

 

  此時A服務器仍然在監聽端口。

  上面證明集群搭建成功,下面重新開啟A->B->C服務器開始程序驗證。

4.程序驗證集群

服務器開啟順序性:A->B->C(此時B占有排他鎖,B監聽端口,A一直監聽)

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61617

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61616
  TCP    0.0.0.0:61616          0.0.0.0:0              LISTENING       2428232
  TCP    127.0.0.1:60520        127.0.0.1:61616        ESTABLISHED     2427192
  TCP    127.0.0.1:61616        127.0.0.1:60520        ESTABLISHED     2428232
  TCP    [::]:61616             [::]:0                 LISTENING       2428232

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61617
  TCP    0.0.0.0:61617          0.0.0.0:0              LISTENING       2427192
  TCP    127.0.0.1:60513        127.0.0.1:61617        ESTABLISHED     2428232
  TCP    127.0.0.1:61617        127.0.0.1:60513        ESTABLISHED     2427192
  TCP    [::]:61617             [::]:0                 LISTENING       2427192

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 61618

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8161
  TCP    0.0.0.0:8161           0.0.0.0:0              LISTENING       2428232
  TCP    [::]:8161              [::]:0                 LISTENING       2428232

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8162
  TCP    0.0.0.0:8162           0.0.0.0:0              LISTENING       2427192
  TCP    [::]:8162              [::]:0                 LISTENING       2427192
liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8163

 

代碼測試:

生產者:  url加了失效策略,而且采用隨機選取,生產消息的地址只有B與C服務器地址

package cn.qlq.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 生產消息
 * 
 * @author QiaoLiQiang
 * @time 2018年9月18日下午11:04:41
 */
public class MsgProducer {

    private static final String url = "failover:(tcp://localhost:61617,tcp://localhost:61618)?randomize=true";
    private static final String queueName = "myQueue";

    public static void main(String[] args) throws JMSException {
        // 1創建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        // 2.由connectionFactory創建connection
        Connection connection = connectionFactory.createConnection();
        // 3.啟動connection
        connection.start();
        // 4.創建Session===第一個參數是是否事務管理,第二個參數是應答模式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5.創建Destination(Queue繼承Queue)
        Queue destination = session.createQueue(queueName);
        // 6.創建生產者producer
        MessageProducer producer = session.createProducer(destination);
        for (int i = 0; i < 100; i++) {
            // 7.創建Message,有好多類型,這里用最簡單的TextMessage
            TextMessage tms = session.createTextMessage("textMessage:" + i);
            // 8.生產者發送消息
            producer.send(tms);

            System.out.println("send:" + tms.getText());
        }
        // 9.關閉connection
        connection.close();
    }

}

 

消費者:url加了失效策略,而且采用隨機選取,生產消息的地址有ABC服務器

package cn.qlq.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消費消息
 * 
 * @author QiaoLiQiang
 * @time 2018年9月18日下午11:26:41
 */
public class MsgConsumer {

    private static final String url = "failover:(tcp://localhost:61616,tcp://localhost:61617,tcp://localhost:61618)?randomize=true";
    private static final String queueName = "myQueue";

    public static void main(String[] args) throws JMSException {
        // 1創建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        // 2.由connectionFactory創建connection
        Connection connection = connectionFactory.createConnection();
        // 3.啟動connection
        connection.start();
        // 4.創建Session===第一個參數是是否事務管理,第二個參數是應答模式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5.創建Destination(Queue繼承Queue)
        Queue destination = session.createQueue(queueName);
        // 6.創建消費者consumer
        MessageConsumer consumer = session.createConsumer(destination);
        // 7.給消費者綁定監聽器(消息的監聽是一個異步的過程,不可以關閉連接,綁定監聽器線程是一直開啟的,處於阻塞狀態,所以可以在程序退出關閉)
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                // 7.1由於消費者接受的是TextMessage,所以強轉一下
                TextMessage tms = (TextMessage) message;
                try {
                    System.out.println("接收消息:" + tms.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }

}

 

啟動生產者發布100條消息:

管理界面查看:(消息被發布在B服務器,B占有排他鎖,C處於阻塞)

 

 A服務不生產消息所有A不會消費消息:(通過A查看網絡連接器)

 

關掉B服務器之后查看C服務器:(驗證B與C共享同一文件夾,且強占排他鎖)

 

查看共享文件夾:

 

 此時A與C處於監聽狀態,啟動消費者:

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8161
  TCP    0.0.0.0:8161           0.0.0.0:0              LISTENING       2431676
  TCP    [::]:8161              [::]:0                 LISTENING       2431676

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8162

liqiang@root MINGW64 ~/Desktop
$ netstat -ano |findstr 8163
  TCP    0.0.0.0:8163           0.0.0.0:0              LISTENING       2432188
  TCP    [::]:8163              [::]:0                 LISTENING       2432188
  TCP    [::1]:8163             [::1]:63600            TIME_WAIT       0
  TCP    [::1]:8163             [::1]:63648            TIME_WAIT       0
  TCP    [::1]:8163             [::1]:63649            TIME_WAIT       0
  TCP    [::1]:8163             [::1]:63650            TIME_WAIT       0
  TCP    [::1]:8163             [::1]:63651            TIME_WAIT       0
  TCP    [::1]:8163             [::1]:63652            TIME_WAIT       0

 

查看控制台如下:連接到A服務器消費信息

 

 總結:上面的配置方案B與C是為了實現高可用,也就是一台宕機之后另一台馬上強占排他鎖提供服務(需要共享文件夾實現共用同一文件夾下的資源與鎖),B與C可以生產消息,也可以提供消費消息,但是同一時刻只有一個提供服務。

     提供A是為了與B、C實現負載均衡,A不生產消息,但是可以消費消息,替B或者C分擔壓力。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM