前言
其實Grpc攔截器是我以前研究過,但是我看網上相關C#版本的源碼解析相對少一點,所以筆者借這篇文章給大家分享下Grpc攔截器的實現,廢話不多說,直接開講(Grpc的源碼看着很方便,包自動都能還原成功。.Net源碼就硬生啃。。。弄了半天沒還原成功😂)。
ps:
- 本篇文章主要是講解源碼,並不進行舉例Demo,所以讀者盡量先寫一個小Demo,看看生成的代碼,然后伴隨着看文章。
- 如果沒有用過Grpc的讀者,可以先寫個小Demo,可以看官網點擊這里,主要是查看下通過Proto文件生成的代碼的格式。
- 這篇文章講解分別從客戶端和服務端兩部分講解(實現有點不一樣),篇幅原因只講解一元調用的示例,其他形式的調用其實是類似的。
Client端
Interceptor和CallInvoker抽象類
public abstract class Interceptor
{
//一元調用同步攔截器
public virtual TResponse BlockingUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, BlockingUnaryCallContinuation<TRequest, TResponse> continuation)
where TRequest : class
where TResponse : class
{
return continuation(request, context);
}
//一元調用異步攔截器
public virtual AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(TRequest request, ClientInterceptorContext<TRequest, TResponse> context, AsyncUnaryCallContinuation<TRequest, TResponse> continuation)
where TRequest : class
where TResponse : class
{
return continuation(request, context);
}
}
public abstract class CallInvoker
{
//一元調用同步攔截器
public abstract TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
where TRequest : class
where TResponse : class;
//一元調用異步攔截器
public abstract AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
where TRequest : class
where TResponse : class;
}
首先我們要理解這兩個抽象類分別是干什么的,上述代碼講解:
- Interceptor我們知道,在實現自定義的攔截器時,需要繼承這個類,並對某些方法進行自定義的實現,而continuation就是調用下一個攔截器。
- 其實CallInvoker其實就是客戶端構造的對象,主要用於調用遠程服務,通過你自己實現的Demo可以看到,先創建Channel,然后通過Channe創建默認的CallInvoker,而在創建Client通過proto生成的文件里可以看到對應的重載構造函數。
添加攔截器
public static class CallInvokerExtensions
{
//增加一個攔截器
public static CallInvoker Intercept(this CallInvoker invoker, Interceptor interceptor)
{
return new InterceptingCallInvoker(invoker, interceptor);
}
//增加一組攔截器
public static CallInvoker Intercept(this CallInvoker invoker, params Interceptor[] interceptors)
{
//檢查是否為Null
GrpcPreconditions.CheckNotNull(invoker, nameof(invoker));
GrpcPreconditions.CheckNotNull(interceptors, nameof(interceptors));
//反轉集合,構造對象
foreach (var interceptor in interceptors.Reverse())
{
invoker = Intercept(invoker, interceptor);
}
return invoker;
}
//篇幅原因,這種方式這里不進行講解,大家可以自己翻下源碼看下,主要作用就是增加用戶自定義的額外報文值,類似Http請求中的Header
public static CallInvoker Intercept(this CallInvoker invoker, Func<Metadata, Metadata> interceptor)
{
return new InterceptingCallInvoker(invoker, new MetadataInterceptor(interceptor));
}
}
上述代碼總結:
- 添加一個攔截器,則直接創建一個InterceptingCallInvoker對象返回,而它必定繼承CallInvoker。
- 添加一組攔截器,則將集合反轉,然后構造Invoker。
- 而在客戶端proto生成的代碼中可以看到,方法的調用是通過CallInvoker對象調用的,讀者可以看一下你自己生成的代碼。
InterceptingCallInvoker類
internal class InterceptingCallInvoker : CallInvoker
{
//下一個invoker對象
readonly CallInvoker invoker;
//當前的攔截器
readonly Interceptor interceptor;
public InterceptingCallInvoker(CallInvoker invoker, Interceptor interceptor)
{
this.invoker = GrpcPreconditions.CheckNotNull(invoker, nameof(invoker));
this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor));
}
//一元同步調用
public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
{
return interceptor.BlockingUnaryCall(
request,
new ClientInterceptorContext<TRequest, TResponse>(method, host, options),
//當前請求參數和上下文,調用下一個BlockingUnaryCall
(req, ctx) => invoker.BlockingUnaryCall(ctx.Method, ctx.Host, ctx.Options, req));
}
//一元異步調用
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
{
return interceptor.AsyncUnaryCall(
request,
new ClientInterceptorContext<TRequest, TResponse>(method, host, options),
//當前請求參數和上下文,調用下一個BlockingUnaryCall
(req, ctx) => invoker.AsyncUnaryCall(ctx.Method, ctx.Host, ctx.Options, req));
}
}
//默認的CallInvoker,也就是不加任何攔截器時候的實現
public class DefaultCallInvoker : CallInvoker
{
readonly Channel channel;
public DefaultCallInvoker(Channel channel)
{
this.channel = GrpcPreconditions.CheckNotNull(channel);
}
//一元同步調用
public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
{
var call = CreateCall(method, host, options);
return Calls.BlockingUnaryCall(call, request);
}
//一元異步調用
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string host, CallOptions options, TRequest request)
{
var call = CreateCall(method, host, options);
return Calls.AsyncUnaryCall(call, request);
}
}
上述代碼總結:
- 構建InterceptingCallInvoker對象時,會保留當前攔截器對象和下一個invoker對象,以方便調用。
- 當前攔截器對象的在調用方法時,第三個參數是委托,而這個委托就是Interceptor對應方法里面的continuation參數,客戶端通過它來調用下一個攔截器。
- 而DefaultInvoker里面其實是內部調用遠程服務,也就是默認實現,而這個是在通過Channel來構造Client的時候構造出來的。
Client總結
- 貫穿上面的代碼可以看出,不管是調用單個添加攔截器,或者鏈式添加單個攔截器,又或者是添加一組攔截器,最終必然返回CallInvoker對象,而CallInvoker對象是在proto生成的代碼中可以看到,在調用對應方法時是由CallInvoker對象調用的。
- 關於構建InterceptingCallInvoker ,其實可以和設計模式中的裝飾着模式關聯下,剛開始只構建了默認的DefaultInvoke(這個里面其實是構建連接,調用server端),然后在這基礎上添加其他不同的攔截器功能,返回最終的CallInvoker對象。
- 需要注意的是,當鏈式添加單個攔截器時,比如Intercept(a).Intercept(b).Intercept(c),那么最終執行的順序是c(continuation前)->b(continuation前)->a->b(continuation后)->c(continuation后)。如果一次添加一組攔截器Intercept(a,b,c),那么最終執行的順序是:a(continuation前)->b(continuation前)->c->b(continuation后)->a(continuation后)。
Server端
Interceptor抽象類和ServerServiceDefinition類
public abstract class Interceptor
{
//服務端一元調用攔截器
public virtual Task<TResponse> UnaryServerHandler<TRequest, TResponse>(TRequest request, ServerCallContext context, UnaryServerMethod<TRequest, TResponse> continuation)
where TRequest : class
where TResponse : class
{
return continuation(request, context);
}
}
public class ServerServiceDefinition
{
//方法列表,也就是服務端寫的那些方法
readonly IReadOnlyList<Action<ServiceBinderBase>> addMethodActions;
internal ServerServiceDefinition(List<Action<ServiceBinderBase>> addMethodActions)
{
this.addMethodActions = addMethodActions.AsReadOnly();
}
//給方法綁定服務,也就是綁定攔截器,一會的源碼會提到
internal void BindService(ServiceBinderBase serviceBinder)
{
//給每個方法都綁定一下攔截器
foreach (var addMethodAction in addMethodActions)
{
addMethodAction(serviceBinder);
}
}
//創建Builder,可以在proto文件中生成的代碼看到,會有調用這個方法
public static Builder CreateBuilder()
{
return new Builder();
}
public class Builder
{
//檢測是否有同名方法,這是不被允許的
readonly Dictionary<string, object> duplicateDetector = new Dictionary<string, object>();
//服務端方法集合
readonly List<Action<ServiceBinderBase>> addMethodActions = new List<Action<ServiceBinderBase>>();
public Builder()
{
}
//可以看到在proto生成的代碼中,有調用AddMethod,將方法添加到集合中
public Builder AddMethod<TRequest, TResponse>(
Method<TRequest, TResponse> method,
UnaryServerMethod<TRequest, TResponse> handler)
where TRequest : class
where TResponse : class
{
duplicateDetector.Add(method.FullName, null);
addMethodActions.Add((serviceBinder) => serviceBinder.AddMethod(method, handler));
return this;
}
//這中間省略了除一元調用的其他調用,有興趣的可以自己翻下源碼
//初始化build,將上面的方法列表添加到其中
public ServerServiceDefinition Build()
{
return new ServerServiceDefinition(addMethodActions);
}
}
}
上述代碼總結:
- 對應每個service,都會維護一個方法的集合,然后把用戶定義的方法添加到集合中(在proto生成的代碼中可以看到)。
- 在給每個方法添加攔截器時(當然目前看不出來,下面會說),會給每個方法都加上,也就是說,它們之間是互不影響的。
添加攔截器
public static class ServerServiceDefinitionExtensions
{
//單個添加攔截器
public static ServerServiceDefinition Intercept(this ServerServiceDefinition serverServiceDefinition, Interceptor interceptor)
{
GrpcPreconditions.CheckNotNull(serverServiceDefinition, nameof(serverServiceDefinition));
GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor));
//構造新的ServiceBinder
var binder = new InterceptingServiceBinder(interceptor);
//將攔截器綁定到每個方法上
serverServiceDefinition.BindService(binder);
//生成並返回新的service
return binder.GetInterceptedServerServiceDefinition();
}
//添加一組攔截器
public static ServerServiceDefinition Intercept(this ServerServiceDefinition serverServiceDefinition, params Interceptor[] interceptors)
{
GrpcPreconditions.CheckNotNull(serverServiceDefinition, nameof(serverServiceDefinition));
GrpcPreconditions.CheckNotNull(interceptors, nameof(interceptors));
foreach (var interceptor in interceptors.Reverse())
{
serverServiceDefinition = Intercept(serverServiceDefinition, interceptor);
}
return serverServiceDefinition;
}
//只保留了一元調用的代碼
private class InterceptingServiceBinder : ServiceBinderBase
{
//創建一個空的Builder
readonly ServerServiceDefinition.Builder builder = ServerServiceDefinition.CreateBuilder();
//當前攔截器
readonly Interceptor interceptor;
public InterceptingServiceBinder(Interceptor interceptor)
{
this.interceptor = GrpcPreconditions.CheckNotNull(interceptor, nameof(interceptor));
}
//構造新的Builder
internal ServerServiceDefinition GetInterceptedServerServiceDefinition()
{
return builder.Build();
}
//添加一元調用的方法,而這個就是你自定義的攔截器
public override void AddMethod<TRequest, TResponse>(
Method<TRequest, TResponse> method,
UnaryServerMethod<TRequest, TResponse> handler)
{
builder.AddMethod(method, (request, context) => interceptor.UnaryServerHandler(request, context, handler));
}
//這里省略了一部分代碼。。。
}
}
其實到這里,咱們再串聯上個小部分的代碼,應該就能看出一些端倪,上述代碼總結:
- 這里鏈式添加或者單次添加一組,它和客戶端攔截器調用順序其實是一致的。
- 我們結合目前上面Server端的所有代碼,可以大概看出,當我們不添加任何攔截器時,ServerServiceDefinition對象里面的方法集合列表僅僅包含用戶定義的方法委托集合。然而當我們添加攔截器時,它代碼的執行順序則是,構建InterceptingServiceBinder->調用BindService方法,原來的委托集合開始執行,構造新的委托,而調用的AddMethod則是InterceptingServiceBinder對象里面的AddMethod,handler則是我們寫的攔截器里面的continuation,用於傳遞。
- 最終我們就會得到一個ServerServiceDefinition對象。當然,上述我們只看到了構造對象,而這個對象在哪里調用的呢?我們繼續往下看。
DefaultServiceBinder類
internal static class ServerServiceDefinitionExtensions
{
//在寫服務端的時候,我們需要綁定服務,而在綁定服務的時候需要先調用靜態BindService方法(可以在proto生成的代碼中看到這個方法),然后添加Services時,內部會調用GetCallHandlers方法。
internal static ReadOnlyDictionary<string, IServerCallHandler> GetCallHandlers(this ServerServiceDefinition serviceDefinition)
{
//構建默認的ServiceBinder,里面其實是執行構造的最終handler
var binder = new DefaultServiceBinder();
//調用BindService方法,將執行集合委托
serviceDefinition.BindService(binder);
//返回集合列表
return binder.GetCallHandlers();
}
private class DefaultServiceBinder : ServiceBinderBase
{
readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
internal ReadOnlyDictionary<string, IServerCallHandler> GetCallHandlers()
{
return new ReadOnlyDictionary<string, IServerCallHandler>(this.callHandlers);
}
public override void AddMethod<TRequest, TResponse>(
Method<TRequest, TResponse> method,
UnaryServerMethod<TRequest, TResponse> handler)
{
//每個方法名稱對應的一個handler
callHandlers.Add(method.FullName, ServerCalls.UnaryCall(method, handler));
}
}
}
上述代碼總結:
- 在構造出ServerServiceDefinition對象時,用戶再將對象綁定到grpc的Servers時,開始執行GetCallHandlers方法,把它又重新構建一遍。
- grpc默認的會構造一個集合,key是方法全名,value則是IServerCallHandler,實際上每次請求進來會檢索方法名,然后執行IServerCallHandler內部的HandleCall方法(這個是在源碼里面可以看到😜)。
- ServerCalls.UnaryCall想了解的可以看下源碼,實質上內部就是執行handler,而這個handler就是用戶構建的最終ServerServiceDefinition。
Server總結
- 通過上面我們可以看出,其大致思路可Client端實現很相像,只不過最終返回的是ServerServiceDefinition對象,而這個對象從剛開始默認handler(用戶重寫的Server端方法),到添加攔截器時在上面的封裝,而這個攔截器又通過InterceptingServiceBinder類將其添加進去,它們都繼承了ServiceBinderBase,通過構造最終的Builder對象來返回最終的ServerServiceDefinition。
- 最終的ServerServiceDefinition在我們寫的服務端Demo中可以看到,它被添加到Servers中,而在這時候調用GetCallHandlers生成最終的以方法名為key,handler為value的集合。
- 當有請求進來時,我們只需要根據方法名找到對應的handler,然后把參數傳遞進去,再執行handler就可以把攔截器和自己定義的方法全部走一遍,這些有興趣的可以參考下源碼。
總結
關於Grpc的攔截器,相信你看完之后會有一定的收獲,這里我再額外說一些其他的關於閱讀Grpc源碼時的小tips:
- 默認情況下,服務啟動時,只有4個后台線程去消費請求(和計算機的CPU數量有關),但是請求的執行默認是通過添加線程池任務來執行的,當然也可以設置不通過線程池執行,直接執行時要注意防止阻塞。
- 默認情況下,Grpc支持同一時間同時處理8000個請求(也和計算機的CPU數量有關),如果有更多的請求應該就被阻塞了。這個數量是可以開發人員去調節的。
以上就是筆者對Grpc攔截器的理解,本篇文章也主要是希望給讀者提供源碼閱讀思路,可能會有偏差,還請評論指正😂。