NetMQ:.NET輕量級消息隊列


前言

首先我現在是在一家游戲工作做服務端的,這幾天我們服務端游戲做了整個底層框架的替換,想必做過游戲的也都知道,在游戲里面會有很多的日志需要記錄,量也是比較大的;在沒有換框架之前我們存日志和游戲運行都是在一套框架里面的,所以做起來比較冗余,也會給游戲服務器帶來比較大的壓力;現在在這套框架就是把存日志的單獨分開做了起來;最后我們老大就選擇了NetMQ, 之前沒有接觸過NetMq 但是總體也是完成了這個日志服務器的編寫,所以在這里也是分享了一下;

什么是NetMQ

簡單說就是ZeroMQ的.net開源版本 吊炸天的速度你懂的

下載NetMQ

Install-Package NetMQ

Demo

1) 客戶端

using Games.BaseModel.LogEntities;
using NetMQ;
using System;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;

namespace NetMQTest
{
    class Program
    {
        static void Main(string[] args)
        {
            Run();
            Console.ReadLine();
        }

        public static void Run()
        {
            using (var ctx = NetMQContext.Create())
            using (var sender = ctx.CreatePushSocket())
            {
                sender.Connect("tcp://192.168.1.138:5556");
                for (int i = 0; i < 10000; i++)
                {
                    using (var sm = new MemoryStream())
                    {
                        var entityLog = new LoginLog { IP = "192.168.1.1", RoleName = "測試", Acc = "test", FromPlat = "1" };
                        var binaryFormatter = new BinaryFormatter();
                        binaryFormatter.Serialize(sm, entityLog);
                        var content = sm.ToArray();
                        sender.Send(content);
                    }
                    Console.WriteLine(i);
                }
            }
        }
    }
}

2)服務端(這里面是我真是游戲里面所用到的,基本意思和原理是一樣的,對於不同的場景需要自己進行修改)

using Games.BaseModel.LogEntities;
using Games.DBHandler;
using Games.DBHandler.Dapper;
using Games.Model.ConfigEntities;
using NetMQ;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Runtime.Serialization.Formatters.Binary;
using System.Threading.Tasks;

namespace NetMQ_Code
{
    public class Program
    {
        private static ServerConfigNew _serverConfig;

        private static IDataAccessor _dataAccessor;

        private static string _connection;

        private static readonly Dictionary<string, IDataAccessor> DicServerConfigNew = new Dictionary<string, IDataAccessor>();

        private static void Main(string[] args)
        {
            Console.WriteLine("NetMQ准備啟動 . . . ");
            _connection = ConfigurationManager.ConnectionStrings["Config"].ConnectionString;
            Task task = new Task(Function);
            task.Start();
            Console.WriteLine("啟動成功!");
            Console.ReadKey();

        }

        private static void Function()
        {
            using (NetMQContext ctx = NetMQContext.Create())
            {
                //接收消息的套接字
                using (var receiver = ctx.CreatePullSocket())
                {
                    receiver.Bind("tcp://*:5556");
                    //處理任務
                    while (true)
                    {
                        var receivedBytes = receiver.Receive();
                        using (var sm = new MemoryStream(receivedBytes, 0, receivedBytes.Length))
                        {
                            //采用二進制方式
                            var binaryFormatter = new BinaryFormatter();
                            sm.Position = 0;
                            var entity = binaryFormatter.Deserialize(sm) as LogEntity;
                            var serverId = entity.ServerID;
                            if (DicServerConfigNew.ContainsKey(serverId))
                            {
                                EntitySchemaSet.InitSchema(entity.GetType());
                                DicServerConfigNew.TryGetValue(serverId, out _dataAccessor);
                                _dataAccessor.SaveLog(entity);
                            }
                            else
                            {
                                _serverConfig = DapplerUtil.Instance.Query<ServerConfigNew>(_connection, "SELECT * FROM ServerConfig where Id=" + serverId).FirstOrDefault();
                                if (_serverConfig != null)
                                {
                                    EntitySchemaSet.InitSchema(entity.GetType());
                                    using (_dataAccessor = DataAccessorFactory.Instance.CreateAccessor("Game", _serverConfig.GameDBConnection, DbProviderType.MsSql))
                                    {
                                        DicServerConfigNew.Add(serverId, _dataAccessor);
                                        _dataAccessor.SaveLog(entity);
                                    }
                                }
                                else
                                {
                                    Console.WriteLine("serverConfig is null");
                                }
                            }
                            Console.WriteLine("Insert:" + entity.GetType());
                        }
                    }
                }
            }
        }
    }
}

運行結果

總結

NetMQ目前還不支持持久化消息,所以,可靠性不是特別高.消息隊列基本的特性都滿足,效率也很高.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM