gRPC源碼分析2-Server的建立


gRPC中,Server、Client共享的Class不是很多,所以我們可以單獨的分別講解Server和Client的源碼。

 

通過第一篇,我們知道對於gRPC來說,建立Server是非常簡單的,還記得怎么寫的?還是以example里 HelloWorldServer 例子來看

server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
.build()
.start();

 

你沒有看錯,就是這么幾行搞定。

 

如果需要看懂gRPC的源碼,首先有幾點需要明白

  • Builder模式生成Entity

  • Provider(SPI)模式解耦,動態選擇服務提供方

  • abstract class用於擴展

 

0. 流程圖

1. Builder

ServerBuilder是一個抽象類,不同的服務提供方(Provider),將繼承實現它。如何找到這些繼承者呢?ServerProvider就是用來找到不同的provider的。

 

2. Provider

 

如上圖,ServerProvider也是一個抽象類,實現者都有哪些呢?我們通過SPI模式找到他們。

 

通過搜索文件知道gRPC中 io.grpc.ServerProvider 的實現方只有:Netty

 

io.grpc.netty.NettyServerProvider,
這個類就是ServerProvider的實現者,它的builderForPort返回ServerBuilder

3. NettyServer

最后,我們來看下當鏈接建立時是如何創建handle的。

public void initChannel(Channel ch) throws Exception {
  eventLoopReferenceCounter.retain();
  ch.closeFuture().addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) {
      eventLoopReferenceCounter.release();
    }
  });
  NettyServerTransport transport = new NettyServerTransport(ch, protocolNegotiator,
      maxStreamsPerConnection, flowControlWindow, maxMessageSize, maxHeaderListSize);
  ServerTransportListener transportListener;
  // This is to order callbacks on the listener, not to guard access to channel.
  synchronized (NettyServer.this) {
    if (channel != null && !channel.isOpen()) {
      // Server already shutdown.
      ch.close();
      return;
    }

    transportListener = listener.transportCreated(transport);
  }
  transport.start(transportListener);
}

看code可知,當一個鏈接建立時,會生成一個NettyServerTransport,所有的數據處理都將在這里實現。

 

4. NettyServerTransport

public void start(ServerTransportListener listener) {
  Preconditions.checkState(this.listener == null, "Handler already registered");
  this.listener = listener;

  // Create the Netty handler for the pipeline.
  final NettyServerHandler grpcHandler = createHandler(listener);
  HandlerSettings.setAutoWindow(grpcHandler);

  // Notify when the channel closes.
  channel.closeFuture().addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
      notifyTerminated(grpcHandler.connectionError());
    }
  });

  ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler);
  channel.pipeline().addLast(negotiationHandler);
}

我們看到當調用start方法是,最重要的就是createHandle,在這個方法里將看到如何綁定HTTP/2的處理器的。

 

5. NettyServerHandle

static NettyServerHandler newHandler(ServerTransportListener transportListener,
                                     int maxStreams,
                                     int flowControlWindow,
                                     int maxHeaderListSize,
                                     int maxMessageSize) {
  Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
  // 就是一個log
  Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
  Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
  // reader
  Http2FrameReader frameReader = new Http2InboundFrameLogger(
      new DefaultHttp2FrameReader(headersDecoder), frameLogger);
  // writer
  Http2FrameWriter frameWriter =
      new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
  return newHandler(frameReader, frameWriter, transportListener, maxStreams, flowControlWindow,
      maxMessageSize);
}

@VisibleForTesting
static NettyServerHandler newHandler(Http2FrameReader frameReader, Http2FrameWriter frameWriter,
                                     ServerTransportListener transportListener,
                                     int maxStreams,
                                     int flowControlWindow,
                                     int maxMessageSize) {
  Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive");
  Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
  Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive");
// 一個channel一個connection
  Http2Connection connection = new DefaultHttp2Connection(true);

  // Create the local flow controller configured to auto-refill the connection window.
  connection.local().flowController(
      new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));


  Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
  Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
      frameReader);

  Http2Settings settings = new Http2Settings();
  settings.initialWindowSize(flowControlWindow);
  settings.maxConcurrentStreams(maxStreams);

  return new NettyServerHandler(transportListener, decoder, encoder, settings, maxMessageSize);
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM