網上關於redis高可用基本都是用redis-sentinel 哨兵 或者 redis cluster 集群來實現, 但是有沒有更簡單的方式,比如我現在就只有2個redis實例。我試驗的結果是我們可用采用主備的方式來實現(我們的實際需求很簡單,有2個redis實例分布在不同的計算機,在一個實例down掉后我們的應用程序有繼續讀寫redis,主從配置可用手動修改)。需求很簡單, 實現也就很簡單。首先下載 https://github.com/StackExchange/StackExchange.Redis 源碼。啟動StackExchange.Redis-master\Redis Configs里面的主從2個實例,我最終demo的code如下:

class Program { static IDatabase database; static ConnectionMultiplexer conn; static void Main(string[] args) { ConfigurationOptions option = new ConfigurationOptions() { EndPoints = { { "127.0.0.1", 6379 }, { "127.0.0.1", 6380 } }, AllowAdmin =true, }; conn = ConnectionMultiplexer.Connect(option); database = conn.GetDatabase(); Random rand = new Random(); while (true) { string val = "gavin_" + rand.Next(1, 999999).ToString(); TestWriteRead(val); Thread.Sleep(100); } } static void TestWriteRead(string value) { string key = "gavinteststring"; try { database.StringSet(key, value); Console.WriteLine($"寫入{key}={value}成功"); } catch (Exception ex) { var points = conn.GetEndPoints(); foreach (var item in points) { var server = conn.GetServer(item); if (server.IsConnected) { server.MakeMaster(ReplicationChangeOptions.All); } else { server.SlaveOf(points[1],CommandFlags.FireAndForget); } } database.StringSet(key, value); Console.WriteLine($"寫入{key}={value}成功"); //Console.WriteLine($"寫入{key}={value}失敗:"+ex.ToString()); // Console.ReadKey(); } string temp = string.Empty; try { temp=database.StringGet(key); Console.WriteLine($"讀取{key}={temp}成功"); } catch (Exception ex) { Console.WriteLine($"讀取{key}失敗:" + ex.ToString()); } } }
大家請先忽略我catch里面的code,當我把redis的master關閉后,程序報錯:
No connection is available to service this operation: SET gavinteststring; 遠程主機強迫關閉了一個現有的連接。; IOCP: (Busy=0,Free=1000,Min=4,Max=1000), WORKER: (Busy=0,Free=1023,Min=4,Max=1023), Local-CPU: 100%
找到源碼后發現在ConnectionMultiplexer的ExecuteSyncImpl方法里面有這么一段:
if (!TryPushMessageToBridge(message, processor, source, ref server))
{
throw ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, message.Command, message, server, GetServerSnapshot());
}
也就是說StackExchange沒有找到redis的服務器實例,繼續跟蹤code發現具體查找server的code在ConnectionMultiplexer的AnyConnected方法里面:
internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, RedisCommand command, CommandFlags flags) { var tmp = serverSnapshot; int len = tmp.Length; ServerEndPoint fallback = null; for (int i = 0; i < len; i++) { var server = tmp[(int)(((uint)i + startOffset) % len)]; if (server != null && server.ServerType == serverType && server.IsSelectable(command)) { if (server.IsSlave) { switch (flags) { case CommandFlags.DemandSlave: case CommandFlags.PreferSlave: return server; case CommandFlags.PreferMaster: fallback = server; break; } } else { switch (flags) { case CommandFlags.DemandMaster: case CommandFlags.PreferMaster: return server; case CommandFlags.PreferSlave: fallback = server; break; } } } } return fallback; }
因為主的server已經down掉了,所以可用訪問的server就是Slave,但是這里的flags默認是CommandFlags.DemandMaster。所以是找不到server。那么我們把現在的從的server改為主的server如: server.MakeMaster(ReplicationChangeOptions.All); 我以為就可以了,但是還是不行。 后來我想 如果我把主的也改為從是否可以了 server.SlaveOf(points[1],CommandFlags.FireAndForget);(我測試的時候還用過quit方法,調試有,但是release的時候說沒有該方法)。運行效果如下
后來把上面的code簡單封裝為一個方法:

void ChangeMaster(IDatabase database) { var mex = database.Multiplexer; var endpoints = mex.GetEndPoints(); if (endpoints.Count() < 2) { return; } //多個endpoint 才切換主備服務器 List<EndPoint> connectedPoints = new List<EndPoint>(); List<EndPoint> disconnetedPoints = new List<EndPoint>(); foreach (var item in endpoints) { //判斷哪些服務器可以連接 var server = mex.GetServer(item); if (server.IsConnected) { connectedPoints.Add(item); } else { disconnetedPoints.Add(item); } } var connectedPoint = connectedPoints.FirstOrDefault(); if (connectedPoint == null) { throw new Exception("沒有可用的redis服務器"); } mex.GetServer(connectedPoint).MakeMaster(ReplicationChangeOptions.All); for (int i = 1; i < connectedPoints.Count; i++) { mex.GetServer(connectedPoints[i]).SlaveOf(connectedPoint, CommandFlags.FireAndForget); } foreach (var item in disconnetedPoints) { mex.GetServer(item).SlaveOf(connectedPoint, CommandFlags.FireAndForget); } }
-----------------------------------------2017-4-14--------------------------------------------------------
我們知道讀寫redis的時候都是Message包
protected Message(int db, CommandFlags flags, RedisCommand command) { bool dbNeeded = RequiresDatabase(command); if (db < 0) { if (dbNeeded) { throw ExceptionFactory.DatabaseRequired(false, command); } } else { if (!dbNeeded) { throw ExceptionFactory.DatabaseNotRequired(false, command); } } bool masterOnly = IsMasterOnly(command); Db = db; this.command = command; this.flags = flags & UserSelectableFlags; if (masterOnly) SetMasterOnly(); createdDateTime = DateTime.UtcNow; createdTimestamp = System.Diagnostics.Stopwatch.GetTimestamp(); }
internal void SetMasterOnly() { switch (GetMasterSlaveFlags(flags)) { case CommandFlags.DemandSlave: throw ExceptionFactory.MasterOnly(false, command, null, null); case CommandFlags.DemandMaster: // already fine as-is break; case CommandFlags.PreferMaster: case CommandFlags.PreferSlave: default: // we will run this on the master, then flags = SetMasterSlaveFlags(flags, CommandFlags.DemandMaster); break; } }
internal static CommandFlags SetMasterSlaveFlags(CommandFlags everything, CommandFlags masterSlave) { // take away the two flags we don't want, and add back the ones we care about return (everything & ~(CommandFlags.DemandMaster | CommandFlags.DemandSlave | CommandFlags.PreferMaster | CommandFlags.PreferSlave)) | masterSlave; }
這里根據我們的Command來判斷是否必須是Master主庫,如果是 就代用SetMasterOnly來設置flags,那么那些指令需要Master了:
public static bool IsMasterOnly(RedisCommand command) { switch (command) { case RedisCommand.APPEND: case RedisCommand.BITOP: case RedisCommand.BLPOP: case RedisCommand.BRPOP: case RedisCommand.BRPOPLPUSH: case RedisCommand.DECR: case RedisCommand.DECRBY: case RedisCommand.DEL: case RedisCommand.EXPIRE: case RedisCommand.EXPIREAT: case RedisCommand.FLUSHALL: case RedisCommand.FLUSHDB: case RedisCommand.GETSET: case RedisCommand.HDEL: case RedisCommand.HINCRBY: case RedisCommand.HINCRBYFLOAT: case RedisCommand.HMSET: case RedisCommand.HSET: case RedisCommand.HSETNX: case RedisCommand.INCR: case RedisCommand.INCRBY: case RedisCommand.INCRBYFLOAT: case RedisCommand.LINSERT: case RedisCommand.LPOP: case RedisCommand.LPUSH: case RedisCommand.LPUSHX: case RedisCommand.LREM: case RedisCommand.LSET: case RedisCommand.LTRIM: case RedisCommand.MIGRATE: case RedisCommand.MOVE: case RedisCommand.MSET: case RedisCommand.MSETNX: case RedisCommand.PERSIST: case RedisCommand.PEXPIRE: case RedisCommand.PEXPIREAT: case RedisCommand.PFADD: case RedisCommand.PFMERGE: case RedisCommand.PSETEX: case RedisCommand.RENAME: case RedisCommand.RENAMENX: case RedisCommand.RESTORE: case RedisCommand.RPOP: case RedisCommand.RPOPLPUSH: case RedisCommand.RPUSH: case RedisCommand.RPUSHX: case RedisCommand.SADD: case RedisCommand.SDIFFSTORE: case RedisCommand.SET: case RedisCommand.SETBIT: case RedisCommand.SETEX: case RedisCommand.SETNX: case RedisCommand.SETRANGE: case RedisCommand.SINTERSTORE: case RedisCommand.SMOVE: case RedisCommand.SPOP: case RedisCommand.SREM: case RedisCommand.SUNIONSTORE: case RedisCommand.ZADD: case RedisCommand.ZINTERSTORE: case RedisCommand.ZINCRBY: case RedisCommand.ZREM: case RedisCommand.ZREMRANGEBYLEX: case RedisCommand.ZREMRANGEBYRANK: case RedisCommand.ZREMRANGEBYSCORE: case RedisCommand.ZUNIONSTORE: return true; default: return false; } }
如果我們執行腳本則是用的ScriptEvalMessage類,其構造函數:
private sealed class ScriptEvalMessage : Message, IMultiMessage { private readonly RedisKey[] keys; private readonly string script; private readonly RedisValue[] values; private byte[] asciiHash, hexHash; public ScriptEvalMessage(int db, CommandFlags flags, string script, RedisKey[] keys, RedisValue[] values) : this(db, flags, ResultProcessor.ScriptLoadProcessor.IsSHA1(script) ? RedisCommand.EVALSHA : RedisCommand.EVAL, script, null, keys, values) { if (script == null) throw new ArgumentNullException(nameof(script)); } public ScriptEvalMessage(int db, CommandFlags flags, byte[] hash, RedisKey[] keys, RedisValue[] values) : this(db, flags, RedisCommand.EVAL, null, hash, keys, values) { if (hash == null) throw new ArgumentNullException(nameof(hash)); } private ScriptEvalMessage(int db, CommandFlags flags, RedisCommand command, string script, byte[] hexHash, RedisKey[] keys, RedisValue[] values) : base(db, flags, command) { this.script = script; this.hexHash = hexHash; if (keys == null) keys = RedisKey.EmptyArray; if (values == null) values = RedisValue.EmptyArray; for (int i = 0; i < keys.Length; i++) keys[i].AssertNotNull(); this.keys = keys; for (int i = 0; i < values.Length; i++) values[i].AssertNotNull(); this.values = values; }
也就是說 執行腳本可以在從庫上執行,很多查詢語句也可以在從庫上執行。