使用Active MQ在.net和java系統之間通信


ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現

一.特性列表

⒈ 多種語言和協議編寫客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
⒉ 完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)
⒊ 對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去,而且也支持Spring2.0的特性
⒋ 通過了常見J2EE服務器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業服務器上
⒌ 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍ 支持通過JDBC和journal提供高速的消息持久化
⒎ 從設計上保證了高性能的集群,客戶端-服務器,點對點
⒏ 支持Ajax
⒐ 支持與Axis的整合
⒑ 可以很容易得調用內嵌JMS provider,進行測試

二、Jms規范里的兩種message傳輸方式Topic和Queue,兩者的對比如下:

Topic
Queue

概要
Publish Subscribe messaging 發布訂閱消息
Point-to-Point 點對點

有無狀態
topic數據默認不落地,是無狀態的。

Queue數據默認會在mq服務器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存儲。

完整性保障
並不保證publisher發布的每條數據,Subscriber都能接受到。
Queue保證每條數據都能被receiver接收。

消息是否會丟失
一般來說publisher發布消息到某一個topic時,只有正在監聽該topic地址的sub能夠接收到消息;如果沒有sub在監聽,該topic就丟失了。
Sender發送消息到目標Queue,receiver可以異步接收這個Queue上的消息。Queue上的消息如果暫時沒有receiver來取,也不會丟失。

消息發布接收策略
一對多的消息發布接收策略,監聽同一個topic地址的多個sub都能收到publisher發送的消息。Sub接收完通知mq服務器
一對一的消息發布接收策略,一個sender發送的消息,只能有一個receiver接收。receiver接收完后,通知mq服務器已接收,mq服務器對queue里的消息采取刪除或其他操作。

三、安裝

1.首先去http://activemq.apache.org/download.html 下載最新版本

2.解壓后,目錄如下:

+bin (windows下面的bat和unix/linux下面的sh)

+conf (activeMQ配置目錄,包含最基本的activeMQ配置文件)

+data (默認是空的)

+docs (index,replease版本里面沒有文檔,-.-b不知道為啥不帶)

+example (幾個例子

+lib (activemMQ使用到的lib)

-apache-activemq-4.1-incubator.jar (ActiveMQ的binary)

-LICENSE.txt

-NOTICE.txt

-README.txt

-user-guide.html

3.使用bin\activemq.bat啟動服務

四、創建.Net Producer

   1:      class Program
   2:      {
   3:          static IConnectionFactory _factory = null;
   4:          static IConnection _connection = null;
   5:          static ITextMessage _message = null;
   6:          static ITextMessage _message2 = null;
   7:          static void Main(string[] args)
   8:          {
   9:              //創建工廠
  10:              _factory = new ConnectionFactory("tcp://localhost:61616/");
  11:              try
  12:              {
  13:                  //創建連接
  14:                  using (_connection = _factory.CreateConnection())
  15:                  {
  16:                      //創建會話
  17:                      using (ISession session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
  18:                      {
  19:                          //創建一個主題
  20:                          IDestination destination = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");
  21:   
  22:                          //創建生產者
  23:                          IMessageProducer producer = session.CreateProducer(destination);
  24:   
  25:                          int counter = 0;
  26:                          Console.WriteLine("請輸入你要發送數據,然后回車!");
  27:                          
  28:                          while (true)
  29:                          {
  30:                              string msg = Console.ReadLine();
  31:                              if (msg != string.Empty)
  32:                              {
  33:                                  Console.BackgroundColor = ConsoleColor.Blue;
  34:                                  Console.Beep();
  35:                                  counter++;
  36:                                  
  37:                                  //創建一個文本消息
  38:                                  _message = producer.CreateTextMessage("This is .Net AcitveMQ Message....");
  39:                                  //_message2 = producer.CreateTextMessage("<Root><First>Test</First><Root>");
  40:                                  
  41:                                  Console.WriteLine("正在發送第{0}組發送數據...........", counter);
  42:                                  //發送消息
  43:                                  producer.Send(_message, MsgDeliveryMode.Persistent, MsgPriority.Normal,
  44:                                      TimeSpan.MaxValue);
  45:   
  46:                                  Console.WriteLine("'"+_message.Text+"'已經發送!");
  47:                                  //producer.Send(_message2, MsgDeliveryMode.Persistent, MsgPriority.Normal,
  48:                                  //    TimeSpan.MinValue);
  49:                                  //Console.WriteLine("'" + _message2.Text + "'已經發送!");
  50:                                  producer.Send(msg, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.MinValue);
  51:                                  Console.WriteLine("你輸入的數據'" + msg + "'已經發送!");
  52:                                  Console.WriteLine("**************************************************");
  53:                                  Console.BackgroundColor = ConsoleColor.Black;
  54:                                  Console.WriteLine("請輸入你要發送數據,然后回車!");                                
  55:                              }
  56:                          }                     
  57:                      }
  58:                  }
  59:              }
  60:              catch (Exception ex)
  61:              {
  62:                  Console.WriteLine(ex.ToString());
  63:              }
  64:              Console.ReadLine();
  65:          }
  66:      }

五、創建Java Consumer

   1:  import javax.jms.Connection;
   2:  import javax.jms.ConnectionFactory;
   3:  import javax.jms.Session;
   4:  import javax.jms.TextMessage;
   5:  import javax.jms.Topic;
   6:  import javax.jms.TopicSubscriber;
   7:   
   8:  import org.apache.activemq.ActiveMQConnectionFactory;
   9:   
  10:  public class ActiveMQConsumer {
  11:      public static void main(String[] args) {
  12:          // ConnectionFactory :連接工廠,JMS 用它創建連接
  13:          ConnectionFactory connectionFactory;
  14:          // Connection :JMS 客戶端到JMS Provider 的連接
  15:          Connection connection = null;
  16:          // Session: 一個發送或接收消息的線程
  17:          Session session;
  18:          Topic topic;
  19:          // 消費者,消息接收者
  20:          TopicSubscriber subscriber;
  21:          connectionFactory = new ActiveMQConnectionFactory(
  22:                  //ActiveMQConnection.DEFAULT_USER,
  23:                  //ActiveMQConnection.DEFAULT_PASSWORD,
  24:                  "tcp://localhost:61616");
  25:          try {
  26:              // 構造從工廠得到連接對象
  27:              connection = connectionFactory.createConnection();
  28:              connection.setClientID("Customer");
  29:              // 啟動
  30:              connection.start();
  31:              // 獲取操作連接
  32:              session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);            
  33:              topic = session.createTopic("topic");
  34:              subscriber= session.createDurableSubscriber(topic, "Customer", null, false);
  35:          
  36:              while (true) {
  37:                  //TextMessage message = (TextMessage) subscriber.receive(10000);
  38:                  //TextMessage message = (TextMessage) subscriber.receiveNoWait();
  39:                  TextMessage message= (TextMessage) subscriber.receive();
  40:                  if (null != message) {                    
  41:                      System.out.println("收到消息" + message.getText());
  42:                      
  43:                  } else {
  44:                      break;
  45:                  }
  46:              }
  47:          } catch (Exception e) {
  48:              e.printStackTrace();
  49:          } finally {
  50:              try {
  51:                  if (null != connection)
  52:                      connection.close();
  53:              } catch (Throwable ignore) {
  54:              }
  55:          }
  56:      }    
  57:  }

Y71_XSHX3]UIN8YW8@XH6I4

9KAL]XXE0NGHTGOIKD}6Q8H

MI)Y{(XLS45G(@P`4@]CWSF[4]

六、參考資料

http://heisetoufa.iteye.com/blog/1908335

http://www.cnblogs.com/sunwei2012/archive/2010/03/22/1691919.html

http://www.cnblogs.com/tenghoo/archive/2009/11/23/activeMQ_client.html

http://www.cnblogs.com/CopyPaster/archive/2012/04/27/2473179.html

http://blog.csdn.net/gongqingkui/article/details/8615777

http://blog.csdn.net/mypurse/article/details/6265073


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM