ActiveMQ集群搭建


一、 Activemq主備搭建

  Shared Filesystem Master-Slave方式
  shared filesystem Master-Slave部署方式主要是通過共享存儲目錄來實現master和slave的熱備,所有的ActiveMQ應用都在不斷地獲取共享目錄的控制權,哪個應用搶到了控制權,它就成為master。
  多個共享存儲目錄的應用,誰先啟動,誰就可以最早取得共享目錄的控制權成為master,其他的應用就只能作為slave。

1  搭建配置步驟搭建 master-slave (一主 一備份)

准備mq的1節點 activemq-1
准備mq的2節點 activemq-2

  特點:
    只能本地不能分布式 和 集群。

針對每一個activemq的節點進行配置:

1.1 配置節點1:
首先創建共享目錄,並創建兩個節點
如圖:

  •  配置activemq-1,需要修改持久數據庫位置,修改:activemq-1/conf/activemq.xml 

  • 配置activemq-1,需要修改activemq-1/conf/activemq.xml

  如圖:修改成61617

  • 配置activemq-1 ,需要修改activemq-1/conf/jetty.xml

2.2 配置節點2:

  • 配置activemq-2,需要修改 持久目錄文件,修改:activemq-2/conf/activemq.xml 

  • 配置activemq-2,需要修改activemq-2/conf/activemq.xml

  如圖:修改成61618

  • 配置activemq-1 ,需要修改activemq-2/conf/jetty.xml

2 測試master-slave
2.1 生產者

 1 public class ProduceQueue {


 2     @Test

 3     public void sendMessage() throws Exception{

 4         //1.創建一個連接工廠 connectionfactory 參數:就是要連接的服務器的地址

 5         ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://192.168.25.130:61617,tcp://192.168.25.130:61618)");

 6         //2.通過工廠獲取連接對象 創建連接 7         Connection connection = factory.createConnection();

 8         //3.開啟連接 9         connection.start();

10         //4.創建一個session對象  提供發送消息等方法

11         // 第一個參數:表示是否開啟分布式事務(JTA)  一般是false 不開啟。

12         // 第二個參數:就是設置消息的應答模式
13         // 如果 第一個參數為false時,第二個參數設置才有意義。用的是自動應答14         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
15         //5.創建目的地 (destination)  queue 參數:目的地的名稱

16         Queue queue = session.createQueue("queue-test-cluster");

17         //6.創建個生產者18         MessageProducer producer = session.createProducer(queue);

19         //7.構建消息的內容20         TextMessage textMessage = session.createTextMessage("queue測試發送的消息");
21         // 8.發送消息22         producer.send(textMessage);

23         //9.關閉資源24         producer.close();

25         session.close();

26         connection.close();

27     }
28 
}

2.2 消費者

 1 public class ConsumerQueue {


 2     @Test

 3     public void consumer() throws Exception{

 4         //1.創建連接的工廠 5         ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://192.168.25.130:61617,tcp://192.168.25.130:61618)");

 6         //2.創建連接 7         Connection connection = factory.createConnection();

 8         //3.開啟連接 9         connection.start();

10         //4.創建session

11         // 第一個參數:表示是否開啟分布式事務(JTA)  一般是false 不開啟。

12         // 第二個參數:就是設置消息的應答模式   如果 第一個參數為false時,第二個參數設置才有意義。用的是自動應答13         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

14         // 5.創建接收消息的一個目的地15         Queue queue = session.createQueue("queue-test-cluster");

16         // 6.創建消費者17         MessageConsumer consumer = session.createConsumer(queue);

18         // 7.接收消息 打印

19         // 第一種20         /*while(true){

21             Message message = consumer.receive(1000000);//設置接收消息的超時時間

22             //沒有接收到消息就跳出循環

23             if(message==null){

24                 break;

25             }

26             if(message instanceof TextMessage){

27                 TextMessage message2 = (TextMessage) message;

28                 System.out.println("接收的消息為"+message2.getText());

29                 }

30          }*/31         //第二種


32         // 設置一個監聽器

33         // System.out.println("start");

34         // 這里其實開辟了一個新的線程35         consumer.setMessageListener(new MessageListener() {


36             //當有消息的時候會執行以下的邏輯37             @Override

38             public void onMessage(Message message) {

39                 if(message instanceof TextMessage){

40                     TextMessage message2 = (TextMessage) message;

41                     try {

42                         System.out.println("接收的消息為"+message2.getText());

43                     } catch (JMSException e) {

44                         e.printStackTrace();

45                     }

46                 }

47             }

48         });

49         //System.out.println("end");50         Thread.sleep(199999);

51         // 8.關閉資源52         consumer.close();

53         session.close();

54         connection.close();

55     }


56 }

先啟動的成為主節點,平常主節點工作,slave不工作但是一直做監聽。當主節點掛掉,slave接手工作。

二、 基於zookeeper的activemq集群搭建(推薦)

2.1 基於可復制的 LevelDB
  LevelDB 是 Google 開發的一套用於持久化數據的高性能類庫。 LevelDB 並不是一種服務,用戶需要自行實現 Server。 是單進程的服務,能夠處理十億級別規模 Key-Value 型數據,占用內存小。
  http://activemq.apache.org/replicated-leveldb-store.html

 

  高可用的原理:使用 ZooKeeper(集群)注冊所有的 ActiveMQ Broker。只有其中的一個 Broker 可以提供服務,被視為 Master,其他的 Broker 處於待機狀態,被視為 Slave。如果 Master 因故障而不能提供服務,ZooKeeper 會從 Slave 中選舉出一個 Broker 充當 Master。Slave 連接 Master 並同步他們的存儲狀態, Slave 不接受客戶端連接。所有的存儲操作都將被復制到連接至 Master 的 Slaves。 如果 Master 宕了,得到了最新更新的 Slave 會成為 Master。 故障節點在恢復后會重新加入到集群中並連接 Master 進入 Slave 模式。所有需要同步的 disk 的消息操作都將等待存儲狀態被復制到其他法定節點的操作完成才能完成。所以,如果你配置了 replicas=3,那么法定大小是(3/2)+1=2。 Master 將會存儲並更新然后等待 (2-1)=1 個Slave 存儲和更新完成,才匯報 success。 至於為什么是 2-1,熟悉 Zookeeper 的應該知道,有一個 node要作為觀擦者存在。當一個新的 Master 被選中,你需要至少保障一個法定 node 在線以能夠找到擁有最新狀態的 node。這個 node 可以成為新的 Master。因此,推薦運行至少 3 個 replica nodes,以防止一個 node失敗了,服務中斷。(原理與 ZooKeeper 集群的高可用實現方式類似)。

2.2 集群單機環境規划
  定義環境:3個activemq節點(node01 node02 node03)

Ip 集群通信端口 節點消息連接端口 Jetty后台運行端口
192.168.25.130 63631 51515 8361
192.168.25.130 63632 51516 8362
192.168.25.130 63633 51517 8363

2.3 配置zookeeper集群

見相關教程;

2.4 節點1的配置

  在activemq.xml中配置:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerCluster" dataDirectory="${activemq.data}">

  brokerName指定一個名字:任意即可。但是整個集群的所有的配置項名稱都應該是一致的。

  broker標簽下的配置:

<persistenceAdapter> 
  <!-- kahaDB directory="${activemq.data}/kahadb"/ --> 
  <replicatedLevelDB 
  directory="${activemq.data}/leveldb" 
  replicas="3" 
  bind="tcp://0.0.0.0:63631" 
  zkAddress="192.168.25.130:2181,192.168.25.130:2182,192.168.25.130:2183" 
  hostname="localhost" 
  zkPath="/activemq2/leveldb-stores"/> 
</persistenceAdapter>

  jetty.xml:配置項:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
  <!-- the default port number for the web console -->
  <property name="host" value="0.0.0.0"/>
  <property name="port" value="8361"/>
</bean>

2.5 節點2的配置
  參考節點1搭建即可,注意:端口不要一樣。

  搭建后的截圖:


  node01 -node03 是activemq的3個節點。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM