ActiveMQ初體驗


首先介紹下MQ,MQ英文名MessageQueue,中文名也就是大家用的消息隊列,干嘛用的呢,說白了就是一個消息的接受和轉發的容器,可用於消息推送。

下面介紹主題,就是今天為大家介紹的ActiveMQ:

他是Apache出品的一個開源的消息隊列軟件,運行在JVM下,支持多種語言,如JAVA,C++,C#。

現在先為大家介紹下如何配置ActiveMQ的服務器端:

1、當然是下載軟件了

去官方網站下載:http://activemq.apache.org/ 我下載的是apache-activemq-5.8.0-bin 5.8版本,當然開源的也是支持下載source的,需要自己編譯下,這里不做過多介紹

2、解壓后,進入\apache-activemq-5.8.0\bin\win32啟動activemq.bat。系統會自動執行啟動過程,當然一般安裝失敗的情況是沒有裝JVM環境,啟動成功應該是這樣

3、打開瀏覽器輸入http://localhost:8161/admin/默認配置是這個,當然你也可以更改這個配置

 

 

4、至此,服務端啟動完畢

—————————————————————————————————完美分割———————————————————————————————————

現在就是今天的主題了,怎樣在C#中使用ActiveMQ提供的API實現消息的訂閱和發布

1、首先需要下載ActiveMQ提供的API函數,這個也從官網下Apache.NMS-1.6.0-bin和Apache.NMS.ActiveMQ-1.6.0-bin 這兩個dll都在下載后文件夾的bin目錄下;

2、將這兩個文件在項目中引用;

3、在ActiveMQ中,有兩個概念,一個是生產者(Producer),另一個是消費者(Consumer),生產者就是我們常說的發布者,而消費者,就是訂閱者,這樣解釋可能更好理解一下吧,如果知道發布訂閱模式的話,不知道的話,字面意思也很好理解,發布者就是發布消息的,而訂閱者通過訂閱事件,將消息接收到;

4、直接上代碼了,Winform下的代碼,如果不想處理界面線程回調問題,可以使用Console程序

5、Produce

 1 using System;
 2 using System.Collections.Generic;
 3 using System.ComponentModel;
 4 using System.Data;
 5 using System.Drawing;
 6 using System.Linq;
 7 using System.Text;
 8 using System.Windows.Forms;
 9 using Apache.NMS;
10 using Apache.NMS.ActiveMQ;
11 
12 namespace MqProducer
13 {
14     public partial class ProducerDemo : Form
15     {
16         //聲明連接對象工廠
17         private IConnectionFactory factory;
18 
19         public ProducerDemo()
20         {
21             InitializeComponent();
22             InitProducer();
23         }
24 
25         public void InitProducer()
26         {
27             try
28             {
29                 //初始化工廠,這里默認的URL是不需要修改的
30                 factory = new ConnectionFactory("tcp://localhost:61616");
31 
32             }
33             catch
34             {
35                 lbMessage.Text = "初始化失敗!!";
36             }
37         }
38 
39         private void btnConfirm_Click(object sender, EventArgs e)
40         {
41             //通過工廠建立連接
42             using (IConnection connection = factory.CreateConnection())
43             {
44                 //通過連接創建Session會話
45                 using (ISession session = connection.CreateSession())
46                 {
47                     //通過會話創建生產者,方法里面new出來的是MQ中的Queue
48                     IMessageProducer prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("firstQueue"));
49                     //創建一個發送的消息對象
50                     ITextMessage message = prod.CreateTextMessage();
51                     //給這個對象賦實際的消息
52                     message.Text = txtMessage.Text;
53                     //設置消息對象的屬性,這個很重要哦,是Queue的過濾條件,也是P2P消息的唯一指定屬性
54                     message.Properties.SetString("filter","demo");
55                     //生產者把消息發送出去,幾個枚舉參數MsgDeliveryMode是否長鏈,MsgPriority消息優先級別,發送最小單位,當然還有其他重載
56                     prod.Send(message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
57                     lbMessage.Text = "發送成功!!";
58                     txtMessage.Text = "";
59                     txtMessage.Focus();
60                 }
61             }
62 
63         }
64     }
65 }

6、consumer

 1 using System;
 2 using System.Collections.Generic;
 3 using System.ComponentModel;
 4 using System.Data;
 5 using System.Drawing;
 6 using System.Linq;
 7 using System.Text;
 8 using System.Windows.Forms;
 9 using Apache.NMS;
10 using Apache.NMS.ActiveMQ;
11 
12 namespace MqConsumer
13 {
14     public partial class ConsumerDemo : Form
15     {
16         public ConsumerDemo()
17         {
18             InitializeComponent();
19             InitConsumer();
20         }
21 
22         public void InitConsumer()
23         {
24             //創建連接工廠
25             IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616");
26             //通過工廠構建連接
27             IConnection connection = factory.CreateConnection();
28             //這個是連接的客戶端名稱標識
29             connection.ClientId = "firstQueueListener";
30             //啟動連接,監聽的話要主動啟動連接
31             connection.Start();
32             //通過連接創建一個會話
33             ISession session = connection.CreateSession();
34             //通過會話創建一個消費者,這里就是Queue這種會話類型的監聽參數設置
35             IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("firstQueue"), "filter='demo'");
36             //注冊監聽事件
37             consumer.Listener += new MessageListener(consumer_Listener);
38             //connection.Stop();
39             //connection.Close();  
40 
41         }
42 
43         void consumer_Listener(IMessage message)
44         {
45             ITextMessage msg = (ITextMessage)message;
46             //異步調用下,否則無法回歸主線程
47             tbReceiveMessage.Invoke(new DelegateRevMessage(RevMessage),msg);
48 
49         }
50 
51         public delegate void DelegateRevMessage(ITextMessage message);
52 
53         public void RevMessage(ITextMessage message)
54         {
55             tbReceiveMessage.Text += string.Format(@"接收到:{0}{1}", message.Text, Environment.NewLine);
56         }
57     }
58 }

7、啟動界面這就完事了

  

今天咱說的就是一個皮毛,我也是下午接到任務要做MQ方面的開發,才臨時抱的佛教,希望拋磚引玉,大家繼續鑽研,當然沒事可以把API的代碼下來自己看看以上是怎么實現的,開源的好處就不多說了,自己做功課去了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM