優雅的處理Redis訪問超時


很長一段時間以來,一直在項目中使用Redis作為輔助存儲,確切來說是利用Redis的內存存儲,而不是將其作為緩存。比如常見的利用Set集合來判斷某個數值是否存在,或者將來自不同請求的數據放在Redis中進行拼接然后一起寫入MySQL等數據庫。
這種存儲目的的使用要求對Redis的訪問不能失敗(如果作為緩存使用,是接受失敗的),所以作為存儲目的使用代碼中要對請求Redis的代碼進行異常處理以及重試等。
在最初的代碼中采用了最常見的方法如try ... catch ...處理異常,遞歸進行重試,類似:

//偽代碼
public void Process(int retry)
{
    if(retry>3)
    {
        //記錄錯誤
        return;
    }
    try
    {
        //業務代碼
    } 
    catch(Exception ex)
    {
        //重試
        ++retry;
        Process(retry);
    }
}

后來有一天看到了園友Jeffcky推薦的Polly庫,瞬間眼前一亮,這才是我們處理異常和重試所需要的東西。
關於Polly的使用,可以參考Jeffcky的博文或者Polly項目的GitHub主頁(文檔很詳細)。
大致的代碼結構如:

var tsArr = new TimeSpan[]
{
    TimeSpan.FromSeconds(1),
    TimeSpan.FromSeconds(1)
};
// 構造一種重試測試(其它可選的包括熔斷等)
var policy = Policy
    .Handle<Exception>()
    .WaitAndRetryAsync(tsArr);

// 需要有Polly調用的業務代碼,以異步方法為例
async Task SomeToInvoke()
{
       // 一些異步調用
}

// 使用Polly執行業務代碼(如不需要捕獲異常可選用其它重載)
var pollyRet = await policy.ExecuteAndCaptureAsync(SomeToInvoke);
// 處理返回值判斷調用是否成功,或發生了什么異常

下面一步步來看博主的實現過程。

先放上一些測試所用的代碼,首先是創建Redis連接的接口和類,它們是從NopCommerce項目一個早起版本借(chao)鑒(xi)來的(文件名都沒改,為了測試方便代碼略有改動),一直用着沒啥大問題就這樣用了。

public interface IRedisConnectionWrapper : IDisposable
{
    IDatabase Database(int? db = null);
    IServer Server(EndPoint endPoint);
    EndPoint[] GetEndpoints();
    void FlushDb(int? db = null);
}
public class RedisConnectionWrapper : IRedisConnectionWrapper
{
    private readonly Lazy<string> _connectionString;
    private readonly Lazy<string> _auth;

    private volatile ConnectionMultiplexer _connection;
    private readonly object _lock = new object();

    public RedisConnectionWrapper(string server, string pswd)
    {
        this._connectionString = new Lazy<string>(() => server);
        this._auth = new Lazy<string>(() => pswd);
    }

    private ConnectionMultiplexer GetConnection()
    {
        if (_connection != null && _connection.IsConnected) return _connection;

        lock (_lock)
        {
            if (_connection != null && _connection.IsConnected) return _connection;

            if (_connection != null)
            {
                _connection.Dispose();
            }

            var options = new ConfigurationOptions();
            options.EndPoints.Add(_connectionString.Value);
            if (!string.IsNullOrEmpty(_auth.Value))
                options.Password = _auth.Value;

            _connection = ConnectionMultiplexer.Connect(options);
        }

        return _connection;
    }

    public IDatabase Database(int? db = null)
    {
        return GetConnection().GetDatabase(db ?? -1);
    }

    public IServer Server(EndPoint endPoint)
    {
        return GetConnection().GetServer(endPoint);
    }

    public EndPoint[] GetEndpoints()
    {
        return GetConnection().GetEndPoints();
    }

    public void FlushDb(int? db = null)
    {
        var endPoints = GetEndpoints();

        foreach (var endPoint in endPoints)
        {
            Server(endPoint).FlushDatabase(db ?? -1);
        }
    }

    public void Dispose()
    {
        if (_connection != null)
        {
            _connection.Dispose();
        }
    }
}

對於StackExchange.Redis來說是比較標准的連接創建方式,順便看了下新版的NopCommerce代碼中,代碼有了些小改進,增加了一個雙重鎖。有需要的園友可以自行去下載新的。

接着開始考慮重試問題,為了代碼看起來更簡潔,決定嘗試通過動態代理將捕捉異常並重試的操作作為切面注入。說到動態代理,第一個想到肯定是Castle.Core(前身為CastleDynamicProxy)。動態代理可以選擇接口或者是類,如果是類的話需要方法是虛方法。看了下StackExchange.Redis的代碼,幾個實現類都是internal,方法也都是非virtual。所以只能只能自己寫一個類包一下。
這個類就是一個殼,為了我們切面注入。下面的代碼只保留的一個方法,其它的省略。另外Castle.Core的動態代理是不支持異步方法的,所以先用Redis的同步接口做下嘗試。

public class RedisDatabaseWrapper:IDatabase
{
    private IDatabase _redisDb;

    public RedisDatabaseWrapper(IRedisConnectionWrapper redisConnectionWrapper)
    {
        _redisDb = redisConnectionWrapper.Database();
    }

    public virtual bool SetContains(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
    {
        return _redisDb.SetContains(key, value, flags);
    }

    // 省略其它所有方法...
}

安裝Castle.Core,並開始實現動態代理類。

public class RetryByPollyInterceptor : IInterceptor
{
    public async void Intercept(IInvocation invocation)
    {
        var isAsync = IsAsyncMethod(invocation.Method);
        if (isAsync)
            InterceptAsync(invocation);
        else
            InterceptSync(invocation);
    }

    private void InterceptSync(IInvocation invocation)
    {
        var tsArr = new TimeSpan[]
        {
            TimeSpan.FromSeconds(1),
            TimeSpan.FromSeconds(1)
        };

        Action<Exception, TimeSpan, int, Context> action = (ex, ts, idx, ctx) =>
        {
            Console.WriteLine($"Polly Exp:{ex.GetType()} {ex.Message} Try:{idx} ");

            var invca = (IInvocation)ctx["inv"];
            if (idx == 2)
            {
                var type = invca.Method.ReturnType;
                if (type == typeof(void)) return;
                var ret = type.IsValueType ? Activator.CreateInstance(type) : null;
                invca.ReturnValue = ret;
            }
        };

        var policy = Policy
            .Handle<TimeoutException>()
            .Or<RedisConnectionException>()
            .Or<Exception>()
            .WaitAndRetry(tsArr, action);

        void OrignalInvoke()
        {
            invocation.Proceed();
        }

        var pollyRet = policy.ExecuteAndCapture(OrignalInvoke, new Dictionary<string, object>() { ["inv"] = invocation });
        if (pollyRet.Outcome != OutcomeType.Successful)
        {
            Console.WriteLine($"Polly Ret Type:{pollyRet.Outcome} Exp:{pollyRet.ExceptionType} Msg:{pollyRet.FinalException?.Message}");
        }
    }

    private void InterceptAsync(IInvocation invocation)
    {
            // 異步方法代理,下文會討論
    }

    private static bool IsAsyncMethod(MethodInfo method)
    {
        return (
            method.ReturnType == typeof(Task) ||
            (method.ReturnType.IsGenericType && method.ReturnType.GetGenericTypeDefinition() == typeof(Task<>))
        );
    }
}

注意
這個方法也是經過多次嘗試才最終完成,可以看到這里預留了處理異步代理的方法,后文會詳細說。對於同步方法這段代碼可以完美的捕獲異常並重試。不用在外側代碼進行catch。當然內部發生異常並多次重試仍失敗后會返回非期望的結果,還是需要根據業務的需要對返回值進行判斷。
這段代碼最值得注意的是這幾行:

Action<Exception, TimeSpan, int, Context> action = (ex, ts, idx, ctx) =>
{
    Console.WriteLine($"Polly Exp:{ex.GetType()} {ex.Message} Try:{idx} ");

    var invca = (IInvocation)ctx["inv"];
    if (idx == 2)
    {
        var type = invca.Method.ReturnType;
        if (type == typeof(void)) return;
        var ret = type.IsValueType ? Activator.CreateInstance(type) : null;
        invca.ReturnValue = ret;
    }
};

由於我們設置重試兩次,當第二次發生異常時,我們強制給方法返回值賦一個返回值,這樣可以讓外部調用方法正常執行下去而不會由於無法獲取代理方法的返回值而報空引用異常。

接着看看其它組成部分。在博主目前大部分項目中都使用Autofac作為容器,我們需要注冊一下用到的類。並且通過Autofac的Castle.Core插件,可以注冊動態代理,這樣就不用通過給類添加Attribute的方式來添加代理,這是個人比較喜歡的風格。

var builder = new ContainerBuilder();

builder.Register(c => new RetryByPollyInterceptor()); //動態代理類

builder.RegisterType<RedisDatabaseWrapper>().As<IDatabase>().EnableInterfaceInterceptors().InterceptedBy(typeof(RetryByPollyInterceptor)).SingleInstance(); //添加動態代理

builder.RegisterType<RedisConnectionWrapper>().As<IRedisConnectionWrapper>()
    .WithParameters(new[]
    {
        new NamedParameter("server", "127.0.0.1"),
        new NamedParameter("pswd",""),
    }).SingleInstance();

Container = builder.Build();

可以用下面的代碼來測試一下上面這些方法。

public void ReadTest(long start, long end)
{
    for (var i = start; i <= end; i++)
    {
        var exists = _redisDb.SetContains(RedisKey, i);
    }
}

可以使用Windows版的Redis,直接運行redis-server.exe來啟動服務。然后直接關閉redis-server程序來模擬服務端失敗,或者直接禁用網卡來模擬網絡失敗。
可以看到Polly會進行重試並且捕獲異常,也就說在ReadTest中感知不到異常。

搞定了同步方法,開始嘗試動態代理異步方法。添加Redis異步接口的實現並注冊:

public class RedisDatabaseAsyncWrapper:IDatabaseAsync
{
    private IDatabase _redisDb;

    public RedisDatabaseAsyncWrapper(IRedisConnectionWrapper redisConnectionWrapper)
    {
        _redisDb = redisConnectionWrapper.Database();
    }
    
    public virtual async Task<bool> SetContainsAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
    {  
        return await _redisDb.SetContainsAsync(key, value, flags);
    }

