這種方式有個問題,activemq1有消息沒消費完但是突然宕機,雖然程序會自動連到activemq2。但是activemq1的消息只有等機器恢復后才會被消費。
1.啟動:我這里使用的是apache-activemq-5.13.3,是在windows下使用的,發現根據文檔說的雙擊activemq.bat啟動不了,那就只好使用命令啟動,CMD進入到apache-activemq-5.13.3\bin下,輸入activemqbat start。這樣就可以啟動了。
2.主從配置:第一個activemq解壓到apache-activemq-5.13.3,第二個解壓到apache-activemq-5.13.3-2
第一個activemq直接輸入命令啟動
第二個需要修改參數:a.打開apache-activemq-5.13.3-2\conf\activemq.xml,修改broker標簽里面的brokerName,不要和第一個相同就行
b.修改activemq.xml中的transportConnectors,刪除其他,只留一個openwire就行,修改uri里面的端口號
c.在transportConnectors上面添加(如果一會兒啟動的時候這里報錯,請手動敲打下面三行,不要復制)
<networkConnectors>
<networkConnector uri="static:(tcp://localhost:61616)" duplex="true"/>
</networkConnectors>

d.修改\conf\jetty.xml文件的115行,端口號隨便寫一個。(這里是jetty的訪問端口)
配置文件修改完成,啟動第一個activemq,啟動第二個activemq。
接下來是代碼中brokerURL需要改成使用failover。這樣啟動生產者和消費者后,程序就可以在主從直接自動切換(可以嘗試輪流關閉主從)。
生產者代碼如下:
1 import javax.jms.Connection; 2 import javax.jms.ConnectionFactory; 3 import javax.jms.DeliveryMode; 4 import javax.jms.Destination; 5 import javax.jms.MessageProducer; 6 import javax.jms.Session; 7 import javax.jms.TextMessage; 8 9 import org.apache.activemq.ActiveMQConnection; 10 import org.apache.activemq.ActiveMQConnectionFactory; 11 12 public class Sender { 13 public static void main(String[] args) { 14 // ConnectionFactory :連接工廠,JMS 用它創建連接 15 ConnectionFactory connectionFactory; 16 // Connection :JMS 客戶端到JMS Provider 的連接 17 Connection connection = null; 18 // Session: 一個發送或接收消息的線程 19 Session session; 20 // Destination :消息的目的地;消息發送給誰. 21 Destination destination; 22 // MessageProducer:消息發送者 23 MessageProducer producer; 24 // TextMessage message; 25 // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar 26 String brokerURL = "failover://(tcp://localhost:61616,tcp://localhost:61617)"; 27 connectionFactory = new ActiveMQConnectionFactory( 28 ActiveMQConnection.DEFAULT_USER, 29 ActiveMQConnection.DEFAULT_PASSWORD, brokerURL); 30 try { 31 // 構造從工廠得到連接對象 32 connection = connectionFactory.createConnection(); 33 // 啟動 34 connection.start(); 35 // 獲取操作連接 36 session = connection.createSession(Boolean.TRUE, 37 Session.AUTO_ACKNOWLEDGE); 38 destination = session.createQueue("FirstQueue"); 39 // 得到消息生成者 40 producer = session.createProducer(destination); 41 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 42 while (true) { 43 sendMessage(session, producer); 44 session.commit();// commit后消息才會發出去 45 Thread.sleep(1000); 46 } 47 } catch (Exception e) { 48 e.printStackTrace(); 49 } finally { 50 try { 51 if (null != connection) 52 connection.close(); 53 } catch (Throwable ignore) { 54 } 55 } 56 } 57 58 static int i = 1; 59 60 public static void sendMessage(Session session, MessageProducer producer) 61 throws Exception { 62 TextMessage message = session.createTextMessage("ActiveMq 發送的消息" + i); 63 // 發送消息到目的地方 64 System.out.println("發送消息:" + "ActiveMq 發送的消息" + i); 65 producer.send(message); 66 i++; 67 } 68 }
消費者代碼如下:
1 import javax.jms.Connection; 2 import javax.jms.ConnectionFactory; 3 import javax.jms.Destination; 4 import javax.jms.JMSException; 5 import javax.jms.Message; 6 import javax.jms.MessageConsumer; 7 import javax.jms.MessageListener; 8 import javax.jms.Session; 9 import javax.jms.TextMessage; 10 11 import org.apache.activemq.ActiveMQConnection; 12 import org.apache.activemq.ActiveMQConnectionFactory; 13 14 public class Receiver { 15 public static void main(String[] args) { 16 // ConnectionFactory :連接工廠,JMS 用它創建連接 17 ConnectionFactory connectionFactory; 18 // Connection :JMS 客戶端到JMS Provider 的連接 19 Connection connection = null; 20 // Session: 一個發送或接收消息的線程 21 Session session; 22 // Destination :消息的目的地;消息發送給誰. 23 Destination destination; 24 // 消費者,消息接收者 25 MessageConsumer consumer; 26 String brokerURL = "failover://(tcp://localhost:61616,tcp://localhost:61617)"; 27 // String brokerURL = "tcp://localhost:61616"; 28 connectionFactory = new ActiveMQConnectionFactory( 29 ActiveMQConnection.DEFAULT_USER, 30 ActiveMQConnection.DEFAULT_PASSWORD, 31 brokerURL); 32 try { 33 // 構造從工廠得到連接對象 34 connection = connectionFactory.createConnection(); 35 // 啟動 36 connection.start(); 37 // 獲取操作連接 38 session = connection.createSession(Boolean.FALSE, 39 Session.AUTO_ACKNOWLEDGE); 40 destination = session.createQueue("FirstQueue"); 41 consumer = session.createConsumer(destination); 42 consumer.setMessageListener(new MyListener()); 43 System.out.println("started..."); 44 while(true){ 45 } 46 } catch (Exception e) { 47 e.printStackTrace(); 48 } finally { 49 try { 50 if (null != connection) 51 connection.close(); 52 } catch (Throwable ignore) { 53 } 54 } 55 } 56 } 57 class MyListener implements MessageListener{ 58 59 public void onMessage(Message message) { 60 TextMessage textMessage = (TextMessage) message; 61 try { 62 System.out.println("收到消息:"+textMessage.getText()); 63 } catch (JMSException e) { 64 e.printStackTrace(); 65 } 66 } 67 }
以上代碼部分摘自網絡
這是配置主從的一個方案,還有一種方案是使用文件系統。
