前言
.Net Core gRPC常見的重試策略。
gRPC RetryPolicy
RetryPolicy 是微軟官方提供的一種重試策略。允許在創建gRPC的時候配置一次重試策略。
var defaultMethodConfig = new MethodConfig
{
Names = { MethodName.Default },
RetryPolicy = new RetryPolicy
{
MaxAttempts = 5,
InitialBackoff = TimeSpan.FromSeconds(1),
MaxBackoff = TimeSpan.FromSeconds(5),
BackoffMultiplier = 1.5,
RetryableStatusCodes = { StatusCode.Unavailable }
}
};
var channel = GrpcChannel.ForAddress("https://localhost:5000", new GrpcChannelOptions
{
ServiceConfig = new ServiceConfig { MethodConfigs = { defaultMethodConfig } }
});
創建一個 RetryPolicy 重試配置,在創建 gRPC 的指定重試配置,重試策略可以按方法配置,而方法可以使用 Names 屬性進行匹配 MethodName.Default 將應用於此通道調用的所有 gRPC 方法。RetryPolicy 應該是最簡單的方式來實現重試了,但是它也有弊端,它沒有留下擴展的入口,想加個日志查看不可以。
- MaxAttempts:最大調用嘗試次數,包括原始嘗試次數,值必須大於1。
- InitialBackoff:重試嘗試之間初始的延遲。每次嘗試后,當前值將乘以 BackoffMultiplier。這個值是必須的,且值必須大於 0。
- MaxBackoff:重試嘗試之間的最大延遲,這個值是必須的,且值必須大於 0。
- BackoffMultiplier:每次重試嘗試后,InitialBackoff 會乘以該值,並將在乘數大於 1 的情況下以指數方式增加。這個值是必須的,且值必須大於 0。
- RetryableStatusCodes:狀態代碼的集合。 匹配的狀態將會重試,狀態代碼為 Unavailable 的會進行重試。
gRPC hedging
var defaultMethodConfig = new MethodConfig
{
Names = { MethodName.Default },
HedgingPolicy = new HedgingPolicy
{
HedgingDelay = TimeSpan.FromSeconds(1),
MaxAttempts = 5,
NonFatalStatusCodes = { StatusCode.Unavailable }
}
};
var channel = GrpcChannel.ForAddress("https://localhost:5000", new GrpcChannelOptions
{
ServiceConfig = new ServiceConfig { MethodConfigs = { defaultMethodConfig } }
});
Hedging 是一種微軟提供的一種備選重試策略。 Hedging 允許在不等待gRPC響應的情況下,發送多個調用請求,並且只會取第一個請求的返回結果,所以你的這個gRPC服務必須具有冪等性(冪等性的實質是一次或多次請求同一個資源,其結果是相同的。其關注的是對資源產生的影響(副作用)而不是結果,結果可以不同)。
- MaxAttempts: 發送的調用數量上限。 MaxAttempts 表示所有嘗試的總數,包括原始嘗試。 值受 GrpcChannelOptions.MaxRetryAttempts(默認值為 5)的限制。 必須為該選項提供值,且值必須大於 2。
- HedgingDelay:除去第一次調用,后續 hedging 調用將按該值延遲發送。 如果延遲設置為零或 null,那么所有所有調用都將立即發送。 默認值為 0。
- NonFatalStatusCodes:如果返回非致命狀態代碼將會繼續發送請求, 否則,將取消未完成的請求,並將錯誤返回到應。
HttpClientFactory 組合 Polly
gRPC 的調用請求最終還是由 HttpClient 來發送,所以我們可以結合 HttpClientFactory 和 Polly 來實現重試功能,引用 Microsoft.Extensions.Http.Polly 包。
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
services.AddGrpcClient<UserClient>(options =>
{
options.Address = new Uri("http://localhost:5005");
}).AddPolicyHandler(
HttpPolicyExtensions.HandleTransientHttpError()
.OrResult(res => res.StatusCode != HttpStatusCode.OK)
.RetryAsync(5, (result, count) =>
{
Console.WriteLine($"StatusCode:{result.Result?.StatusCode},Exception:{result.Exception?.Message},正在進行第{count}次重試");
}));
通過 AddPolicyHandler 來擴展需要處理的異常和處理機制。

還可以通過 WaitANdRetryAsync 來指定重試的時間
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
services.AddGrpcClient<UserClient>(options => { options.Address = new Uri("http://localhost:5005"); })
.AddPolicyHandler(
HttpPolicyExtensions.HandleTransientHttpError()
.OrResult(res => res.StatusCode != HttpStatusCode.OK)
.WaitAndRetryAsync(
5,
retryAttempt => TimeSpan.FromSeconds(3),
(result, time, count, context) =>
{
Console.WriteLine($"正在進行第{count}次重試,間隔{time.TotalSeconds}秒");
}
));

Interceptor
Interceptor 是gRPC中的攔截器,類似 MVC 中的中間件和過濾器。
public class ClientLoggerInterceptor : Interceptor
{
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(
TRequest request,
ClientInterceptorContext<TRequest, TResponse> context,
AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
{
return Policy<AsyncUnaryCall<TResponse>> // Return type for single request - single response call.
.Handle<RpcException>(s => s.StatusCode != StatusCode.OK)
.OrResult(asyncCall =>
{
if (asyncCall.GetAwaiter().IsCompleted)
{
return asyncCall.GetStatus().StatusCode == StatusCode.OK;
}
try
{
asyncCall.ResponseAsync.Wait();
}
catch (AggregateException)
{
return true;
}
return false;
})
.WaitAndRetryAsync(3, m => TimeSpan.FromSeconds(2), (result, time, count, context) =>
{
Console.WriteLine($"正在進行第{count}次重試。間隔{time.TotalSeconds}秒");
})
.ExecuteAsync(() => Task.FromResult(continuation(request, context))).Result;
}
}
private static async Task Main(string[] args)
{
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
var channel = GrpcChannel.ForAddress("http://localhost:5000");
var intercept = channel.Intercept(new ClientLoggerInterceptor());
var client = new Greeter.GreeterClient(intercept);
var response = await client.SayHelloAsync(
new HelloRequest
{
Name = "World"
});
Console.WriteLine(response.Message);
}

總結
簡單的實現了gRPC的重試策略,在開發過程可以根據自己的需求進行拓展。