    // 省略其它實現..
}

//注冊異步實現
builder.RegisterType<RedisDatabaseAsyncWrapper>().As<IDatabaseAsync>().EnableInterfaceInterceptors().InterceptedBy(typeof(RetryByPollyInterceptor)).SingleInstance();

//異步代理
private void InterceptAsync(IInvocation invocation)
{
    var tsArr = new TimeSpan[]
    {
        TimeSpan.FromSeconds(1),
        TimeSpan.FromSeconds(1)
    };

    var policy = Policy
        .Handle<TimeoutException>()
        .Or<RedisConnectionException>()
        .Or<Exception>()
        .WaitAndRetry(tsArr);

    void OrignalInvoke()
    {
        try
        {
            invocation.Proceed();
        }
        catch (Exception e)
        {
            var geneType = invocation.Method.ReturnType.GenericTypeArguments[0];
            var ret = geneType.IsValueType ? Activator.CreateInstance(geneType) : null;
            invocation.ReturnValue = Task.FromResult(ret);

            Console.WriteLine(e);
        }
    }
    
    var pollyRet = policy.ExecuteAndCapture(OrignalInvoke,
        new Dictionary<string, object>() { ["inv"] = invocation });
    if (pollyRet.Outcome != OutcomeType.Successful)
    {
        Console.WriteLine(
            $"Polly Ret Type:{pollyRet.Outcome} Exp:{pollyRet.ExceptionType} Msg:{pollyRet.FinalException?.Message}");

        var invca = (IInvocation)pollyRet.Context["inv"];

        var type = invca.Method.ReturnType;
        if (type == typeof(void)) return;
        if (type.IsGenericType)
        {
            var geneType = invca.Method.ReturnType.GenericTypeArguments[0];
            var ret = geneType.IsValueType ? Activator.CreateInstance(geneType) : null;
            invca.ReturnValue = Task.FromResult(ret);
        }
        else
        {
            invca.ReturnValue = Task.FromResult(0);
        }
    }
}

這里直接告訴各位我的嘗試結果是無論如何都無法通過Polly來捕獲異常。即上面代碼中,OrignalInvoke方法中try...catch...抓不到異常,異常直接被扔給了外部方法。具體原因由於本人比較菜也比較懶沒有仔細研究,大概可能就是用一個同步環境去調異步環境的方法沒有特殊處理所以出的問題。有知道的園友評論中指點下。
如果是把invocation.Proceed()放在Task中,到是異常不會拋到外側,但會因為被代理的方法取不到返回值而報空引用錯誤。原因大概應該是Castle.Core沒有取到這個異步構造中的返回值。

經過一番嘗試后放棄。在查找解決方法的過程中還發現一個名為Castle.Core.AsyncInterceptor的庫,給Castle.Core添加動態代理異步函數的功能,但此擴展的文檔實在過長,而且粗略看了下還不支持針對Autofac等IoC容器的擴展,直接放棄。

后來機緣巧合看到了園友Lemon大神的介紹其AspectCore庫的文章。留言問了下對異步方法支持的情況,Lemon大神立刻給了回復,還附送了一些使用的特別提示。於是立馬安裝嘗試。

首先是最重要的代理方法,AspectCore原生對異步方法提供支持,代碼寫起來很簡單:

public class RetryByPollyAspectCoreInterceptor : AbstractInterceptorAttribute
{
    public override async Task Invoke(AspectContext context, AspectDelegate next)
    {
        var tsArr = new TimeSpan[]
        {
            TimeSpan.FromSeconds(1),
            TimeSpan.FromSeconds(1)
        };

        var policy = Policy
            .Handle<AspectInvocationException>(ex=>ex.InnerException?.GetType()==typeof(TimeoutException))
            .Or<AspectInvocationException>(ex=>ex.InnerException?.GetType()==typeof(RedisConnectionException))
            .WaitAndRetryAsync(tsArr);

        async Task OrignalInvoke()
        {
            await context.Invoke(next);
        }

        var pollyRet = await policy.ExecuteAndCaptureAsync(OrignalInvoke,new Dictionary<string, object>() { ["ctx"] = context});
        if (pollyRet.Outcome != OutcomeType.Successful)
        {
            Console.WriteLine($"Polly Ret Type:{pollyRet.Outcome} Exp:{pollyRet.ExceptionType} Msg:{pollyRet.FinalException?.Message}");

            var ctx = (AspectContext)pollyRet.Context["ctx"];
            var type = ctx.ProxyMethod.ReturnType;
            if (type == typeof(void)) return;
            if (type.IsGenericType)
            {
                var geneType = type.GenericTypeArguments[0];
                dynamic ret = geneType.IsValueType ? Activator.CreateInstance(geneType) : null;
                ctx.ReturnValue = Task.FromResult(ret);
            }
            else
            {
                var ret = type.IsValueType ? Activator.CreateInstance(type) : null;
                ctx.ReturnValue = Task.FromResult(ret);
            }
        }
    }
}

AspectCore也有Autofac的擴展,注冊也是非常簡單:

builder.RegisterDynamicProxy();
不過AspectCore還是需要給被代理的類添加Attribute:
[RetryByPollyAspectCoreInterceptor]
public class RedisDatabaseAsyncWrapper:IDatabaseAsync
{
    ...
}

希望大神可以擴展AspectCore的Autofac插件實現無需Attribute的代理設置。

2018/01/18補充

根據Lemon大神在評論中指點,AspectCore可以使用如下方式在Autofac注冊中進行全局AOP注入:

builder.RegisterDynamicProxy(config =>
{
    config.Interceptors.AddTyped<RetryByPollyAspectCoreInterceptor>(
        Predicates.ForService("IDatabaseAsync"));
});

最后可以使用下面的代碼測試這個異步的重試實現:

public async Task ReadTestAsync(long start, long end)
{
    var total = end - start;
    for (var i = 0; i <= total; i++)
    {
        var item = i + start;
        var exists = await _redisDb.SetContainsAsync(RedisKey, item);
    }
}

可以看到代理方法完美的處理了異常。

文末,在這個異步方法越來越多的新時代再次強烈推薦AspectCore
感謝各位大神提供了這么多好用的庫。感謝各位園友閱讀本文。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM