ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現
一.特性列表
⒈ 多種語言和協議編寫客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
⒉ 完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)
⒊ 對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去,而且也支持Spring2.0的特性
⒋ 通過了常見J2EE服務器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業服務器上
⒌ 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍ 支持通過JDBC和journal提供高速的消息持久化
⒎ 從設計上保證了高性能的集群,客戶端-服務器,點對點
⒏ 支持Ajax
⒐ 支持與Axis的整合
⒑ 可以很容易得調用內嵌JMS provider,進行測試
二、Jms規范里的兩種message傳輸方式Topic和Queue,兩者的對比如下:
Topic
Queue
概要
Publish Subscribe messaging 發布訂閱消息
Point-to-Point 點對點
有無狀態
topic數據默認不落地,是無狀態的。
Queue數據默認會在mq服務器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存儲。
完整性保障
並不保證publisher發布的每條數據,Subscriber都能接受到。
Queue保證每條數據都能被receiver接收。
消息是否會丟失
一般來說publisher發布消息到某一個topic時,只有正在監聽該topic地址的sub能夠接收到消息;如果沒有sub在監聽,該topic就丟失了。
Sender發送消息到目標Queue,receiver可以異步接收這個Queue上的消息。Queue上的消息如果暫時沒有receiver來取,也不會丟失。
消息發布接收策略
一對多的消息發布接收策略,監聽同一個topic地址的多個sub都能收到publisher發送的消息。Sub接收完通知mq服務器
一對一的消息發布接收策略,一個sender發送的消息,只能有一個receiver接收。receiver接收完后,通知mq服務器已接收,mq服務器對queue里的消息采取刪除或其他操作。
三、安裝
1.首先去http://activemq.apache.org/download.html 下載最新版本
2.解壓后,目錄如下:
+bin (windows下面的bat和unix/linux下面的sh)
+conf (activeMQ配置目錄,包含最基本的activeMQ配置文件)
+data (默認是空的)
+docs (index,replease版本里面沒有文檔,-.-b不知道為啥不帶)
+example (幾個例子
+lib (activemMQ使用到的lib)
-apache-activemq-4.1-incubator.jar (ActiveMQ的binary)
-LICENSE.txt
-NOTICE.txt
-README.txt
-user-guide.html
3.使用bin\activemq.bat啟動服務
四、創建.Net Producer
1: class Program
2: {
3: static IConnectionFactory _factory = null;
4: static IConnection _connection = null;
5: static ITextMessage _message = null;
6: static ITextMessage _message2 = null;
7: static void Main(string[] args)
8: {
9: //創建工廠
10: _factory = new ConnectionFactory("tcp://localhost:61616/");
11: try
12: {
13: //創建連接
14: using (_connection = _factory.CreateConnection())
15: {
16: //創建會話
17: using (ISession session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
18: {
19: //創建一個主題
20: IDestination destination = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");
21:
22: //創建生產者
23: IMessageProducer producer = session.CreateProducer(destination);
24:
25: int counter = 0;
26: Console.WriteLine("請輸入你要發送數據,然后回車!");
27:
28: while (true)
29: {
30: string msg = Console.ReadLine();
31: if (msg != string.Empty)
32: {
33: Console.BackgroundColor = ConsoleColor.Blue;
34: Console.Beep();
35: counter++;
36:
37: //創建一個文本消息
38: _message = producer.CreateTextMessage("This is .Net AcitveMQ Message....");
39: //_message2 = producer.CreateTextMessage("<Root><First>Test</First><Root>");
40:
41: Console.WriteLine("正在發送第{0}組發送數據...........", counter);
42: //發送消息
43: producer.Send(_message, MsgDeliveryMode.Persistent, MsgPriority.Normal,
44: TimeSpan.MaxValue);
45:
46: Console.WriteLine("'"+_message.Text+"'已經發送!");
47: //producer.Send(_message2, MsgDeliveryMode.Persistent, MsgPriority.Normal,
48: // TimeSpan.MinValue);
49: //Console.WriteLine("'" + _message2.Text + "'已經發送!");
50: producer.Send(msg, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.MinValue);
51: Console.WriteLine("你輸入的數據'" + msg + "'已經發送!");
52: Console.WriteLine("**************************************************");
53: Console.BackgroundColor = ConsoleColor.Black;
54: Console.WriteLine("請輸入你要發送數據,然后回車!");
55: }
56: }
57: }
58: }
59: }
60: catch (Exception ex)
61: {
62: Console.WriteLine(ex.ToString());
63: }
64: Console.ReadLine();
65: }
66: }
五、創建Java Consumer
1: import javax.jms.Connection;
2: import javax.jms.ConnectionFactory;
3: import javax.jms.Session;
4: import javax.jms.TextMessage;
5: import javax.jms.Topic;
6: import javax.jms.TopicSubscriber;
7:
8: import org.apache.activemq.ActiveMQConnectionFactory;
9:
10: public class ActiveMQConsumer {
11: public static void main(String[] args) {
12: // ConnectionFactory :連接工廠,JMS 用它創建連接
13: ConnectionFactory connectionFactory;
14: // Connection :JMS 客戶端到JMS Provider 的連接
15: Connection connection = null;
16: // Session: 一個發送或接收消息的線程
17: Session session;
18: Topic topic;
19: // 消費者,消息接收者
20: TopicSubscriber subscriber;
21: connectionFactory = new ActiveMQConnectionFactory(
22: //ActiveMQConnection.DEFAULT_USER,
23: //ActiveMQConnection.DEFAULT_PASSWORD,
24: "tcp://localhost:61616");
25: try {
26: // 構造從工廠得到連接對象
27: connection = connectionFactory.createConnection();
28: connection.setClientID("Customer");
29: // 啟動
30: connection.start();
31: // 獲取操作連接
32: session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
33: topic = session.createTopic("topic");
34: subscriber= session.createDurableSubscriber(topic, "Customer", null, false);
35:
36: while (true) {
37: //TextMessage message = (TextMessage) subscriber.receive(10000);
38: //TextMessage message = (TextMessage) subscriber.receiveNoWait();
39: TextMessage message= (TextMessage) subscriber.receive();
40: if (null != message) {
41: System.out.println("收到消息" + message.getText());
42:
43: } else {
44: break;
45: }
46: }
47: } catch (Exception e) {
48: e.printStackTrace();
49: } finally {
50: try {
51: if (null != connection)
52: connection.close();
53: } catch (Throwable ignore) {
54: }
55: }
56: }
57: }
六、參考資料
http://heisetoufa.iteye.com/blog/1908335
http://www.cnblogs.com/sunwei2012/archive/2010/03/22/1691919.html
http://www.cnblogs.com/tenghoo/archive/2009/11/23/activeMQ_client.html
http://www.cnblogs.com/CopyPaster/archive/2012/04/27/2473179.html