對ServiceStack.Redis的連接池進行故障轉移改造


使用ServiceStack.Redis的連接池在操作多台Redis的時候並不會對出現故障的Redis進行排除切換,這樣就會導致應用會還是會分配到故障的Redis服務中導致應用處理錯誤.這次對ServiceStack.Redis連接池的改造主要實現兩個功能:1)對故障的Redis服務在輪循的時候排除,2)定期檢測故障的Redis服務,如果服務正常則恢復到輪盾環節中.(ServiceStack.Redis的代碼結構還是很不錯修改起來也很方便)

增加基於Host的連接池功能

ServiceStack.Redis連接池的連接存儲結構相對簡單,只是用一些簡單的數組進行處理也沒有明確按Host划分,所以修改起來比較麻煩.通過查看代碼決定在RedisEndPoint添加一系列的功能.包括:連接獲取,回收,有效性驗測等功能.詳細代碼如下:

    public class RedisEndPoint : EndPoint
    {
        public RedisEndPoint(string host, int port) : base(host, port)
        {
        }

        public RedisEndPoint(string host, int port, string password) : this(host,port)
        {
            this.Password = password;
        }

        public string Password { get; set; }

        public bool RequiresAuth { get { return !string.IsNullOrEmpty(Password); } }

        private System.Collections.Generic.Stack<RedisClient> mStack = new System.Collections.Generic.Stack<RedisClient>();

        private bool mAvailable = true;

        private int mLastDetectTime = 0;

        private void PingHost(object state)
        {
            try
            {
                RedisClient client = Redis.RedisClientFactory.Instance.CreateRedisClient(Host, Port);
                client.Password = Password;
                client.EndPoint = this;
                client.Ping();
                Push(client);
                mAvailable = true;
                
            }
            catch
            {

            }
            mIsDetecting = false;
        }

        private bool mIsDetecting = false;

        public bool Detect()
        {
            if (!mAvailable)
            {
                if (System.Math.Abs(System.Environment.TickCount - mLastDetectTime) >= 10000)
                {
                    mLastDetectTime = System.Environment.TickCount;
                    if (!mIsDetecting)
                    {
                        mIsDetecting = true;
                        System.Threading.ThreadPool.QueueUserWorkItem(PingHost);
                    }
                    
                   
                }
            }
            return mAvailable;
        }

        public RedisClient Pop()
        {
            lock (mStack)
            {
                if (mStack.Count > 0)
                   return mStack.Pop();
                
            }
            RedisClient client = Redis.RedisClientFactory.Instance.CreateRedisClient(Host, Port);
            client.EndPoint = this;
            client.Password = Password;
            return client;
        }

        public void Push(RedisClient client)
        {
            lock (mStack)
            {
                if (!client.HadExceptions)
                {
                    mStack.Push(client);
                }
                else
                {
                    client.ClientManager = null;
                    client.Dispose();
                    while (mStack.Count > 0)
                    {
                        client = mStack.Pop();
                        client.ClientManager = null;
                        client.Dispose();
                    }
                    mAvailable = false;
                }
            }
        }
    }

比較重要的功能主要是回收和檢測,在連接回收的時候判斷連接是否存在異常(從代碼來看HadExceptions的設置是由SocketError引發的,因此可以判斷當這個值為True的時候存在網絡異常),如果是則把當前節點標識為不可用,並把池中的所有連接進行清除關閉.檢測方法主要是每隔10秒對redis服務進行一個連接和ping操作,如果成功該節點恢復到有效狀態. 

修改PooledRedisClientManager 

 為了讓新連接池的代碼生效,必須修改PooledRedisClientManager幾個地方,主要是連接獲了和連接回收幾個方法的代碼.

GetInActiveWriteClient

/// <summary>
        /// Called within a lock
        /// </summary>
        /// <returns></returns>
        private RedisClient GetInActiveWriteClient()
        {
            var desiredIndex = WritePoolIndex % writeClients.Length;
            //this will loop through all hosts in readClients once even though there are 2 for loops
            //both loops are used to try to get the prefered host according to the round robin algorithm
            for (int x = 0; x < ReadWriteHosts.Count; x++)
            {
                var nextHostIndex = (desiredIndex + x) % ReadWriteHosts.Count;
                var nextHost = ReadWriteHosts[nextHostIndex];
                if (nextHost.Detect())
                {
                    RedisClient client = nextHost.Pop();
                    if (client != null)
                    {
                        if (nextHost.RequiresAuth)
                            client.Password = nextHost.Password;
                        client.Id = RedisClientCounter++;
                        client.ClientManager = this;
                        client.NamespacePrefix = NamespacePrefix;
                        client.ConnectionFilter = ConnectionFilter;
                        return client;
                    }
                }
                //for (var i = nextHostIndex; i < writeClients.Length; i += ReadWriteHosts.Count)
                //{                    
                //    if (writeClients[i] != null && !writeClients[i].Active && !writeClients[i].HadExceptions)
                //        return writeClients[i];
                //    else if (writeClients[i] == null || writeClients[i].HadExceptions)
                //    {
                //        if (writeClients[i] != null)
                //            writeClients[i].DisposeConnection();
                //        var client = RedisClientFactory.CreateRedisClient(nextHost.Host, nextHost.Port);

                //        if (nextHost.RequiresAuth)
                //            client.Password = nextHost.Password;

                //        client.Id = RedisClientCounter++;
                //        client.ClientManager = this;
                //        client.NamespacePrefix = NamespacePrefix;
                //        client.ConnectionFilter = ConnectionFilter;
                        
                //        writeClients[i] = client;

                //        return client;
                //    }
                //}
            }
            return null;
        }

把代碼改成直接檢測當明的Host是否有效,如果是則獲取連接並返回,這里只修改的writerclient,類里面還有readclient的方法也相對應用進行修改.

 GetClient方法代碼 

        /// <summary>
        /// Returns a Read/Write client (The default) using the hosts defined in ReadWriteHosts
        /// </summary>
        /// <returns></returns>
        public IRedisClient GetClient()
        {
            lock (writeClients)
            {
                AssertValidReadWritePool();

                RedisClient inActiveClient;
                inActiveClient = GetInActiveWriteClient();
                if(inActiveClient == null)
                    throw new TimeoutException(PoolTimeoutError);
                //while ((inActiveClient = GetInActiveWriteClient()) == null)
                //{
                //    if (PoolTimeOut.HasValue)
                //    {
                //        // wait for a connection, cry out if made to wait too long
                //        if (!Monitor.Wait(writeClients, PoolTimeOut.Value))
                //            throw new TimeoutException(PoolTimeoutError);
                //    }
                //    else
                //        Monitor.Wait(writeClients);
                //}

                WritePoolIndex++;
                inActiveClient.Active = true;

                if (this.ConnectTimeout != null)
                {
                    inActiveClient.ConnectTimeout = this.ConnectTimeout.Value;
                }

                if( this.SocketSendTimeout.HasValue )
                {
                    inActiveClient.SendTimeout = this.SocketSendTimeout.Value;
                }
                if( this.SocketReceiveTimeout.HasValue )
                {
                    inActiveClient.ReceiveTimeout = this.SocketReceiveTimeout.Value;
                }

                inActiveClient.NamespacePrefix = NamespacePrefix;

                //Reset database to default if changed
                if (inActiveClient.Db != Db)
                {
                    inActiveClient.ChangeDb(Db);
                }

                return inActiveClient;
            }
        }

DisposeClient方法代碼

        public void DisposeClient(RedisNativeClient client)
        {
            if (client.EndPoint != null)
            {
                client.EndPoint.Push((RedisClient)client);
                return;
            }
            //lock (readClients)
            //{
            //    for (var i = 0; i < readClients.Length; i++)
            //    {
            //        var readClient = readClients[i];
            //        if (client != readClient) continue;
            //        client.Active = false;
            //        Monitor.PulseAll(readClients);
            //        return;
            //    }
            //}

通過以上簡單的代碼修改后ServiceStack.Redis的連接池就具備了故意遷移和恢復的功能:)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM