1、准備工具
2、開始項目
VS2013新建一個C#控制台應用程序,項目中添加兩個dll引用,一個是D:\Apache.NMS.ActiveMQ-1.7.2-bin\lib\Apache.NMS\net-4.0目錄下的Apache.NMS.dll,另一個是D:\Apache.NMS.ActiveMQ-1.7.2-bin\build\net-4.0\debug目錄下的Apache.NMS.ActiveMQ.dll。
新建一個類,MyActiveMq.cs,用於對activemq消息隊列接口的封裝,實現如下:
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using Apache.NMS;
- using Apache.NMS.ActiveMQ;
- namespace NmsProducerClasses
- {
- public class MyActiveMq
- {
- private IConnectionFactory factory;
- private IConnection connection;
- private ISession session;
- private IMessageProducer prod;
- private IMessageConsumer consumer;
- private ITextMessage msg;
- private bool isTopic = false;
- private bool hasSelector = false;
- private const string ClientID = "clientid";
- private const string Selector = "filter='demo'";
- private bool sendSuccess = true;
- private bool receiveSuccess = true;
- public MyActiveMq(bool isLocalMachine, string remoteAddress)
- {
- try
- {
- //初始化工廠
- if (isLocalMachine)
- {
- factory = new ConnectionFactory("tcp://localhost:61616/");
- }
- else
- {
- factory = new ConnectionFactory("tcp://" + remoteAddress + ":61616/"); //寫tcp://192.168.1.111:61616的形式連接其他服務器上的ActiveMQ服務器
- }
- //通過工廠建立連接
- connection = factory.CreateConnection();
- connection.ClientId = ClientID;
- connection.Start();
- //通過連接創建Session會話
- session = connection.CreateSession();
- }
- catch (System.Exception e)
- {
- sendSuccess = false;
- receiveSuccess = false;
- Console.WriteLine("Exception:{0}", e.Message);
- Console.ReadLine();
- throw e;
- }
- Console.WriteLine("Begin connection...");
- }
- ~MyActiveMq()
- {
- //this.ShutDown();
- }
- /// <summary>
- /// 初始化
- /// </summary>
- /// <param name="topic">選擇是否是Topic</param>
- /// <param name="name">隊列名</param>
- /// <param name="selector">是否設置過濾</param>
- public bool InitQueueOrTopic(bool topic, string name, bool selector = false)
- {
- try
- {
- //通過會話創建生產者、消費者
- if (topic)
- {
- prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name));
- if (selector)
- {
- consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, Selector, false);
- hasSelector = true;
- }
- else
- {
- consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, null, false);
- hasSelector = false;
- }
- isTopic = true;
- }
- else
- {
- prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));
- if (selector)
- {
- consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name), Selector);
- hasSelector = true;
- }
- else
- {
- consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name));
- hasSelector = false;
- }
- isTopic = false;
- }
- //創建一個發送的消息對象
- msg = prod.CreateTextMessage();
- }
- catch (System.Exception e)
- {
- sendSuccess = false;
- receiveSuccess = false;
- Console.WriteLine("Exception:{0}", e.Message);
- Console.ReadLine();
- throw e;
- }
- return sendSuccess;
- }
- public bool SendMessage(string message, string msgId = "defult", MsgPriority priority = MsgPriority.Normal)
- {
- if (prod == null)
- {
- sendSuccess = false;
- Console.WriteLine("call InitQueueOrTopic() first!!");
- return false;
- }
- Console.WriteLine("Begin send messages...");
- //給這個對象賦實際的消息
- msg.NMSCorrelationID = msgId;
- msg.Properties["MyID"] = msgId;
- msg.NMSMessageId = msgId;
- msg.Text = message;
- Console.WriteLine(message);
- if (isTopic)
- {
- sendSuccess = ProducerSubcriber(message, priority);
- }
- else
- {
- sendSuccess = P2P(message, priority);
- }
- return sendSuccess;
- }
- public string GetMessage()
- {
- if (prod == null)
- {
- Console.WriteLine("call InitQueueOrTopic() first!!");
- return null;
- }
- Console.WriteLine("Begin receive messages...");
- ITextMessage revMessage = null;
- try
- {
- //同步阻塞10ms,沒消息就直接返回null,注意此處時間不能設太短,否則還沒取到消息就直接返回null了!!!
- revMessage = consumer.Receive(new TimeSpan(TimeSpan.TicksPerMillisecond *10)) as ITextMessage;
- }
- catch (System.Exception e)
- {
- receiveSuccess = false;
- Console.WriteLine("Exception:{0}", e.Message);
- Console.ReadLine();
- throw e;
- }
- if (revMessage == null)
- {
- Console.WriteLine("No message received!");
- return null;
- }
- else
- {
- Console.WriteLine("Received message with Correlation ID: " + revMessage.NMSCorrelationID);
- //Console.WriteLine("Received message with Properties'ID: " + revMessage.Properties["MyID"]);
- Console.WriteLine("Received message with text: " + revMessage.Text);
- }
- return revMessage.Text;
- }
- //P2P模式,一個生產者對應一個消費者
- private bool P2P(string message, MsgPriority priority)
- {
- try
- {
- if (hasSelector)
- {
- //設置消息對象的屬性,這個很重要,是Queue的過濾條件,也是P2P消息的唯一指定屬性
- msg.Properties.SetString("filter", "demo"); //P2P模式
- }
- prod.Priority = priority;
- //設置持久化
- prod.DeliveryMode = MsgDeliveryMode.Persistent;
- //生產者把消息發送出去,幾個枚舉參數MsgDeliveryMode是否持久化,MsgPriority消息優先級別,存活時間,當然還有其他重載
- prod.Send(msg, MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue);
- }
- catch (System.Exception e)
- {
- sendSuccess = false;
- Console.WriteLine("Exception:{0}", e.Message);
- Console.ReadLine();
- throw e;
- }
- return sendSuccess;
- }
- //發布訂閱模式,一個生產者多個消費者
- private bool ProducerSubcriber(string message, MsgPriority priority)
- {
- try
- {
- prod.Priority = priority;
- //設置持久化,如果DeliveryMode沒有設置或者設置為NON_PERSISTENT,那么重啟MQ之后消息就會丟失
- prod.DeliveryMode = MsgDeliveryMode.Persistent;
- prod.Send(msg, Apache.NMS.MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue);
- //System.Threading.Thread.Sleep(1000);
- }
- catch (System.Exception e)
- {
- sendSuccess = false;
- Console.WriteLine("Exception:{0}", e.Message);
- Console.ReadLine();
- throw e;
- }
- return sendSuccess;
- }
- public void ShutDown()
- {
- Console.WriteLine("Close connection and session...");
- session.Close();
- connection.Close();
- }
- }
- }
Program.cs代碼如下:
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- using System.IO;
- using System.Threading;
- namespace NmsProducerClasses
- {
- class Program
- {
- static void Main(string[] args)
- {
- MyActiveMq mymq = new MyActiveMq(isLocalMachine: true, remoteAddress: "");
- mymq.InitQueueOrTopic(topic: false, name: "myqueue", selector: false);
- //mymq.InitQueueOrTopic(topic: false, name: "seletorqueue", selector: true);
- //mymq.InitQueueOrTopic(topic: true, name: "noselectortopic", selector: false);
- //mymq.InitQueueOrTopic(topic: true, name: "selectortopic", selector: true);
- //The full range of priority values (0-9) are supported by the JDBC message store. For KahaDB three priority categories are supported, Low (< 4), Default (= 4) and High (> 4).
- User myuser0 = new User("0000", "Lowest", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser0), "newid", priority: Apache.NMS.MsgPriority.Lowest);
- User myuser1 = new User("1111", "AboveLow", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser1), "newid", priority: Apache.NMS.MsgPriority.AboveLow);
- User myuser2 = new User("2222", "AboveNormal", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser2), "newid", priority: Apache.NMS.MsgPriority.AboveNormal);
- User myuser3 = new User("0000", "BelowNormal", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser3), "newid", priority: Apache.NMS.MsgPriority.BelowNormal);
- User myuser4 = new User("1111", "High", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser4), "newid", priority: Apache.NMS.MsgPriority.High);
- User myuser5 = new User("2222", "Highest", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser5), "newid", priority: Apache.NMS.MsgPriority.Highest);
- User myuser6 = new User("0000", "Low", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser6), "newid", priority: Apache.NMS.MsgPriority.Low);
- User myuser7 = new User("1111", "Normal", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser7), "newid", priority: Apache.NMS.MsgPriority.Normal);
- User myuser8 = new User("2222", "VeryHigh", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser8), "newid", priority: Apache.NMS.MsgPriority.VeryHigh);
- User myuser9 = new User("2222", "VeryLow", "img/p.jpg");
- mymq.SendMessage(JsonUtil.ObjectToJson(myuser8), "newid", priority: Apache.NMS.MsgPriority.VeryLow);
- int num = 20;
- while (num-- > 0)
- {
- mymq.GetMessage();
- //Thread.Sleep(1000);
- }
- mymq.ShutDown();
- //XML測試
- //string xml = XmlTest.ObjToXml();
- //Console.WriteLine("ObjToXml: {0}", xml);
- //Json測試
- //User u = new User() { Id="88", Imgurl="img/88.jpg", Name="haha88"};
- //string jsonstr = JsonUtil.ObjectToJson(u);
- //Console.WriteLine(jsonstr);
- }
- }
3、測試
首先,需要啟動消息隊列,具體啟動及測試消息隊列步驟可見這邊:
點擊打開鏈接
然后,運行項目,運行結果如下:

4、優先級
priority並不能決定消息傳送的嚴格消息,具體原因可見
優先級設置:
在D:\apache-activemq-5.14.0\conf目錄的activemq.xml配置文件中,找到<destinationPolicy>標簽,在其中的<policyEntries>標簽下添加
- <policyEntry queue=">" producerFlowControl="false" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1" />
- <policyEntry queue=">" strictOrderDispatch="false" />
- <policyEntry queue=">" >
- <pendingMessageLimitStrategy>
- <constantPendingMessageLimitStrategy limit="0"/>
- </pendingMessageLimitStrategy>
- <messageEvictionStrategy>
- <oldestMessageWithLowestPriorityEvictionStrategy/>
- </messageEvictionStrategy>
- </policyEntry>