記錄core中GRPC長連接導致負載均衡不均衡問題 二,解決長連接問題


題外話:

1.這幾天收到蔚來的面試邀請,但是自己沒做准備,並且遠程面試,還在上班時間,再加上老東家對我還不錯.沒想着換工作,導致在自己工位上做算法題不想被人看見,然后非常緊張.估計over了.不過沒事,接下來知道哪里不足補哪里繼續我的grpc源碼解析

2.上期的博客,記錄了grpc源碼及創建grpc的過程,其實說到底就是圍繞GrpcChannel,通過httpclient做長連接處理這次來分析下,具體的實現規律

3.直接上github地址:https://github.com/BestHYC/GRPCHelper

4.大家還是得多做題,不然面試都過不去,都不會看你代碼。

一:查看創建HttpClient的源碼

        public HttpClient CreateClient(string name)
        {
            if (name == null)
            {
                throw new ArgumentNullException(nameof(name));
            }
 
            HttpMessageHandler handler =  _activeHandlers.GetOrAdd(name, _entryFactory).Value;
            var client = new HttpClient(handler, disposeHandler: false);
 
            HttpClientFactoryOptions options = _optionsMonitor.Get(name);
            for (int i = 0; i < options.HttpClientActions.Count; i++)
            {
                options.HttpClientActions[i](client);
            }
 
            return client;
        }
View Code

可以看見,在處理HttpName的時候,通過GetOrAdd來提供HttpClient,那么可以得到一個事實就是,其實AddHttpClient(name),只不過用來標識,其實底層沒做特別處理,即使我僅僅AddHttpClient(),在創建HttpClient的時候使用CreateClient("AA"),另外一個地方同樣使用CreateClient("AA"),這兩個HttpClient在未dispose情況下,還是會共用一個句柄

二:查看Grpc中HttpClinet使用場景

也是查看DefaultGrpcClientFactory創建中var httpClient = _httpClientFactory.CreateClient(name);而由前面源碼可知,name可以當成同一個Grpc客戶端名稱。那么得到,
同一個GrpcClient共用同一個HttpClient,不同的客戶端還是會產生2個鏈接,我們來抓包測試下

            StringBuilder sb = new StringBuilder();
            for (Int32 i = 0; i < 2; i++)
            {
                var result = client.SayHelloAsync(new HelloRequest() { Name = i.ToString() }).ResponseAsync.Result;
                sb.Append(result.Message);
                sb.Append("client1的執行結果");
                var result1 = client1.SayHelloAsync(new HelloRequest() { Name = i.ToString() }).ResponseAsync.Result;
                sb.AppendLine(result1.Message);
            }
            return sb.ToString();

如果按照正常邏輯是公用同一個端口號。但是查看可以發現,client兩次復用一個端口,Client1兩次也是復用一個端口,但是這兩個客戶端不公用同一個端口號
可以證明我們結合上面代碼的邏輯是正確的。
在反證明一次,如果共用一個HttpClient那么端口號相同。那么采用原始創建GrpcChannel方式

        [HttpGet("DoubleSamePortByChannel")]
        public String DoubleSamePortByChannel()
        {
            StringBuilder sb = new StringBuilder();
            var channel = GrpcChannel.ForAddress("");
            for (Int32 i = 0; i < 100; i++)
            {
                var client = new Greeter.GreeterClient(channel);
                var result = client.SayHelloAsync(new HelloRequest() { Name = i.ToString() }).ResponseAsync.Result;
                sb.Append(result.Message);
                sb.Append("client1的執行結果");
                var client1 = new Greeter1.Greeter1Client(channel);
                var result1 = client1.SayHelloAsync(new HelloRequest() { Name = i.ToString() }).ResponseAsync.Result;
                sb.AppendLine(result1.Message);
            }
            return sb.ToString();
        }

因為共用一個Channel,所以HttpClient是公用的。抓包可以看到,他們復用同一個端口號。
結論:如果共用同一個HttpClient,那么復用同一個端口號,如果使用不同的HttpClient,那么即使是基於Http2.0也是不同的端口號

三:改動源碼,解決長連接問題

改動前需要確定幾個目的:
1.避免每次AddGrpcClient()注入,隨時注入隨時啟用
2.每次客戶端能夠復用連接,那么就復用。
3.當請求量比較大的時候,每個端口最多保證10次調用,然后啟用新的HttpClient,使用新的http端口號
4.保證可以調用多個站點集合,但是由於正常情況下,大部分站點都是相同的,這里就不做拓展,拓展開來其實都一致。

3.1.解決GrpcClient的注入問題。

難點:1.注入當前站點。2.解決創建Client的注入問題。

3.1.1:注入當前站點,采用最原始的方式,直接GrpcClientFactoryOptions的CurrentValue,而不是通過Option的Get獲取通過名稱的配置,
缺點是共同使用而不是單點配置,當然完全可以改,這里就不做拓展了。

          public static IHttpClientBuilder AddMyGrpcClient<TClient>(this IServiceCollection services, String url)
            where TClient : class
        {
            if (services == null)
            {
                throw new ArgumentNullException(nameof(services));
            }
            services.Configure<GrpcClientFactoryOptions>(options => options.Address = new Uri(url));
            var name = TypeNameHelper.GetTypeDisplayName(typeof(TClient), fullName: false);
            return services.AddGrpcClientCore<TClient>(name);
        }

3.1.2:注入當前GrpcClient,由於看源碼得到,所有的Client都是通過DefaultClientActivator<T>創建,修改代碼

private void AddClient(Type type)
        {
            if (type == null) return;
            Func<ObjectFactory> result = () => ActivatorUtilities.CreateFactory(type, new Type[] { typeof(CallInvoker), });
            if (_createActivator.ContainsKey(type))
            {
                _createActivator[type] = result;
            }
            else
            {
                _createActivator.Add(type, result);
            }
        }
        private Object m_lock = new Object();
        public TClient CreateClient<TClient>(CallInvoker callInvoker)
        {
            if (!_createActivator.ContainsKey(typeof(TClient)))
            {
                lock (m_lock)
                {
                    if (!_createActivator.ContainsKey(typeof(TClient)))
                    {
                        AddClient(typeof(TClient));
                    }
                }
            }
            return (TClient)Activator(typeof(TClient))(_services, new object[] { callInvoker });
        }

增加AddClient,這樣在創建新的客戶端時候去判斷是否存在,不存在就新增,而不是原來只新增注入的Client。
3.1.3.限制請求數量,這種通過注入的方式,留給大家自己擴展吧,因為我發現一個基於GRPCChanel的原始版本

4.不修改注入方式,而是采用直連方式

4.1.創建Client,基於表達式實現

private static Dictionary<String, Func<GrpcChannel, Object>> m_expression = new Dictionary<String, Func<GrpcChannel, Object>>();
        private T GetFunc<T>(GrpcChannel channel)
        {
            String name = typeof(T).FullName;
            if (m_expression.ContainsKey(name)) return (T)m_expression[name].Invoke(channel);
            var argumentType = new[] { typeof(GrpcChannel) };
            var constructor = typeof(T).GetConstructor(
                BindingFlags.Instance | BindingFlags.Public,
                null,
                argumentType,
                null);
            var param = Expression.Parameter(typeof(GrpcChannel), "channel");
            var constructorCallExpression = Expression.New(constructor, param);
            var constructorCallingLambda = Expression
              .Lambda<Func<GrpcChannel, Object>>(constructorCallExpression, param).Compile();
            m_expression.Add(name, constructorCallingLambda);
            return (T)constructorCallingLambda(channel);
        }

4.2.創建GrpcChannel的代碼實現,並且每次請求只允許10次

     public T GetHttpClient<T>()
        {
            lock (m_lock)
            {
                HttpClient client = null;
                foreach (var item in m_httpclients)
                {
                    if (item.Value < 10)
                    {
                        m_currentname = item.Key;
                        break;
                    }
                }
                if (String.IsNullOrWhiteSpace(m_currentname))
                {
                    String guid = Guid.NewGuid().ToString();
                    m_currentname = guid;
                    m_httpclients.Add(guid, 0);
                }
                m_httpclients[m_currentname] += 1;
                client = m_httpclientfactory.CreateClient(m_currentname);
                GrpcChannelOptions options = new GrpcChannelOptions()
                {
                    HttpClient = client
                };
                var channel = GrpcChannel.ForAddress("http://localhost:6001", options);
                var client1 = GetFunc<T>(channel);
                return client1;
            }
        }
        public void Dispose()
        {
            lock (m_lock)
            {
                if (m_currentname == null) return;
                if (m_httpclients.TryGetValue(m_currentname, out Int32 num))
                {
                    if (num <= 0) return;
                    m_httpclients[m_currentname] = num - 1;
                }
            }
        }
View Code

五:測試是否成功

[HttpGet("GrpcHelper")]
public String GetInfotest([FromServices] GrpcHelper grpcHelper)
{
StringBuilder sb = new StringBuilder();
Int32 a = 0;
for (Int32 i = 0; i < 100; i++)
{
Task.Run(() =>
{

using (var factory = grpcHelper.CreateClientFactory())
{
var client = factory.GetHttpClient<Greeter.GreeterClient>();
var result = client.SayHelloAsync(new GrpcService1.HelloRequest() { Name = "hongyichao " + Environment.MachineName }).ResponseAsync.Result.Message;
sb.Append(result);
}

}).ContinueWith(t => Interlocked.Increment(ref a));
}
while (Volatile.Read(ref a) < 100)
{
Thread.Sleep(100);
}
return JsonConvert.SerializeObject(sb);
}

 

最終100個連接使用了3個端口號就可以解決。這樣既解決了只復用單個端口號,又解決了單鏈接無法復用端口號問題。解決


最終在吐槽下自己,昨天面試渣成啥樣了。下一篇開始研究Rabbitmq了。另外,大家得注意,現在都流行代碼測試。多做做題。

不然即使像我這種老司機,也在很簡單很簡單的題目上遭遇滑鐵盧。但是也不能一味着寫算法,也多多看源碼,畢竟這是我們的工作

分割線----------------------

最后吐槽下:寫完之后,公司上層決定不采用我的實現方式,因為問我能不能保證完全可以,我說得先上去測試下,才能下結論。然后測試機會都沒,直接全部將grpc換成api的形式了。哎,之前的努力都付諸東流。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM