MSMQ實現自定義序列化存儲


在使用MSMQ的時候一般只會使用默認的XML序列化來對消息進行存儲,但XML存儲的缺點是序列化體積相對比較大和效率上有點低.其實.net提供非常簡單的方式讓我們實現不同序列化方式來存儲MSMQ信息,如json,protobuf等.為了能夠讓開發人員實現自定義序列化的消息存儲,.NET提供了IMessageFormatter這樣一個接口,只需要簡單地實現這個接口就可以對MSMQ的消息進行處理.以下講解如何實現json和protobuf的messageformater.

IMessageFormatter

// 摘要:
    //     從“消息隊列”消息體序列化或反序列化對象。
    [TypeConverter(typeof(MessageFormatterConverter))]
    public interface IMessageFormatter : ICloneable
    {
        // 摘要:
        //     在類中實現時,確定格式化程序是否可以反序列化消息的內容。
        //
        // 參數:
        //   message:
        //     要檢查的 System.Messaging.Message。
        //
        // 返回結果:
        //     如果格式化程序可以反序列化消息,則為 true;否則為 false。
        bool CanRead(Message message);
        //
        // 摘要:
        //     在類中實現時,讀取給定消息中的內容並創建包含該消息中的數據的對象。
        //
        // 參數:
        //   message:
        //     The System.Messaging.Message to deserialize.
        //
        // 返回結果:
        //     反序列化的消息。
        object Read(Message message);
        //
        // 摘要:
        //     在類中實現時,將對象序列化到消息體中。
        //
        // 參數:
        //   message:
        //     System.Messaging.Message,它將包含序列化的對象。
        //
        //   obj:
        //     要序列化到消息中的對象。
        void Write(Message message, object obj);
    }

接口非常簡單,主要規范了MSMQ寫入和讀取的規則.

Json Formater

public class JsonFormater<T> : IMessageFormatter
    {

        public bool CanRead(Message message)
        {
            return message.BodyStream != null && message.BodyStream.Length > 0;
        }

        [ThreadStatic]
        private static byte[] mBuffer;

        public object Read(Message message)
        {
            if(mBuffer== null)
                mBuffer = new byte[4096];
            int count =(int)message.BodyStream.Length;
            message.BodyStream.Read(mBuffer, 0, count);
            return Newtonsoft.Json.JsonConvert.DeserializeObject(Encoding.UTF8.GetString(mBuffer, 0, count), typeof(T));

        }

        [System.ThreadStatic]
        private static System.IO.MemoryStream mStream;

        public void Write(Message message, object obj)
        {
            if (mStream == null)
                mStream = new System.IO.MemoryStream(4096);
            mStream.Position = 0;
            mStream.SetLength(4095);
            string value = Newtonsoft.Json.JsonConvert.SerializeObject(obj);
            int count = Encoding.UTF8.GetBytes(value, 0, value.Length, mStream.GetBuffer(), 0);
            mStream.SetLength(count);
            message.BodyStream = mStream;
        }

        public object Clone()
        {
            return this;
        }
    }

Protobuf Formater

public class ProtobufFormater<T> : IMessageFormatter
    {
        public bool CanRead(Message message)
        {
            return message.BodyStream != null && message.BodyStream.Length > 0;
        }

        public object Read(Message message)
        {
            return ProtoBuf.Meta.RuntimeTypeModel.Default.Deserialize(message.BodyStream, null, typeof(T));
        }

        [System.ThreadStatic]
        private static System.IO.MemoryStream mStream ;

        public void Write(Message message, object obj)
        {
            if (mStream == null)
                mStream = new System.IO.MemoryStream(4096);
            mStream.Position = 0;
            mStream.SetLength(0);
            ProtoBuf.Meta.RuntimeTypeModel.Default.Serialize(mStream, obj);
            message.BodyStream = mStream;
        }

        public object Clone()
        {
            return this;
        }
       
    }

使用Formater

使有自定義Formater比較簡單,只需要指定MessageQueue的Formatter屬性即可.

MessageQueue queue = new MessageQueue(@".\private$\Test");
            queue.Formatter = new JsonFormater<User>();

簡單的測性能測試

針對json,protobuf這兩種自定義序列化和默認的XML序列化性能上有多大差異,各自進行100000條寫入和讀取的耗時情況.

System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
            sw.Reset();
            sw.Start();
            for (int i = 0; i < 100000; i++)
            {
                queue.Send(user);
            }
            sw.Stop();
            Console.WriteLine("MSMQ send xml formater:" + sw.Elapsed.TotalMilliseconds + "ms");
            sw.Reset();
            sw.Start();
            for (int i = 0; i < 100000; i++)
            {
                User result = (User)queue.Receive().Body;
            }
            sw.Stop();
            Console.WriteLine("MSMQ receive xml formater:" + sw.Elapsed.TotalMilliseconds + "ms");

            queue.Formatter = new JsonFormater<User>();
            sw.Reset();
            sw.Start();
            for (int i = 0; i < 100000; i++)
            {
                queue.Send(user);
            }
            sw.Stop();
            Console.WriteLine("MSMQ send Json formater:" + sw.Elapsed.TotalMilliseconds + "ms");
            sw.Reset();
            sw.Start();
            for (int i = 0; i < 100000; i++)
            {
                User result = (User)queue.Receive().Body;
            }
            sw.Stop();
            Console.WriteLine("MSMQ receive json formater:" + sw.Elapsed.TotalMilliseconds + "ms");

            queue.Formatter = new ProtobufFormater<User>();
            sw.Reset();
            sw.Start();
            for (int i = 0; i < 100000; i++)
            {
                queue.Send(user);
            }
            sw.Stop();
            Console.WriteLine("MSMQ send Protobuf formater:" + sw.Elapsed.TotalMilliseconds+"ms");
            sw.Reset();
            sw.Start();
            for (int i = 0; i < 100000; i++)
            {
                User result = (User)queue.Receive().Body;
            }
            sw.Stop();
            Console.WriteLine("MSMQ receive Protobuf formater:" + sw.Elapsed.TotalMilliseconds + "ms");

測試結果

從測試來看還是protobuf效率上占優點:)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM