1.ActiveMQ集群介紹
1.為什么要集群?
實現高可用,以排除單點故障引起的服務中斷
實現負載均衡,以提升效率為更多客戶提供服務
2.集群方式
客戶端集群:讓多個消費者消費同一個隊列
Broker Cluster:多個Broker之間同步消息(做不了高可用,可以實現負載均衡)
Master-Slave:高可用(做不了負載均衡)
3.ActiveMq失效轉移
允許當其中一台消息服務器宕機時,客戶端在傳輸層上重新連接其他消息服務器。
語法:failover:(uri1,...uriN)?transportOptions
transportOptions參數說明:randomize:默認為true,表示在URI列表中選擇URI連接時是否采用隨機策略
initialReconnectDelay默認為10,默認10毫秒,表示第一次嘗試重連之間等待的時間。
maxReconnectDelay:默認30000,單位毫秒,最長重連時間的間隔
4.Master/Slave集群配置
Share nothing storage master/slave(已經過時,5.8之后移除)
shared storage master/slave 共享存儲(實際上是共享同一文件夾,只不過采用排他鎖,所以只有一個master節點可以訪問,當此服務宕機,另一台slave可以快速強占排他鎖,所以不會造成數據丟失,使用同一文件夾下的東西。如果多態服務器的話需要搭建文件共享服務器)
Replicated LevelDB Store 基於復制的LevelDB Store
1.共享存儲原理:(獲取排他鎖才可以提供消息服務)--簡單方式
2.Replicated LevelDB Store 基於復制的LevelDB Store原理(基於zookeper)
3.兩種方式對比:
高可用 | 負載均衡 | |
Master/Slave | 是 | 否 |
Broker Cluster | 否 | 是 |
4.三台機器的完美集群方案:(實現高可用和負載均衡)
2.ActiveMQ集群配置
1.方案介紹
方案如下: B與C采用master-slave共享文件夾存儲(任一時刻只有一個可以占有排他鎖,也就是只有一個可以提供服務),當A宕機之后B會獲取資源鎖提供服務---實現高可用
A與B、A與C都采用Broker 集群,可以同時提供服務,不管B與C誰獲取鎖,A都可以提供服務(A只能提供服務消費消息,不生成消息)---實現負載均衡
配置如下:(同一個電腦不同端口模擬集群)
2.配置文件 (修改的配置文件都是在apache-activemq-5.15.6\conf文件夾下)
(1)activemq-a下面的配置:---A服務器
activemq.xml:(注釋掉其他協議,服務端口使用61616端口;增加靜態網絡連接器,同時連接B與C)
jetty.xml: 采用默認的8161端口
(2)activemq-b下面的配置:---B服務器
activemq.xml:(增加與A的靜態連接器,修改服務端口采用61617,修改共享文件夾的地址)
jitty.xml:修改端口采用8162
(3)activemq-c下面的配置:---C服務器
activemq.xml:(增加與A的靜態連接器,修改服務端口采用61618,修改共享文件夾的地址)
jitty.xml:修改端口采用8163
3.依次啟動ActiveMq進行測試
啟動順序是:A->B->C,下面通過端口信息驗證集群:
liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161 TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2423496 TCP [::]:8161 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162 TCP 0.0.0.0:8162 0.0.0.0:0 LISTENING 2423756 TCP [::]:8162 [::]:0 LISTENING 2423756 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616 TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2423496 TCP 127.0.0.1:55338 127.0.0.1:61616 ESTABLISHED 2423756 TCP 127.0.0.1:61616 127.0.0.1:55338 ESTABLISHED 2423496 TCP [::]:61616 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617 TCP 0.0.0.0:61617 0.0.0.0:0 LISTENING 2423756 TCP 127.0.0.1:55345 127.0.0.1:61617 ESTABLISHED 2423496 TCP 127.0.0.1:61617 127.0.0.1:55345 ESTABLISHED 2423756 TCP [::]:61617 [::]:0 LISTENING 2423756 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618
由於先啟動的B,B占有排斥鎖,所以B(8162-61617)處於監聽狀態,而C與B采用共享文件排他鎖集群,所以C處於阻塞狀態,也就是沒有監聽端口,被阻塞。
停掉B服務器,模擬B服務器宕機的情況,再次查看端口:
liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161 TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2423496 TCP [::]:8161 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163 TCP 0.0.0.0:8163 0.0.0.0:0 LISTENING 2423848 TCP [::]:8163 [::]:0 LISTENING 2423848 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616 TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2423496 TCP 127.0.0.1:56704 127.0.0.1:61616 ESTABLISHED 2423848 TCP 127.0.0.1:61616 127.0.0.1:56704 ESTABLISHED 2423496 TCP [::]:61616 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618 TCP 0.0.0.0:61618 0.0.0.0:0 LISTENING 2423848 TCP 127.0.0.1:56791 127.0.0.1:61618 ESTABLISHED 2423496 TCP 127.0.0.1:61618 127.0.0.1:56791 ESTABLISHED 2423848 TCP [::]:61618 [::]:0 LISTENING 2423848 liqiang@root MINGW64 ~/Desktop
由於B宕機,所以C會強占排他鎖,也就是會監聽端口,所以可以看到8163與61618端口的監聽狀態。
現在模擬C也宕機,將C服務器也停掉。再次查看端口信息:
liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161 TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2423496 TCP [::]:8161 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616 TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2423496 TCP [::]:61616 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617 TCP 127.0.0.1:57242 127.0.0.1:61617 SYN_SENT 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618 liqiang@root MINGW64 ~/Desktop
此時A服務器仍然在監聽端口。
上面證明集群搭建成功,下面重新開啟A->B->C服務器開始程序驗證。
4.程序驗證集群
服務器開啟順序性:A->B->C(此時B占有排他鎖,B監聽端口,A一直監聽)
liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616 TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2428232 TCP 127.0.0.1:60520 127.0.0.1:61616 ESTABLISHED 2427192 TCP 127.0.0.1:61616 127.0.0.1:60520 ESTABLISHED 2428232 TCP [::]:61616 [::]:0 LISTENING 2428232 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617 TCP 0.0.0.0:61617 0.0.0.0:0 LISTENING 2427192 TCP 127.0.0.1:60513 127.0.0.1:61617 ESTABLISHED 2428232 TCP 127.0.0.1:61617 127.0.0.1:60513 ESTABLISHED 2427192 TCP [::]:61617 [::]:0 LISTENING 2427192 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161 TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2428232 TCP [::]:8161 [::]:0 LISTENING 2428232 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162 TCP 0.0.0.0:8162 0.0.0.0:0 LISTENING 2427192 TCP [::]:8162 [::]:0 LISTENING 2427192 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163
代碼測試:
生產者: url加了失效策略,而且采用隨機選取,生產消息的地址只有B與C服務器地址
package cn.qlq.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 生產消息 * * @author QiaoLiQiang * @time 2018年9月18日下午11:04:41 */ public class MsgProducer { private static final String url = "failover:(tcp://localhost:61617,tcp://localhost:61618)?randomize=true"; private static final String queueName = "myQueue"; public static void main(String[] args) throws JMSException { // 1創建ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // 2.由connectionFactory創建connection Connection connection = connectionFactory.createConnection(); // 3.啟動connection connection.start(); // 4.創建Session===第一個參數是是否事務管理,第二個參數是應答模式 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.創建Destination(Queue繼承Queue) Queue destination = session.createQueue(queueName); // 6.創建生產者producer MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 100; i++) { // 7.創建Message,有好多類型,這里用最簡單的TextMessage TextMessage tms = session.createTextMessage("textMessage:" + i); // 8.生產者發送消息 producer.send(tms); System.out.println("send:" + tms.getText()); } // 9.關閉connection connection.close(); } }
消費者:url加了失效策略,而且采用隨機選取,生產消息的地址有ABC服務器
package cn.qlq.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消費消息 * * @author QiaoLiQiang * @time 2018年9月18日下午11:26:41 */ public class MsgConsumer { private static final String url = "failover:(tcp://localhost:61616,tcp://localhost:61617,tcp://localhost:61618)?randomize=true"; private static final String queueName = "myQueue"; public static void main(String[] args) throws JMSException { // 1創建ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // 2.由connectionFactory創建connection Connection connection = connectionFactory.createConnection(); // 3.啟動connection connection.start(); // 4.創建Session===第一個參數是是否事務管理,第二個參數是應答模式 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.創建Destination(Queue繼承Queue) Queue destination = session.createQueue(queueName); // 6.創建消費者consumer MessageConsumer consumer = session.createConsumer(destination); // 7.給消費者綁定監聽器(消息的監聽是一個異步的過程,不可以關閉連接,綁定監聽器線程是一直開啟的,處於阻塞狀態,所以可以在程序退出關閉) consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // 7.1由於消費者接受的是TextMessage,所以強轉一下 TextMessage tms = (TextMessage) message; try { System.out.println("接收消息:" + tms.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
啟動生產者發布100條消息:
管理界面查看:(消息被發布在B服務器,B占有排他鎖,C處於阻塞)
A服務不生產消息所有A不會消費消息:(通過A查看網絡連接器)
關掉B服務器之后查看C服務器:(驗證B與C共享同一文件夾,且強占排他鎖)
查看共享文件夾:
此時A與C處於監聽狀態,啟動消費者:
liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161 TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2431676 TCP [::]:8161 [::]:0 LISTENING 2431676 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163 TCP 0.0.0.0:8163 0.0.0.0:0 LISTENING 2432188 TCP [::]:8163 [::]:0 LISTENING 2432188 TCP [::1]:8163 [::1]:63600 TIME_WAIT 0 TCP [::1]:8163 [::1]:63648 TIME_WAIT 0 TCP [::1]:8163 [::1]:63649 TIME_WAIT 0 TCP [::1]:8163 [::1]:63650 TIME_WAIT 0 TCP [::1]:8163 [::1]:63651 TIME_WAIT 0 TCP [::1]:8163 [::1]:63652 TIME_WAIT 0
查看控制台如下:連接到A服務器消費信息
總結:上面的配置方案B與C是為了實現高可用,也就是一台宕機之后另一台馬上強占排他鎖提供服務(需要共享文件夾實現共用同一文件夾下的資源與鎖),B與C可以生產消息,也可以提供消費消息,但是同一時刻只有一個提供服務。
提供A是為了與B、C實現負載均衡,A不生產消息,但是可以消費消息,替B或者C分擔壓力。