gRPC(2):客戶端創建和調用原理


 

1. gRPC 客戶端創建流程

1.1 背景

gRPC 是在 HTTP/2 之上實現的 RPC 框架,HTTP/2 是第 7 層(應用層)協議,它運行在 TCP(第 4 層 - 傳輸層)協議之上,相比於傳統的 REST/JSON 機制有諸多的優點:

  1. 基於 HTTP/2 之上的二進制協議(Protobuf 序列化機制);
  2. 一個連接上可以多路復用,並發處理多個請求和響應;
  3. 多種語言的類庫實現;
  4. 服務定義文件和自動代碼生成(.proto 文件和 Protobuf 編譯工具)。

此外,gRPC 還提供了很多擴展點,用於對框架進行功能定制和擴展,例如,通過開放負載均衡接口可以無縫的與第三方組件進行集成對接(Zookeeper、域名解析服務、SLB 服務等)。

一個完整的 RPC 調用流程示例如下:

gRPC 的 RPC 調用與上述流程相似,下面我們一起學習下 gRPC 的客戶端創建和服務調用流程。

1.2 業務代碼示例

以 gRPC 入門級的 helloworld Demo 為例,客戶端發起 RPC 調用的代碼主要包括如下幾部分:

  1. 根據 hostname 和 port 創建 ManagedChannelImpl;
  2. 根據 helloworld.proto 文件生成的 GreeterGrpc 創建客戶端 Stub,用來發起 RPC 調用;
  3. 使用客戶端 Stub(GreeterBlockingStub)發起 RPC 調用,獲取響應。

相關示例代碼如下所示(HelloWorldClient 類):

HelloWorldClient(ManagedChannelBuilder<?> channelBuilder) {
    channel = channelBuilder.build();
    blockingStub = GreeterGrpc.newBlockingStub(channel);
    futureStub = GreeterGrpc.newFutureStub(channel);
    stub = GreeterGrpc.newStub(channel);
  }
  public void blockingGreet(String name) {
    logger.info("Will try to greet " + name + " ...");
    HelloRequest request = HelloRequest.newBuilder().setName(name).build();
    try {
      HelloReply response = blockingStub
              .sayHello(request);
...

1.3 RPC 調用流程

gRPC 的客戶端調用主要包括基於 Netty 的 HTTP/2 客戶端創建、客戶端負載均衡、請求消息的發送和響應接收處理四個流程。

1.3.1 客戶端調用總體流程

gRPC 的客戶端調用總體流程如下圖所示:

gRPC 的客戶端調用流程如下:

  1. 客戶端 Stub(GreeterBlockingStub) 調用 sayHello(request),發起 RPC 調用;
  2. 通過 DnsNameResolver 進行域名解析,獲取服務端的地址信息(列表),隨后使用默認的 LoadBalancer 策略,選擇一個具體的 gRPC 服務端實例;
  3. 如果與路由選中的服務端之間沒有可用的連接,則創建 NettyClientTransport 和 NettyClientHandler,發起 HTTP/2 連接;
  4. 對請求消息使用 PB(Protobuf)做序列化,通過 HTTP/2 Stream 發送給 gRPC 服務端;
  5. 接收到服務端響應之后,使用 PB(Protobuf)做反序列化;
  6. 回調 GrpcFuture 的 set(Response) 方法,喚醒阻塞的客戶端調用線程,獲取 RPC 響應。

需要指出的是,客戶端同步阻塞 RPC 調用阻塞的是調用方線程(通常是業務線程),底層 Transport 的 I/O 線程(Netty 的 NioEventLoop)仍然是非阻塞的。

1.3.2 ManagedChannel 創建流程

ManagedChannel 是對 Transport 層 SocketChannel 的抽象,Transport 層負責協議消息的序列化和反序列化,以及協議消息的發送和讀取。

ManagedChannel 將處理后的請求和響應傳遞給與之相關聯的 ClientCall 進行上層處理,同時,ManagedChannel 提供了對 Channel 的生命周期管理(鏈路創建、空閑、關閉等)。

ManagedChannel 提供了接口式的切面 ClientInterceptor,它可以攔截 RPC 客戶端調用,注入擴展點,以及功能定制,方便框架的使用者對 gRPC 進行功能擴展。

ManagedChannel 的主要實現類 ManagedChannelImpl 創建流程如下:

流程關鍵技術點解讀:

  1. 使用 builder 模式創建 ManagedChannelBuilder 實現類 NettyChannelBuilder,NettyChannelBuilder 提供了 buildTransportFactory 工廠方法創建 NettyTransportFactory,最終用於創建 NettyClientTransport;
  2. 初始化 HTTP/2 連接方式:采用 plaintext 協商模式還是默認的 TLS 模式,HTTP/2 的連接有兩種模式,h2(基於 TLS 之上構建的 HTTP/2)和 h2c(直接在 TCP 之上構建的 HTTP/2);
  3. 創建 NameResolver.Factory 工廠類,用於服務端 URI 的解析,gRPC 默認采用 DNS 域名解析方式。

ManagedChannel 實例構造完成之后,即可創建 ClientCall,發起 RPC 調用。

1.3.3 ClientCall 創建流程

完成 ManagedChannelImpl 創建之后,由 ManagedChannelImpl 發起創建一個新的 ClientCall 實例。ClientCall 的用途是業務應用層的消息調度和處理,它的典型用法如下:

 call = channel.newCall(unaryMethod, callOptions);
 call.start(listener, headers);
 call.sendMessage(message);
 call.halfClose();
 call.request(1);
 // wait for listener.onMessage()

ClientCall 實例的創建流程如下所示:

流程關鍵技術點解讀:

  1. ClientCallImpl 的主要構造參數是 MethodDescriptor 和 CallOptions,其中 MethodDescriptor 存放了需要調用 RPC 服務的接口名、方法名、服務調用的方式(例如 UNARY 類型)以及請求和響應的序列化和反序列化實現類。

    CallOptions 則存放了 RPC 調用的其它附加信息,例如超時時間、鑒權信息、消息長度限制和執行客戶端調用的線程池等。

  2. 設置壓縮和解壓縮的注冊類(CompressorRegistry 和 DecompressorRegistry),以便可以按照指定的壓縮算法對 HTTP/2 消息做壓縮和解壓縮。

ClientCallImpl 實例創建完成之后,就可以調用 ClientTransport,創建 HTTP/2 Client,向 gRPC 服務端發起遠程服務調用。

1.3.4 基於 Netty 的 HTTP/2 Client 創建流程

gRPC 客戶端底層基於 Netty4.1 的 HTTP/2 協議棧框架構建,以便可以使用 HTTP/2 協議來承載 RPC 消息,在滿足標准化規范的前提下,提升通信性能。

gRPC HTTP/2 協議棧(客戶端)的關鍵實現是 NettyClientTransport 和 NettyClientHandler,客戶端初始化流程如下所示:

流程關鍵技術點解讀:

  1. NettyClientHandler 的創建:級聯創建 Netty 的 Http2FrameReader、Http2FrameWriter 和 Http2Connection,用於構建基於 Netty 的 gRPC HTTP/2 客戶端協議棧。

  2. HTTP/2 Client 啟動:仍然基於 Netty 的 Bootstrap 來初始化並啟動客戶端,但是有兩個細節需要注意:

    • NettyClientHandler(實際被包裝成 ProtocolNegotiator.Handler,用於 HTTP/2 的握手協商)創建之后,不是由傳統的 ChannelInitializer 在初始化 Channel 時將 NettyClientHandler 加入到 pipeline 中,而是直接通過 Bootstrap 的 handler 方法直接加入到 pipeline 中,以便可以立即接收發送任務。

    • 客戶端使用的 work 線程組並非通常意義的 EventLoopGroup,而是一個 EventLoop:即 HTTP/2 客戶端使用的 work 線程並非一組線程(默認線程數為 CPU 內核 * 2),而是一個 EventLoop 線程。這個其實也很容易理解,一個 NioEventLoop 線程可以同時處理多個 HTTP/2 客戶端連接,它是多路復用的,對於單個 HTTP/2 客戶端,如果默認獨占一個 work 線程組,將造成極大的資源浪費,同時也可能會導致句柄溢出(並發啟動大量 HTTP/2 客戶端)。

  3. WriteQueue 創建:Netty 的 NioSocketChannel 初始化並向 Selector 注冊之后(發起 HTTP 連接之前),立即由 NettyClientHandler 創建 WriteQueue,用於接收並處理 gRPC 內部的各種 Command,例如鏈路關閉指令、發送 Frame 指令、發送 Ping 指令等。

HTTP/2 Client 創建完成之后,即可由客戶端根據協商策略發起 HTTP/2 連接。如果連接創建成功,后續即可復用該 HTTP/2 連接,進行 RPC 調用。

1.3.5 HTTP/2 連接創建流程

HTTP/2 在 TCP 連接之初通過協商的方式進行通信,只有協商成功,才能進行后續的業務層數據發送和接收。

HTTP/2 的版本標識分為兩類:

  1. 基於 TLS 之上構架的 HTTP/2, 即 HTTPS,使用 h2 表示(ALPN):0x68 與 0x32;
  2. 直接在 TCP 之上構建的 HTTP/2, 即 HTTP,使用 h2c 表示。

HTTP/2 連接創建,分為兩種:通過協商升級協議方式和直接連接方式。

假如不知道服務端是否支持 HTTP/2,可以先使用 HTTP/1.1 進行協商,客戶端發送協商請求消息(只含消息頭),報文示例如下:

GET / HTTP/1.1
Host: 127.0.0.1
Connection: Upgrade, HTTP2-Settings
Upgrade: h2c
HTTP2-Settings: <base64url encoding of HTTP/2 SETTINGS payload>

服務端接收到協商請求之后,如果不支持 HTTP/2,則直接按照 HTTP/1.1 響應返回,雙方通過 HTTP/1.1 進行通信,報文示例如下:

HTTP/1.1 200 OK
Content-Length: 28
Content-Type: text/css

body...

如果服務端支持 HTTP/2, 則協商成功,返回 101 結果碼,通知客戶端一起升級到 HTTP/2 進行通信,示例報文如下:

HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Upgrade: h2c

[ HTTP/2 connection...

101 響應之后,服務需要發送 SETTINGS 幀作為連接序言,客戶端接收到 101 響應之后,也必須發送一個序言作為回應,示例如下:

PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n
SETTINGS 幀

客戶端序言發送完成之后,可以不需要等待服務端的 SETTINGS 幀,而直接發送業務請求 Frame。

假如客戶端和服務端已經約定使用 HTTP/2, 則可以免去 101 協商和切換流程,直接發起 HTTP/2 連接,具體流程如下所示:

幾個關鍵點:

  1. 如果已經明確知道服務端支持 HTTP/2,則可免去通過 HTTP/1.1 101 協議切換方式進行升級,TCP 連接建立之后即可發送序言,否則只能在接收到服務端 101 響應之后發送序言;
  2. 針對一個連接,服務端第一個要發送的幀必須是 SETTINGS 幀,連接序言所包含的 SETTINGS 幀可以為空;
  3. 客戶端可以在發送完序言之后發送應用幀數據,不用等待來自服務器端的序言 SETTINGS 幀。

gRPC 支持三種 Protocol Negotiator 策略:

  1. PlaintextNegotiator:明確服務端支持 HTTP/2,采用 HTTP 直接連接的方式與服務端建立 HTTP/2 連接,省去 101 協議切換過程;
  2. PlaintextUpgradeNegotiator:不清楚服務端是否支持 HTTP/2,采用 HTTP/1.1 協商模式切換升級到 HTTP/2;
  3. TlsNegotiator:在 TLS 之上構建 HTTP/2,協商采用 ALPN 擴展協議,以 "h2" 作為協議標識符。

下面我們以 PlaintextNegotiator 為例,了解下基於 Netty 的 HTTP/2 連接創建流程:

1.3.6 負載均衡策略

總體上看,RPC 的負載均衡策略有兩大類:

  1. 服務端負載均衡(例如代理模式、外部負載均衡服務)
  2. 客戶端負載均衡(內置負載均衡策略和算法,客戶端實現)

外部負載均衡模式如下所示:

以代理 LB 模式為例:RPC 客戶端向負載均衡代理發送請求,負載均衡代理按照指定的路由策略,將請求消息轉發到后端可用的服務實例上。負載均衡代理負責維護后端可用的服務列表,如果發現某個服務不可用,則將其剔除出路由表。

代理 LB 模式的優點是客戶端不需要實現負載均衡策略算法,也不需要維護后端的服務列表信息,不直接跟后端的服務進行通信,在做網絡安全邊界隔離時,非常實用。例如通過 Nginx 做 L7 層負載均衡,將互聯網前端的流量安全的接入到后端服務中。

代理 LB 模式通常支持 L4(Transport)和 L7(Application) 層負載均衡,兩者各有優缺點,可以根據 RPC 的協議特點靈活選擇。L4/L7 層負載均衡對應場景如下:

  • L4 層:對時延要求苛刻、資源損耗少、RPC 本身采用私有 TCP 協議;
  • L7 層:有會話狀態的連接、HTTP 協議簇(例如 Restful)。

客戶端負載均衡策略由客戶端內置負載均衡能力,通過靜態配置、域名解析服務(例如 DNS 服務)、訂閱發布(例如 Zookeeper 服務注冊中心)等方式獲取 RPC 服務端地址列表,並將地址列表緩存到客戶端內存中。

每次 RPC 調用時,根據客戶端配置的負載均衡策略由負載均衡算法從緩存的服務地址列表中選擇一個服務實例,發起 RPC 調用。

客戶端負載均衡策略工作原理示例如下:

gRPC 默認采用客戶端負載均衡策略,同時提供了擴展機制,使用者通過自定義實現 NameResolver 和 LoadBalancer,即可覆蓋 gRPC 默認的負載均衡策略,實現自定義路由策略的擴展。

gRPC 提供的負載均衡策略實現類如下所示:

  • PickFirstBalancer:無負載均衡能力,即使有多個服務端地址可用,也只選擇第一個地址;
  • RoundRobinLoadBalancer:"RoundRobin" 負載均衡策略。

gRPC 負載均衡流程如下所示:

流程關鍵技術點解讀:

  1. 負載均衡功能模塊的輸入是客戶端指定的 hostName、需要調用的接口名和方法名等參數,輸出是執行負載均衡算法后獲得的 NettyClientTransport,通過 NettyClientTransport 可以創建基於 Netty HTTP/2 的 gRPC 客戶端,發起 RPC 調用;
  2. gRPC 系統默認提供的是 DnsNameResolver,它通過 InetAddress.getAllByName(host) 獲取指定 host 的 IP 地址列表(本地 DNS 服務),對於擴展者而言,可以繼承 NameResolver 實現自定義的地址解析服務,例如使用 Zookeeper 替換 DnsNameResolver,把 Zookeeper 作為動態的服務地址配置中心,它的偽代碼示例如下:

    第一步:繼承 NameResolver,實現 start(Listener listener) 方法:

    void start(Listener listener)
    {
    // 獲取 ZooKeeper 地址,並連接
    // 創建 Watcher,並實現 process(WatchedEvent event),監聽地址變更
    // 根據接口名和方法名,調用 getChildren 方法,獲取發布該服務的地址列表
    // 將地址列表加到 List 中
    // 調用 NameResolver.Listener.onAddresses(), 通知地址解析完成

    第二步:創建 ManagedChannelBuilder 時,指定 Target 的地址為 Zookeeper 服務端地址,同時設置 nameResolver 為 Zookeeper NameResolver, 示例代碼如下所示:

    this(ManagedChannelBuilder.forTarget(zookeeperAddr)
         .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
         .nameResolverFactory(new ZookeeperNameResolverProvider())
         .usePlaintext(false));
  3. LoadBalancer 負責從 nameResolver 中解析獲得的服務端 URL 中按照指定路由策略,選擇一個目標服務端地址,並創建 ClientTransport。同樣,可以通過覆蓋 handleResolvedAddressGroups 實現自定義負載均衡策略。

通過 LoadBalancer + NameResolver,可以實現靈活的負載均衡策略擴展。例如基於 Zookeeper、etcd 的分布式配置服務中心方案。

1.3.7 RPC 請求消息發送流程

gRPC 默認基於 Netty HTTP/2 + PB 進行 RPC 調用,請求消息發送流程如下所示:

流程關鍵技術點解讀:

  1. ClientCallImpl 的 sendMessage 調用,主要完成了請求對象的序列化(基於 PB)、HTTP/2 Frame 的初始化;
  2. ClientCallImpl 的 halfClose 調用將客戶端准備就緒的請求 Frame 封裝成自定義的 SendGrpcFrameCommand,寫入到 WriteQueue 中;
  3. WriteQueue 執行 flush() 將 SendGrpcFrameCommand 寫入到 Netty 的 Channel 中,調用 Channel 的 write 方法,被 NettyClientHandler 攔截到,由 NettyClientHandler 負責具體的發送操作;
  4. NettyClientHandler 調用 Http2ConnectionEncoder 的 writeData 方法,將 Frame 寫入到 HTTP/2 Stream 中,完成請求消息的發送。

1.3.8 RPC 響應接收和處理流程

gRPC 客戶端響應消息的接收入口是 NettyClientHandler,它的處理流程如下所示:

流程關鍵技術點解讀:

  1. NettyClientHandler 的 onHeadersRead(int streamId, Http2Headers headers, boolean endStream) 方法會被調用兩次,根據 endStream 判斷是否是 Stream 結尾;
  2. 請求和響應的關聯:根據 streamId 可以關聯同一個 HTTP/2 Stream,將 NettyClientStream 緩存到 Stream 中,客戶端就可以在接收到響應消息頭或消息體時還原出 NettyClientStream,進行后續處理;
  3. RPC 客戶端調用線程的阻塞和喚醒使用到了 GrpcFuture 的 wait 和 notify 機制,來實現客戶端調用線程的同步阻塞和喚醒;
  4. 客戶端和服務端的 HTTP/2 Header 和 Data Frame 解析共用同一個方法,即 MessageDeframer 的 deliver()。

客戶端源碼分析

gRPC 客戶端調用原理並不復雜,但是代碼卻相對比較繁雜。下面圍繞關鍵的類庫,對主要功能點進行源碼分析。

NettyClientTransport 功能和源碼分析

NettyClientTransport 的主要功能如下:

  • 通過 start(Listener transportListener) 創建 HTTP/2 Client,並連接 gRPC 服務端;
  • 通過 newStream(MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) 創建 ClientStream;
  • 通過 shutdown() 關閉底層的 HTTP/2 連接。

以啟動 HTTP/2 客戶端為例進行講解(NettyClientTransport 類):

EventLoop eventLoop = group.next();
    if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) {
      keepAliveManager = new KeepAliveManager(
          new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos, keepAliveTimeoutNanos,
          keepAliveWithoutCalls);
    }
    handler = NettyClientHandler.newHandler(lifecycleManager, keepAliveManager, flowControlWindow,
        maxHeaderListSize, Ticker.systemTicker(), tooManyPingsRunnable);
    HandlerSettings.setAutoWindow(handler);
    negotiationHandler = negotiator.newHandler(handler);

根據啟動時配置的 HTTP/2 協商策略,以 NettyClientHandler 為參數創建 ProtocolNegotiator.Handler。

創建 Bootstrap,並設置 EventLoopGroup,需要指出的是,此處並沒有使用 EventLoopGroup,而是它的一種實現類 EventLoop,原因在前文中已經說明,相關代碼示例如下(NettyClientTransport 類):

Bootstrap b = new Bootstrap();
    b.group(eventLoop);
    b.channel(channelType);
    if (NioSocketChannel.class.isAssignableFrom(channelType)) {
      b.option(SO_KEEPALIVE, true);
    }

創建 WriteQueue 並設置到 NettyClientHandler 中,用於接收內部的各種 QueuedCommand,初始化完成之后,發起 HTTP/2 連接,代碼如下(NettyClientTransport 類):

handler.startWriteQueue(channel);
    channel.connect(address).addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
        if (!future.isSuccess()) {
          ChannelHandlerContext ctx = future.channel().pipeline().context(handler);
          if (ctx != null) {
            ctx.fireExceptionCaught(future.cause());
          }
          future.channel().pipeline().fireExceptionCaught(future.cause());
        }

2.2 NettyClientHandler 功能和源碼分析

NettyClientHandler 繼承自 Netty 的 Http2ConnectionHandler,是 gRPC 接收和發送 HTTP/2 消息的關鍵實現類,也是 gRPC 和 Netty 的交互橋梁,它的主要功能如下所示:

  • 發送各種協議消息給 gRPC 服務端;
  • 接收 gRPC 服務端返回的應答消息頭、消息體和其它協議消息;
  • 處理 HTTP/2 協議相關的指令,例如 StreamError、ConnectionError 等。

協議消息的發送:無論是業務請求消息,還是協議指令消息,都統一封裝成 QueuedCommand,由 NettyClientHandler 攔截並處理,相關代碼如下所示(NettyClientHandler 類):

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
          throws Exception {
    if (msg instanceof CreateStreamCommand) {
      createStream((CreateStreamCommand) msg, promise);
    } else if (msg instanceof SendGrpcFrameCommand) {
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
    } else if (msg instanceof CancelClientStreamCommand) {
      cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
    } else if (msg instanceof SendPingCommand) {
      sendPingFrame(ctx, (SendPingCommand) msg, promise);
    } else if (msg instanceof GracefulCloseCommand) {
      gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
    } else if (msg instanceof ForcefulCloseCommand) {
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
    } else if (msg == NOOP_MESSAGE) {
      ctx.write(Unpooled.EMPTY_BUFFER, promise);
    } else {
      throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
    }

協議消息的接收:NettyClientHandler 通過向 Http2ConnectionDecoder 注冊 FrameListener 來監聽 RPC 響應消息和協議指令消息,相關接口如下:

FrameListener 回調 NettyClientHandler 的相關方法,實現協議消息的接收和處理:

需要指出的是,NettyClientHandler 並沒有實現所有的回調接口,對於需要特殊處理的幾個方法進行了重載,例如 onDataRead 和 onHeadersRead。

2.3 ProtocolNegotiator 功能和源碼分析

ProtocolNegotiator 用於 HTTP/2 連接創建的協商,gRPC 支持三種策略並有三個實現子類:

gRPC 的 ProtocolNegotiator 實現類完全遵循 HTTP/2 相關規范,以 PlaintextUpgradeNegotiator 為例,通過設置 Http2ClientUpgradeCodec,用於 101 協商和協議升級,相關代碼如下所示(PlaintextUpgradeNegotiator 類):

public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
      Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(handler);
      HttpClientCodec httpClientCodec = new HttpClientCodec();
      final HttpClientUpgradeHandler upgrader =
          new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000);
      return new BufferingHttp2UpgradeHandler(upgrader);
    }

2.4 LoadBalancer 功能和源碼分析

LoadBalancer 負責客戶端負載均衡,它是個抽象類,gRPC 框架的使用者可以通過繼承的方式進行擴展。

gRPC 當前已經支持 PickFirstBalancer 和 RoundRobinLoadBalancer 兩種負載均衡策略,未來不排除會提供更多的策略。

以 RoundRobinLoadBalancer 為例,它的工作原理如下:根據 PickSubchannelArgs 來選擇一個 Subchannel(RoundRobinLoadBalancerFactory 類):

public PickResult pickSubchannel(PickSubchannelArgs args) {
      if (size > 0) {
        return PickResult.withSubchannel(nextSubchannel());
      }
      if (status != null) {
        return PickResult.withError(status);
      }
      return PickResult.withNoResult();
    }

再看下 Subchannel 的選擇算法(Picker 類):

private Subchannel nextSubchannel() {
      if (size == 0) {
        throw new NoSuchElementException();
      }
      synchronized (this) {
        Subchannel val = list.get(index);
        index++;
        if (index >= size) {
          index = 0;
        }
        return val;
      }
    }

即通過順序的方式從服務端列表中獲取一個 Subchannel。 如果用戶需要定制負載均衡策略,則可以在 RPC 調用時,使用如下代碼(HelloWorldClient 類):

this(ManagedChannelBuilder.forAddress(host, port).loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
.nameResolverFactory(new ZkNameResolverProvider()) .usePlaintext(true));

2.5 ClientCalls 功能和源碼分析

ClientCalls 提供了各種 RPC 調用方式,包括同步、異步、Streaming 和 Unary 方式等,相關方法如下所示:

下面一起看下 RPC 請求消息的發送和應答接收相關代碼。

2.5.1 RPC 請求調用源碼分析

請求調用主要有兩步:請求 Frame 構造和 Frame 發送,請求 Frame 構造代碼如下所示(ClientCallImpl 類):

public void sendMessage(ReqT message) {
    Preconditions.checkState(stream != null, "Not started");
    Preconditions.checkState(!cancelCalled, "call was cancelled");
    Preconditions.checkState(!halfCloseCalled, "call was half-closed");
    try {
      InputStream messageIs = method.streamRequest(message);
      stream.writeMessage(messageIs);
...

使用 PB 對請求消息做序列化,生成 InputStream,構造請求 Frame:

private int writeUncompressed(InputStream message, int messageLength) throws IOException {
    if (messageLength != -1) {
      statsTraceCtx.outboundWireSize(messageLength);
      return writeKnownLengthUncompressed(message, messageLength);
    }
    BufferChainOutputStream bufferChain = new BufferChainOutputStream();
    int written = writeToOutputStream(message, bufferChain);
    if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
      throw Status.INTERNAL
          .withDescription(
              String.format("message too large %d > %d", written , maxOutboundMessageSize))
          .asRuntimeException();
    }
    writeBufferChain(bufferChain, false);
    return written;
}

Frame 發送代碼如下所示:

public void writeFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
      ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf();
      final int numBytes = bytebuf.readableBytes();
      if (numBytes > 0) {
        onSendingBytes(numBytes);
        writeQueue.enqueue(
            new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream),
            channel.newPromise().addListener(new ChannelFutureListener() {
              @Override
              public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                  transportState().onSentBytes(numBytes);
                }
              }
            }), flush);

NettyClientHandler 接收到發送事件之后,調用 Http2ConnectionEncoder 將 Frame 寫入 Netty HTTP/2 協議棧(NettyClientHandler 類):

private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
      ChannelPromise promise) {
    encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
  }

2.5.2 RPC 響應接收和處理源碼分析

響應消息的接收入口是 NettyClientHandler,包括 HTTP/2 Header 和 HTTP/2 DATA Frame 兩部分,代碼如下(NettyClientHandler 類):

private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) {
    NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
    stream.transportHeadersReceived(headers, endStream);
    if (keepAliveManager != null) {
      keepAliveManager.onDataReceived();
    }
  }

如果參數 endStream 為 True,說明 Stream 已經結束,調用 transportTrailersReceived,通知 Listener close,代碼如下所示(AbstractClientStream2 類):

if (stopDelivery || isDeframerStalled()) {
        deliveryStalledTask = null;
        closeListener(status, trailers);
      } else {
        deliveryStalledTask = new Runnable() {
          @Override
          public void run() {
            closeListener(status, trailers);
          }
        };
      }

讀取到 HTTP/2 DATA Frame 之后,調用 MessageDeframer 的 deliver 對 Frame 進行解析,代碼如下(MessageDeframer 類):

private void deliver() {
    if (inDelivery) {
      return;
    }
    inDelivery = true;
    try {
          while (pendingDeliveries > 0 && readRequiredBytes()) {
        switch (state) {
          case HEADER:
            processHeader();
            break;
          case BODY:
            processBody();
...

將 Frame 轉換成 InputStream 之后,通知 ClientStreamListenerImpl,調用 messageRead(final InputStream message),將 InputStream 反序列化為響應對象,相關代碼如下所示(ClientStreamListenerImpl 類):

public void messageRead(final InputStream message) {
      class MessageRead extends ContextRunnable {
        MessageRead() {
          super(context);
        }
        @Override
        public final void runInContext() {
          try {
            if (closed) {
              return;
            }
            try {
              observer.onMessage(method.parseResponse(message));
            } finally {
              message.close();
            }

當接收到 endOfStream 之后,通知 ClientStreamListenerImpl,調用它的 close 方法,如下所示(ClientStreamListenerImpl 類):

private void close(Status status, Metadata trailers) {
      closed = true;
      cancelListenersShouldBeRemoved = true;
      try {
        closeObserver(observer, status, trailers);
      } finally {
        removeContextListenerAndCancelDeadlineFuture();
      }
    }

最終調用 UnaryStreamToFuture 的 onClose 方法,set 響應對象,喚醒阻塞的調用方線程,完成 RPC 調用,代碼如下(UnaryStreamToFuture 類):

public void onClose(Status status, Metadata trailers) {
      if (status.isOk()) {
        if (value == null) {
          responseFuture.setException(
              Status.INTERNAL.withDescription("No value received for unary call")
                  .asRuntimeException(trailers));
        }
        responseFuture.set(value);
      } else {
        responseFuture.setException(status.asRuntimeException(trailers));
      }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM