Tip: 此篇已加入.NET Core微服務基礎系列文章索引
一、熔斷、降級與AOP
1.1 啥是熔斷?
在廣義的解釋中,熔斷主要是指為控制股票、期貨或其他金融衍生產品的交易風險,為其單日價格波動幅度規定區間限制,一旦成交價觸及區間上下限,交易則自動中斷一段時間(“熔即斷”),或就此“躺平”而不得超過上限或下限(“熔而不斷”)。
而對於微服務來說,熔斷就是我們常說的“保險絲”,意為當服務出現某些狀況時,切斷服務,從而防止應用程序不斷地常識執行可能會失敗的操作造成系統的“雪崩”,或者大量的超時等待導致系統卡死等情況,很多地方也將其成為“過載保護”。
1.2 啥是降級?
降級的目的就是當某個服務提供者發生故障的時候,向調用方返回一個替代響應或者錯誤響應。
例如:假設有一個短信服務,其調用聯通接口服務器發送短信服務(假設這里調用聯通接口最方便,最省事也最經濟)失敗之后,會嘗試改用移動短信服務器(假設這里調用移動服務器比較不方便也不經濟)發送,如果移動服務器調用也失敗,那么還會嘗試改用電信短信服務器(假設這里調用電信服務器最不省事和最不經濟),如果還失敗,則返回“失敗”響應;
降級的另一個概念也可以看作是服務的“選擇性放棄”,比如在雙11或618等大型的電商活動日中,在高峰值的情形下,一般的電商系統都會采用部分服務的優先級降低或者干脆延時或停止服務,以確保主要的服務能夠使用最大化的資源為客戶提供服務。等待峰值下降之后,再通過處理恢復那些降級的服務的原有優先級。
1.3 啥是AOP?
AOP(Aspect Oriented Programming)意為面向切面編程,它是指在運行時,動態地將代碼切入到類的指定方法、指定位置上的編程思想就是面向切面的編程。比如說,我們在兩個類中,可能都需要在每個方法中做日志。按面向對象的設計方法,我們就必須在兩個類的方法中都加入日志的內容。也許他們是完全相同的,但就是因為面向對象的設計讓類與類之間無法聯系,而不能將這些重復的代碼統一起來。而AOP就是為了解決這個問題而生的,一般而言,我們把切入到指定類指定方法的代碼片段稱為切面,而切入到哪些類、哪些方法則叫切入點。有了AOP,我們就可以把幾個類共有的代碼,抽取到一個切片中,等到需要時再切入對象中去,從而改變其原有的行為。
AOP是OOP(Object Oriented Programming)的補充,OOP從橫向上區分出一個個的類來,而AOP則從縱向上向對象中加入特定的代碼。有了AOP,OOP變得立體了。關於AOP的更多細節和討論,可以瀏覽知乎的這篇帖子:《什么是AOP?》
二、Polly的基本使用
2.1 Polly極簡介紹
Polly是一個被.NET基金會認可的彈性和瞬態故障處理庫,允許我們以非常順暢和線程安全的方式來執諸如行重試,斷路,超時,故障恢復等策略,其主要功能如下:
- 功能1:重試(Retry)
- 功能2:斷路器(Circuit-Breaker)
- 功能3:超時檢測(Timeout)
- 功能4:緩存(Cache)
- 功能5:降級(Fallback)
Polly的策略主要由“故障”和“動作”兩個部分組成,“故障”可以包括異常、超時等情況,“動作”則包括Fallback(降級)、重試(Retry)、熔斷(Circuit-Breaker)等。策略則用來執行業務代碼,當業務代碼出現了“故障”中的情況時就開始執行“動作”。
2.2 Polly基礎使用
*.這里只介紹幾個我們需要用到的功能,其他功能請瀏覽參考資料關於Polly的部分。
(1)通過NuGet安裝,最新版本:6.0.1
NuGet>Install-Package Polly
(2)FallBack => 當出現故障,則進入降級動作
public static void Case1() { ISyncPolicy policy = Policy.Handle<ArgumentException>() .Fallback(() => { Console.WriteLine("Error occured"); }); policy.Execute(() => { Console.WriteLine("Job Start"); throw new ArgumentException("Hello Polly!"); Console.WriteLine("Job End"); }); }
執行結果如下圖所示:這里捕捉的是ArgumentException, 如果想捕捉所有的Exception,請設置Policy.Handle<Exception>,不過這樣就擴大了范圍。
(3)Retry => 重試,比較容易理解
public static void Case2() { ISyncPolicy policy = Policy.Handle<Exception>().Retry(3); try { policy.Execute(() => { Console.WriteLine("Job Start"); if (DateTime.Now.Second % 10 != 0) { throw new Exception("Special error occured"); } Console.WriteLine("Job End"); }); } catch (Exception ex) { Console.WriteLine("There's one unhandled exception : " + ex.Message); } }
執行結果如下圖所示:可以看到,這里重試了三次,仍然沒有滿足條件(DateTime.Now.Second % 10 == 0),因此進入了外部的未處理異常catch塊中。
(4)CircuitBreaker => 短路保護,當一塊業務代碼/服務 出現了N次錯誤,則把“熔斷器”(保險絲)熔斷,等待一段時間后才允許再次執行,在這段等待的時間內如果再執行則直接拋出BrokenCircuitException異常。這個也很好理解,比如我們的手機屏幕密碼,如果輸錯了N次之后,手機會拒絕我們再次輸入,而是讓我們等待20 ~ 30s 之后再輸入,如果等待之后再輸錯N次,則再次進入等待。
這里假設我們設置一個短路保護策略:當發生了故障的時候,則重試了5次還是有故障(代碼中的6代表的是在執行短路保護策略之前允許6次故障),那么久停止服務10s鍾,10s之后再允許重試。
public static void Case3() { // Stop for 10s after retry 6 times ISyncPolicy policy = Policy.Handle<Exception>() .CircuitBreaker(6, TimeSpan.FromSeconds(10)); while (true) { try { policy.Execute(() => { Console.WriteLine("Job Start"); throw new Exception("Special error occured"); Console.WriteLine("Job End"); }); } catch (Exception ex) { Console.WriteLine("There's one unhandled exception : " + ex.Message); } Thread.Sleep(500); } }
執行結果如下圖所示:出現了6次故障之后,直接給我們跑出了短路保護的異常,“The circuit is now open and is not allowing calls”.
(5)Timeout 與 Wrap => Wrap是指策略封裝,可以把多個ISyncPolicy合並到一起執行。Timeout則是指超時處理,但是超時策略一般不能直接使用,而是其其他策略封裝到一起使用。
這里我們封裝兩個策略,一個是基本的Fallback,另一個則是超時策略,如果調用執行時間超過2s則觸發Fallback。
這里涉及到Polly中關於超時的兩個策略:一個是悲觀策略(Pessimistic),一個是樂觀策略(Optimistic)。其中,悲觀策略超時后會直接拋異常,而樂觀策略則不會,而只是觸發CancellationTokenSource.Cancel函數,需要等待委托自行終止操作。一般情況下,我們都會用悲觀策略。
public static void Case4() { try { ISyncPolicy policyException = Policy.Handle<TimeoutRejectedException>() .Fallback(() => { Console.WriteLine("Fallback"); }); ISyncPolicy policyTimeout = Policy.Timeout(3, Polly.Timeout.TimeoutStrategy.Pessimistic); ISyncPolicy mainPolicy = Policy.Wrap(policyTimeout, policyException); mainPolicy.Execute(() => { Console.WriteLine("Job Start..."); Thread.Sleep(5000); //throw new Exception(); Console.WriteLine("Job End..."); }); } catch (Exception ex) { Console.WriteLine($"Unhandled exception : {ex.GetType()} : {ex.Message}"); } }
執行結果如下圖所示:
除此之外,Polly還提供了一些異步方法供調用以實現以上介紹的功能,比如在業務代碼中有一些Http的調用或者IO操作時,不妨用用異步操作來提高一點效率,可以看下面這個例子:
public static async void Case5() { Policy<byte[]> policy = Policy<byte[]>.Handle<Exception>() .FallbackAsync(async c => { Console.WriteLine("Executed Error!"); return new byte[0]; }, async r => { Console.WriteLine(r.Exception); }); policy = policy.WrapAsync(Policy.TimeoutAsync(20, TimeoutStrategy.Pessimistic, async (context, timespan, task) => { Console.WriteLine("Timeout!"); })); var bytes = await policy.ExecuteAsync(async ()=> { Console.WriteLine("Start Job"); HttpClient httpClient = new HttpClient(); var result = await httpClient.GetByteArrayAsync("https://images2018.cnblogs.com/blog/381412/201806/381412-20180606230929894-145212290.png"); Console.WriteLine("Finish Job"); return result; }); Console.WriteLine($"Length of bytes : {bytes.Length}"); }
執行結果如下圖所示:
至於Polly更多的功能和用法,可以參閱官方文檔,這里不再贅述。
三、AspectCore的基本使用
3.1 為什么要用AOP框架
從上面的例子可以看出,如果直接使用Polly,那么就會造成我們的業務代碼中混雜大量的業務無關的代碼。所以,我們會使用AOP的方式來封裝Polly,嗯,首先我們先找一個支持的.NET Core的AOP框架吧,目前大家都在用AspectCore(國產,作者Lemon),它采用動態動態代理/織入,並且支持異步方法的攔截。
快快通過NuGet安裝一個吧:
NuGet>Install-Package AspectCore.Core
3.2 AspectCore的極簡使用
這里假設我們要針對一個類的某些類的某些方法進行攔截,我們一般會經過一下幾個步驟:
(1)編寫一個攔截器,一般繼承自AbstractInterceptorAttribute
/// <summary> /// 自定義攔截器 /// </summary> public class CustomInterceptorAttribute : AbstractInterceptorAttribute { /// <summary> /// 每個被攔截的方法中執行 /// </summary> /// <param name="context"></param> /// <param name="next"></param> /// <returns></returns> public override async Task Invoke(AspectContext context, AspectDelegate next) { try { Console.WriteLine("Before service call"); await next(context); // 執行被攔截的方法 } catch (Exception) { Console.WriteLine("Service threw an exception"); throw; } finally { Console.WriteLine("After service call"); } } }
這里我們通過為被攔截方法增加一些處理前和處理后的logic來實現AOP。
(2)編寫需要被代理攔截的類
/// <summary> /// 實現AoP的兩個要求: /// 1.public 類 /// 2.virtual 方法 /// </summary> public class Person { [CustomInterceptor] public virtual void Say(string message) { Console.WriteLine($"Service calling ... => {message}"); } }
可以看到我們在要攔截的方法Say()的聲明之上加了一個Attribute:CustomInterceptor,正是我們之前新增的。
(3)通過AspectCore創建代理對象實現AOP
public class Program { public static void Main(string[] args) { ProxyGeneratorBuilder proxyGeneratorBuilder = new ProxyGeneratorBuilder(); using (IProxyGenerator proxyGenerator = proxyGeneratorBuilder.Build()) { Person p = proxyGenerator.CreateClassProxy<Person>(); p.Say("edisonchou.cnblogs.com"); } Console.ReadKey(); } }
執行結果如下圖所示:
代碼很清晰,不再解釋。直到這里,我們看到了不管是Polly的使用,還是AspectCore的使用,都存在一些業務無關的聲明代碼,而且我們需要結合Polly和AspectCore才能完整地實現適合ASP.NET Core的熔斷降級組件,下面我們就來模仿Spring Cloud中的Hystrix(可以參考這一篇文章來了解Spring Cloud Hystrix是個啥玩意兒)
四、Polly+AspectCore的結合使用
4.1 封裝一個Hystrix
NuGet>Install-Package Polly
NuGet>Install-Package AspectCore.Core
NuGet>Install-Package Microsoft.Extensions.Caching.Memory
[AttributeUsage(AttributeTargets.Method)] public class HystrixCommandAttribute : AbstractInterceptorAttribute { /// <summary> /// 最多重試幾次,如果為0則不重試 /// </summary> public int MaxRetryTimes { get; set; } = 0; /// <summary> /// 重試間隔的毫秒數 /// </summary> public int RetryIntervalMilliseconds { get; set; } = 100; /// <summary> /// 是否啟用熔斷 /// </summary> public bool IsEnableCircuitBreaker { get; set; } = false; /// <summary> /// 熔斷前出現允許錯誤幾次 /// </summary> public int ExceptionsAllowedBeforeBreaking { get; set; } = 3; /// <summary> /// 熔斷多長時間(毫秒) /// </summary> public int MillisecondsOfBreak { get; set; } = 1000; /// <summary> /// 執行超過多少毫秒則認為超時(0表示不檢測超時) /// </summary> public int TimeOutMilliseconds { get; set; } = 0; /// <summary> /// 緩存多少毫秒(0表示不緩存),用“類名+方法名+所有參數ToString拼接”做緩存Key /// </summary> public int CacheTTLMilliseconds { get; set; } = 0; private static ConcurrentDictionary<MethodInfo, Policy> policies = new ConcurrentDictionary<MethodInfo, Policy>(); private static readonly IMemoryCache memoryCache = new MemoryCache(new Microsoft.Extensions.Caching.Memory.MemoryCacheOptions()); /// <summary> /// HystrixCommandAttribute /// </summary> /// <param name="fallBackMethod">降級的方法名</param> public HystrixCommandAttribute(string fallBackMethod) { this.FallBackMethod = fallBackMethod; } public string FallBackMethod { get; set; } public override async Task Invoke(AspectContext context, AspectDelegate next) { //一個HystrixCommand中保持一個policy對象即可 //其實主要是CircuitBreaker要求對於同一段代碼要共享一個policy對象 //根據反射原理,同一個方法的MethodInfo是同一個對象,但是對象上取出來的HystrixCommandAttribute //每次獲取的都是不同的對象,因此以MethodInfo為Key保存到policies中,確保一個方法對應一個policy實例 policies.TryGetValue(context.ServiceMethod, out Policy policy); lock (policies)//因為Invoke可能是並發調用,因此要確保policies賦值的線程安全 { if (policy == null) { policy = Policy.NoOpAsync();//創建一個空的Policy if (IsEnableCircuitBreaker) { policy = policy.WrapAsync(Policy.Handle<Exception>().CircuitBreakerAsync(ExceptionsAllowedBeforeBreaking, TimeSpan.FromMilliseconds(MillisecondsOfBreak))); } if (TimeOutMilliseconds > 0) { policy = policy.WrapAsync(Policy.TimeoutAsync(() => TimeSpan.FromMilliseconds(TimeOutMilliseconds), Polly.Timeout.TimeoutStrategy.Pessimistic)); } if (MaxRetryTimes > 0) { policy = policy.WrapAsync(Policy.Handle<Exception>().WaitAndRetryAsync(MaxRetryTimes, i => TimeSpan.FromMilliseconds(RetryIntervalMilliseconds))); } Policy policyFallBack = Policy .Handle<Exception>() .FallbackAsync(async (ctx, t) => { AspectContext aspectContext = (AspectContext)ctx["aspectContext"]; var fallBackMethod = context.ServiceMethod.DeclaringType.GetMethod(this.FallBackMethod); Object fallBackResult = fallBackMethod.Invoke(context.Implementation, context.Parameters); //不能如下這樣,因為這是閉包相關,如果這樣寫第二次調用Invoke的時候context指向的 //還是第一次的對象,所以要通過Polly的上下文來傳遞AspectContext //context.ReturnValue = fallBackResult; aspectContext.ReturnValue = fallBackResult; }, async (ex, t) => { }); policy = policyFallBack.WrapAsync(policy); //放入 policies.TryAdd(context.ServiceMethod, policy); } } //把本地調用的AspectContext傳遞給Polly,主要給FallbackAsync中使用,避免閉包的坑 Context pollyCtx = new Context(); pollyCtx["aspectContext"] = context; //Install-Package Microsoft.Extensions.Caching.Memory if (CacheTTLMilliseconds > 0) { //用類名+方法名+參數的下划線連接起來作為緩存key string cacheKey = "HystrixMethodCacheManager_Key_" + context.ServiceMethod.DeclaringType + "." + context.ServiceMethod + string.Join("_", context.Parameters); //嘗試去緩存中獲取。如果找到了,則直接用緩存中的值做返回值 if (memoryCache.TryGetValue(cacheKey, out var cacheValue)) { context.ReturnValue = cacheValue; } else { //如果緩存中沒有,則執行實際被攔截的方法 await policy.ExecuteAsync(ctx => next(context), pollyCtx); //存入緩存中 using (var cacheEntry = memoryCache.CreateEntry(cacheKey)) { cacheEntry.Value = context.ReturnValue; cacheEntry.AbsoluteExpiration = DateTime.Now + TimeSpan.FromMilliseconds(CacheTTLMilliseconds); } } } else//如果沒有啟用緩存,就直接執行業務方法 { await policy.ExecuteAsync(ctx => next(context), pollyCtx); } } }
這個HystrixCommand並非我原創,而是引用的楊中科老師在.NET微服務中的代碼,大家也可以直接通過NuGet安裝這個封裝好的Package:
NuGet>Install-Package RuPeng.HystrixCore
這里不再多講解代碼,因為都有注釋,大家通過一個案例調試以下就了解流程了。
4.2 在ASP.NET Core的使用
(1)為了簡化代理類對象的注入,不用在ASP.NET Core中再通過ProxyGeneratorBuilder進行注入,我們引入一個AspectCore的DI擴展包:
NuGet>Install-Package AspectCore.Extensions.DependencyInjection
(2)改寫Startup類的ConfigureService方法,把返回值從void改為IServiceProvider
// This method gets called by the runtime. Use this method to add services to the container. public IServiceProvider ConfigureServices(IServiceCollection services) { services.AddMvc(); ....... // AoP - AspectCore RegisterServices(this.GetType().Assembly, services); return services.BuildAspectCoreServiceProvider(); }
這里BuildAspectCoreServiceProvider就是讓AspectCore接管注入。RegisterService方法如下所示:
private static void RegisterServices(Assembly asm, IServiceCollection services) { foreach (var type in asm.GetExportedTypes()) { bool hasHystrixCommand = type.GetMethods().Any(m => m.GetCustomAttribute(typeof(HystrixCommandAttribute)) != null); if (hasHystrixCommand) { services.AddSingleton(type); } } }
這里使用反射,篩選出那些帶有HystrixCommandAttribute的類進行注入,從而減少一行一行注入的代碼工作量。
(3)這里假設我們需要進行熔斷保護的方法所在類是一個ProductService類,它主要的功能就是通過HttpClient去調用ProductService的某個API,它的定義如下:
public class ProductService { [HystrixCommand(nameof(GetAllProductsFallBackAsync), IsEnableCircuitBreaker = true, ExceptionsAllowedBeforeBreaking = 3, MillisecondsOfBreak = 1000 * 5)] public virtual async Task<string> GetAllProductsAsync(string productType) { Console.WriteLine($"-->>Starting get product type : {productType}"); string str = null; str.ToString(); // to do : using HttpClient to call outer service to get product list return $"OK {productType}"; } public virtual async Task<string> GetAllProductsFallBackAsync(string productType) { Console.WriteLine($"-->>FallBack : Starting get product type : {productType}"); return $"OK for FallBack {productType}"; } }
這里假設我們主要針對GetAllProductsAsync這個方法進行熔斷保護,假設它會調用另一個Service的獲取產品的接口,這個接口會訪問核心數據庫,其每天的訪問量很大,我們對此接口進行熔斷保護,設置在啟用熔斷保護前允許兩次故障(這里主要指異常),熔斷保護時間為5s。
在Controller中,通過構造函數注入:
[Produces("application/json")] [Route("api/Client")] public class ClientController : Controller { private readonly IClientService clientService; private readonly ProductService productService; public ClientController(IClientService _clientService, ProductService _productService) { clientService = _clientService; productService = _productService; } [HttpGet("{id}")] public async Task<string> Get(int id) { var product = await productService.GetAllProductsAsync("B"); return product; } }
為了能夠在控制台中看到熔斷的信息,我們增加一句Console.WriteLine到HystrixCommandAttribute中:
// 啟用熔斷保護(CircuitBreaker) if (IsEnableCircuitBreaker) { policy = policy.WrapAsync(Policy.Handle<Exception>() .CircuitBreakerAsync(ExceptionsAllowedBeforeBreaking, TimeSpan.FromMilliseconds(MillisecondsOfBreak), (ex, ts) => { // assuem to do logging Console.WriteLine($"Service API OnBreak -- ts = {ts.Seconds}s, ex.message = {ex.Message}"); }, () => {})); }
這樣當Polly啟用熔斷時便會在控制台中輸出一段消息,實際使用中可以往日志中寫一段日志信息。
(4)開起內置服務器進行測試
Step1.借助命令行啟動一個WebAPI程序
Step2.借助Postman/SoapUI等API測試工具,輸入我們的URL,測試結果如下圖所示:
可以看到我們通過在Postman中訪問這個URL從而觸發Service中的異常,兩次異常之后,便進入了熔斷保護時間,此后5s內的訪問都沒有再進行實際代碼的執行,而直接進入了Fallback方法執行降級后的邏輯。5s保護時間之后,則再次進入實際代碼的執行。目前,這個Hystrix還存在一些問題,需繼續完善,還無法正式投入使用,后續會結合Polly和Ocelot,在API網關處做統一熔斷保護。
五、小結
本篇首先介紹了一下熔斷、降級以及AOP的基本概念,然后從兩個流行的庫Polly和AspectCore的基本使用開始了解如何在.NET Core代碼中實現熔斷機制和AOP,最后通過結合Polly+AspectCore封裝了一個Hystrix來介紹了一下如何在ASP.NET Core程序中如何做到標簽式地快速實現熔斷降級機制。后續,會將Polly與Ocelot結合實踐API網關,在Ocelot網關處做統一熔斷保護。
參考資料
楊中科,《.NET微服務直播課課件(第二版)》
guwei,《談談我對服務熔斷、服務降級的理解》
Jeffcky,《已被.NET基金會認可的彈性和瞬態故障處理庫Polly介紹》
Lemon,《Asp.Net Core輕量級Aop解決方案:AspectCore》
Sunday_Xiao,《服務熔斷保護Spring Cloud Hystrix》
Catcher Wong, 《再談Circuit Breaker之使用Polly》
Polly官方文檔,https://github.com/App-vNext/Polly
AspectCore官方文檔,https://github.com/dotnetcore/AspectCore-Framework