Beetle在Tcp通訊中使用Protobuf


  Protobuf是google制定的一種對象序列化格式,而在.net下的實現有protobuf-net.而protobuf-net在序列化方面有着出色的性能,效率是.net二進制序列化幾倍,而序列化后所占的空間也少於.net二進制序列化;除了以上兩個優勢外Protobuf有着一個更大的優勢就是和其他平台交互的兼容性,在現有大部分流行的語言平台中基本都有Protobuf的實現.因此采用protobuf進行對象序列化是個不錯的選擇.接下來詳細講解Beetle實現對protobuf-net支持.

定義協議格式

為了保證TCP數據流正確處理,首先要做的事情還是要制定一個處理協議,來保證數據處理的有效性.

數據包同樣也是兩部分組件,頭描述消息總長度,消息體是主內容.由於Protobuf-net序列化的數據交不包括消息類型,所以在協議中必須包括類型名稱用於提供給對方實始化對應的類型過行反序列化操作.

實現具體的分析器和消息適配器

協議制定后就可以進行分析器的實現,由於采用頭4字節描述大小,所以分析器從HeadSizeOfPackage基礎類派生下載重寫相關方法即可;完整實現代碼如下:

using System;
using System.Collections.Generic;
using System.Text;
using Beetle;
namespace Beetle.Packages
{
    public class ProtobufPackage : Beetle.HeadSizeOfPackage
    {

        public ProtobufPackage()
        {
        }

        public ProtobufPackage(Beetle.TcpChannel channel)
            : base(channel)
        {

        }

        protected override void WriteMessageType(IMessage msg, BufferWriter writer)
        {

        }

        public override object WriteCast(object message)
        {
            MessageAdapter ma = new MessageAdapter();
            ma.Message = message;
            return ma;
        }

        public override object ReadCast(object message)
        {
            return ((MessageAdapter)message).Message;
        }

        protected override IMessage ReadMessageByType(BufferReader reader, out object typeTag)
        {
            typeTag = "ProtobufAdapter";
            return new MessageAdapter();
        }

        static Dictionary<string, Type> mTypes = new Dictionary<string, Type>(256);

        static Dictionary<Type, string> mNames = new Dictionary<Type, string>(256);

        public static void LoadAssembly()
        {
            try
            {
                string path = AppDomain.CurrentDomain.DynamicDirectory;
                LoadAssembly(path);
                path = AppDomain.CurrentDomain.BaseDirectory;
                LoadAssembly(path);
            }
            catch
            {
            }
        }

        public static void LoadAssembly(string path)
        {
            if (!string.IsNullOrEmpty(path))
            {
                foreach (string item in System.IO.Directory.GetFiles(path, "*.dll"))
                {
                    try
                    {
                        LoadAssembly(System.Reflection.Assembly.LoadFile(item));
                    }
                    catch
                    {
                    }
                }
            }
        }

        public static void LoadAssembly(System.Reflection.Assembly assembly)
        {
            foreach (Type t in assembly.GetTypes())
            {
                ProtoBuf.ProtoContractAttribute[] pc = Smark.Core.Functions.GetTypeAttributes<ProtoBuf.ProtoContractAttribute>(t, false);
                if (pc.Length > 0)
                {
                    string name = t.Name;
                    if (!string.IsNullOrEmpty(pc[0].Name))
                        name = pc[0].Name;
                    mTypes.Add(name, t);
                    mNames.Add(t, name);
                }
            }
        }

        class MessageAdapter : Beetle.IMessage
        {
            public object Message
            {
                get;
                set;
            }

            public void Load(Beetle.BufferReader reader)
            {
                string type = reader.ReadString();
                Beetle.ByteArraySegment segment = ByteArraySegment;
                reader.ReadByteArray(segment);
                using (System.IO.Stream stream = new System.IO.MemoryStream(segment.Array, 0, segment.Count))
                {
                    Message = ProtoBuf.Meta.RuntimeTypeModel.Default.Deserialize(stream, null, mTypes[type]);
                }

            }

            public void Save(Beetle.BufferWriter writer)
            {
                writer.Write(mNames[Message.GetType()]);
                Beetle.ByteArraySegment segment = ByteArraySegment;
                using (System.IO.Stream stream = new System.IO.MemoryStream(segment.Array))
                {
                    ProtoBuf.Meta.RuntimeTypeModel.Default.Serialize(stream, Message);
                    segment.SetInfo(0, (int)stream.Position);

                }
                writer.Write(segment);


            }

            [ThreadStatic]
            private static ByteArraySegment mByteArraySegment = null;

            public ByteArraySegment ByteArraySegment
            {
                get
                {
                    if (mByteArraySegment == null)
                        mByteArraySegment = new ByteArraySegment(TcpUtils.DataPacketMaxLength);
                    return mByteArraySegment;
                }
            }

        }
    }
}

分析主要重寫了ReadCast,WriteCast,而兩個Cast方法主要是把消息進行一個適配器包裝到一個IMessage對象中提供給組件處理.通過適配器MessageAdapter來實現終於對象的序列化和反序列化操作,並整合的流中.為了方便處理消息對應稱名稱還添加了分析程序類型來加載對應的類型和名稱關系映射.接下來訂制一個簡單的注冊對象

    [ProtoContract]
    public class Register
    {
        [ProtoMember(1)]
        public string UserName { get; set; }
         [ProtoMember(2)]
        public string EMail { get; set; }
         [ProtoMember(3)]
        public DateTime ResponseTime { get; set; }
    }

實現相應的TCP服務

協議分析器擴展完成后就通過它來實現一個基於protobuf對象處理的TCP交互服務.

    class Program : Beetle.ServerBase<Beetle.Packages.ProtobufPackage>
    {
        static void Main(string[] args)
        {
            Beetle.Packages.ProtobufPackage.LoadAssembly(typeof(Program).Assembly);
            Beetle.TcpUtils.Setup("beetle");
            Program server = new Program();
            server.Open(9034);
            Console.WriteLine("server start @9034");
            Console.Read();
        }

        protected override void OnConnected(object sender, Beetle.ChannelEventArgs e)
        {
            base.OnConnected(sender, e);
            Console.WriteLine("{0} connected", e.Channel.EndPoint);
        }
        protected override void OnDisposed(object sender, Beetle.ChannelDisposedEventArgs e)
        {
            base.OnDisposed(sender, e);
            Console.WriteLine("{0} disposed", e.Channel.EndPoint);
        }
        protected override void OnError(object sender, Beetle.ChannelErrorEventArgs e)
        {
            base.OnError(sender, e);
            Console.WriteLine("{0} error {1}", e.Channel.EndPoint, e.Exception.Message);
        }
        protected override void OnMessageReceive(Beetle.PacketRecieveMessagerArgs e)
        {
            Messages.Register register = (Messages.Register)e.Message;
            register.ResponseTime = DateTime.Now;
            e.Channel.Send(register);
        }
    }

TCP服務的實現和原來的實現方式一致,只是繼承的ServerBase的泛型參是基於protobuf的協議分析器類型.

連接到服務進行對象通訊

同樣接入服務端的代碼只是改變一下泛型參類型即可

        private void cmdConnect_Click(object sender, EventArgs e)
        {
            try
            {
                channel = Beetle.TcpServer.CreateClient<Beetle.Packages.ProtobufPackage>(txtIPAddress.Text, 9034, OnReceive);
                channel.ChannelDisposed += OnDisposed;
                channel.ChannelError += OnError;
                channel.BeginReceive();
                cmdRegister.Enabled = true;
                cmdConnect.Enabled = false;
            }
            catch (Exception e_)
            {
                MessageBox.Show(e_.Message);
            }
        }
        private void OnReceive(Beetle.PacketRecieveMessagerArgs e)
        {
            Register reg = (Register)e.Message;
            Invoke(new Action<Register>(r =>
            {
                txtREMail.Text = r.EMail;
                txtRName.Text = r.UserName;
                txtResponseTime.Text = r.ResponseTime.ToString();
            }), reg);
        }
        private void OnDisposed(object sender, Beetle.ChannelEventArgs e)
        {
            Invoke(new Action<Beetle.ChannelEventArgs>(s =>
            {
                txtStatus.Text = "disconnect!";
                cmdRegister.Enabled = false;
                cmdConnect.Enabled = true;
            }), e);
        }
        private void OnError(object sender, Beetle.ChannelErrorEventArgs e)
        {
            Invoke(new Action<Beetle.ChannelErrorEventArgs>(r =>
            {
                txtStatus.Text = r.Exception.Message;
            }), e);
        }

把對象發送給服務端

            Register register = new Register();
            register.EMail = txtEMail.Text;
            register.UserName = txtName.Text;
            channel.Send(register);

運行效果

下載代碼:Code

總結

由於Protobuf制定的協議是開放的,所以很多平台下都有相關實現包括:c++,java,php等.通過整合protobuf作為協議載體可以方便地和其他平台進行TCP數據交互整合.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM