Spark2.1.0——內置RPC框架詳解


Spark2.1.0——內置RPC框架詳解

         在Spark中很多地方都涉及網絡通信,比如Spark各個組件間的消息互通、用戶文件與Jar包的上傳、節點間的Shuffle過程、Block數據的復制與備份等。在Spark 0.x.x與Spark 1.x.x版本中,組件間的消息通信主要借助於Akka[1],使用Akka可以輕松的構建強有力的高並發與分布式應用。但是Akka在Spark 2.0.0版本中被移除了,Spark官網文檔對此的描述為:“Akka的依賴被移除了,因此用戶可以使用任何版本的Akka來編程了。”Spark團隊的決策者或許認為對於Akka具體版本的依賴,限制了用戶對於Akka不同版本的使用。盡管如此,筆者依然認為Akka是一款非常優秀的開源分布式系統,我參與的一些Java Application或者Java Web就利用Akka的豐富特性實現了分布式一致性、最終一致性以及分布式事務等分布式環境面對的問題。在Spark 1.x.x版本中,用戶文件與Jar包的上傳采用了由Jetty[2]實現的HttpFileServer,但在Spark 2.0.0版本中也被廢棄了,現在使用的是基於Spark內置RPC框架的NettyStreamManager。節點間的Shuffle過程和Block數據的復制與備份這兩個部分在Spark 2.0.0版本中依然沿用了Netty[3],通過對接口和程序進行重新設計將各個組件間的消息互通、用戶文件與Jar包的上傳等內容統一納入到Spark的RPC框架體系中。

         我們先來看看RPC框架的基本架構,如圖1所示。

圖1       Spark內置RPC框架的基本架構

TransportContext內部包含傳輸上下文的配置信息TransportConf和對客戶端請求消息進行處理的RpcHandler。TransportConf在創建TransportClientFactory和TransportServer時都是必須的,而RpcHandler只用於創建TransportServer。TransportClientFactory是RPC客戶端的工廠類。TransportServer是RPC服務端的實現。圖中記號的含義如下:

記號①:表示通過調用TransportContext的createClientFactory方法創建傳輸客戶端工廠TransportClientFactory的實例。在構造TransportClientFactory的實例時,還會傳遞客戶端引導程序TransportClientBootstrap的列表。此外,TransportClientFactory內部還存在針對每個Socket地址的連接池ClientPool,這個連接池緩存的定義如下:

  private final ConcurrentHashMap<SocketAddress, ClientPool> connectionPool;

ClientPool的類型定義如下:

  private static class ClientPool {
    TransportClient[] clients;
    Object[] locks;

    ClientPool(int size) {
      clients = new TransportClient[size];
      locks = new Object[size];
      for (int i = 0; i < size; i++) {
        locks[i] = new Object();
      }
    }
  } 

由此可見,ClientPool實際是由TransportClient的數組構成,而locks數組中的Object與clients數組中的TransportClient按照數組索引一一對應,通過對每個TransportClient分別采用不同的鎖,降低並發情況下線程間對鎖的爭用,進而減少阻塞,提高並發度。

記號②:表示通過調用TransportContext的createServer方法創建傳輸服務端TransportServer的實例。在構造TransportServer的實例時,需要傳遞TransportContext、host、port、RpcHandler以及服務端引導程序TransportServerBootstrap的列表。

         有了對Spark內置RPC框架的基本架構的了解,現在正式介紹Spark的RPC框架所包含的各個組件:

  • TransportContext:傳輸上下文,包含了用於創建傳輸服務端(TransportServer)和傳輸客戶端工廠(TransportClientFactory)的上下文信息,並支持使用TransportChannelHandler設置Netty提供的SocketChannel的Pipeline的實現。
  • TransportConf:傳輸上下文的配置信息。
  • RpcHandler:對調用傳輸客戶端(TransportClient)的sendRPC方法發送的消息進行處理的程序。
  • MessageEncoder:在將消息放入管道前,先對消息內容進行編碼,防止管道另一端讀取時丟包和解析錯誤。
  • MessageDecoder:對從管道中讀取的ByteBuf進行解析,防止丟包和解析錯誤;
  • TransportFrameDecoder:對從管道中讀取的ByteBuf按照數據幀進行解析;
  • RpcResponseCallback:RpcHandler對請求的消息處理完畢后,進行回調的接口。
  • TransportClientFactory:創建傳輸客戶端(TransportClient)的傳輸客戶端工廠類。
  • ClientPool:在兩個對等節點間維護的關於傳輸客戶端(TransportClient)的池子。ClientPool是TransportClientFactory的內部組件。
  • TransportClient:RPC框架的客戶端,用於獲取預先協商好的流中的連續塊。TransportClient旨在允許有效傳輸大量數據,這些數據將被拆分成幾百KB到幾MB的塊。當TransportClient處理從流中獲取的獲取的塊時,實際的設置是在傳輸層之外完成的。sendRPC方法能夠在客戶端和服務端的同一水平線的通信進行這些設置。
  • TransportClientBootstrap:當服務端響應客戶端連接時在客戶端執行一次的引導程序。
  • TransportRequestHandler:用於處理客戶端的請求並在寫完塊數據后返回的處理程序。
  • TransportResponseHandler:用於處理服務端的響應,並且對發出請求的客戶端進行響應的處理程序。
  • TransportChannelHandler:代理由TransportRequestHandler處理的請求和由TransportResponseHandler處理的響應,並加入傳輸層的處理。
  • TransportServerBootstrap:當客戶端連接到服務端時在服務端執行一次的引導程序。
  • TransportServer:RPC框架的服務端,提供高效的、低級別的流服務。

拓展知識:為什么需要MessageEncoder和MessageDecoder?因為在基於流的傳輸里(比如TCP/IP),接收到的數據首先會被存儲到一個socket接收緩沖里。不幸的是,基於流的傳輸並不是一個數據包隊列,而是一個字節隊列。即使你發送了2個獨立的數據包,操作系統也不會作為2個消息處理而僅僅認為是一連串的字節。因此不能保證遠程寫入的數據會被准確地讀取。舉個例子,讓我們假設操作系統的TCP/TP協議棧已經接收了3個數據包:ABC、DEF、GHI。由於基於流傳輸的協議的這種統一的性質,在你的應用程序在讀取數據的時候有很高的可能性被分成下面的片段:AB、CDEFG、H、I。因此,接收方不管是客戶端還是服務端,都應該把接收到的數據整理成一個或者多個更有意義並且讓程序的邏輯更好理解的數據。


[1]  Akka是基於Actor並發編程模型實現的並發的分布式的框架。Akka是用Scala語言編寫的,它提供了Java和Scala兩種語言的API,減少開發人員對並發的細節處理,並保證分布式調用的最終一致性。在附錄B中有關於Akka的進一步介紹,感興趣的讀者不妨一讀。

[2]  Jetty 是一個開源的Servlet容器,它為基於Java的Web容器,例如JSP和Servlet提供運行環境。Jetty是使用Java語言編寫的,它的API以一組JAR包的形式發布。開發人員可以將Jetty容器實例化成一個對象,可以迅速為一些獨立運行的Java應用提供網絡和Web連接。在附錄C中有對Jetty的簡單介紹,感興趣的讀者可以選擇閱讀。

[3]  Netty是由Jboss提供的一個基於NIO的客戶、服務器端編程框架,使用Netty 可以確保你快速、簡單的開發出一個網絡應用,例如實現了某種協議的客戶,服務端應用。附錄G中有對Netty的簡單介紹,感興趣的讀者可以一讀。


 

一、RPC配置TransportConf

         上文提到TransportContext中的TransportConf給Spark的RPC框架提供配置信息,它有兩個成員屬性——配置提供者conf和配置的模塊名稱module。這兩個屬性的定義如下:

  private final ConfigProvider conf;
  private final String module;

其中conf是真正的配置提供者,其類型ConfigProvider是一個抽象類,見代碼清單1。

代碼清單1  ConfigProvider的實現

public abstract class ConfigProvider {
  public abstract String get(String name);

  public String get(String name, String defaultValue) {
    try {
      return get(name);
    } catch (NoSuchElementException e) {
      return defaultValue;
    }
  }

  public int getInt(String name, int defaultValue) {
    return Integer.parseInt(get(name, Integer.toString(defaultValue)));
  }

  public long getLong(String name, long defaultValue) {
    return Long.parseLong(get(name, Long.toString(defaultValue)));
  }

  public double getDouble(String name, double defaultValue) {
    return Double.parseDouble(get(name, Double.toString(defaultValue)));
  }

  public boolean getBoolean(String name, boolean defaultValue) {
    return Boolean.parseBoolean(get(name, Boolean.toString(defaultValue)));
  }
}

從代碼清單1,可以看到ConfigProvider中包括get、getInt、getLong、getDouble、getBoolean等方法,這些方法都是基於抽象方法get獲取值,經過一次類型轉換而實現。這個抽象的get方法將需要子類去實現。

         Spark通常使用SparkTransportConf創建TransportConf,其實現見代碼清單2。

代碼清單2  SparkTransportConf的實現

object SparkTransportConf {
  private val MAX_DEFAULT_NETTY_THREADS = 8
  def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
    val conf = _conf.clone
    val numThreads = defaultNumThreads(numUsableCores)
    conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
    conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)

    new TransportConf(module, new ConfigProvider {
      override def get(name: String): String = conf.get(name)
    })
  }
  private def defaultNumThreads(numUsableCores: Int): Int = {
    val availableCores =
      if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
    math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
  }
}

從代碼清單2看到,可以使用SparkTransportConf的fromSparkConf方法來構造TransportConf。傳遞的三個參數分別為SparkConf、模塊名module及可用的內核數numUsableCores。如果numUsableCores小於等於0,那么線程數是系統可用處理器的數量,不過系統的內核數不可能全部用於網絡傳輸使用,所以這里還將分配給網絡傳輸的內核數量最多限制在8個。最終確定的線程數將被用於設置客戶端傳輸線程數(spark.$module.io.clientThreads屬性)和服務端傳輸線程數(spark.$module.io.serverThreads屬性)。fromSparkConf最終構造TransportConf對象時傳遞的ConfigProvider為實現了get方法的匿名的內部類,get的實現實際是代理了SparkConf的get方法。

 

二、RPC客戶端工廠TransportClientFactory

         TransportClientFactory是創建傳輸客戶端(TransportClient)的工廠類。在說明圖3-1中的記號①時提到過TransportContext的createClientFactory方法可以創建TransportClientFactory的實例,其實現見代碼清單3。

代碼清單3  創建客戶端工廠

  public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
    return new TransportClientFactory(this, bootstraps);
  }

  public TransportClientFactory createClientFactory() {
    return createClientFactory(Lists.<TransportClientBootstrap>newArrayList());
  }

可以看到TransportContext中有兩個重載的createClientFactory方法,它們最終在構造TransportClientFactory時都會傳遞兩個參數:TransportContext和TransportClientBootstrap列表。TransportClientFactory構造器的實現見代碼清單4。

代碼清單4  TransportClientFactory的構造器

  public TransportClientFactory(
      TransportContext context,
      List<TransportClientBootstrap> clientBootstraps) {
    this.context = Preconditions.checkNotNull(context);
    this.conf = context.getConf();
    this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));
    this.connectionPool = new ConcurrentHashMap<>();
    this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
    this.rand = new Random();

    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
    this.workerGroup = NettyUtils.createEventLoop(
        ioMode,
        conf.clientThreads(),
        conf.getModuleName() + "-client");
    this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
      conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
  } 

TransportClientFactory構造器中的各個變量分別為:

  • context:即參數傳遞的TransportContext的引用;
  • conf:即TransportConf,這里通過調用TransportContext的getConf獲取;
  • clientBootstraps:即參數傳遞的TransportClientBootstrap列表;
  • connectionPool:即針對每個Socket地址的連接池ClientPool的緩存;connectionPool的數據結構較為復雜,為便於讀者理解,這里以圖2來表示connectionPool的數據結構。

圖2       TransportClientFactory的connectionPool

  • numConnectionsPerPeer:即從TransportConf獲取的key為”spark.+模塊名+.io.numConnectionsPerPeer”的屬性值。此屬性值用於指定對等節點間的連接數。這里的模塊名實際為TransportConf的module字段,Spark的很多組件都利用RPC框架構建,它們之間按照模塊名區分,例如RPC模塊的key為“spark.rpc.io.numConnectionsPerPeer”;
  • rand:對Socket地址對應的連接池ClientPool中緩存的TransportClient進行隨機選擇,對每個連接做負載均衡;
  • ioMode:IO模式,即從TransportConf獲取key為”spark.+模塊名+.io.mode”的屬性值。默認值為NIO,Spark還支持EPOLL;
  • socketChannelClass:客戶端Channel被創建時使用的類,通過ioMode來匹配,默認為NioSocketChannel,Spark還支持EpollEventLoopGroup;
  • workerGroup:根據Netty的規范,客戶端只有worker組,所以此處創建workerGroup。workerGroup的實際類型是NioEventLoopGroup;
  • pooledAllocator :匯集ByteBuf但對本地線程緩存禁用的分配器。

TransportClientFactory里大量使用了NettyUtils,關於NettyUtils的具體實現,請看附錄G。[1]


提示:NIO是指Java中New IO的簡稱,其特點包括:為所有的原始類型提供(Buffer)緩沖支持;字符集編碼解碼解決方案;提供一個新的原始I/O 抽象Channel,支持鎖和內存映射文件的文件訪問接口;提供多路非阻塞式(non-bloking)的高伸縮性網絡I/O 。其具體使用屬於Java語言的范疇,本文不過多介紹。


[1] Spark將對Netty框架的使用細節都封裝在NettyUtils工具類中,由於Netty的API使用不屬於本書主要闡述的內容,故此放入附錄G中,對Netty的使用感興趣的讀者可以選擇閱讀。


 2.1、客戶端引導程序TransportClientBootstrap

         TransportClientFactory的clientBootstraps屬性是TransportClientBootstrap的列表。TransportClientBootstrap是在TransportClient上執行的客戶端引導程序,主要對連接建立時進行一些初始化的准備(例如驗證、加密)。TransportClientBootstrap所作的操作往往是昂貴的,好在建立的連接可以重用。TransportClientBootstrap的接口定義見代碼清單5。

代碼清單5         TransportClientBootstrap的定義

public interface TransportClientBootstrap {
  void doBootstrap(TransportClient client, Channel channel) throws RuntimeException;
}

TransportClientBootstrap有兩個實現類:EncryptionDisablerBootstrap和SaslClientBootstrap。為了對TransportClientBootstrap的作用能有更深的了解,這里以EncryptionDisablerBootstrap為例,EncryptionDisablerBootstrap的實現見代碼清單6。

代碼清單6         EncryptionDisablerBootstrap的實現

  private static class EncryptionDisablerBootstrap implements TransportClientBootstrap {
    @Override
    public void doBootstrap(TransportClient client, Channel channel) {
      channel.pipeline().remove(SaslEncryption.ENCRYPTION_HANDLER_NAME);
    }
  }

根據代碼清單6,可以看到EncryptionDisablerBootstrap的作用是移除客戶端管道中的SASL加密。

2.2、創建Rpc客戶端TransportClient

         有了TransportClientFactory,Spark的各個模塊就可以使用它創建RPC客戶端TransportClient了。每個TransportClient實例只能和一個遠端的RPC服務通信,所以Spark中的組件如果想要和多個RPC服務通信,就需要持有多個TransportClient實例。創建TransportClient的方法見代碼清單7(實際為從緩存中獲取TransportClient)。

代碼清單7        從緩存獲取TransportClient 

  public TransportClient createClient(String remoteHost, int remotePort)
      throws IOException, InterruptedException {
    // 創建InetSocketAddress
    final InetSocketAddress unresolvedAddress =
      InetSocketAddress.createUnresolved(remoteHost, remotePort);

    ClientPool clientPool = connectionPool.get(unresolvedAddress);
    if (clientPool == null) {
      connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
      clientPool = connectionPool.get(unresolvedAddress);
    }
    
    int clientIndex = rand.nextInt(numConnectionsPerPeer); // 隨機選擇一個TransportClient
    TransportClient cachedClient = clientPool.clients[clientIndex];

    if (cachedClient != null && cachedClient.isActive()) {// 獲取並返回激活的TransportClient
      TransportChannelHandler handler = cachedClient.getChannel().pipeline()
        .get(TransportChannelHandler.class);
      synchronized (handler) {
        handler.getResponseHandler().updateTimeOfLastRequest();
      }

      if (cachedClient.isActive()) {
        logger.trace("Returning cached connection to {}: {}",
          cachedClient.getSocketAddress(), cachedClient);
        return cachedClient;
      }
    }

    final long preResolveHost = System.nanoTime();
    final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
    final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
    if (hostResolveTimeMs > 2000) {
      logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
    } else {
      logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
    }
    // 創建並返回TransportClient對象
    synchronized (clientPool.locks[clientIndex]) {
      cachedClient = clientPool.clients[clientIndex];

      if (cachedClient != null) {
        if (cachedClient.isActive()) {
          logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient);
          return cachedClient;
        } else {
          logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress);
        }
      }
      clientPool.clients[clientIndex] = createClient(resolvedAddress); 
      return clientPool.clients[clientIndex];
    }
  } 

從代碼清單7得知,創建TransportClient的步驟如下:

  1. 調用InetSocketAddress的靜態方法createUnresolved構建InetSocketAddress(這種方式創建InetSocketAddress,可以在緩存中已經有TransportClient時避免不必要的域名解析),然后從connectionPool中獲取與此地址對應的ClientPool,如果沒有則需要新建ClientPool,並放入緩存connectionPool中;
  2. 根據numConnectionsPerPeer的大小(使用“spark.+模塊名+.io.numConnectionsPerPeer”屬性配置),從ClientPool中隨機選擇一個TransportClient;
  3. 如果ClientPool的clients中在隨機產生索引位置不存在TransportClient或者TransportClient沒有激活,則進入第5)步,否則對此TransportClient進行第4)步的檢查;
  4. 更新TransportClient的channel中配置的TransportChannelHandler的最后一次使用時間,確保channel沒有超時,然后檢查TransportClient是否是激活狀態,最后返回此TransportClient給調用方;
  5. 由於緩存中沒有TransportClient可用,於是調用InetSocketAddress的構造器創建InetSocketAddress對象(直接使用InetSocketAddress的構造器創建InetSocketAddress,會進行域名解析),在這一步驟多個線程可能會產生競態條件(由於沒有同步處理,所以多個線程極有可能同時執行到此處,都發現緩存中沒有TransportClient可用,於是都使用InetSocketAddress的構造器創建InetSocketAddress);
  6. 第5步中創建InetSocketAddress的過程中產生的競態條件如果不妥善處理,會產生線程安全問題,所以到了ClientPool的locks數組發揮作用的時候了。按照隨機產生的數組索引,locks數組中的鎖對象可以對clients數組中的TransportClient一對一進行同步。即便之前產生了競態條件,但是在這一步只能有一個線程進入臨界區。在臨界區內,先進入的線程調用重載的createClient方法創建TransportClient對象並放入ClientPool的clients數組中。當率先進入臨界區的線程退出臨界區后,其他線程才能進入,此時發現ClientPool的clients數組中已經存在了TransportClient對象,那么將不再創建TransportClient,而是直接使用它。

代碼清單7的整個執行過程實際解決了TransportClient緩存的使用以及createClient方法的線程安全問題,並沒有涉及創建TransportClient的實現。TransportClient的創建過程在重載的createClient方法(見代碼清單8)中實現。

代碼清單8         創建TransportClient

  private TransportClient createClient(InetSocketAddress address)
      throws IOException, InterruptedException {
    logger.debug("Creating new connection to {}", address);
    // 構建根引導器Bootstrap並對其進行配置
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(workerGroup)
      .channel(socketChannelClass)
      .option(ChannelOption.TCP_NODELAY, true)
      .option(ChannelOption.SO_KEEPALIVE, true)
      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
      .option(ChannelOption.ALLOCATOR, pooledAllocator);

    final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
    final AtomicReference<Channel> channelRef = new AtomicReference<>();
    // 為根引導程序設置管道初始化回調函數
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
      @Override
      public void initChannel(SocketChannel ch) {
        TransportChannelHandler clientHandler = context.initializePipeline(ch);
        clientRef.set(clientHandler.getClient());
        channelRef.set(ch);
      }
    });

    long preConnect = System.nanoTime();
    ChannelFuture cf = bootstrap.connect(address);// 使用根引導程序連接遠程服務器
    if (!cf.await(conf.connectionTimeoutMs())) {
      throw new IOException(
        String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
    } else if (cf.cause() != null) {
      throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
    }

    TransportClient client = clientRef.get();
    Channel channel = channelRef.get();
    assert client != null : "Channel future completed successfully with null client";

    // Execute any client bootstraps synchronously before marking the Client as successful.
    long preBootstrap = System.nanoTime();
    logger.debug("Connection to {} successful, running bootstraps...", address);
    try {
      for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
        clientBootstrap.doBootstrap(client, channel);// 給TransportClient設置客戶端引導程序
      }
    } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala
      long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;
      logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e);
      client.close();
      throw Throwables.propagate(e);
    }
    long postBootstrap = System.nanoTime();

    logger.info("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
      address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);

    return client;
  }

從代碼清單8得知,真正創建TransportClient的步驟如下:

  1. 構建根引導器Bootstrap並對其進行配置;
  2. 為根引導程序設置管道初始化回調函數,此回調函數將調用TransportContext的initializePipeline方法初始化Channel的pipeline;
  3. 使用根引導程序連接遠程服務器,當連接成功對管道初始化時會回調初始化回調函數,將TransportClient和Channel對象分別設置到原子引用clientRef與channelRef中;
  4. 給TransportClient設置客戶端引導程序,即設置TransportClientFactory中的TransportClientBootstrap列表;
  5. 最后返回此TransportClient對象。

 三、RPC服務器TransportServer

         TransportServer是RPC框架的服務端,可提供高效的、低級別的流服務。在說明圖1中的記號②時提到過TransportContext的createServer方法用於創建TransportServer,其實現見代碼清單9。

代碼清單9         創建RPC服務端

  public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, null, port, rpcHandler, bootstraps);
  }

  public TransportServer createServer(
      String host, int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, host, port, rpcHandler, bootstraps);
  }

  public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
    return createServer(0, bootstraps);
  }

  public TransportServer createServer() {
    return createServer(0, Lists.<TransportServerBootstrap>newArrayList());
  }

代碼清單9中列出了四個名為createServer的重載方法,但是它們最終調用了TransportServer的構造器(見代碼清單10)來創建TransportServer實例。

代碼清單10         TransportServer的構造器

  public TransportServer(
      TransportContext context,
      String hostToBind,
      int portToBind,
      RpcHandler appRpcHandler,
      List<TransportServerBootstrap> bootstraps) {
    this.context = context;
    this.conf = context.getConf();
    this.appRpcHandler = appRpcHandler;
    this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));

    try {
      init(hostToBind, portToBind);
    } catch (RuntimeException e) {
      JavaUtils.closeQuietly(this);
      throw e;
    }
  }

TransportServer的構造器中的各個變量分別為:

  • context:即參數傳遞的TransportContext的引用;
  • conf:即TransportConf,這里通過調用TransportContext的getConf獲取;
  • appRpcHandler:即RPC請求處理器RpcHandler;
  • bootstraps:即參數傳遞的TransportServerBootstrap列表;

         TransportServer的構造器(見代碼清單10)中調用了init方法, init方法用於對TransportServer進行初始化,見代碼清單11。

代碼清單11         TransportServer的初始化

  private void init(String hostToBind, int portToBind) {
    // 根據Netty的API文檔,Netty服務端需同時創建bossGroup和workerGroup
    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    EventLoopGroup bossGroup =
      NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
    EventLoopGroup workerGroup = bossGroup;
    // 創建一個匯集ByteBuf但對本地線程緩存禁用的分配器
    PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
      conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
    // 創建Netty的服務端根引導程序並對其進行配置
    bootstrap = new ServerBootstrap()
      .group(bossGroup, workerGroup)
      .channel(NettyUtils.getServerChannelClass(ioMode))
      .option(ChannelOption.ALLOCATOR, allocator)
      .childOption(ChannelOption.ALLOCATOR, allocator);

    if (conf.backLog() > 0) {
      bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
    }
    if (conf.receiveBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
    }
    if (conf.sendBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
    }
    // 為根引導程序設置管道初始化回調函數
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        RpcHandler rpcHandler = appRpcHandler;
        for (TransportServerBootstrap bootstrap : bootstraps) {
          rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
        }
        context.initializePipeline(ch, rpcHandler);
      }
    });
    // 給根引導程序綁定Socket的監聽端口
    InetSocketAddress address = hostToBind == null ?
        new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
    channelFuture = bootstrap.bind(address);
    channelFuture.syncUninterruptibly();

    port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
    logger.debug("Shuffle server started on port: {}", port);
  }

代碼清單11中TransportServer初始化的步驟如下:

  1. 創建bossGroup和workerGroup;
  2. 創建一個匯集ByteBuf但對本地線程緩存禁用的分配器;
  3. 調用Netty的API創建Netty的服務端根引導程序並對其進行配置;
  4. 為根引導程序設置管道初始化回調函數,此回調函數首先設置TransportServerBootstrap到根引導程序中,然后調用TransportContext的initializePipeline方法初始化Channel的pipeline;
  5. 給根引導程序綁定Socket的監聽端口,最后返回監聽的端口。

小貼士:根據Netty的API文檔,Netty服務端需同時創建bossGroup和workerGroup。

提示:代碼清單11中使用了NettyUtils工具類的很多方法,在附錄G中有對它們的詳細介紹。EventLoopGroup、PooledByteBufAllocator、ServerBootstrap都是Netty提供的API,對於它們的更多介紹,請訪問http://netty.io/


 四、管道初始化

         在代碼清單8創建TransportClient和代碼清單11對TransportServer初始化的實現中都在管道初始化回調函數中調用了TransportContext的initializePipeline方法,initializePipeline方法(見代碼清單12)將調用Netty的API對管道初始化。

代碼清單12         管道初始化

  public TransportChannelHandler initializePipeline(
      SocketChannel channel,
      RpcHandler channelRpcHandler) {
    try {
      TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
      channel.pipeline()
        .addLast("encoder", ENCODER)
        .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
        .addLast("decoder", DECODER)
        .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
        .addLast("handler", channelHandler);
      return channelHandler;
    } catch (RuntimeException e) {
      logger.error("Error while initializing Netty pipeline", e);
      throw e;
    }
  }

 根據代碼清單12,initializePipeline方法的執行步驟如下:

  1. 調用createChannelHandler方法創建TransportChannelHandler,從createChannelHandler的實現(見代碼清單13)中可以看到真正創建TransportClient是在這里發生的。從TransportClient的構造過程看到RpcHandler 與TransportClient毫無關系,TransportClient只使用了TransportResponseHandler。TransportChannelHandler在服務端將代理TransportRequestHandler對請求消息進行處理,並在客戶端代理TransportResponseHandler對響應消息進行處理。
  2. 對管道進行設置,這里的ENCODER(即MessageEncoder)派生自Netty的ChannelOutboundHandler接口;DECODER(即MessageDecoder)、TransportChannelHandler以及TransportFrameDecoder(由工具類NettyUtils的靜態方法createFrameDecoder創建)派生自Netty的ChannelInboundHandler接口;IdleStateHandler同時實現了ChannelOutboundHandler和ChannelInboundHandler接口。根據Netty的API行為,通過addLast方法注冊多個handler時,ChannelInboundHandler按照注冊的先后順序執行;ChannelOutboundHandler按照注冊的先后順序逆序執行,因此在管道兩端(無論是服務端還是客戶端)處理請求和響應的流程如圖3所示。

 代碼清單13         創建TransportChannelHandler

  private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
    TransportClient client = new TransportClient(channel, responseHandler);
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
      rpcHandler);
    return new TransportChannelHandler(client, responseHandler, requestHandler,
      conf.connectionTimeoutMs(), closeIdleConnections);
  }

圖3       管道處理請求和響應的流程圖

 五、TransportChannelHandler詳解

 

         TransportChannelHandler實現了Netty的ChannelInboundHandler[1],以便對Netty管道中的消息進行處理。圖3中的這些Handler(除了MessageEncoder)由於都實現了ChannelInboundHandler接口,作為自定義的ChannelInboundHandler,因而都要重寫channelRead方法。Netty框架使用工作鏈模式來對每個ChannelInboundHandler的實現類的channelRead方法進行鏈式調用。TransportChannelHandler實現的channelRead方法見代碼清單14。

代碼清單14       TransportChannelHandler的channelRead實現

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
    if (request instanceof RequestMessage) {
      requestHandler.handle((RequestMessage) request);
    } else if (request instanceof ResponseMessage) {
      responseHandler.handle((ResponseMessage) request);
    } else {
      ctx.fireChannelRead(request);
    }
  }

從代碼清單14看到,當TransportChannelHandler讀取到的request是RequestMessage類型時,則將此消息的處理進一步交給TransportRequestHandler,當request是ResponseMessage時,則將此消息的處理進一步交給TransportResponseHandler。

5.1、MessageHandler的繼承體系

         TransportRequestHandler與TransportResponseHandler都繼承自抽象類MessageHandler,MessageHandler定義了子類的規范,詳細定義見代碼清單15。

代碼清單15         MessageHandler規范

public abstract class MessageHandler<T extends Message> {
  public abstract void handle(T message) throws Exception;
  public abstract void channelActive();
  public abstract void exceptionCaught(Throwable cause);
  public abstract void channelInactive();
}

MessageHandler中定義的各個方法的作用分別為:

  • handle:用於對接收到的單個消息進行處理;
  • channelActive:當channel激活時調用;
  • exceptionCaught:當捕獲到channel發生異常時調用;
  • channelInactive:當channel非激活時調用;

Spark中MessageHandler類的繼承體系如圖4所示。

圖4       MessageHandler類的繼承體系

5.2、Message的繼承體系

         根據代碼清單15,我們知道MessageHandler同時也是一個Java泛型類,其子類能處理的消息都派生自接口Message。Message的定義見代碼清單16。

代碼清單16         Message的定義

public interface Message extends Encodable {
  Type type();
  ManagedBuffer body();
  boolean isBodyInFrame();

Message中定義的三個接口方法的作用分別為:

  • type:返回消息的類型;
  • body:返回消息中可選的內容體;
  • isBodyInFrame:用於判斷消息的主體是否包含在消息的同一幀中。

Message接口繼承了Encodable接口,Encodable的定義見代碼清單17。

代碼清單17         Encodable的定義

public interface Encodable {
  int encodedLength();
  void encode(ByteBuf buf);
}

實現Encodable接口的類將可以轉換到一個ByteBuf中,多個對象將被存儲到預先分配的單個ByteBuf,所以這里的encodedLength用於返回轉換的對象數量。下面一起來看看Message的類繼承體系,如圖5所示。

圖5       Message的類繼承體系

從圖5看到最終的消息實現類都直接或間接的實現了RequestMessage或ResponseMessage接口,其中RequestMessage的具體實現有四種,分別是:

  • ChunkFetchRequest:請求獲取流的單個塊的序列。
  • RpcRequest:此消息類型由遠程的RPC服務端進行處理,是一種需要服務端向客戶端回復的RPC請求信息類型。
  • OneWayMessage:此消息也需要由遠程的RPC服務端進行處理,與RpcRequest不同的是不需要服務端向客戶端回復。
  • StreamRequest:此消息表示向遠程的服務發起請求,以獲取流式數據。

由於OneWayMessage 不需要響應,所以ResponseMessage的對於成功或失敗狀態的實現各有三種,分別是:

  • ChunkFetchSuccess:處理ChunkFetchRequest成功后返回的消息;
  • ChunkFetchFailure:處理ChunkFetchRequest失敗后返回的消息;
  • RpcResponse:處理RpcRequest成功后返回的消息;
  • RpcFailure:處理RpcRequest失敗后返回的消息;
  • StreamResponse:處理StreamRequest成功后返回的消息;
  • StreamFailure:處理StreamRequest失敗后返回的消息;

5.3、ManagedBuffer的繼承體系

         回頭再看看代碼清單16中對body接口的定義,可以看到其返回內容體的類型為ManagedBuffer。ManagedBuffer提供了由字節構成數據的不可變視圖(也就是說ManagedBuffer並不存儲數據,也不是數據的實際來源,這同關系型數據庫的視圖類似)。我們先來看看抽象類ManagedBuffer中對行為的定義,見代碼清單18。

代碼清單18         ManagedBuffer的定義

public abstract class ManagedBuffer {
  public abstract long size();
  public abstract ByteBuffer nioByteBuffer() throws IOException;
  public abstract InputStream createInputStream() throws IOException;
  public abstract ManagedBuffer retain();
  public abstract ManagedBuffer release();
  public abstract Object convertToNetty() throws IOException;
}

ManagedBuffer中定義了六個方法,分別為:

  • size:返回數據的字節數。
  • nioByteBuffer:將數據按照Nio的ByteBuffer類型返回。
  • createInputStream:將數據按照InputStream返回。
  • retain:當有新的使用者使用此視圖時,增加引用此視圖的引用數。
  • release:當有使用者不再使用此視圖時,減少引用此視圖的引用數。當引用數為0時釋放緩沖區。
  • convertToNetty:將緩沖區的數據轉換為Netty的對象,用來將數據寫到外部。此方法返回的數據類型要么是io.netty.buffer.ByteBuf,要么是io.netty.channel.FileRegion。

ManagedBuffer的具體實現有很多,我們可以通過圖6來了解。

圖6       ManagedBuffer的繼承體系

圖6中列出了ManagedBuffer的五個實現類,其中TestManagedBuffer和RecordingManagedBuffer用於測試。NettyManagedBuffer中的緩沖為io.netty.buffer.ByteBuf,NioManagedBuffer中的緩沖為java.nio.ByteBuffer。NettyManagedBuffer和NioManagedBuffer的實現都非常簡單,留給讀者自行閱讀。本節挑選FileSegmentManagedBuffer作為ManagedBuffer具體實現的例子進行介紹。

         FileSegmentManagedBuffer的作用為獲取一個文件中的一段,它一共有四個由final修飾的屬性,全部都通過FileSegmentManagedBuffer的構造器傳入屬性值,這四個屬性為:

  • conf:即TransportConf。
  • file:所要讀取的文件。
  • offset:所要讀取文件的偏移量。
  • length:所要讀取的長度。

下面將逐個介紹FileSegmentManagedBuffer對於ManagedBuffer的實現。

  • NIO方式讀取文件。FileSegmentManagedBuffer實現的nioByteBuffer方法見代碼清單19。

代碼清單19         nioByteBuffer方法的實現

  @Override
  public ByteBuffer nioByteBuffer() throws IOException {
    FileChannel channel = null;
    try {
      channel = new RandomAccessFile(file, "r").getChannel();
      if (length < conf.memoryMapBytes()) {
        ByteBuffer buf = ByteBuffer.allocate((int) length);
        channel.position(offset);
        while (buf.remaining() != 0) {
          if (channel.read(buf) == -1) {
            throw new IOException(String.format("Reached EOF before filling buffer\n" +
              "offset=%s\nfile=%s\nbuf.remaining=%s",
              offset, file.getAbsoluteFile(), buf.remaining()));
          }
        }
        buf.flip();
        return buf;
      } else {
        return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
      }
    } catch (IOException e) {
      try {
        if (channel != null) {
          long size = channel.size();
          throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
            e);
        }
      } catch (IOException ignored) {
        // ignore
      }
      throw new IOException("Error in opening " + this, e);
    } finally {
      JavaUtils.closeQuietly(channel);
    }
  }

nioByteBuffer的實現還是很簡單的,主要利用RandomAccessFile獲取FileChannel,然后使用java.nio.ByteBuffer和FileChannel的API將數據寫入緩沖區java.nio.ByteBuffer中。

  • 文件流方式讀取文件。FileSegmentManagedBuffer實現的createInputStream方法見代碼清單20。

代碼清單20         createInputStream的實現

  @Override
  public InputStream createInputStream() throws IOException {
    FileInputStream is = null;
    try {
      is = new FileInputStream(file);
      ByteStreams.skipFully(is, offset);
      return new LimitedInputStream(is, length);
    } catch (IOException e) {
      try {
        if (is != null) {
          long size = file.length();
          throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
              e);
        }
      } catch (IOException ignored) {
        // ignore
      } finally {
        JavaUtils.closeQuietly(is);
      }
      throw new IOException("Error in opening " + this, e);
    } catch (RuntimeException e) {
      JavaUtils.closeQuietly(is);
      throw e;
    }
  }

createInputStream的實現還是很簡單的,這里不多作介紹。

  • 將數據轉換為Netty對象。FileSegmentManagedBuffer實現的convertToNetty方法見代碼清單21。

代碼清單21         convertToNetty的實現

  @Override
  public Object convertToNetty() throws IOException {
    if (conf.lazyFileDescriptor()) {
      return new DefaultFileRegion(file, offset, length);
    } else {
      FileChannel fileChannel = new FileInputStream(file).getChannel();
      return new DefaultFileRegion(fileChannel, offset, length);
    }
  }
  • 其他方法的實現。其他方法由於實現非常簡單,所以這里就不一一列出了,感興趣的讀者可以自行查閱。

[1] ChannelInboundHandler接口的實現及原理不屬於本書要分析的內容,感興趣的同學可以閱讀Netty的官方文檔或者研究Netty的源碼。

六、服務端RpcHandler詳解

 

         由於TransportRequestHandler實際是把請求消息交給RpcHandler進一步處理的,所以這里對RpcHandler首先做個介紹。RpcHandler是一個抽象類,定義了一些RPC處理器的規范,其主要實現見代碼清單22。

代碼清單22         RpcHandler的實現

public abstract class RpcHandler {

  private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback();

  public abstract void receive(
      TransportClient client,
      ByteBuffer message,
      RpcResponseCallback callback);

  public abstract StreamManager getStreamManager();

  public void receive(TransportClient client, ByteBuffer message) {
    receive(client, message, ONE_WAY_CALLBACK);
  }

  public void channelActive(TransportClient client) { }

  public void channelInactive(TransportClient client) { }

  public void exceptionCaught(Throwable cause, TransportClient client) { }

  private static class OneWayRpcCallback implements RpcResponseCallback {

    private static final Logger logger = LoggerFactory.getLogger(OneWayRpcCallback.class);

    @Override
    public void onSuccess(ByteBuffer response) {
      logger.warn("Response provided for one-way RPC.");
    }

    @Override
    public void onFailure(Throwable e) {
      logger.error("Error response provided for one-way RPC.", e);
    }

  }

}

代碼清單22中RpcHandler的各個方法的作用如下:

  • receive:這是一個抽象方法,用來接收單一的RPC消息,具體處理邏輯需要子類去實現。receive接收三個參數,分別是TransportClient、ByteBuffer和RpcResponseCallback。RpcResponseCallback用於對請求處理結束后進行回調,無論處理結果是成功還是失敗,RpcResponseCallback都會被調用一次。RpcResponseCallback的接口定義如下:
public interface RpcResponseCallback {
  void onSuccess(ByteBuffer response);
  void onFailure(Throwable e);
}
  • 重載的receive:只接收TransportClient和ByteBuffer兩個參數,RpcResponseCallback為默認的ONE_WAY_CALLBACK,其類型為OneWayRpcCallback,從代碼清單22中OneWayRpcCallback的實現可以看出其onSuccess和onFailure只是打印日志,並沒有針對客戶端做回復處理。
  • channelActive:當與給定客戶端相關聯的channel處於活動狀態時調用。
  • channelInactive:當與給定客戶端相關聯的channel處於非活動狀態時調用。
  • exceptionCaught:當channel產生異常時調用。
  • getStreamManager:獲取StreamManager,StreamManager可以從流中獲取單個的塊,因此它也包含着當前正在被TransportClient獲取的流的狀態。

介紹完RpcHandler,現在回到TransportRequestHandler的處理過程。TransportRequestHandler處理以上四種RequestMessage的實現見代碼清單23。

代碼清單23         TransportRequestHandler的handle方法

  @Override
  public void handle(RequestMessage request) {
    if (request instanceof ChunkFetchRequest) {
      processFetchRequest((ChunkFetchRequest) request);
    } else if (request instanceof RpcRequest) {
      processRpcRequest((RpcRequest) request);
    } else if (request instanceof OneWayMessage) {
      processOneWayMessage((OneWayMessage) request);
    } else if (request instanceof StreamRequest) {
      processStreamRequest((StreamRequest) request);
    } else {
      throw new IllegalArgumentException("Unknown request type: " + request);
    }
  }

結合代碼清單23,下面逐一詳細分析這四種類型請求的處理過程。

6.1、處理塊獲取請求

         processFetchRequest方法用於處理ChunkFetchRequest類型的消息,其實現見代碼清單24。

代碼清單24         processFetchRequest的實現

  private void processFetchRequest(final ChunkFetchRequest req) {
    if (logger.isTraceEnabled()) {
      logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel),
        req.streamChunkId);
    }

    ManagedBuffer buf;
    try {
      streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId);
      streamManager.registerChannel(channel, req.streamChunkId.streamId);
      buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex);
    } catch (Exception e) {
      logger.error(String.format("Error opening block %s for request from %s",
        req.streamChunkId, getRemoteAddress(channel)), e);
      respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e)));
      return;
    }

    respond(new ChunkFetchSuccess(req.streamChunkId, buf));
  }

代碼清單24中的streamManager是通過調用RpcHandler的getStreamManager方法獲取的StreamManager。processFetchRequest的處理都依托於RpcHandler的StreamManager,其處理步驟如下:

  1. 調用StreamManager的checkAuthorization方法,校驗客戶端是否有權限從給定的流中讀取;
  2. 調用StreamManager的registerChannel方法,將一個流和一條(只能是一條)客戶端的TCP連接關聯起來,這可以保證對於單個的流只會有一個客戶端讀取。流關閉之后就永遠不能夠重用了;
  3. 調用StreamManager的getChunk方法,獲取單個的塊(塊被封裝為ManagedBuffer)。由於單個的流只能與單個的TCP連接相關聯,因此getChunk方法不能為了某個特殊的流而並行調用;
  4. 將ManagedBuffer和流的塊Id封裝為ChunkFetchSuccess后,調用respond方法返回給客戶端。

有關StreamManager的具體實現,讀者可以參考《Spark內核設計的藝術》一書5.3.5節介紹的NettyStreamManager和《Spark內核設計的藝術》一書6.9.2節介紹的NettyBlockRpcServer中的OneForOneStreamManager。

6.2、處理RPC請求

         processRpcRequest方法用於處理RpcRequest類型的消息,其實現見代碼清單25。

代碼清單25         processRpcRequest的實現

  private void processRpcRequest(final RpcRequest req) {
    try {
      rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
        @Override
        public void onSuccess(ByteBuffer response) {
          respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
        }

        @Override
        public void onFailure(Throwable e) {
          respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
        }
      });
    } catch (Exception e) {
      logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
      respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
    } finally {
      req.body().release();
    }
  }

代碼清單25中將RpcRequest消息的內容體、發送消息的客戶端以及一個RpcResponseCallback類型的匿名內部類作為參數傳遞給了RpcHandler的receive方法。這就是說真正用於處理RpcRequest消息的是RpcHandler,而非TransportRequestHandler。由於RpcHandler是抽象類(見代碼清單22),其receive方法也是抽象方法,所以具體的操作將由RpcHandler的實現了receive方法的子類來完成。所有繼承RpcHandler的子類都需要在其receive方法的具體實現中回調RpcResponseCallback的onSuccess(處理成功時)或者onFailure(處理失敗時)方法。從RpcResponseCallback的實現來看,無論處理結果成功還是失敗,都將調用respond方法對客戶端進行響應。

6.3、處理流請求

         processStreamRequest方法用於處理StreamRequest類型的消息,其實現見代碼清單26。

代碼清單26         processStreamRequest的實現

  private void processStreamRequest(final StreamRequest req) {
    ManagedBuffer buf;
    try {
      buf = streamManager.openStream(req.streamId);// 將獲取到的流數據封裝為ManagedBuffer
    } catch (Exception e) {
      logger.error(String.format(
        "Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e);
      respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e)));
      return;
    }

    if (buf != null) {
      respond(new StreamResponse(req.streamId, buf.size(), buf));
    } else {
      respond(new StreamFailure(req.streamId, String.format(
        "Stream '%s' was not found.", req.streamId)));
    }
  }

代碼清單26中也使用了RpcHandler的StreamManager,其處理步驟如下:

  1. 調用StreamManager的openStream方法將獲取到的流數據封裝為ManagedBuffer;
  2. 當成功或失敗時調用respond方法向客戶端響應。

6.4、處理無需回復的RPC請求

         processOneWayMessage方法用於處理StreamRequest類型的消息,其實現見代碼清單27。

代碼清單27         processOneWayMessage的實現

  private void processOneWayMessage(OneWayMessage req) {
    try {
      rpcHandler.receive(reverseClient, req.body().nioByteBuffer());
    } catch (Exception e) {
      logger.error("Error while invoking RpcHandler#receive() for one-way message.", e);
    } finally {
      req.body().release();
    }
  }

processOneWayMessage方法的實現processRpcRequest非常相似,區別在於processOneWayMessage調用了代碼清單22中ONE_WAY_CALLBACK的receive方法,因而processOneWayMessage在處理完RPC請求后不會對客戶端作出響應。

         從以上四種處理的分析可以看出最終的處理都由RpcHandler及其內部組件完成。除了OneWayMessage的消息外,其余三種消息都是最終調用respond方法響應客戶端,其實現見代碼清單28。

代碼清單28         respond的實現

  private void respond(final Encodable result) {
    final SocketAddress remoteAddress = channel.remoteAddress();
    channel.writeAndFlush(result).addListener(
      new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (future.isSuccess()) {
            logger.trace("Sent result {} to client {}", result, remoteAddress);
          } else {
            logger.error(String.format("Error sending result %s to %s; closing connection",
              result, remoteAddress), future.cause());
            channel.close();
          }
        }
      }
    );
  }

可以看到respond方法中實際調用了Channel的writeAndFlush方法[1]來響應客戶端。



[1] Channel的writeAndFlush方法涉及Netty的實現細節及原理,這並不是本書所要闡述的內容,有興趣的讀者可以訪問Netty官網:http://netty.io獲取更多信息。

 

七、服務端引導程序TransportServerBootstrap

       TransportServer的構造器(見代碼清單10)中的bootstraps是TransportServerBootstrap的列表。接口TransportServerBootstrap定義了服務端引導程序的規范,服務端引導程序旨在當客戶端與服務端建立連接之后,在服務端持有的客戶端管道上執行的引導程序。TransportServerBootstrap的定義見代碼清單29。

代碼清單29         TransportServerBootstrap的定義 

public interface TransportServerBootstrap {
  RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler);
}

TransportServerBootstrap的doBootstrap方法將對服務端的RpcHandler進行代理,接收客戶端的請求。TransportServerBootstrap有SaslServerBootstrap和EncryptionCheckerBootstrap兩個實現類。為了更清楚的說明TransportServerBootstrap的意義,我們以SaslServerBootstrap為例,來講解其實現(見代碼清單30)。

代碼清單30         SaslServerBootstrap的doBootstrap實現

  public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
    return new SaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder);
  }

根據代碼清單30,我們知道SaslServerBootstrap的doBootstrap方法實際創建了SaslRpcHandler,SaslRpcHandler負責對管道進行SASL(Simple Authentication and Security Layer)加密。SaslRpcHandler本身也繼承了RpcHandler,所以我們重點來看其receive方法的實現,見代碼清單31。

代碼清單31         SaslRpcHandler的receive方法

  @Override
  public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
    if (isComplete) {
      // 將消息傳遞給SaslRpcHandler所代理的下游RpcHandler並返回
      delegate.receive(client, message, callback);
      return;
    }

    ByteBuf nettyBuf = Unpooled.wrappedBuffer(message);
    SaslMessage saslMessage;
    try {
      saslMessage = SaslMessage.decode(nettyBuf);// 對客戶端發送的消息進行SASL解密
    } finally {
      nettyBuf.release();
    }

    if (saslServer == null) {
      // 如果saslServer還未創建,則需要創建SparkSaslServer
      client.setClientId(saslMessage.appId);
      saslServer = new SparkSaslServer(saslMessage.appId, secretKeyHolder,
        conf.saslServerAlwaysEncrypt());
    }

    byte[] response;
    try {
      response = saslServer.response(JavaUtils.bufferToArray(// 使用saslServer處理已解密的消息
        saslMessage.body().nioByteBuffer()));
    } catch (IOException ioe) {
      throw new RuntimeException(ioe);
    }
    callback.onSuccess(ByteBuffer.wrap(response));

    if (saslServer.isComplete()) {
      logger.debug("SASL authentication successful for channel {}", client);
      isComplete = true;// SASL認證交換已經完成
      if (SparkSaslServer.QOP_AUTH_CONF.equals(saslServer.getNegotiatedProperty(Sasl.QOP))) {
        logger.debug("Enabling encryption for channel {}", client);
        // 對管道進行SASL加密
        SaslEncryption.addToChannel(channel, saslServer, conf.maxSaslEncryptedBlockSize());
        saslServer = null;
      } else {
        saslServer.dispose();
        saslServer = null;
      }
    }
  }

根據代碼清單31,SaslRpcHandler處理客戶端消息的步驟如下:

  1. 如果SASL認證交換已經完成(isComplete等於true),則將消息傳遞給SaslRpcHandler所代理的下游RpcHandler並返回。
  2. 如果SASL認證交換未完成(isComplete等於false),則對客戶端發送的消息進行SASL解密。
  3. 如果saslServer還未創建,則需要創建SparkSaslServer。當SaslRpcHandler接收到客戶端的第一條消息時會做此操作。
  4. 使用saslServer處理已解密的消息,並將處理結果通過RpcResponseCallback的回調方法返回給客戶端。
  5. 如果SASL認證交換已經完成,則將isComplete置為true。
  6. 對管道進行SASL加密。

SaslServerBootstrap是通過SaslRpcHandler對下游RpcHandler進行代理的一種TransportServerBootstrap。EncryptionCheckerBootstrap是另一種TransportServerBootstrap的實現,它通過將自身加入Netty的管道中實現引導,EncryptionCheckerBootstrap的doBootstrap方法的實現見代碼清單32。

代碼清單32         EncryptionCheckerBootstrap的doBootstrap實現

    @Override
    public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) {
      channel.pipeline().addFirst("encryptionChecker", this);
      return rpcHandler;
    }

         在詳細介紹了TransportChannelHandler之后我們就可以對圖3-3進行擴展,把TransportRequestHandler、TransportServerBootstrap及RpcHandler的處理流程增加進來,如圖7所示。

圖7       RPC框架服務端處理請求、響應流程圖

有讀者可能會問,圖7中並未見TransportServerBootstrap的身影。根據對TransportServerBootstrap的兩種實現的舉例,我們知道TransportServerBootstrap將可能存在於圖中任何兩個組件的箭頭連線中間,起到引導、包裝、代理的作用。

八、客戶端TransportClient詳解

         在介紹完服務端RpcHandler對請求消息的處理之后,現在來看看客戶端發送RPC請求的原理。我們在分析代碼清單13中的createChannelHandler方法時,看到調用了TransportClient的構造器(見代碼清單33),其中TransportResponseHandler的引用將賦給handler屬性。

代碼清單33         TransportClient的構造器 

  public TransportClient(Channel channel, TransportResponseHandler handler) {
    this.channel = Preconditions.checkNotNull(channel);
    this.handler = Preconditions.checkNotNull(handler);
    this.timedOut = false;
  }

TransportClient一共有五個方法用於發送請求,分別為:

  • fetchChunk:從遠端協商好的流中請求單個塊;
  • stream:使用流的ID,從遠端獲取流數據;
  • sendRpc:向服務端發送RPC的請求,通過At least Once Delivery原則保證請求不會丟失;
  • sendRpcSync:向服務端發送異步的RPC的請求,並根據指定的超時時間等待響應;
  • send:向服務端發送RPC的請求,但是並不期望能獲取響應,因而不能保證投遞的可靠性;

本節只選擇最常用的sendRpc和fetchChunk進行分析,其余實現都可以觸類旁通。

8.1、發送RPC請求

         sendRpc方法的實現見代碼清單34。

代碼清單34         sendRpc的實現

  public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
    final long startTime = System.currentTimeMillis();
    if (logger.isTraceEnabled()) {
      logger.trace("Sending RPC to {}", getRemoteAddress(channel));
    }
    // 使用UUID生成請求主鍵requestId
    final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
    handler.addRpcRequest(requestId, callback);// 添加requestId與RpcResponseCallback的引用之間的關系
    // 發送RPC請求
    channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
      new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (future.isSuccess()) {
            long timeTaken = System.currentTimeMillis() - startTime;
            if (logger.isTraceEnabled()) {
              logger.trace("Sending request {} to {} took {} ms", requestId,
                getRemoteAddress(channel), timeTaken);
            }
          } else {
            String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId,
              getRemoteAddress(channel), future.cause());
            logger.error(errorMsg, future.cause());
            handler.removeRpcRequest(requestId);
            channel.close();
            try {
              callback.onFailure(new IOException(errorMsg, future.cause()));
            } catch (Exception e) {
              logger.error("Uncaught exception in RPC response callback handler!", e);
            }
          }
        }
      });

    return requestId;
  }

結合代碼清單34,我們知道sendRpc方法的實現步驟如下:

1)    使用UUID生成請求主鍵requestId;

2)    調用addRpcRequest向handler(特別提醒下讀者這里的handler不是RpcHandler,而是通過TransportClient構造器傳入的TransportResponseHandler)添加requestId與回調類RpcResponseCallback的引用之間的關系。TransportResponseHandler的addRpcRequest方法(見代碼清單35)將更新最后一次請求的時間為當前系統時間,然后將requestId與RpcResponseCallback之間的映射加入到outstandingRpcs緩存中。outstandingRpcs專門用於緩存發出的RPC請求信息。

代碼清單35         添加RPC請求到緩存

  public void addRpcRequest(long requestId, RpcResponseCallback callback) {
    updateTimeOfLastRequest();
    outstandingRpcs.put(requestId, callback);
  }

3)    調用Channel的writeAndFlush方法將RPC請求發送出去,這和在代碼清單28中服務端調用的respond方法響應客戶端的一樣,都是使用Channel的writeAndFlush方法。當發送成功或者失敗時會回調ChannelFutureListener的operationComplete方法。如果發送成功,那么只會打印requestId、遠端地址及花費時間的日志,如果發送失敗,除了打印錯誤日志外,還要調用TransportResponseHandler的removeRpcRequest方法(見代碼清單36)將此次請求從outstandingRpcs緩存中移除。

代碼清單36         從緩存中刪除RPC請求

  public void removeRpcRequest(long requestId) {
    outstandingRpcs.remove(requestId);
  }

請求發送成功后,客戶端將等待接收服務端的響應。根據圖3,返回的消息也會傳遞給TransportChannelHandler的channelRead方法(見代碼清單14),根據之前的分析,消息的分析將最后交給TransportResponseHandler的handle方法來處理。TransportResponseHandler的handle方法分別對圖5中的六種ResponseMessage進行處理,由於服務端使用processRpcRequest方法(見代碼清單25)處理RpcRequest類型的消息后返回給客戶端的消息為RpcResponse或RpcFailure,所以我們來看看客戶端的TransportResponseHandler的handle方法是如何處理RpcResponse和RpcFailure,見代碼清單37。

代碼清單37         RpcResponse和RpcFailure消息的處理

    } else if (message instanceof RpcResponse) {
      RpcResponse resp = (RpcResponse) message;
      RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);// 獲取RpcResponseCallback
      if (listener == null) {
        logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
          resp.requestId, getRemoteAddress(channel), resp.body().size());
      } else {
        outstandingRpcs.remove(resp.requestId);
        try {
          listener.onSuccess(resp.body().nioByteBuffer());
        } finally {
          resp.body().release();
        }
      }
    } else if (message instanceof RpcFailure) {
      RpcFailure resp = (RpcFailure) message;
      RpcResponseCallback listener = outstandingRpcs.get(resp.requestId); // 獲取RpcResponseCallback
      if (listener == null) {
        logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
          resp.requestId, getRemoteAddress(channel), resp.errorString);
      } else {
        outstandingRpcs.remove(resp.requestId);
        listener.onFailure(new RuntimeException(resp.errorString));
      }

從代碼清單37看到,處理RpcResponse的邏輯為:

  1. 使用RpcResponse對應的RpcRequest的主鍵requestId,從outstandingRpcs緩存中獲取注冊的RpcResponseCallback,此處的RpcResponseCallback即為代碼清單34中傳遞給sendRpc方法的RpcResponseCallback;
  2. 移除outstandingRpcs緩存中requestId和RpcResponseCallback的注冊信息;
  3. 調用RpcResponseCallback的onSuccess方法,處理成功響應后的具體邏輯。這里的RpcResponseCallback需要各個使用TransportClient的sendRpc方法的場景中分別實現;
  4. 最后釋放RpcResponse的body,回收資源。

處理RpcFailure的邏輯為:

  1. 使用RpcFailure對應的RpcRequest的主鍵requestId,從outstandingRpcs緩存中獲取注冊的RpcResponseCallback,此處的RpcResponseCallback即為代碼清單34中傳遞給sendRpc方法的RpcResponseCallback;
  2. 移除outstandingRpcs緩存中requestId和RpcResponseCallback的注冊信息;
  3. 調用RpcResponseCallback的onFailure方法,處理失敗響應后的具體邏輯。這里的RpcResponseCallback需要在使用TransportClient的sendRpc方法時指定或實現。

8.2、發送獲取塊請求

         fetchChunk的實現見代碼清單38。

代碼清單38         fetchChunk的實現

  public void fetchChunk(
      long streamId,
      final int chunkIndex,
      final ChunkReceivedCallback callback) {
    final long startTime = System.currentTimeMillis();
    if (logger.isDebugEnabled()) {
      logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
    }

    final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);// 創建StreamChunkId
    // 添加StreamChunkId與ChunkReceivedCallback之間的對應關系
    handler.addFetchRequest(streamChunkId, callback);
    // 發送塊請求
    channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
      new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (future.isSuccess()) {
            long timeTaken = System.currentTimeMillis() - startTime;
            if (logger.isTraceEnabled()) {
              logger.trace("Sending request {} to {} took {} ms", streamChunkId,
                getRemoteAddress(channel), timeTaken);
            }
          } else {
            String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
              getRemoteAddress(channel), future.cause());
            logger.error(errorMsg, future.cause());
            handler.removeFetchRequest(streamChunkId);
            channel.close();
            try {
              callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
            } catch (Exception e) {
              logger.error("Uncaught exception in RPC response callback handler!", e);
            }
          }
        }
      });
  }

結合代碼清單38,我們知道fetchChunk方法的實現步驟如下:

1)    使用流的標記streamId和塊的索引chunkIndex創建StreamChunkId;

2)    調用addFetchRequest向handler(特別提醒下讀者這里的handler不是RpcHandler,而是通過TransportClient構造器傳入的TransportResponseHandler)添加StreamChunkId與回調類ChunkReceivedCallback的引用之間的關系。TransportResponseHandler的addFetchRequest方法(見代碼清單39)將更新最后一次請求的時間為當前系統時間,然后將StreamChunkId與ChunkReceivedCallback之間的映射加入到outstandingFetches緩存中。outstandingFetches專門用於緩存發出的塊請求信息。

代碼清單39         添加塊請求到緩存

  public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback) {
    updateTimeOfLastRequest();
    outstandingFetches.put(streamChunkId, callback);
  }

3)    調用Channel的writeAndFlush方法將塊請求發送出去,這和在代碼清單28中服務端調用的respond方法響應客戶端的一樣,都是使用Channel的writeAndFlush方法。當發送成功或者失敗時會回調ChannelFutureListener的operationComplete方法。如果發送成功,那么只會打印StreamChunkId、遠端地址及花費時間的日志,如果發送失敗,除了打印錯誤日志外,還要調用TransportResponseHandler的removeFetchRequest方法(見代碼清單40)將此次請求從outstandingFetches緩存中移除。

代碼清單40         從緩存中刪除RPC請求

  public void removeRpcRequest(long requestId) {

    outstandingRpcs.remove(requestId);

  }

請求發送成功后,客戶端將等待接收服務端的響應。根據圖3,返回的消息也會傳遞給TransportChannelHandler的channelRead方法(見代碼清單14),根據之前的分析,消息的分析將最后交給TransportResponseHandler的handle方法來處理。TransportResponseHandler的handle方法分別對圖5中的六種處理結果進行處理,由於服務端使用processFetchRequest方法(見代碼清單24)處理ChunkFetchRequest類型的消息后返回給客戶端的消息為ChunkFetchSuccess或ChunkFetchFailure,所以我們來看看客戶端的TransportResponseHandler的handle方法是如何處理ChunkFetchSuccess和ChunkFetchFailure,見代碼清單41。

代碼清單41         ChunkFetchSuccess和ChunkFetchFailure消息的處理

    if (message instanceof ChunkFetchSuccess) {
      ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
      ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
      if (listener == null) {
        logger.warn("Ignoring response for block {} from {} since it is not outstanding",
          resp.streamChunkId, getRemoteAddress(channel));
        resp.body().release();
      } else {
        outstandingFetches.remove(resp.streamChunkId);
        listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
        resp.body().release();
      }
    } else if (message instanceof ChunkFetchFailure) {
      ChunkFetchFailure resp = (ChunkFetchFailure) message;
      ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
      if (listener == null) {
        logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
          resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
      } else {
        outstandingFetches.remove(resp.streamChunkId);
        listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
          "Failure while fetching " + resp.streamChunkId + ": " + resp.errorString));
      }
    }

從代碼清單41看到,處理ChunkFetchSuccess的邏輯為:

  1. 使用ChunkFetchSuccess對應的StreamChunkId,從outstandingFetches緩存中獲取注冊的ChunkReceivedCallback,此處的ChunkReceivedCallback即為代碼清單38中傳遞給fetchChunk方法的ChunkReceivedCallback;
  2. 移除outstandingFetches緩存中StreamChunkId和ChunkReceivedCallback的注冊信息;
  3. 調用ChunkReceivedCallback的onSuccess方法,處理成功響應后的具體邏輯。這里的ChunkReceivedCallback需要各個使用TransportClient的fetchChunk方法的場景中分別實現;
  4. 最后釋放ChunkFetchSuccess的body,回收資源。

處理ChunkFetchFailure的邏輯為:

  1. 使用ChunkFetchFailure對應的StreamChunkId,從outstandingFetches緩存中獲取注冊的ChunkReceivedCallback,此處的ChunkReceivedCallback即為代碼清單38中傳遞給fetchChunk方法的ChunkReceivedCallback;
  2. 移除outstandingFetches緩存中StreamChunkId和ChunkReceivedCallback的注冊信息;
  3. 調用ChunkReceivedCallback的onFailure方法,處理失敗響應后的具體邏輯。這里的ChunkReceivedCallback需要各個使用TransportClient的fetchChunk方法的場景中分別實現。

         在詳細介紹了TransportClient和TransportResponseHandler之后,對於客戶端我們就可以擴展圖3,把TransportResponseHandler及TransportClient的處理流程增加進來,如圖8所示。

圖8       客戶端請求、響應流程圖

圖8中的序號①表示調用TransportResponseHandler的addRpcRequest方法(或addFetchRequest方法)將更新最后一次請求的時間為當前系統時間,然后將requestId與RpcResponseCallback之間的映射加入到outstandingRpcs緩存中(或將StreamChunkId與ChunkReceivedCallback之間的映射加入到outstandingFetches緩存中)。②表示調用Channel的writeAndFlush方法將RPC請求發送出去。圖中的虛線表示當TransportResponseHandler處理RpcResponse和RpcFailure時將從outstandingRpcs緩存中獲取此請求對應的RpcResponseCallback(或處理ChunkFetchSuccess和ChunkFetchFailure時將從outstandingFetches緩存中獲取StreamChunkId對應的ChunkReceivedCallback),並執行回調。此外,TransportClientBootstrap將可能存在於圖8中任何兩個組件的箭頭連線中間。

 

關於《Spark內核設計的藝術 架構設計與實現》

經過近一年的准備,基於Spark2.1.0版本的《Spark內核設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:

Spark內核設計的藝術

 

紙質版售賣鏈接如下:

京東:https://item.jd.com/12302500.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM