.NET ActiveMQ類庫


ActiveMQ .NET類庫

ActiveMQ是一種開源的,實現了JMS規范的,面向消息(MOM)的中間件,為應用程序提供高效的、可擴展的、穩定的和安全的企業級消息通信。

0. 准備

使用Nuget管理控制台安裝最新版Apache.NMS.ActiveMQ

    PM> Install-Package Apache.NMS.ActiveMQ

1. IMessageQueue 隊列接口

    /// <summary>
    /// 消息隊列接口
    /// </summary>
    public interface IMessageQueue
    {

        /// <summary>
        /// 打開連接
        /// </summary>
        void Open();

        /// <summary>
        /// 關閉連接
        /// </summary>
        void Close();
    }

2. ActiveMQ 抽象基類

/// <summary>
/// ActiveMQ
/// </summary>
public abstract class ActiveMQ
{
    #region 監聽連接對象
    protected IConnection _connection;
    protected ISession _session;
    protected IMessageConsumer _consumer;
    #endregion

    /// <summary>
    /// 連接地址
    /// </summary>
    public string BrokerUri { get; set; }

    /// <summary>
    /// 用於登錄的用戶名,必須和密碼同時指定
    /// </summary>
    public string UserName { get; set; }

    /// <summary>
    /// 用於登錄的密碼,必須和用戶名同時指定
    /// </summary>
    public string Password { get; set; }

    /// <summary>
    /// 隊列名稱
    /// </summary>
    public string QueueName { get; set; }

    /// <summary>
    /// 指定使用隊列的模式
    /// </summary>
    public MQMode MQMode { get; set; }
}

隊列模式:

/// <summary>
/// 隊列模式
/// </summary>
public enum MQMode
{
    /// <summary>
    /// 隊列,點對點模式。
    /// 使用此模式。一個生產者向隊列存入一條消息之后,只有一個消費者能觸發消息接收事件。
    /// </summary>
    Queue,

    /// <summary>
    /// 主題,發布者/訂閱模式。
    /// 使用此模式,一個生產者向隊列存入一條消息之后,所有訂閱當前的主題的消費者都能觸發消息接收事件。
    /// 使用此模式,必須先創建消費者,再創建生產者。
    /// </summary>
    Topic
}

3. ActiveMQProducer 生產者

/// <summary>
/// ActiveMQ生產者,打開連接,向指定隊列中發送數據
/// </summary>
public class ActiveMQProducer : ActiveMQ, IMessageQueue, IDisposable
{
    /// <summary>
    /// 隊列緩存字典
    /// </summary>
    private ConcurrentDictionary<string, IMessageProducer> _concrtProcuder = new ConcurrentDictionary<string, IMessageProducer>();

    /// <summary>
    /// 打開連接
    /// </summary>
    public void Open()
    {
        if (string.IsNullOrWhiteSpace(this.BrokerUri))
            throw new MemberAccessException("未指定BrokerUri");
        if (string.IsNullOrWhiteSpace(this.QueueName))
            throw new MemberAccessException("未指定QueueName");

        var factory = new ConnectionFactory(this.BrokerUri);
        if (string.IsNullOrWhiteSpace(this.UserName) && string.IsNullOrWhiteSpace(this.Password))
            _connection = factory.CreateConnection();
        else
            _connection = factory.CreateConnection(this.UserName, this.Password);
        _connection.Start();
        _session = _connection.CreateSession();

        CreateProducer(this.QueueName);
    }


    /// <summary>
    /// 關閉連接
    /// </summary>
    public void Close()
    {
        IMessageProducer _p = null;
        foreach (var p in this._concrtProcuder)
        {
            if (this._concrtProcuder.TryGetValue(p.Key, out _p))
            {
                _p?.Close();
            }
        }
        this._concrtProcuder.Clear();

        _session?.Close();
        _connection?.Close();
    }

    /// <summary>
    /// 向隊列發送數據
    /// </summary>
    /// <typeparam name="T">數據類型</typeparam>
    /// <param name="body">數據</param>
    public void Put<T>(T body)
    {
        Send(this.QueueName, body);
    }

    /// <summary>
    /// 向指定隊列發送數據
    /// </summary>
    /// <typeparam name="T">數據類型</typeparam>
    /// <param name="body">數據</param>
    /// <param name="queueName">指定隊列名</param>
    public void Put<T>(T body, string queueName)
    {
        Send(queueName, body);
    }

    /// <summary>
    /// 創建隊列
    /// </summary>
    /// <param name="queueName"></param>
    private IMessageProducer CreateProducer(string queueName)
    {
        if (_session == null)
        {
            Open();
        }

        //創建新生產者
        Func<string, IMessageProducer> CreateNewProducter = (name) =>
        {
            IMessageProducer _newProducer = null;
            switch (MQMode)
            {
                case MQMode.Queue:
                    {
                        _newProducer = _session.CreateProducer(new ActiveMQQueue(name));
                        break;
                    }
                case MQMode.Topic:
                    {
                        _newProducer = _session.CreateProducer(new ActiveMQTopic(name));
                        break;
                    }
                default:
                    {
                        throw new Exception(string.Format("無法識別的MQMode類型:{0}", MQMode.ToString()));
                    }
            }
            return _newProducer;
        };

        return this._concrtProcuder.GetOrAdd(queueName, CreateNewProducter);
    }

    /// <summary>
    /// 發送數據
    /// </summary>
    /// <param name="queueName">隊列名稱</param>
    /// <typeparam name="T"></typeparam>
    /// <param name="body">數據</param>
    private void Send<T>(string queueName, T body)
    {
        var producer = CreateProducer(queueName);
        IMessage msg;
        if (body is byte[])
        {
            msg = producer.CreateBytesMessage(body as byte[]);
        }
        else if (body is string)
        {
            msg = producer.CreateTextMessage(body as string);
        }
        else
        {
            msg = producer.CreateObjectMessage(body);
        }
        if (msg != null)
        {
            producer.Send(msg, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.MinValue);
        }
    }

    /// <summary>
    /// 執行與釋放或重置非托管資源相關的應用程序定義的任務。
    /// </summary>
    public void Dispose()
    {
        this.Close();
    }
}

4. ActiveMQConsumer 消費者

/// <summary>
/// ActiveMQ消費者,打開連接,監聽隊列,接收到數據之后觸發回調
/// </summary>
public class ActiveMQConsumer : ActiveMQ, IMessageQueue, IDisposable
{
    /// <summary>
    /// 接收到數據回調,ActiveMQ原生IMessage類型
    /// </summary>
    public Action<IMessage> OnMessageReceived { get; set; }

    /// <summary>
    /// 接收到消息回調(業務數據對象, 根據自己的業務靈活替換)
    /// </summary>
    public Action<DataCenterMessage> OnDataCenterMessageReceived { get; set; }

    /// <summary>
    /// 打開連接
    /// </summary>
    public void Open()
    {
        if (string.IsNullOrWhiteSpace(this.BrokerUri))
            throw new MemberAccessException("未指定BrokerUri");
        if (string.IsNullOrWhiteSpace(this.QueueName))
            throw new MemberAccessException("未指定QueueName");

        var factory = new ConnectionFactory(this.BrokerUri);
        if (string.IsNullOrWhiteSpace(this.UserName) && string.IsNullOrWhiteSpace(this.Password))
            _connection = factory.CreateConnection();
        else
            _connection = factory.CreateConnection(this.UserName, this.Password);
        _connection.Start();
        _session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge);

        switch (MQMode)
        {
            case MQMode.Queue:
                {
                    _consumer = _session.CreateConsumer(new ActiveMQQueue(this.QueueName));
                    break;
                }
            case MQMode.Topic:
                {
                    _consumer = _session.CreateConsumer(new ActiveMQTopic(this.QueueName));
                    break;
                }
            default:
                {
                    throw new Exception(string.Format("無法識別的MQMode類型:{0}", MQMode.ToString()));
                }
        }
    }

    /// <summary>
    /// 關閉連接
    /// </summary>
    public void Close()
    {
        _consumer?.Close();
        _session?.Close();
        _connection?.Close();
    }

    /// <summary>
    /// 開始監聽
    /// </summary>
    public void StartListen()
    {
        if (_consumer == null)
        {
            Open();
        }

        _consumer.Listener += new MessageListener(msg =>
        {
            if (OnMessageReceived != null)
                OnMessageReceived(msg);

            //轉換為業務需要的數據對象
            if (OnDataCenterMessageReceived != null)
            {
                var objectMessage = msg as ActiveMQObjectMessage;
                if (objectMessage != null)
                {
                    var dataCenterMsg = objectMessage.Body as DataCenterMessage;
                    if (dataCenterMsg != null)
                    {
                        OnDataCenterMessageReceived(dataCenterMsg);
                    }
                }
            }
        });
    }

    /// <summary>
    /// 執行與釋放或重置非托管資源相關的應用程序定義的任務。
    /// </summary>
    public void Dispose()
    {
        this.Close();
    }
}

5. 擴展方法

/// <summary>
/// 擴展方法類
/// </summary>
public static class ExtendMethods
{
    /// <summary>
    /// 將對象轉換為bytes
    /// </summary>
    /// <param name="obj"></param>
    /// <returns>bytes</returns>
    public static byte[] ToBytes<T>(this T obj) where T : class
    {
        if (obj == null)
            return null;
        using (var ms = new MemoryStream())
        {
            var formatter = new BinaryFormatter();
            formatter.Serialize(ms, obj);
            return ms.GetBuffer();
        }
    }

    /// <summary>
    /// 將bytes轉換為對象
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="bytes"></param>
    /// <returns></returns>
    public static T ToObject<T>(this byte[] bytes) where T : class
    {
        if (bytes == null)
            return default(T);
        using (var ms = new MemoryStream(bytes))
        {
            var formatter = new BinaryFormatter();
            return formatter.Deserialize(ms) as T;
        }
    }
}

6. 使用示例:

        #region 生產者
        var producer = new ActiveMQProducer();
        producer.BrokerUri = @"tcp://127.0.0.1:61616/";
        producer.UserName = "admin";
        producer.Password = "admin";
        producer.QueueName = "TestQueueName";
        producer.MQMode = MQMode.Queue;

        producer.Open();
        var message = new DataCenterMessage()
        {
            //初始化業務數據對象...
        };

        //發送到隊列, Put對象類必須使用[Serializable]注解屬性
        producer.Put(message);
        #endregion

        #region 消費者
        var consumer = new ActiveMQConsumer();
        consumer.BrokerUri = @"tcp://127.0.0.1:61616/";
        consumer.UserName = "admin";
        consumer.Password = "admin";
        consumer.QueueName = "TestQueueName";
        consumer.MQMode = MQMode.Queue;

        consumer.OnMessageReceived = (msg) =>
        {
            var bytesMessage = msg as ActiveMQBytesMessage;
            if (bytesMessage != null)
            {
                var buffer = new byte[bytesMessage.BodyLength];
                bytesMessage.WriteBytes(buffer);
                var result = buffer.ToObject<DataCenterMessage>();
                Debug.WriteLine(result);
            }
        };

        consumer.OnDataCenterMessageReceived = (msg) =>
        {
            Debug.Write(msg);
        };

        consumer.Open();
        consumer.StartListen();


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM