StackExchange.Redis 訪問封裝類


最近需要在C#中使用Redis,在Redis的官網找到了ServiceStack.Redis,最后在測試的時候發現這是個坑,4.0已上已經收費,后面只好找到3系列的最終版本,最后測試發現還是有BUG或者是我不會用。沒有辦法,最好找到了StackExchange.Redis,支持異步的客戶端,據說性能比ServiceStack.Redis更好,而且據說Stack Overflow也是使用的這個客戶端,里面有支持4.0和4.5的版本,現將我在使用中的封裝類提供出來給大家參考(參考了網上的一些資料):

 

    using DotNet.Log;

    /// <summary>
    /// StackExchangeRedisHelper
    /// 
    /// 在StackExchange.Redis中最重要的對象是ConnectionMultiplexer類, 它存在於StackExchange.Redis命名空間中。
    /// 這個類隱藏了Redis服務的操作細節,ConnectionMultiplexer類做了很多東西, 在所有調用之間它被設計為共享和重用的。
    /// 不應該為每一個操作都創建一個ConnectionMultiplexer 。 ConnectionMultiplexer是線程安全的 , 推薦使用下面的方法。
    /// 在所有后續示例中 , 都假定你已經實例化好了一個ConnectionMultiplexer類,它將會一直被重用 ,
    /// 現在我們來創建一個ConnectionMultiplexer實例。它是通過ConnectionMultiplexer.Connect 或者 ConnectionMultiplexer.ConnectAsync,
    /// 傳遞一個連接字符串或者一個ConfigurationOptions 對象來創建的。
    /// 連接字符串可以是以逗號分割的多個服務的節點.
    /// 
    /// 
    /// 注意 : 
    /// ConnectionMultiplexer 實現了IDisposable接口當我們不再需要是可以將其釋放的 , 這里我故意不使用 using 來釋放他。 
    /// 簡單來講創建一個ConnectionMultiplexer是十分昂貴的 , 一個好的主意是我們一直重用一個ConnectionMultiplexer對象。
    /// 一個復雜的的場景中可能包含有主從復制 , 對於這種情況,只需要指定所有地址在連接字符串中(它將會自動識別出主服務器)
    ///  ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("server1:6379,server2:6379");
    /// 假設這里找到了兩台主服務器,將會對兩台服務進行裁決選出一台作為主服務器來解決這個問題 , 這種情況是非常罕見的 ,我們也應該避免這種情況的發生。
    /// 
    /// 
    /// 這里有個和 ServiceStack.Redis 大的區別是沒有默認的連接池管理了。沒有連接池自然有其利弊,最大的好處在於等待獲取連接的等待時間沒有了,
    /// 也不會因為連接池里面的連接由於沒有正確釋放等原因導致無限等待而處於死鎖狀態。缺點在於一些低質量的代碼可能導致服務器資源耗盡。不過提供連接池等阻塞和等待的手段是和作者的設計理念相違背的。StackExchange.Redis這里使用管道和多路復用的技術來實現減少連接
    /// 
    /// 參考:http://www.cnblogs.com/Leo_wl/p/4968537.html
    /// 
    /// 修改記錄
    /// 
    ///        2016.04.07 版本:1.0 SongBiao    主鍵創建。
    ///        
    /// <author>
    ///        <name>SongBiao</name>
    ///        <date>2016.04.07</date>
    /// </author>
    /// </summary>
    public static class StackExchangeRedisHelper
    {
        private static readonly string Coonstr = ConfigurationManager.ConnectionStrings["RedisExchangeHosts"].ConnectionString;
        private static object _locker = new Object();
        private static ConnectionMultiplexer _instance = null;
        /// <summary>
        /// 使用一個靜態屬性來返回已連接的實例,如下列中所示。這樣,一旦 ConnectionMultiplexer 斷開連接,便可以初始化新的連接實例。
        /// </summary>
        public static ConnectionMultiplexer Instance
        {
            get
            {
                if (_instance == null)
                {
                    lock (_locker)
                    {
                        if (_instance == null || !_instance.IsConnected)
                        {
                            _instance = ConnectionMultiplexer.Connect(Coonstr);
                        }
                    }
                }
                //注冊如下事件
                _instance.ConnectionFailed += MuxerConnectionFailed;
                _instance.ConnectionRestored += MuxerConnectionRestored;
                _instance.ErrorMessage += MuxerErrorMessage;
                _instance.ConfigurationChanged += MuxerConfigurationChanged;
                _instance.HashSlotMoved += MuxerHashSlotMoved;
                _instance.InternalError += MuxerInternalError;
                return _instance;
            }
        }
        static StackExchangeRedisHelper()
        {
        }

        /// <summary>
        /// 
        /// </summary>
        /// <returns></returns>
        public static IDatabase GetDatabase()
        {
            return Instance.GetDatabase();
        }

        /// <summary>
        /// 這里的 MergeKey 用來拼接 Key 的前綴,具體不同的業務模塊使用不同的前綴。
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        private static string MergeKey(string key)
        {
            return BaseSystemInfo.SystemCode + key;
        }
        /// <summary>
        /// 根據key獲取緩存對象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="key"></param>
        /// <returns></returns>
        public static T Get<T>(string key)
        {
            key = MergeKey(key);
            return Deserialize<T>(GetDatabase().StringGet(key));
        }
        /// <summary>
        /// 根據key獲取緩存對象
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public static object Get(string key)
        {
            key = MergeKey(key);
            return Deserialize<object>(GetDatabase().StringGet(key));
        }

        /// <summary>
        /// 設置緩存
        /// </summary>
        /// <param name="key"></param>
        /// <param name="value"></param>
        public static void Set(string key, object value)
        {
            key = MergeKey(key);
            GetDatabase().StringSet(key, Serialize(value));
        }

        /// <summary>
        /// 判斷在緩存中是否存在該key的緩存數據
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public static bool Exists(string key)
        {
            key = MergeKey(key);
            return GetDatabase().KeyExists(key);  //可直接調用
        }

        /// <summary>
        /// 移除指定key的緩存
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public static bool Remove(string key)
        {
            key = MergeKey(key);
            return GetDatabase().KeyDelete(key);
        }

        /// <summary>
        /// 異步設置
        /// </summary>
        /// <param name="key"></param>
        /// <param name="value"></param>
        public static async Task SetAsync(string key, object value)
        {
            key = MergeKey(key);
            await GetDatabase().StringSetAsync(key, Serialize(value));
        }

        /// <summary>
        /// 根據key獲取緩存對象
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public static async Task<object> GetAsync(string key)
        {
            key = MergeKey(key);
            object value = await GetDatabase().StringGetAsync(key);
            return value;
        }

        /// <summary>
        /// 實現遞增
        /// </summary>
        /// <param name="key"></param>
        /// <returns></returns>
        public static long Increment(string key)
        {
            key = MergeKey(key);
            //三種命令模式
            //Sync,同步模式會直接阻塞調用者,但是顯然不會阻塞其他線程。
            //Async,異步模式直接走的是Task模型。
            //Fire - and - Forget,就是發送命令,然后完全不關心最終什么時候完成命令操作。
            //即發即棄:通過配置 CommandFlags 來實現即發即棄功能,在該實例中該方法會立即返回,如果是string則返回null 如果是int則返回0.這個操作將會繼續在后台運行,一個典型的用法頁面計數器的實現:
            return GetDatabase().StringIncrement(key, flags: CommandFlags.FireAndForget);
        }

        /// <summary>
        /// 實現遞減
        /// </summary>
        /// <param name="key"></param>
        /// <param name="value"></param>
        /// <returns></returns>
        public static long Decrement(string key, string value)
        {
            key = MergeKey(key);
            return GetDatabase().HashDecrement(key, value, flags: CommandFlags.FireAndForget);
        }

        /// <summary>
        /// 序列化對象
        /// </summary>
        /// <param name="o"></param>
        /// <returns></returns>
        static byte[] Serialize(object o)
        {
            if (o == null)
            {
                return null;
            }
            BinaryFormatter binaryFormatter = new BinaryFormatter();
            using (MemoryStream memoryStream = new MemoryStream())
            {
                binaryFormatter.Serialize(memoryStream, o);
                byte[] objectDataAsStream = memoryStream.ToArray();
                return objectDataAsStream;
            }
        }

        /// <summary>
        /// 反序列化對象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="stream"></param>
        /// <returns></returns>
        static T Deserialize<T>(byte[] stream)
        {
            if (stream == null)
            {
                return default(T);
            }
            BinaryFormatter binaryFormatter = new BinaryFormatter();
            using (MemoryStream memoryStream = new MemoryStream(stream))
            {
                T result = (T)binaryFormatter.Deserialize(memoryStream);
                return result;
            }
        }
        /// <summary>
        /// 配置更改時
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private static void MuxerConfigurationChanged(object sender, EndPointEventArgs e)
        {
            LogHelper.WriteInfoLog("Configuration changed: " + e.EndPoint);
        }
        /// <summary>
        /// 發生錯誤時
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private static void MuxerErrorMessage(object sender, RedisErrorEventArgs e)
        {
            LogHelper.WriteInfoLog("ErrorMessage: " + e.Message);
        }
        /// <summary>
        /// 重新建立連接之前的錯誤
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private static void MuxerConnectionRestored(object sender, ConnectionFailedEventArgs e)
        {
            LogHelper.WriteInfoLog("ConnectionRestored: " + e.EndPoint);
        }
        /// <summary>
        /// 連接失敗 , 如果重新連接成功你將不會收到這個通知
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private static void MuxerConnectionFailed(object sender, ConnectionFailedEventArgs e)
        {
            LogHelper.WriteInfoLog("重新連接:Endpoint failed: " + e.EndPoint + ", " + e.FailureType + (e.Exception == null ? "" : (", " + e.Exception.Message)));
        }
        /// <summary>
        /// 更改集群
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private static void MuxerHashSlotMoved(object sender, HashSlotMovedEventArgs e)
        {
            LogHelper.WriteInfoLog("HashSlotMoved:NewEndPoint" + e.NewEndPoint + ", OldEndPoint" + e.OldEndPoint);
        }
        /// <summary>
        /// redis類庫錯誤
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private static void MuxerInternalError(object sender, InternalErrorEventArgs e)
        {
            LogHelper.WriteInfoLog("InternalError:Message" + e.Exception.Message);
        }

        //場景不一樣,選擇的模式便會不一樣,大家可以按照自己系統架構情況合理選擇長連接還是Lazy。
        //建立連接后,通過調用ConnectionMultiplexer.GetDatabase 方法返回對 Redis Cache 數據庫的引用。從 GetDatabase 方法返回的對象是一個輕量級直通對象,不需要進行存儲。

        /// <summary>
        /// 使用的是Lazy,在真正需要連接時創建連接。
        /// 延遲加載技術
        /// 微軟azure中的配置 連接模板
        /// </summary>
        //private static Lazy<ConnectionMultiplexer> lazyConnection = new Lazy<ConnectionMultiplexer>(() =>
        //{
        //    //var options = ConfigurationOptions.Parse(constr);
        //    ////options.ClientName = GetAppName(); // only known at runtime
        //    //options.AllowAdmin = true;
        //    //return ConnectionMultiplexer.Connect(options);
        //    ConnectionMultiplexer muxer = ConnectionMultiplexer.Connect(Coonstr);
        //    muxer.ConnectionFailed += MuxerConnectionFailed;
        //    muxer.ConnectionRestored += MuxerConnectionRestored;
        //    muxer.ErrorMessage += MuxerErrorMessage;
        //    muxer.ConfigurationChanged += MuxerConfigurationChanged;
        //    muxer.HashSlotMoved += MuxerHashSlotMoved;
        //    muxer.InternalError += MuxerInternalError;
        //    return muxer;
        //});


        #region  當作消息代理中間件使用 一般使用更專業的消息隊列來處理這種業務場景
        /// <summary>
        /// 當作消息代理中間件使用
        /// 消息組建中,重要的概念便是生產者,消費者,消息中間件。
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="message"></param>
        /// <returns></returns>
        public static long Publish(string channel, string message)
        {
            ISubscriber sub = Instance.GetSubscriber();
            //return sub.Publish("messages", "hello");
            return sub.Publish(channel, message);
        }

        /// <summary>
        /// 在消費者端得到該消息並輸出
        /// </summary>
        /// <param name="channelFrom"></param>
        /// <returns></returns>
        public static void Subscribe(string channelFrom)
        {
            ISubscriber sub = Instance.GetSubscriber();
            sub.Subscribe(channelFrom, (channel, message) =>
            {
                Console.WriteLine((string)message);
            });
        }
        #endregion

        /// <summary>
        /// GetServer方法會接收一個EndPoint類或者一個唯一標識一台服務器的鍵值對
        /// 有時候需要為單個服務器指定特定的命令
        /// 使用IServer可以使用所有的shell命令,比如:
        /// DateTime lastSave = server.LastSave();
        /// ClientInfo[] clients = server.ClientList();
        /// 如果報錯在連接字符串后加 ,allowAdmin=true;
        /// </summary>
        /// <returns></returns>
        public static IServer GetServer(string host, int port)
        {
            IServer server = Instance.GetServer(host, port);
            return server;
        }

        /// <summary>
        /// 獲取全部終結點
        /// </summary>
        /// <returns></returns>
        public static EndPoint[] GetEndPoints()
        {
            EndPoint[] endpoints = Instance.GetEndPoints();
            return endpoints;
        }

    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM