ActiveMQ是一款功能強大的消息服務器,它支持許多種開發語言,例如Java, C, C++, C#等等。企業級消息服務器無論對服務器穩定性還是速度,要求都很高,而ActiveMQ的分布式集群則能很好的滿足這一需求,下面說說ActiveMQ的幾種集群配置。
Queue consumer clusters
此集群讓多個消費者同時消費一個隊列,若某個消費者出問題無法消費信息,則未消費掉的消息將被發給其他正常的消費者,結構圖如下:
Broker clusters
此種配置是一個消費者連接到多個broker集群的中的一個broker,當該broker出問題時,消費者自動連接到其他一個正常的broker。消費者使用 failover:// 協議來連接broker。
failover:(tcp://localhost:61616,tcp://localhost:61617)
failover官網介紹 http://activemq.apache.org/failover-transport-reference.html
broker之間的通過靜態發現(static discovery)和動態發現(dynamic discovery)來維持彼此發現,下面來介紹靜態發現和動態發現的機制:
靜態發現:
靜態發現通過配置固定的broker uri來發現彼此,配置語法如下:
static:(uri1,uri2,uri3,...)?options
例如:
static:(tcp://localhost:61616,tcp://remotehost:61617?trace=false,vm://localbroker)?initialReconnectDelay=100
更多靜態發現介紹,見ActiveMQ官網 http://activemq.apache.org/static-transport-reference.html
動態發現:
動態發現機制是在各個broker啟動時通過Fanout transport來發現彼此,配置舉例如下:
1 <broker name="foo"> 2 <transportConnectors> 3 <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/> 4 </transportConnectors> 5 ... 6 </broker>
更多動態發現機制介紹,見官網http://activemq.apache.org/discovery-transport-reference.html
Networks of brokers
多個broker組成集群,當其中一個broker的消費者出問題導致消息堆積無法消費掉時,通過ActiveMQ支持的Network of Broker方案可將該broker堆積的消息轉發到其他有消費者的broker。該方案主要有以下兩種配置方式:
1、為broker配置文件配置networkConnector元素
2、使用發現機制互相探測broker
Here is an example of using the fixed list of URIs:
1 <?xml version="1.0" encoding="UTF-8"?> 2 3 <beans xmlns="http://activemq.org/config/1.0"> 4 5 <broker brokerName="receiver" persistent="false" useJmx="false"> 6 <networkConnectors> 7 <!-- Static discovery --> 8 <networkConnector uri="static:(tcp://localhost:62001)"/> 9 <!-- MasterSlave Discovery --> 10 <!--<networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> --> 11 </networkConnectors> 12 13 <persistenceAdapter> 14 <memoryPersistenceAdapter/> 15 </persistenceAdapter> 16 17 <transportConnectors> 18 <transportConnector uri="tcp://localhost:62002"/> 19 </transportConnectors> 20 </broker> 21 22 </beans>
This example uses multicast discovery:
1 <?xml version="1.0" encoding="UTF-8"?> 2 3 <beans xmlns="http://activemq.org/config/1.0"> 4 5 <broker name="sender" persistent="false" useJmx="false"> 6 <networkConnectors> 7 <networkConnector uri="multicast://default"/> 8 </networkConnectors> 9 10 <persistenceAdapter> 11 <memoryPersistenceAdapter/> 12 </persistenceAdapter> 13 14 <transportConnectors> 15 <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/> 16 </transportConnectors> 17 </broker> 18 19 </beans>
Master Slave
通過部署多個broker實例,一個master和多個slave關系的broker來達到高可用性,有三種方案:
1、Master-Slave
2、SharedFile System Master Slave
3、JDBCMaster Slave
第一種方案由於只可以由兩個AMQ實例組件,實際應用場景並不廣泛;
第三種方案支持N個AMQ實例組網,但他的性能會受限於數據庫;
第二種方案同樣支持N個AMQ實例組網,基於kahadb存儲策略,亦可以部署在分布式文件系統上,應用靈活、高效且安全。
Master Slave方案當其中一個broker啟動並拿到獨占鎖時自動成為master,其他后續的broker則一直等待鎖,當master宕機釋放鎖時其他slave拿到獨占鎖則自動成為master,部署結構如下:
第二種方案的配置只需修改config文件夾下activemq.xml文件,修改消息持久化使用的方案:
1 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="D:/Platform/mq_share_file"> 2 ... 3 <persistenceAdapter> 4 <kahaDB directory="D:/Platform/mq_share_file/kahadb" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/> 5 </persistenceAdapter> 6 ... 7 </broker>
消息生產者代碼:
1 public class P2PSender { 2 private static final String QUEUE = "client1-to-client2"; 3 4 public static void main(String[] args) { 5 // ConnectionFactory :連接工廠,JMS用它創建連接 6 ConnectionFactory connectionFactory; 7 // Connection :JMS客戶端到JMS Provider的連接 8 Connection connection = null; 9 // Session:一個發送或接收消息的線程 10 Session session; 11 // Destination :消息的目的地;消息發送給誰. 12 Destination destination; 13 // MessageProducer:消息發送者 14 MessageProducer producer; 15 // TextMessage message; 16 // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現 17 connectionFactory = new ActiveMQConnectionFactory( 18 "failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)"); 19 try { 20 // 構造從工廠得到連接對象 21 connection = connectionFactory.createConnection(); 22 // 啟動 23 connection.start(); 24 // 獲取操作連接 25 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 26 destination = session.createQueue(QUEUE); 27 // 獲取session,FirstQueue是一個服務器的queue destination = session.createQueue("FirstQueue"); 28 // 得到消息生成者【發送者】 29 producer = session.createProducer(destination); 30 // 設置不持久化 31 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 32 // 構造消息 33 sendMessage(session, producer); 34 // session.commit(); 35 connection.close(); 36 } catch (Exception e) { 37 e.printStackTrace(); 38 } finally { 39 if (null != connection) { 40 try { 41 connection.close(); 42 } catch (JMSException e) { 43 e.printStackTrace(); 44 } 45 } 46 } 47 } 48 49 public static void sendMessage(Session session, MessageProducer producer) throws Exception { 50 for (int i = 1; i <= 1; i++) { 51 Date d = new Date(); 52 TextMessage message = session.createTextMessage("ActiveMQ發送消息" + i + " " + new Date()); 53 System.out.println("發送消息:ActiveMQ發送的消息" + i + " " + new Date()); 54 producer.send(message); 55 } 56 } 57 }
消息消費者代碼:
1 public class P2PReceiver { 2 private static final String QUEUE = "client1-to-client2"; 3 4 public static void main(String[] args) { 5 // ConnectionFactory :連接工廠,JMS用它創建連接 6 ConnectionFactory connectionFactory; 7 // Connection :JMS客戶端到JMS Provider的連接 8 Connection connection = null; 9 // Session:一個發送或接收消息的線程 10 Session session; 11 // Destination :消息的目的地;消息發送給誰. 12 Destination destination; 13 // 消費者,消息接收者 14 MessageConsumer consumer; 15 connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)"); 16 try { 17 // 得到連接對象 18 connection = connectionFactory.createConnection(); 19 // 啟動 20 connection.start(); 21 // 獲取操作連接 22 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 23 // 創建Queue 24 destination = session.createQueue(QUEUE); 25 consumer = session.createConsumer(destination); 26 while (true) { 27 TextMessage message = (TextMessage) consumer.receive(); 28 if (null != message) { 29 System.out.println("收到消息" + message.getText()); 30 } 31 } 32 } catch (Exception e) { 33 e.printStackTrace(); 34 } finally { 35 try { 36 if (null != connection) 37 connection.close(); 38 } catch (Throwable ignore) { 39 } 40 } 41 } 42 }