ActiveMQ的靜態網絡鏈接


--------------------------------------------------------------------

(1)ActiveMQ的networkConnector是什么

  在某些情況下,需要多個ActiveMQ的Broker做集群,那么就涉及到Broker到Broker的通信,這個就稱為ActiveMQ的networkConnector.

  ActiveMQ的networkConnector默認是單向的,一個Broker在一端發送消息,另一個Broker在另一端接收消息,這就是所謂的"橋接"。ActiveMQ也支持雙向鏈接,創建一個雙向的通道對於兩個Broker不僅發送消息而且也能從相同的通道接收消息,通常作為duplex connector來映射,如下:

(2)有兩種配置Client到Broker的鏈接方式

  第一種: Client通過Staticlly配置的方式去連接Broker(靜態鏈接)

  第二種:  Client通過discover agent來dynamically的發現Brokers(動態鏈接)

Static networks:

  Static networkConnector是用於創建一個靜態的配置對於網絡中的多個Broker,這種協議用於復合url,一個復合url包括多個url地址,格式如下:

  static:(uri1,uri2,uri3, ...)?key=value

1 <networkConnectors>
2     <networkConnector name="local network" uri="static://(tcp://ip:prot,tcp://ip:port)"/>
3 </networkConnectors>

  在activemq.xml配置如下:

 下面啟動兩個Broker實例,一個端口:61616,另一個端口:61716,然后通過程序往61616端口的Broker發送數據,再從61716端口的Broker接收數據

JmsSend程序如下:

 1 import javax.jms.Connection;
 2 import javax.jms.ConnectionFactory;
 3 import javax.jms.Destination;
 4 import javax.jms.MessageProducer;
 5 import javax.jms.Session;
 6 import javax.jms.TextMessage;
 7 
 8 import org.apache.activemq.ActiveMQConnectionFactory;
 9 
10 public class JmsSend {
11     public static void main(String[] args) throws Exception {
12         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.81:61616");
13         Connection connection = connectionFactory.createConnection();
14         connection.start();
15         
16         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
17         Destination destination = session.createQueue("my-queue");
18         
19         MessageProducer producer = session.createProducer(destination);
20         for(int i = 0;i < 10;i++){
21             TextMessage message = session.createTextMessage("message,1212 --->" + i);
22             Thread.sleep(1000);
23             //通過生產者發出消息
24             producer.send(message);
25         }
26         session.commit();
27         session.close();
28         connection.close();
29     }
30 }

JmsReceiver程序如下:

 1 import javax.jms.Connection;
 2 import javax.jms.ConnectionFactory;
 3 import javax.jms.Destination;
 4 import javax.jms.MessageConsumer;
 5 import javax.jms.Session;
 6 import javax.jms.TextMessage;
 7 
 8 import org.apache.activemq.ActiveMQConnectionFactory;
 9 
10 public class JmsReceiver {
11     public static void main(String[] args) throws Exception {
12         ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.1.81:61716");
13         Connection connection =  cf.createConnection();
14         connection.start();
15         
16         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
17         Destination destination = session.createQueue("my-queue");
18         MessageConsumer consumer = session.createConsumer(destination);
19         int i = 0;
20         while(i < 10){
21             i++;
22             TextMessage message = (TextMessage)consumer.receive();
23             session.commit();
24             System.out.println("接收到的消息是:"+message.getText());
25         }
26         session.close();
27         connection.close();
28     }
29 }

運行結果:

(3)networkConnector配置的可用屬性

  1.name: 默認的bridge
  2.dynamicOnly: 默認是false,如果為true,持久訂閱被激活時才創建對應的網絡持久訂閱。默認是啟動時激活
  3.decreaseNetworkConsumerPriority: 默認是false。設定消費者優先權,如果為true,網絡的消費者優先級降低為-5。如果為false,則默認跟本地消費者一樣為0
  4.networkTTL: 默認是1,網絡中用於消息和訂閱消費的broker數量
  5.messageTTL: 默認是1,網絡中用於消息的broker數量
  6.consumerTTL: 默認是1,網絡中用於消費的broker數量
  7.conduitSubscriptions: 默認true,是否把同一個broker的多個consumer當做一個來處理(在做集群的時候如果有多個consumer,需要設置為false)
  8.dynamicallyIncludedDestinations:默認為空,要包括的動態消息地址,類適於excludedDestinations,如:

1 <dynamicallyIncludedDestinations>
2     <queue physicalName="include.test.foo"/>
3     <topic physicalName="include.test.bar"/>
4 </dynamicallyIncludedDestinations>    

  9.staticallyIncludedDestinations:默認為空,要包括的靜態消息地址。類似於excludedDestinations,如:

1 <staticallyIncludedDestinations>
2     <queue physicalName="always.include.queue"/>
3 </staticallyIncludedDestinations>

  10.excludedDestinations: 默認為空,指定排除的地址,示例如下:

 1 <networkConnectors>
 2   <networkConnector uri="static://(tcp://localhost:61617)" name="bridge" dynamicOnly="false" conduitSubscriptions="true"                                             decreaseNetworkConsumerPriority="false">
 5   <excludedDestinations>
 6     <queue physicalName="exclude.test.foo">
 7     <topic physicalName="exclude.test.bar">
 8   </excludedDestinations>
 9   <dynamicallyIncludedDestinations>
10     <queue physicalName="include.test.foo"/>
11     <topic physicalName="include.test.bar"/>
12   </dynamicallyIncludedDestinations>
13   <staticallyIncludedDestinations>
14     <queue physicalName="always.include.queue"/>
15   </staticallyIncludedDestinations>
16   </networkConnector>
17 </networkConnectors>

  12. prefetchSize: 默認是1000,持有的未確認的最大消息數量,必須大於0,因為網絡消費者不能自己輪詢消息
  13. suppressDuplicateQueueSubscriptions: 默認false,如果為true,重復的訂閱關系一產生即被阻止
  14. bridgeTempDestinations: 默認true,是否廣播advisory messages來創建臨時的destination
  15. alwaysSyncSend: 默認false,如果為true,非持久化消息也將使用request/reply方式代替oneway方式發送到遠程broker
  16. staticBridge: 默認false,如果為true,只有staticallyIncludedDestinations中配置的destination可以被處理

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM