ActiveMQ主從配置


這種方式有個問題,activemq1有消息沒消費完但是突然宕機,雖然程序會自動連到activemq2。但是activemq1的消息只有等機器恢復后才會被消費。

1.啟動:我這里使用的是apache-activemq-5.13.3,是在windows下使用的,發現根據文檔說的雙擊activemq.bat啟動不了,那就只好使用命令啟動,CMD進入到apache-activemq-5.13.3\bin下,輸入activemqbat start。這樣就可以啟動了。

2.主從配置:第一個activemq解壓到apache-activemq-5.13.3,第二個解壓到apache-activemq-5.13.3-2

  第一個activemq直接輸入命令啟動

  第二個需要修改參數:a.打開apache-activemq-5.13.3-2\conf\activemq.xml,修改broker標簽里面的brokerName,不要和第一個相同就行

            b.修改activemq.xml中的transportConnectors,刪除其他,只留一個openwire就行,修改uri里面的端口號

            c.在transportConnectors上面添加(如果一會兒啟動的時候這里報錯,請手動敲打下面三行,不要復制)

              <networkConnectors>

                <networkConnector uri="static:(tcp://localhost:61616)" duplex="true"/>
              </networkConnectors>


            d.修改\conf\jetty.xml文件的115行,端口號隨便寫一個。(這里是jetty的訪問端口)

配置文件修改完成,啟動第一個activemq,啟動第二個activemq。

接下來是代碼中brokerURL需要改成使用failover。這樣啟動生產者和消費者后,程序就可以在主從直接自動切換(可以嘗試輪流關閉主從)。

 

生產者代碼如下:

 1 import javax.jms.Connection;
 2 import javax.jms.ConnectionFactory;
 3 import javax.jms.DeliveryMode;
 4 import javax.jms.Destination;
 5 import javax.jms.MessageProducer;
 6 import javax.jms.Session;
 7 import javax.jms.TextMessage;
 8 
 9 import org.apache.activemq.ActiveMQConnection;
10 import org.apache.activemq.ActiveMQConnectionFactory;
11 
12 public class Sender {
13     public static void main(String[] args) {
14         // ConnectionFactory :連接工廠,JMS 用它創建連接
15         ConnectionFactory connectionFactory;
16         // Connection :JMS 客戶端到JMS Provider 的連接
17         Connection connection = null;
18         // Session: 一個發送或接收消息的線程
19         Session session;
20         // Destination :消息的目的地;消息發送給誰.
21         Destination destination;
22         // MessageProducer:消息發送者
23         MessageProducer producer;
24         // TextMessage message;
25         // 構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar
26         String brokerURL = "failover://(tcp://localhost:61616,tcp://localhost:61617)";
27         connectionFactory = new ActiveMQConnectionFactory(
28                 ActiveMQConnection.DEFAULT_USER,
29                 ActiveMQConnection.DEFAULT_PASSWORD, brokerURL);
30         try {
31             // 構造從工廠得到連接對象
32             connection = connectionFactory.createConnection();
33             // 啟動
34             connection.start();
35             // 獲取操作連接
36             session = connection.createSession(Boolean.TRUE,
37                     Session.AUTO_ACKNOWLEDGE);
38             destination = session.createQueue("FirstQueue");
39             // 得到消息生成者
40             producer = session.createProducer(destination);
41                         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
42             while (true) {
43                 sendMessage(session, producer);
44                 session.commit();// commit后消息才會發出去
45                 Thread.sleep(1000);
46             }
47         } catch (Exception e) {
48             e.printStackTrace();
49         } finally {
50             try {
51                 if (null != connection)
52                     connection.close();
53             } catch (Throwable ignore) {
54             }
55         }
56     }
57 
58     static int i = 1;
59 
60     public static void sendMessage(Session session, MessageProducer producer)
61             throws Exception {
62         TextMessage message = session.createTextMessage("ActiveMq 發送的消息" + i);
63         // 發送消息到目的地方
64         System.out.println("發送消息:" + "ActiveMq 發送的消息" + i);
65         producer.send(message);
66         i++;
67     }
68 }    
producer

 

消費者代碼如下:

 1 import javax.jms.Connection;
 2 import javax.jms.ConnectionFactory;
 3 import javax.jms.Destination;
 4 import javax.jms.JMSException;
 5 import javax.jms.Message;
 6 import javax.jms.MessageConsumer;
 7 import javax.jms.MessageListener;
 8 import javax.jms.Session;
 9 import javax.jms.TextMessage;
10 
11 import org.apache.activemq.ActiveMQConnection;
12 import org.apache.activemq.ActiveMQConnectionFactory;
13 
14 public class Receiver {
15     public static void main(String[] args) {
16         // ConnectionFactory :連接工廠,JMS 用它創建連接
17         ConnectionFactory connectionFactory;
18         // Connection :JMS 客戶端到JMS Provider 的連接
19         Connection connection = null;
20         // Session: 一個發送或接收消息的線程
21         Session session;
22         // Destination :消息的目的地;消息發送給誰.
23         Destination destination;
24         // 消費者,消息接收者
25         MessageConsumer consumer;
26         String brokerURL = "failover://(tcp://localhost:61616,tcp://localhost:61617)";
27 //        String brokerURL = "tcp://localhost:61616";
28         connectionFactory = new ActiveMQConnectionFactory(
29                 ActiveMQConnection.DEFAULT_USER,
30                 ActiveMQConnection.DEFAULT_PASSWORD,
31                 brokerURL);
32         try {
33             // 構造從工廠得到連接對象
34             connection = connectionFactory.createConnection();
35             // 啟動
36             connection.start();
37             // 獲取操作連接
38             session = connection.createSession(Boolean.FALSE,
39                     Session.AUTO_ACKNOWLEDGE);
40             destination = session.createQueue("FirstQueue");
41             consumer = session.createConsumer(destination);
42             consumer.setMessageListener(new MyListener());
43             System.out.println("started...");
44             while(true){
45             }
46         } catch (Exception e) {
47             e.printStackTrace();
48         } finally {
49             try {
50                 if (null != connection)
51                     connection.close();
52             } catch (Throwable ignore) {
53             }
54         }
55     }
56 }
57 class MyListener implements MessageListener{
58     
59     public void onMessage(Message message) {
60         TextMessage textMessage = (TextMessage) message;
61         try {
62             System.out.println("收到消息:"+textMessage.getText());
63         } catch (JMSException e) {
64             e.printStackTrace();
65         }
66     }
67 }
Receiver

 

以上代碼部分摘自網絡

 

這是配置主從的一個方案,還有一種方案是使用文件系統。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM