redis StackExchange 主備 實現 demo


網上關於redis高可用基本都是用redis-sentinel 哨兵 或者 redis cluster 集群來實現, 但是有沒有更簡單的方式,比如我現在就只有2個redis實例。我試驗的結果是我們可用采用主備的方式來實現(我們的實際需求很簡單,有2個redis實例分布在不同的計算機,在一個實例down掉后我們的應用程序有繼續讀寫redis,主從配置可用手動修改)。需求很簡單, 實現也就很簡單。首先下載 https://github.com/StackExchange/StackExchange.Redis 源碼。啟動StackExchange.Redis-master\Redis Configs里面的主從2個實例,我最終demo的code如下:

 class Program
    {
        static IDatabase database;
        static ConnectionMultiplexer conn;
        static void Main(string[] args)
        {
            ConfigurationOptions option = new ConfigurationOptions() {
                EndPoints =
                            {
                                { "127.0.0.1", 6379 },
                                { "127.0.0.1", 6380 }
                            },
                AllowAdmin =true,                
            };

             conn = ConnectionMultiplexer.Connect(option);
            database = conn.GetDatabase();

            Random rand = new Random();
           

            while (true)
            {
                string val = "gavin_" + rand.Next(1, 999999).ToString();
                TestWriteRead(val);
                Thread.Sleep(100);
            }

        }

        static void TestWriteRead(string value) {
            string key = "gavinteststring";
            try
            {
                database.StringSet(key, value);
                Console.WriteLine($"寫入{key}={value}成功");
            }
            catch (Exception ex)
            {
                var points = conn.GetEndPoints();
                foreach (var item in points)
                {
                    var server = conn.GetServer(item);
                    if (server.IsConnected)
                    {
                        server.MakeMaster(ReplicationChangeOptions.All);
                    }
                    else
                    {
                        server.SlaveOf(points[1],CommandFlags.FireAndForget);
                    }
                    
                }
                database.StringSet(key, value);
                Console.WriteLine($"寫入{key}={value}成功");
                //Console.WriteLine($"寫入{key}={value}失敗:"+ex.ToString());
               // Console.ReadKey();
            }
            string temp = string.Empty;
            try
            {
                temp=database.StringGet(key);
                Console.WriteLine($"讀取{key}={temp}成功");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"讀取{key}失敗:" + ex.ToString());
            }
        }
    }
View Code

大家請先忽略我catch里面的code,當我把redis的master關閉后,程序報錯:

No connection is available to service this operation: SET gavinteststring; 遠程主機強迫關閉了一個現有的連接。; IOCP: (Busy=0,Free=1000,Min=4,Max=1000), WORKER: (Busy=0,Free=1023,Min=4,Max=1023), Local-CPU: 100%

找到源碼后發現在ConnectionMultiplexer的ExecuteSyncImpl方法里面有這么一段:

if (!TryPushMessageToBridge(message, processor, source, ref server))
{
throw ExceptionFactory.NoConnectionAvailable(IncludeDetailInExceptions, message.Command, message, server, GetServerSnapshot());
}

也就是說StackExchange沒有找到redis的服務器實例,繼續跟蹤code發現具體查找server的code在ConnectionMultiplexer的AnyConnected方法里面:

 internal ServerEndPoint AnyConnected(ServerType serverType, uint startOffset, RedisCommand command, CommandFlags flags)
        {
            var tmp = serverSnapshot;
            int len = tmp.Length;
            ServerEndPoint fallback = null;
            for (int i = 0; i < len; i++)
            {
                var server = tmp[(int)(((uint)i + startOffset) % len)];
                if (server != null && server.ServerType == serverType && server.IsSelectable(command))
                {
                    if (server.IsSlave)
                    {
                        switch (flags)
                        {
                            case CommandFlags.DemandSlave:
                            case CommandFlags.PreferSlave:
                                return server;
                            case CommandFlags.PreferMaster:
                                fallback = server;
                                break;
                        }
                    } else
                    {
                        switch (flags)
                        {
                            case CommandFlags.DemandMaster:
                            case CommandFlags.PreferMaster:
                                return server;
                            case CommandFlags.PreferSlave:
                                fallback = server;
                                break;
                        }
                    }
                }
            }
            return fallback;
        }

因為主的server已經down掉了,所以可用訪問的server就是Slave,但是這里的flags默認是CommandFlags.DemandMaster。所以是找不到server。那么我們把現在的從的server改為主的server如:  server.MakeMaster(ReplicationChangeOptions.All); 我以為就可以了,但是還是不行。 后來我想 如果我把主的也改為從是否可以了  server.SlaveOf(points[1],CommandFlags.FireAndForget);(我測試的時候還用過quit方法,調試有,但是release的時候說沒有該方法)。運行效果如下

 后來把上面的code簡單封裝為一個方法:

  void ChangeMaster(IDatabase database)
        {
            var mex = database.Multiplexer;
            var endpoints = mex.GetEndPoints();
            if (endpoints.Count() < 2)
            {
                return;
            }
            //多個endpoint 才切換主備服務器
            List<EndPoint> connectedPoints = new List<EndPoint>();
            List<EndPoint> disconnetedPoints = new List<EndPoint>();
            foreach (var item in endpoints)
            {
                //判斷哪些服務器可以連接
                var server = mex.GetServer(item);
                if (server.IsConnected)
                {
                    connectedPoints.Add(item);
                }
                else
                {
                    disconnetedPoints.Add(item);
                }
            }
            var connectedPoint = connectedPoints.FirstOrDefault();
            if (connectedPoint == null)
            {
                throw new Exception("沒有可用的redis服務器");
            }
            mex.GetServer(connectedPoint).MakeMaster(ReplicationChangeOptions.All);
            for (int i = 1; i < connectedPoints.Count; i++)
            {
                mex.GetServer(connectedPoints[i]).SlaveOf(connectedPoint, CommandFlags.FireAndForget);
            }
            foreach (var item in disconnetedPoints)
            {
                mex.GetServer(item).SlaveOf(connectedPoint, CommandFlags.FireAndForget);
            }
        }
View Code

 -----------------------------------------2017-4-14--------------------------------------------------------

我們知道讀寫redis的時候都是Message包

protected Message(int db, CommandFlags flags, RedisCommand command)
        {
            bool dbNeeded = RequiresDatabase(command);
            if (db < 0)
            {
                if (dbNeeded)
                {
                    throw ExceptionFactory.DatabaseRequired(false, command);
                }
            }
            else
            {
                if (!dbNeeded)
                {
                    throw ExceptionFactory.DatabaseNotRequired(false, command);
                }
            }

            bool masterOnly = IsMasterOnly(command);
            Db = db;
            this.command = command;
            this.flags = flags & UserSelectableFlags;
            if (masterOnly) SetMasterOnly();

            createdDateTime = DateTime.UtcNow;
            createdTimestamp = System.Diagnostics.Stopwatch.GetTimestamp();
        }
 internal void SetMasterOnly()
        {
            switch (GetMasterSlaveFlags(flags))
            {
                case CommandFlags.DemandSlave:
                    throw ExceptionFactory.MasterOnly(false, command, null, null);
                case CommandFlags.DemandMaster:
                    // already fine as-is
                    break;
                case CommandFlags.PreferMaster:
                case CommandFlags.PreferSlave:
                default: // we will run this on the master, then
                    flags = SetMasterSlaveFlags(flags, CommandFlags.DemandMaster);
                    break;
            }
        }
        internal static CommandFlags SetMasterSlaveFlags(CommandFlags everything, CommandFlags masterSlave)
        {
            // take away the two flags we don't want, and add back the ones we care about
            return (everything & ~(CommandFlags.DemandMaster | CommandFlags.DemandSlave | CommandFlags.PreferMaster | CommandFlags.PreferSlave))
                            | masterSlave;
        }

這里根據我們的Command來判斷是否必須是Master主庫,如果是 就代用SetMasterOnly來設置flags,那么那些指令需要Master了:

 public static bool IsMasterOnly(RedisCommand command)
        {
            switch (command)
            {
                case RedisCommand.APPEND:
                case RedisCommand.BITOP:
                case RedisCommand.BLPOP:
                case RedisCommand.BRPOP:
                case RedisCommand.BRPOPLPUSH:
                case RedisCommand.DECR:
                case RedisCommand.DECRBY:
                case RedisCommand.DEL:
                case RedisCommand.EXPIRE:
                case RedisCommand.EXPIREAT:
                case RedisCommand.FLUSHALL:
                case RedisCommand.FLUSHDB:
                case RedisCommand.GETSET:
                case RedisCommand.HDEL:
                case RedisCommand.HINCRBY:
                case RedisCommand.HINCRBYFLOAT:
                case RedisCommand.HMSET:
                case RedisCommand.HSET:
                case RedisCommand.HSETNX:
                case RedisCommand.INCR:
                case RedisCommand.INCRBY:
                case RedisCommand.INCRBYFLOAT:
                case RedisCommand.LINSERT:
                case RedisCommand.LPOP:
                case RedisCommand.LPUSH:
                case RedisCommand.LPUSHX:
                case RedisCommand.LREM:
                case RedisCommand.LSET:
                case RedisCommand.LTRIM:
                case RedisCommand.MIGRATE:
                case RedisCommand.MOVE:
                case RedisCommand.MSET:
                case RedisCommand.MSETNX:
                case RedisCommand.PERSIST:
                case RedisCommand.PEXPIRE:
                case RedisCommand.PEXPIREAT:
                case RedisCommand.PFADD:
                case RedisCommand.PFMERGE:
                case RedisCommand.PSETEX:
                case RedisCommand.RENAME:
                case RedisCommand.RENAMENX:
                case RedisCommand.RESTORE:
                case RedisCommand.RPOP:
                case RedisCommand.RPOPLPUSH:
                case RedisCommand.RPUSH:
                case RedisCommand.RPUSHX:
                case RedisCommand.SADD:
                case RedisCommand.SDIFFSTORE:
                case RedisCommand.SET:
                case RedisCommand.SETBIT:
                case RedisCommand.SETEX:
                case RedisCommand.SETNX:
                case RedisCommand.SETRANGE:
                case RedisCommand.SINTERSTORE:
                case RedisCommand.SMOVE:
                case RedisCommand.SPOP:
                case RedisCommand.SREM:
                case RedisCommand.SUNIONSTORE:
                case RedisCommand.ZADD:
                case RedisCommand.ZINTERSTORE:
                case RedisCommand.ZINCRBY:
                case RedisCommand.ZREM:
                case RedisCommand.ZREMRANGEBYLEX:
                case RedisCommand.ZREMRANGEBYRANK:
                case RedisCommand.ZREMRANGEBYSCORE:
                case RedisCommand.ZUNIONSTORE:
                    return true;
                default:
                    return false;
            }
        }

如果我們執行腳本則是用的ScriptEvalMessage類,其構造函數:

   private sealed class ScriptEvalMessage : Message, IMultiMessage
        {
            private readonly RedisKey[] keys;
            private readonly string script;
            private readonly RedisValue[] values;
            private byte[] asciiHash, hexHash;
            public ScriptEvalMessage(int db, CommandFlags flags, string script, RedisKey[] keys, RedisValue[] values)
                : this(db, flags, ResultProcessor.ScriptLoadProcessor.IsSHA1(script) ? RedisCommand.EVALSHA : RedisCommand.EVAL, script, null, keys, values)
            {
                if (script == null) throw new ArgumentNullException(nameof(script));
            }
            public ScriptEvalMessage(int db, CommandFlags flags, byte[] hash, RedisKey[] keys, RedisValue[] values)
                : this(db, flags, RedisCommand.EVAL, null, hash, keys, values)
            {
                if (hash == null) throw new ArgumentNullException(nameof(hash));
            }

            private ScriptEvalMessage(int db, CommandFlags flags, RedisCommand command, string script, byte[] hexHash, RedisKey[] keys, RedisValue[] values)
                : base(db, flags, command)
            {
                this.script = script;
                this.hexHash = hexHash;

                if (keys == null) keys = RedisKey.EmptyArray;
                if (values == null) values = RedisValue.EmptyArray;
                for (int i = 0; i < keys.Length; i++)
                    keys[i].AssertNotNull();
                this.keys = keys;
                for (int i = 0; i < values.Length; i++)
                    values[i].AssertNotNull();
                this.values = values;
            }     

也就是說 執行腳本可以在從庫上執行,很多查詢語句也可以在從庫上執行

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM