gRPC的簡單使用


前言

八月初的時候,在公司內部做了一個主題為《gRPC的簡單使用》的分享,其實就是和小伙伴們扯扯淡,現在抽空回憶一下,也算是一個小小的總結吧。

現在市面上耳熟能詳的RPC框架也很多,下面列舉幾個遇到比較多的。

  1. 谷歌的gRPC
  2. 推特的Thrift
  3. 阿里的Dubbo
  4. 。。。。

它們都是支持多語言的,相對來說,這三個之中,Dubbo支持的語言略微少一點。現在在一個公司內都能見到多種語言的技術棧都已經是十分常見的事了,好比我司,都有JAVA,C#,Python三種語言了,所以在多語言支持這方面,在技術選型的時候,肯定是要有所考慮的。

下面進入正式的主題,gRPC。

gRPC的簡單介紹

gRPC是一個現代的開源高性能RPC框架,可以在任何環境中運行。它可以高效地將數據中心內和跨數據中心的服務連接起來,並支持可插拔的負載平衡、跟蹤、健康檢查和身份驗證。同時,它還把設備,移動應用程序和瀏覽器連接到后端服務的分布式計算變得很容易。

gRPC有什么優點呢?

  1. 簡單的服務定義 (使用Protocol Buffers定義服務,這是一個功能強大的二進制序列化工具集和語言)
  2. 跨語言和平台工作 (在微服務式架構中有效地連接多語言服務(10+種語言支持)並能自動為各種語言和平台的服務生成慣用的客戶端和服務器存根)
  3. 快速啟動並擴展 (使用單行安裝運行時和開發環境,並使用框架每秒擴展到數百萬個RPC)
  4. 雙向流媒體和集成的身份驗證 (雙向流媒體和集成的身份驗證 基於http/2的傳輸的雙向流和完全集成的可插拔身份驗證)

gRPC在使用的時候有4種模式供我們選擇

  1. 一元RPC(Unary RPCs ):這是最簡單的定義,客戶端發送一個請求,服務端返回一個結果
  2. 服務器流RPC(Server streaming RPCs):客戶端發送一個請求,服務端返回一個流給客戶端,客戶從流中讀取一系列消息,直到讀取所有消息
  3. 客戶端流RPC(Client streaming RPCs ):客戶端通過流向服務端發送一系列消息,然后等待服務端讀取完數據並返回處理結果
  4. 雙向流RPC(Bidirectional streaming RPCs):客戶端和服務端都可以獨立向對方發送或接受一系列的消息。客戶端和服務端讀寫的順序是任意。

我們要根據具體的場景來決定選擇那一種。

這里只介紹一元RPC。正常來說,一元RPC應該可以滿足我們日常60~70%的需求了吧。

基本用法

gRPC的基本用法可以簡單的分為三個點:

  • 服務的定義,即proto文件的編寫
  • 服務端代碼編寫
  • 客戶端代碼編寫

下面我們依次來看一下

服務的定義

既然要定義一個服務,肯定是知道了這個服務要完成什么事之后。

在定義之前,要對proto3和proto2有所了解。不過proto3是推薦的格式。所以我們基本上只要用proto3就可以了。

下面先來看一個后面要用到的proto文件。

syntax = "proto3";

option csharp_namespace = "XXXService";

package UserInfo;

service UserInfoService {
  rpc GetList(GetUserListRequest) returns (GetUserListReply){}
  rpc GetById(GetUserByIdRequest) returns (GetUserByIdRelpy){}
  rpc Save(SaveUserRequest) returns (SaveUserReply){}
}


message GetUserByIdRequest {
	int32 id = 1;
}

message GetUserByIdRelpy{
	int32 id = 1;
	string name = 2;
	int32 age = 3;
	int64 create_time = 4;
}

message GetUserListRequest {
	int32 id = 1;
	string name = 2;	
}

message GetUserListReply {
  message MsgItem {
    int32 id = 1;
	string name = 2;
	int32 age = 3;
	int64 create_time = 4;
   }
   int32 code = 1;
   string msg = 2;
   repeated MsgItem data = 3;
}

message SaveUserRequest {
	string name = 1;
	int32 age = 2;	
}

message SaveUserReply {
   int32 code = 1;
   string msg = 2;
}

它有下面的幾個部分

  1. syntax , 指定要用那個版本的語法
  2. service , 指定rpc服務的接口,簡單理解成我們平時定義的接口
  3. message , 指定要傳輸的消息體,簡單理解成我們平常用的 DTO
  4. package , 指定包名
  5. option , 可選參數的定義,不同語言有不同的選項

其實看上去還是比較容易懂的。至少一眼看過去能知道是些什么意思。

如果對proto3還沒有了解的,可以參考這個文檔Language Guide (proto3),里面很清楚的介紹了一些數據類型和不同語言數據類型的對應關系。

這里有一個要注意的是,時間類型,在proto3中,沒有datetime類型,過去很長一段時間,我們是只能用時間戳來表示時間,也就是定義一個長整型,現在是可以用timestamp表處理了。

在寫服務端和客戶端代碼之前,我們需要根據proto文件生成對應的代碼。

一個命令即可搞定。

protoc --proto_path=IMPORT_PATH \
           --cpp_out=DST_DIR \
           --java_out=DST_DIR \
           --python_out=DST_DIR \
           --go_out=DST_DIR \
           --objc_out=DST_DIR \
           --csharp_out=DST_DIR \
           path/to/file.proto

現在時代進步的這么快,不少語言已經有工具做了集成,可以在build項目的時候就生成對應的文件了,不需要我們再單獨去執行一次上面的那個命令。

好比我們的.NET項目,可以在ItemGroup中直接指定Protobuf,然后告訴它,proto文件是那個,是要生成服務端代碼還是客戶端代碼。

可以看看下面這個具體的例子。

<Project Sdk="Microsoft.NET.Sdk.Web">

  <PropertyGroup>
    <TargetFramework>netcoreapp2.1</TargetFramework>
  </PropertyGroup>

  <ItemGroup>
    <Protobuf Include="Protos\userinfo.proto" GrpcServices="Server" />
  </ItemGroup>
  
  <ItemGroup>
    <PackageReference Include="Microsoft.AspNetCore.App" />
    <PackageReference Include="Microsoft.AspNetCore.Razor.Design" Version="2.1.2" PrivateAssets="All" />
    <PackageReference Include="Google.Protobuf" Version="3.8.0" />
    <PackageReference Include="Grpc.Core" Version="1.22.0" />
    <PackageReference Include="Grpc.Tools" Version="1.22.0">
      <PrivateAssets>all</PrivateAssets>
      <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
    </PackageReference>
  </ItemGroup>

</Project>

再往下,就是寫代碼了。

服務端代碼編寫

服務端代碼分兩部分,一部分是服務具體的實現,一部分是服務怎么起來。

先來看看服務的具體實現。

namespace MyBasedServiceA
{
    using Grpc.Core;
    using System.Linq;
    using System.Threading.Tasks;

    public class UserInfoServiceImpl : UserInfoService.UserInfoServiceBase
    {
        public override Task<GetUserByIdRelpy> GetById(GetUserByIdRequest request, ServerCallContext context)
        {
            var result = new GetUserByIdRelpy();

            var user = FakeUserInfoDb.GetById(request.Id);

            result.Id = user.Id;
            result.Name = user.Name;
            result.Age = user.Age;
            result.CreateTime = user.CreateTime;

            return Task.FromResult(result);
        }

        public override Task<GetUserListReply> GetList(GetUserListRequest request, ServerCallContext context)
        {
            var result = new GetUserListReply();

            var userList = FakeUserInfoDb.GetList(request.Id, request.Name);

            result.Code = 0;
            result.Msg = "成功";
            result.Data.AddRange(userList.Select(x => new GetUserListReply.Types.MsgItem
            {
                Id = x.Id,
                Age = x.Age,
                CreateTime = x.CreateTime,
                Name = x.Name
            }));

            return Task.FromResult(result);
        }

        public override Task<SaveUserReply> Save(SaveUserRequest request, ServerCallContext context)
        {
            var result = new SaveUserReply();

            var flag = FakeUserInfoDb.Save(request.Name, request.Age);

            result.Code = 0;
            result.Msg = "成功";
            
            return Task.FromResult(result);
        }
    }
}

可以看到上面的代碼,我們只要繼承由proto文件生成的一個基類,然后去重寫它的實現,就可以認為是實現了一個服務。這個其實就是寫我們具體的業務邏輯,大boss有什么需求,堆上去就好了。

然后來看第二部分,服務怎么起來。

在這里我選擇的方案是使用通用主機來跑。當然也可以直接在Startup的Configure方法中去啟動服務。只要能起來就行 😄

namespace MyBasedServiceA
{
    using Grpc.Core;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using System;
    using System.Threading;
    using System.Threading.Tasks;

    public class MyBasedServiceAHostedService : BackgroundService
    {
        private readonly ILogger _logger;

        private Server _server;

        public MyBasedServiceAHostedService(ILoggerFactory loggerFactory)
        {
            this._logger = loggerFactory.CreateLogger<MyBasedServiceAHostedService>();

            _server = new Server
            {
                Services = { UserInfoService.BindService(new UserInfoServiceImpl()) },
                // ServerCredentials.Insecure還是沒有用https,只用於演示
                // 生產環境,建議還是弄個證書
                Ports = { new ServerPort("0.0.0.0", 9999, ServerCredentials.Insecure) }                
            };
        }
      
        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _server.Start();
            return Task.CompletedTask;
        }
    }
}

然后是Program中的代碼。

namespace MyBasedServiceA
{
    using Microsoft.AspNetCore.Builder;
    using Microsoft.AspNetCore.Hosting;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;

    public class Program
    {
        public static void Main(string[] args)
        {
            var host = new HostBuilder()
                .ConfigureLogging((hostContext, configLogging) =>
                {
                    configLogging.AddConsole();
                    configLogging.AddDebug();
                })
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<MyBasedServiceAHostedService>();
                })
                .Build();

            host.Run();
        }
    }
}

到這里,服務端已經可以了。

下面就是客戶端了。

客戶端代碼編寫

在這里客戶端,我們寫兩個,一個基於C#(.net core), 一個基於python。剛好也驗證一下gRPC的多語言。

正常來說,我們所說的客戶端可能很大一部分是對外的WEB API了,就是說api的內部實現,是rpc的調用,而對外的是常見的返回JSON的rest api。

我們先通過控制台來體驗一下它的客戶端調用。

C#(.net core)客戶端

class Program
{
    static void Main(string[] args)
    {
        var channel = new Channel("localhost:9999", ChannelCredentials.Insecure);
        var client = new UserInfoService.UserInfoServiceClient(channel);

        var saveResponse = client.Save(new SaveUserRequest { Age = 99, Name = "c#name" });

        Console.WriteLine($"Save received: code = {saveResponse.Code} ,  msg = {saveResponse.Msg}");

        var getListResponse = client.GetList(new GetUserListRequest { });

        Console.WriteLine($"GetList received: code =  {getListResponse.Code} ,  msg = {getListResponse.Msg}");

        foreach (var item in getListResponse.Data)
        {
            Console.WriteLine(item.Name);
        }

        Console.ReadKey();
    }
}

其實這種方式我們很容易聯想到WCF,都是生成代碼,可以直接點出來的方法,強類型的使用體驗。不過我是基本沒有用過WCF的,貌似暴露了年齡了,逃~~

python客戶端

python要想運行gRPC相關的,要先安裝 grpcio-tools,然后再用命令生成相應的文件。

# 安裝
pip install grpcio-tools

# 生成
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./userinfo.proto

具體實現

import grpc
import userinfo_pb2, userinfo_pb2_grpc

_HOST = 'localhost'
_PORT = '9999'

def run():
    conn = grpc.insecure_channel(_HOST + ':' + _PORT)

    client = userinfo_pb2_grpc.UserInfoServiceStub(channel=conn)

    saveResponse = client.Save(userinfo_pb2.SaveUserRequest(name="pyname", age=39))
    print("Save received: code = " + str(saveResponse.code) + ", msg = "+ saveResponse.msg)

    getListResponse = client.GetList(userinfo_pb2.GetUserListRequest())
    print("GetList received: code = " + str(getListResponse.code) + ", msg = "+ getListResponse.msg)

    for d in getListResponse.data:
        print(d.name)

if __name__ == '__main__':
    run()

同樣也是很簡潔。

運行效果

在服務端起來的情況下,先運行.net core的客戶端,然后再運行python的客戶端,結果大致如下。

注: 在調用的時候,有幾個概念要知道!!

  1. gRPC中沒有采用傳統的timeout方式去處理,而是采用了Deadline機制,覺得這個機制和我們的CancellationToken很相似
  2. 無論是客戶端還是服務端,都可以隨時取消RPC

可以看到我們現在的地址都是硬編碼的,因為只有一個節點,然后在線上環境,都會是多節點的,所以我們需要有服務注冊和服務發現,下面我們就結合consul來完成服務注冊與發現。

服務治理(注冊與發現)

當然現在可選的工具還是有很多的,consul,etcd,eureka等,當然最好的還是直接上K8S,不過我們公司還有很長的一段路才能上,所以我們就怎么簡單怎么來了。

下面我們調整一下服務端的代碼,讓gRPC的服務可以注冊到consul上面。

public class MyBasedServiceAHostedService : BackgroundService
{
    private readonly Microsoft.Extensions.Logging.ILogger _logger;
    private readonly IConfiguration _configuration;
    private readonly IConsulClient _consulClient;

    private Server _server;
    private AgentServiceRegistration registration;

    public MyBasedServiceAHostedService(ILoggerFactory loggerFactory, IConfiguration configuration, IConsulClient consulClient, IHostingEnvironment environment)
    {
        this._logger = loggerFactory.CreateLogger<MyBasedServiceAHostedService>();
        this._configuration = configuration;
        this._consulClient = consulClient;

        var port = _configuration.GetValue<int>("AppSettings:Port");

        _logger.LogInformation($"{environment.EnvironmentName} Current Port is : {port}");

        // global logger for grpc
        GrpcEnvironment.SetLogger(new GrpcAdapterLogger(loggerFactory));

        var address = GetLocalIP();

        _logger.LogInformation($"{environment.EnvironmentName} Current IP is : {address}");

        registration = new AgentServiceRegistration()
        {
            ID = $"MyBasedServiceA-{Guid.NewGuid().ToString("N")}",
            Name = "MyBasedServiceA",
            Address = address,
            Port = port,
            Check = new AgentServiceCheck
            {
                TCP = $"{address}:{port}",
                DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(5),
                Interval = TimeSpan.FromSeconds(10),
                Timeout = TimeSpan.FromSeconds(5)
            } 
        };

        _server = new Server
        {
            Ports = { new ServerPort("0.0.0.0", port, ServerCredentials.Insecure) }                
        };

        // not production record some things
        if (!environment.IsProduction())
        {
            _server.Services.Add(UserInfoService.BindService(new UserInfoServiceImpl()).Intercept(new AccessLogInterceptor(loggerFactory)));
        }
        else
        {
            _server.Services.Add(UserInfoService.BindService(new UserInfoServiceImpl()));
        }
    }
  
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {            
        await _consulClient.Agent.ServiceDeregister(registration.ID);
        await _consulClient.Agent.ServiceRegister(registration);
        _logger.LogInformation($"Registering with Consul {registration.ID} OK");  
        _server.Start();
    }

    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Unregistering from Consul");

        await _consulClient.Agent.ServiceDeregister(registration.ID);
        await _server.KillAsync();
        await base.StopAsync(cancellationToken);
    }

    private string GetLocalIP()
    {
        try
        {
            string hostName = Dns.GetHostName(); 
            IPHostEntry ipEntry = Dns.GetHostEntry(hostName);
            for (int i = 0; i < ipEntry.AddressList.Length; i++)
            {
                if (ipEntry.AddressList[i].AddressFamily == AddressFamily.InterNetwork)
                {
                    return ipEntry.AddressList[i].ToString();
                }
            }
            return "127.0.0.1";
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Get Local Ip error");
            return "127.0.0.1";
        }
    }
}

然后客戶端要定義一個從consul中讀取實例的方法。

從consul中讀取對應service的健康實例,正常會用定時輪訓的方式去讀取,或者寫入短時間的緩存。

public class FindService : IFindService 
{
    private readonly ILogger _logger;
    private readonly IConsulClient _consulClient;
    private readonly ConcurrentDictionary<string, (List<string> List, DateTimeOffset Expiration)> _dict;

    public FindService(ILoggerFactory loggerFactory, IConsulClient consulClient)
    {
        _logger = loggerFactory.CreateLogger<FindService>();
        _consulClient = consulClient;
        _dict = new ConcurrentDictionary<string, (List<string> List, DateTimeOffset Expiration)>();
    }

    public async Task<string> FindServiceAsync(string serviceName)
    {
        var key = $"SD:{serviceName}";

        if (_dict.TryGetValue(key, out var item) && item.Expiration > DateTimeOffset.UtcNow)
        {
            _logger.LogInformation($"Read from cache");
            return item.List[new Random().Next(0, item.List.Count)];              
        }
        else
        {
            var queryResult = await _consulClient.Health.Service(serviceName, string.Empty, true);

            var result = new List<string>();
            foreach (var serviceEntry in queryResult.Response)
            {
                result.Add(serviceEntry.Service.Address + ":" + serviceEntry.Service.Port);
            }

            _logger.LogInformation($"Read from consul : {string.Join(",", result)}");

            if (result != null && result.Any())
            {
                // for demonstration, we make expiration a little big
                var val = (result, DateTimeOffset.UtcNow.AddSeconds(600));

                _dict.AddOrUpdate(key, val, (x, y) => val);

                var count = result.Count;
                return result[new Random().Next(0, count)];
            }

            return "";
        }
    }
}

調用的時候。

private async Task<(UserInfoService.UserInfoServiceClient Client, string Msg)> GetClientAsync(string name)
{
    var target = await _findService.FindServiceAsync(name);
    _logger.LogInformation($"Current target = {target}");

    if (string.IsNullOrWhiteSpace(target))
    {
        return (null, "can not find a service");
    }
    else
    {
        var channel = new Channel(target, ChannelCredentials.Insecure);

        var client = new UserInfoService.UserInfoServiceClient(channel);
        return (client, string.Empty);
    }
}

然后我們編寫docker-compose.yml, 讓它在docker中跑

version: '3.4'

services:
  xxxserver1:
    image: ${DOCKER_REGISTRY-}xxxserver
    build:
      context: .
      dockerfile: MyBasedServiceA/Dockerfile
    ports:
      - "9999:9999"  # 綁定容器的9999端口到主機的9999端口
    depends_on:
      - consuldev      
    networks:  
      backend:

  xxxserver2:
    image: ${DOCKER_REGISTRY-}xxxserver
    build:
      context: .
      dockerfile: MyBasedServiceA/Dockerfile
    ports:
      - "9995:9999"   # 綁定容器的9999端口到主機的9995端口
    depends_on:
      - consuldev      
    networks:  
      backend:      
      
  xxxclient:
    image: ${DOCKER_REGISTRY-}xxxclient
    build:
      context: .
      dockerfile: XXXService/Dockerfile
    ports:
      - "9000:80"
    depends_on:
      - consuldev    
      - xxxserver1
      - xxxserver2
    networks:  
      backend:

  consuldev:
    image: consul:latest    
    ports:
      - "8300:8300"
      - "8400:8400"
      - "8500:8500"    
    networks:  
      backend:

networks:  
  backend:      
    driver: bridge

運行結果如下:

當用 docker 把這幾個服務都跑起來之后, 可以看到類似下面的輸出

也可以用docker ps命令來看一下那幾個服務是不是真的在運行。

同時,我們打開consul的UI界面,可以看到我們服務端的兩個實例已經注冊上來了。

當我們用客戶端去訪問的時候,可以發現,第一次它是從consul中取下來了兩個ip,然后隨機選了一個進行訪問。

我們把其中一個服務端(0.4)stop,用來模擬某個節點出現異常,被剔除的情況 ,可以發現consul上面已經看不到了,只剩下0.3這個節點了。

如果我們的調度策略沒有及時將"死掉"的節點剔除,就會出現下面的這種情況。

最后,把stop的服務端啟動,模擬恢復正常,這個時候可以發現無論調度到那個節點都可以正常訪問了。

.NET Core 2.x 和 .NET Core 3.0的細微區別

在.NET Core 2.x中,我們的Server,是需要手動控制的,在.NET Core 3.0中,可以認為它已經和Kestrel融為一體了,不再需要我們再手動去Start了。

同樣的,服務的實現,也和Endpoint Routing緊密的結合在一起了,不再和之前一樣了。

可以看看下面的例子,可能會發現,這是一種熟悉的不能再熟悉的感覺。

public class Startup
{        
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddGrpc(x=> 
        {
            x.EnableDetailedErrors = true;
            x.Interceptors.Add<AccessLogInterceptor>();
        });
    }
  
    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapGrpcService<GreeterService>();

            endpoints.MapGrpcService<UserService>();

            endpoints.MapGet("/", async context =>
            {
                await context.Response.WriteAsync("Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
            });
        });
    }
}

客戶端的用法也和以前不一樣了,直接看一個例子就很清晰了。

[HttpGet]
public async Task<string> GetAsync(CancellationToken cancellationToken)
{
    // fixed https error
    var httpclientHandler = new HttpClientHandler
    {
        ServerCertificateCustomValidationCallback = (message, cert, chain, error) => true
    };

    var httpClient = new HttpClient(httpclientHandler)
    {
        // The port number(5001) must match the port of the gRPC server.
        BaseAddress = new Uri("https://localhost:5001")
    };

    try
    {
        var client = GrpcClient.Create<UserInfoRpcService.UserInfoRpcServiceClient>(httpClient);

        var callOptions = new Grpc.Core.CallOptions()
            // StatusCode=Cancelled
            .WithCancellationToken(cancellationToken)
            // StatusCode=DeadlineExceeded
            .WithDeadline(DateTime.UtcNow.AddMilliseconds(2000));

        var reply = await client.GetByIdAsync(new GetUserByIdRequest { Id = 1 }, callOptions);

        return reply.Name;
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "some exception occure");

        return "error";
    }            
}

從它的用法上,我們也有熟悉的面孔 HttpClient

雖然gRPC是基於HTTP/2的,但是可以看到我們上面小節的例子中,還是能夠指定不使用的。然而到.NET Core 3.0之后,我們就必須要使用https了,不然客戶端就是調不通的。同樣的,我們也可以在grpc-dotnet的倉庫上面看到,如果想不使用HTTP/2,就讓我們用回之前的老庫,不要用新庫,James Newton-King就是這么直接。

https://github.com/grpc/grpc-dotnet/issues/277

https://github.com/grpc/grpc-dotnet/issues/405

https://github.com/grpc/grpc-dotnet/issues/431

擴展閱讀

文中出現的示例代碼都可以在下面這個倉庫找到

catcherwong-archive/2019


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM