使用ServiceStack.Redis的連接池在操作多台Redis的時候並不會對出現故障的Redis進行排除切換,這樣就會導致應用會還是會分配到故障的Redis服務中導致應用處理錯誤.這次對ServiceStack.Redis連接池的改造主要實現兩個功能:1)對故障的Redis服務在輪循的時候排除,2)定期檢測故障的Redis服務,如果服務正常則恢復到輪盾環節中.(ServiceStack.Redis的代碼結構還是很不錯修改起來也很方便)
增加基於Host的連接池功能
ServiceStack.Redis連接池的連接存儲結構相對簡單,只是用一些簡單的數組進行處理也沒有明確按Host划分,所以修改起來比較麻煩.通過查看代碼決定在RedisEndPoint添加一系列的功能.包括:連接獲取,回收,有效性驗測等功能.詳細代碼如下:
public class RedisEndPoint : EndPoint { public RedisEndPoint(string host, int port) : base(host, port) { } public RedisEndPoint(string host, int port, string password) : this(host,port) { this.Password = password; } public string Password { get; set; } public bool RequiresAuth { get { return !string.IsNullOrEmpty(Password); } } private System.Collections.Generic.Stack<RedisClient> mStack = new System.Collections.Generic.Stack<RedisClient>(); private bool mAvailable = true; private int mLastDetectTime = 0; private void PingHost(object state) { try { RedisClient client = Redis.RedisClientFactory.Instance.CreateRedisClient(Host, Port); client.Password = Password; client.EndPoint = this; client.Ping(); Push(client); mAvailable = true; } catch { } mIsDetecting = false; } private bool mIsDetecting = false; public bool Detect() { if (!mAvailable) { if (System.Math.Abs(System.Environment.TickCount - mLastDetectTime) >= 10000) { mLastDetectTime = System.Environment.TickCount; if (!mIsDetecting) { mIsDetecting = true; System.Threading.ThreadPool.QueueUserWorkItem(PingHost); } } } return mAvailable; } public RedisClient Pop() { lock (mStack) { if (mStack.Count > 0) return mStack.Pop(); } RedisClient client = Redis.RedisClientFactory.Instance.CreateRedisClient(Host, Port); client.EndPoint = this; client.Password = Password; return client; } public void Push(RedisClient client) { lock (mStack) { if (!client.HadExceptions) { mStack.Push(client); } else { client.ClientManager = null; client.Dispose(); while (mStack.Count > 0) { client = mStack.Pop(); client.ClientManager = null; client.Dispose(); } mAvailable = false; } } } }
比較重要的功能主要是回收和檢測,在連接回收的時候判斷連接是否存在異常(從代碼來看HadExceptions的設置是由SocketError引發的,因此可以判斷當這個值為True的時候存在網絡異常),如果是則把當前節點標識為不可用,並把池中的所有連接進行清除關閉.檢測方法主要是每隔10秒對redis服務進行一個連接和ping操作,如果成功該節點恢復到有效狀態.
修改PooledRedisClientManager
為了讓新連接池的代碼生效,必須修改PooledRedisClientManager幾個地方,主要是連接獲了和連接回收幾個方法的代碼.
GetInActiveWriteClient
/// <summary> /// Called within a lock /// </summary> /// <returns></returns> private RedisClient GetInActiveWriteClient() { var desiredIndex = WritePoolIndex % writeClients.Length; //this will loop through all hosts in readClients once even though there are 2 for loops //both loops are used to try to get the prefered host according to the round robin algorithm for (int x = 0; x < ReadWriteHosts.Count; x++) { var nextHostIndex = (desiredIndex + x) % ReadWriteHosts.Count; var nextHost = ReadWriteHosts[nextHostIndex]; if (nextHost.Detect()) { RedisClient client = nextHost.Pop(); if (client != null) { if (nextHost.RequiresAuth) client.Password = nextHost.Password; client.Id = RedisClientCounter++; client.ClientManager = this; client.NamespacePrefix = NamespacePrefix; client.ConnectionFilter = ConnectionFilter; return client; } } //for (var i = nextHostIndex; i < writeClients.Length; i += ReadWriteHosts.Count) //{ // if (writeClients[i] != null && !writeClients[i].Active && !writeClients[i].HadExceptions) // return writeClients[i]; // else if (writeClients[i] == null || writeClients[i].HadExceptions) // { // if (writeClients[i] != null) // writeClients[i].DisposeConnection(); // var client = RedisClientFactory.CreateRedisClient(nextHost.Host, nextHost.Port); // if (nextHost.RequiresAuth) // client.Password = nextHost.Password; // client.Id = RedisClientCounter++; // client.ClientManager = this; // client.NamespacePrefix = NamespacePrefix; // client.ConnectionFilter = ConnectionFilter; // writeClients[i] = client; // return client; // } //} } return null; }
把代碼改成直接檢測當明的Host是否有效,如果是則獲取連接並返回,這里只修改的writerclient,類里面還有readclient的方法也相對應用進行修改.
GetClient方法代碼
/// <summary> /// Returns a Read/Write client (The default) using the hosts defined in ReadWriteHosts /// </summary> /// <returns></returns> public IRedisClient GetClient() { lock (writeClients) { AssertValidReadWritePool(); RedisClient inActiveClient; inActiveClient = GetInActiveWriteClient(); if(inActiveClient == null) throw new TimeoutException(PoolTimeoutError); //while ((inActiveClient = GetInActiveWriteClient()) == null) //{ // if (PoolTimeOut.HasValue) // { // // wait for a connection, cry out if made to wait too long // if (!Monitor.Wait(writeClients, PoolTimeOut.Value)) // throw new TimeoutException(PoolTimeoutError); // } // else // Monitor.Wait(writeClients); //} WritePoolIndex++; inActiveClient.Active = true; if (this.ConnectTimeout != null) { inActiveClient.ConnectTimeout = this.ConnectTimeout.Value; } if( this.SocketSendTimeout.HasValue ) { inActiveClient.SendTimeout = this.SocketSendTimeout.Value; } if( this.SocketReceiveTimeout.HasValue ) { inActiveClient.ReceiveTimeout = this.SocketReceiveTimeout.Value; } inActiveClient.NamespacePrefix = NamespacePrefix; //Reset database to default if changed if (inActiveClient.Db != Db) { inActiveClient.ChangeDb(Db); } return inActiveClient; } }
DisposeClient方法代碼
public void DisposeClient(RedisNativeClient client) { if (client.EndPoint != null) { client.EndPoint.Push((RedisClient)client); return; } //lock (readClients) //{ // for (var i = 0; i < readClients.Length; i++) // { // var readClient = readClients[i]; // if (client != readClient) continue; // client.Active = false; // Monitor.PulseAll(readClients); // return; // } //}
通過以上簡單的代碼修改后ServiceStack.Redis的連接池就具備了故意遷移和恢復的功能:)