ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。
特性
- 多種語言和協議編寫客戶端。語言: Java、C、C++、C#、Ruby、Perl、Python、PHP。應用協議:OpenWire、Stomp REST、WS Notification、XMPP、AMQP
- 完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)
- 對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去,而且也支持Spring2.0的特性
- 通過了常見J2EE服務器(如 Geronimo、JBoss 4、GlassFish、WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業服務器上
- 支持多種傳送協議:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA
- 支持通過JDBC和journal提供高速的消息持久化
- 從設計上保證了高性能的集群,客戶端-服務器,點對點
- 支持Ajax
- 支持與Axis的整合
- 可以很容易得調用內嵌JMS provider,進行測試
環境准備
我以windows7為列子
- windows 7
- jdk1.6+ 並在windows下配置好環境變量等,具體可以看 http://jingyan.baidu.com/article/ff41162596a77912e4823716.html
- 下載最新版activemq http://activemq.apache.org/activemq-5121-release.html
安裝
解壓下載好的安裝包,至本地任何磁盤
啟動activemq服務
啟動成功后的界面是
如果出現major.minor version51.0 之類的錯誤,都是java版本安裝錯誤的問題,安裝1.6+以上的版本就能解決
啟動成功后,http://localhost:8161/admin,默認用戶名和密碼admin/admin。如果你想修改用戶名和密碼的話,在conf/jetty-realm.properties中修改即可。
如果需要修改端口可以在jetty文件中修改
管理員界面如下
ActiviteMQ接收和發送消息基本流程
摘自 http://www.cnblogs.com/hoojo/p/active_mq_jms_apache_activeMQ.html
發送消息的基本步驟:
(1)、創建連接使用的工廠類JMS ConnectionFactory
(2)、使用管理對象JMS ConnectionFactory建立連接Connection,並啟動
(3)、使用連接Connection 建立會話Session
(4)、使用會話Session和管理對象Destination創建消息生產者MessageSender
(5)、使用消息生產者MessageSender發送消息
消息接收者從JMS接受消息的步驟
(1)、創建連接使用的工廠類JMS ConnectionFactory
(2)、使用管理對象JMS ConnectionFactory建立連接Connection,並啟動
(3)、使用連接Connection 建立會話Session
(4)、使用會話Session和管理對象Destination創建消息接收者MessageReceiver
(5)、使用消息接收者MessageReceiver接受消息,需要用setMessageListener將MessageListener接口綁定到MessageReceiver消息接收者必須實現了MessageListener接口,需要定義onMessage事件方法。
C#消息隊列寫入
ActiveMQ官方網站下載最新版的Apache.NMS,網址:http://activemq.apache.org/nms/download.html,需要下載Apache.NMS和Apache.NMS.ActiveMQ兩個bin包
http://www.apache.org/dyn/closer.lua/activemq/apache-nms/1.7.0/Apache.NMS.ActiveMQ-1.7.1-bin.zip
創建C#項目,將這兩個DLL添加到項目中。
寫入地址如下
寫入賬號,密碼如下
寫入代碼如下:
namespace ConsoleApplication21 { class Program { static void Main(string[] args) { String QueuesNameESF = "queue://test.log"; Uri _uri = new Uri(String.Concat("activemq:tcp://10.58.8.239:61616")); IConnectionFactory factory = new ConnectionFactory(_uri); using (IConnection conn = factory.CreateConnection("admin", "manager")) { using (ISession session = conn.CreateSession()) { IDestination destination = SessionUtil.GetDestination(session, QueuesNameESF); using (IMessageProducer producer = session.CreateProducer(destination)) { conn.Start(); //可以寫入字符串,也可以是一個xml字符串等 ITextMessage request = session.CreateTextMessage("messsage"); producer.Send(request); } } } } }
寫入后,可以查看
讀取代碼如下:
using System; using Apache.NMS; using Apache.NMS.ActiveMQ; using Apache.NMS.Util; namespace ConsoleApplication21 { class Program { static void Main(string[] args) { String QueuesNameESF = "queue://test.log"; Uri _uri = new Uri(String.Concat("activemq:tcp://10.58.8.239:61616?wireFormat.maxInactivityDuration=0")); IConnectionFactory factory = new ConnectionFactory(_uri); using (IConnection conn = factory.CreateConnection("admin", "manager")) { using (ISession session = conn.CreateSession()) { conn.Start(); IDestination destination = SessionUtil.GetDestination(session, QueuesNameESF); using (IMessageConsumer consumer = session.CreateConsumer(destination)) { ITextMessage message = consumer.Receive() as ITextMessage; Console.Write(message.Text); } } } Console.ReadLine(); } } }
作者:釋迦苦僧 出處:http://www.cnblogs.com/woxpp/p/5001373.html 本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接。