題外話:
1.這幾天收到蔚來的面試邀請,但是自己沒做准備,並且遠程面試,還在上班時間,再加上老東家對我還不錯.沒想着換工作,導致在自己工位上做算法題不想被人看見,然后非常緊張.估計over了.不過沒事,接下來知道哪里不足補哪里繼續我的grpc源碼解析
2.上期的博客,記錄了grpc源碼及創建grpc的過程,其實說到底就是圍繞GrpcChannel,通過httpclient做長連接處理這次來分析下,具體的實現規律
3.直接上github地址:https://github.com/BestHYC/GRPCHelper
4.大家還是得多做題,不然面試都過不去,都不會看你代碼。
一:查看創建HttpClient的源碼

public HttpClient CreateClient(string name) { if (name == null) { throw new ArgumentNullException(nameof(name)); } HttpMessageHandler handler = _activeHandlers.GetOrAdd(name, _entryFactory).Value; var client = new HttpClient(handler, disposeHandler: false); HttpClientFactoryOptions options = _optionsMonitor.Get(name); for (int i = 0; i < options.HttpClientActions.Count; i++) { options.HttpClientActions[i](client); } return client; }
可以看見,在處理HttpName的時候,通過GetOrAdd來提供HttpClient,那么可以得到一個事實就是,其實AddHttpClient(name),只不過用來標識,其實底層沒做特別處理,即使我僅僅AddHttpClient(),在創建HttpClient的時候使用CreateClient("AA"),另外一個地方同樣使用CreateClient("AA"),這兩個HttpClient在未dispose情況下,還是會共用一個句柄
二:查看Grpc中HttpClinet使用場景
也是查看DefaultGrpcClientFactory創建中var httpClient = _httpClientFactory.CreateClient(name);而由前面源碼可知,name可以當成同一個Grpc客戶端名稱。那么得到,
同一個GrpcClient共用同一個HttpClient,不同的客戶端還是會產生2個鏈接,我們來抓包測試下
StringBuilder sb = new StringBuilder(); for (Int32 i = 0; i < 2; i++) { var result = client.SayHelloAsync(new HelloRequest() { Name = i.ToString() }).ResponseAsync.Result; sb.Append(result.Message); sb.Append("client1的執行結果"); var result1 = client1.SayHelloAsync(new HelloRequest() { Name = i.ToString() }).ResponseAsync.Result; sb.AppendLine(result1.Message); } return sb.ToString();
如果按照正常邏輯是公用同一個端口號。但是查看可以發現,client兩次復用一個端口,Client1兩次也是復用一個端口,但是這兩個客戶端不公用同一個端口號
可以證明我們結合上面代碼的邏輯是正確的。
在反證明一次,如果共用一個HttpClient那么端口號相同。那么采用原始創建GrpcChannel方式
[HttpGet("DoubleSamePortByChannel")] public String DoubleSamePortByChannel() { StringBuilder sb = new StringBuilder(); var channel = GrpcChannel.ForAddress(""); for (Int32 i = 0; i < 100; i++) { var client = new Greeter.GreeterClient(channel); var result = client.SayHelloAsync(new HelloRequest() { Name = i.ToString() }).ResponseAsync.Result; sb.Append(result.Message); sb.Append("client1的執行結果"); var client1 = new Greeter1.Greeter1Client(channel); var result1 = client1.SayHelloAsync(new HelloRequest() { Name = i.ToString() }).ResponseAsync.Result; sb.AppendLine(result1.Message); } return sb.ToString(); }
因為共用一個Channel,所以HttpClient是公用的。抓包可以看到,他們復用同一個端口號。
結論:如果共用同一個HttpClient,那么復用同一個端口號,如果使用不同的HttpClient,那么即使是基於Http2.0也是不同的端口號
三:改動源碼,解決長連接問題
改動前需要確定幾個目的:
1.避免每次AddGrpcClient()注入,隨時注入隨時啟用
2.每次客戶端能夠復用連接,那么就復用。
3.當請求量比較大的時候,每個端口最多保證10次調用,然后啟用新的HttpClient,使用新的http端口號
4.保證可以調用多個站點集合,但是由於正常情況下,大部分站點都是相同的,這里就不做拓展,拓展開來其實都一致。
3.1.解決GrpcClient的注入問題。
難點:1.注入當前站點。2.解決創建Client的注入問題。
3.1.1:注入當前站點,采用最原始的方式,直接GrpcClientFactoryOptions的CurrentValue,而不是通過Option的Get獲取通過名稱的配置,
缺點是共同使用而不是單點配置,當然完全可以改,這里就不做拓展了。
public static IHttpClientBuilder AddMyGrpcClient<TClient>(this IServiceCollection services, String url) where TClient : class { if (services == null) { throw new ArgumentNullException(nameof(services)); } services.Configure<GrpcClientFactoryOptions>(options => options.Address = new Uri(url)); var name = TypeNameHelper.GetTypeDisplayName(typeof(TClient), fullName: false); return services.AddGrpcClientCore<TClient>(name); }
3.1.2:注入當前GrpcClient,由於看源碼得到,所有的Client都是通過DefaultClientActivator<T>創建,修改代碼
private void AddClient(Type type) { if (type == null) return; Func<ObjectFactory> result = () => ActivatorUtilities.CreateFactory(type, new Type[] { typeof(CallInvoker), }); if (_createActivator.ContainsKey(type)) { _createActivator[type] = result; } else { _createActivator.Add(type, result); } } private Object m_lock = new Object(); public TClient CreateClient<TClient>(CallInvoker callInvoker) { if (!_createActivator.ContainsKey(typeof(TClient))) { lock (m_lock) { if (!_createActivator.ContainsKey(typeof(TClient))) { AddClient(typeof(TClient)); } } } return (TClient)Activator(typeof(TClient))(_services, new object[] { callInvoker }); }
增加AddClient,這樣在創建新的客戶端時候去判斷是否存在,不存在就新增,而不是原來只新增注入的Client。
3.1.3.限制請求數量,這種通過注入的方式,留給大家自己擴展吧,因為我發現一個基於GRPCChanel的原始版本
4.不修改注入方式,而是采用直連方式
4.1.創建Client,基於表達式實現
private static Dictionary<String, Func<GrpcChannel, Object>> m_expression = new Dictionary<String, Func<GrpcChannel, Object>>(); private T GetFunc<T>(GrpcChannel channel) { String name = typeof(T).FullName; if (m_expression.ContainsKey(name)) return (T)m_expression[name].Invoke(channel); var argumentType = new[] { typeof(GrpcChannel) }; var constructor = typeof(T).GetConstructor( BindingFlags.Instance | BindingFlags.Public, null, argumentType, null); var param = Expression.Parameter(typeof(GrpcChannel), "channel"); var constructorCallExpression = Expression.New(constructor, param); var constructorCallingLambda = Expression .Lambda<Func<GrpcChannel, Object>>(constructorCallExpression, param).Compile(); m_expression.Add(name, constructorCallingLambda); return (T)constructorCallingLambda(channel); }
4.2.創建GrpcChannel的代碼實現,並且每次請求只允許10次

public T GetHttpClient<T>() { lock (m_lock) { HttpClient client = null; foreach (var item in m_httpclients) { if (item.Value < 10) { m_currentname = item.Key; break; } } if (String.IsNullOrWhiteSpace(m_currentname)) { String guid = Guid.NewGuid().ToString(); m_currentname = guid; m_httpclients.Add(guid, 0); } m_httpclients[m_currentname] += 1; client = m_httpclientfactory.CreateClient(m_currentname); GrpcChannelOptions options = new GrpcChannelOptions() { HttpClient = client }; var channel = GrpcChannel.ForAddress("http://localhost:6001", options); var client1 = GetFunc<T>(channel); return client1; } } public void Dispose() { lock (m_lock) { if (m_currentname == null) return; if (m_httpclients.TryGetValue(m_currentname, out Int32 num)) { if (num <= 0) return; m_httpclients[m_currentname] = num - 1; } } }
五:測試是否成功
[HttpGet("GrpcHelper")] public String GetInfotest([FromServices] GrpcHelper grpcHelper) { StringBuilder sb = new StringBuilder(); Int32 a = 0; for (Int32 i = 0; i < 100; i++) { Task.Run(() => { using (var factory = grpcHelper.CreateClientFactory()) { var client = factory.GetHttpClient<Greeter.GreeterClient>(); var result = client.SayHelloAsync(new GrpcService1.HelloRequest() { Name = "hongyichao " + Environment.MachineName }).ResponseAsync.Result.Message; sb.Append(result); } }).ContinueWith(t => Interlocked.Increment(ref a)); } while (Volatile.Read(ref a) < 100) { Thread.Sleep(100); } return JsonConvert.SerializeObject(sb); }
最終100個連接使用了3個端口號就可以解決。這樣既解決了只復用單個端口號,又解決了單鏈接無法復用端口號問題。解決
最終在吐槽下自己,昨天面試渣成啥樣了。下一篇開始研究Rabbitmq了。另外,大家得注意,現在都流行代碼測試。多做做題。
不然即使像我這種老司機,也在很簡單很簡單的題目上遭遇滑鐵盧。但是也不能一味着寫算法,也多多看源碼,畢竟這是我們的工作
分割線----------------------
最后吐槽下:寫完之后,公司上層決定不采用我的實現方式,因為問我能不能保證完全可以,我說得先上去測試下,才能下結論。然后測試機會都沒,直接全部將grpc換成api的形式了。哎,之前的努力都付諸東流。