RPC-Thrift(一)


一個簡單例子

  IDL文件如下,詳細的IDL語法參考官方文檔http://thrift.apache.org/docs/idl。

  通過代碼生成工具得到兩個文件:HelloService.java和ResultCommon.java。

namespace java com.mytest.thrift

struct ResultCommon{
    1:i32      resultCode,
    2:string   desc
}

service HelloService{
    ResultCommon sayHello(1:string paramJson)
}

 

  Thrift業務HelloService.Iface接口的實現如下

public class HelloHandler implements HelloService.Iface {
    private Logger logger = LoggerFactory.getLogger(HelloHandler.class);
    @Override
    public ResultCommon sayHello(String paramJson) throws TException {
        logger.info("receive request param : {}", paramJson);
        ResultCommon response = new ResultCommon();
        response.setDesc("Hello World!");
        return response;
    }
}

 

  Thrift RPC服務端實現

public class RpcServer {
    public static void main(String[] args) throws TTransportException {
        //基於阻塞式同步IO模型
        TServerSocket tServerSocket = new TServerSocket(8090);
        HelloService.Processor<Iface> processor = new HelloService.Processor<HelloService.Iface>(new HelloHandler());
        Args args1 = new Args(tServerSocket);
        args1.processor(processor);
        //消息格式使用二進制 
        args1.protocolFactory(new TBinaryProtocol.Factory());
        //線程池的最大、最小線程數
        args1.maxWorkerThreads(10);
        args1.minWorkerThreads(1);
        //啟動服務
        TThreadPoolServer server = new TThreadPoolServer(args1);
        //在此處阻塞
        server.serve();
    }
}

 

  Thrift RPC客戶端實現

public class RpcClient {
    public static void main(String[] args) throws TException {
        TSocket tSocket = new TSocket("127.0.0.1", 8090);
        tSocket.open();
        TProtocol tProtocol = new TBinaryProtocol(tSocket);
        HelloService.Client client = new HelloService.Client(tProtocol);
        String paramJson = "{\"wewe\":\"111\"}";
        ResultCommon resultCommon = client.sayHello(paramJson);
        System.out.println(resultCommon.getDesc());
        tSocket.close();
    }
}

 

  注意點:1)Thrift客戶端和服務端使用的I/O模型必須一致,上例中都是使用阻塞式同步I/O模型。

      2)Thrift客戶端和服務端使用的消息格式必須一致,上例中都是使用二進制流格式TBinaryProtocol。

Thrift RPC詳解

  Thrift協議棧如下圖所示:  

 

   

    底層I/O模塊:負責實際的數據傳輸,可以是Socket、文件、壓縮數據流等;

    TTransport:定義了消息怎樣在Client和Server之間進行通信的,負責以字節流的方式發送和接收消息。TTransport不同的子類負責Thrift字節流(Byte Stream)數據在不同的IO模塊上的傳輸,如:TSocket負責Socket傳輸,TFileTransport負責文件傳輸;

    TProtocol:定義了消息時怎樣進行序列化的,即負責結構化數據(如對象、結構體等)與字節流消息的轉換,對Client側是將結構化數據組裝成字節流消息,對Server端則是從字節流消息中提取結構化數據。TProtocol不同的子類對應不同的消息格式轉換,如TBinaryProtocol對應字節流。

    TServer:負責接收客戶端請求,並將請求轉發給Processor。TServer各個子類實現機制不同,性能也差距很大。

    Processor:負責處理客戶端請求並返回響應,包括RPC請求轉發、參數解析、調用用戶定義的代碼等。Processor的代碼時Thrift根據IDL文件自動生成的,用戶只需根據自動生成的接口進行業務邏輯的實現就可以,Processor是Thrift框架轉入用戶邏輯的關鍵。

    ServiceClient:負責客戶端發送RPC請求,和Processor一樣,該部分的代碼也是由Thrift根據IDL文件自動生成的。

Thrift核心類庫實現原理

  TServer

    主要負責接收並轉發Client的請求。TServer的類結構圖如下:

      

    

 

    Thrift提供了多種TServer的實現,不同的TServer使用了不同的模型,適用的情況也有所不同。

      TSimpleServer:阻塞I/O單線程Server,主要用於測試;

      TThreadPoolServer:阻塞I/O多線程Server,多線程使用Java並發包中的線程池ThreadPoolExecutor。

      AbstractNonblockingServer:抽象類,為非阻塞I/O Server類提供共同的方法和類。

      TNonblockingServer:多路復用I/O單線程Server,依賴於TFramedTransport;

      THsHaServer:半同步/半異步Server,多線程處理業務邏輯調用,同樣依賴於TFramedTransport;

      TThreadedSelectorServer:半同步/半異步Server,依賴於TFramedTransport。

    下面詳細分析一下各個TServer的實現原理

    TSimpleServer

      TSimpleServer每次只能處理一個連接,直到客戶端關閉了連接,它才回去接受一個新的連接,正因為它只在一個單獨的線程中以阻塞I/O的方式完成這些工作,所以它只能服務一個客戶端連接,其他所有客戶端在被服務器端接受之前都只能等待。TSimpleServer的效率很低,不能用在生產環境。通過源碼具體分析實現機制。

public void serve() {
  stopped_ = false;
  try {
    //啟動監聽Socket
    serverTransport_.listen();
  } catch (TTransportException ttx) {
    LOGGER.error("Error occurred during listening.", ttx);
    return;
  }
  setServing(true);    //置狀態為正在服務
  //一次只能處理一個Socket連接
  while (!stopped_) {
    TTransport client = null;
    TProcessor processor = null;
    TTransport inputTransport = null;
    TTransport outputTransport = null;
    TProtocol inputProtocol = null;
    TProtocol outputProtocol = null;
    try {
      client = serverTransport_.accept(); //接收連接請求,若沒有則一直阻塞
      if (client != null) {
        processor = processorFactory_.getProcessor(client);
        inputTransport = inputTransportFactory_.getTransport(client);
        outputTransport = outputTransportFactory_.getTransport(client);
        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
        //處理該請求直到成功
        while (processor.process(inputProtocol, outputProtocol)) {}
      }
    } catch (TTransportException ttx) {
      // Client died, just move on
    } catch (TException tx) {
      if (!stopped_) {
        LOGGER.error("Thrift error occurred during processing of message.", tx);
      }
    } catch (Exception x) {
      if (!stopped_) {
        LOGGER.error("Error occurred during processing of message.", x);
      }
    }

    if (inputTransport != null) {
      inputTransport.close();
    }

    if (outputTransport != null) {
      outputTransport.close();
    }

  }
  setServing(false); 
}

 

      由源代碼可以分析出,TSimpleServer的處理流程如下:      

    

      

     TThreadPoolServer

      TThreadPoolServer也是基於阻塞I/O模型,與TSimpleServer不同的是,它使用線程池來提高效率。

      TThreadPoolServer的構造函數如下,使用了JDK並發包提供的線程池ThreadPoolExecutor,可配置最大線程數(默認為Integer.Max)和最小線程數(默認5),線程池的阻塞隊列使用的是SynchronousQueue,每個put操作必須等待一個take操作,如果不滿足條件,put操作和take操作將會被阻塞。

  // Executor service for handling client connections
  private ExecutorService executorService_;
  //關閉Server時的最長等待時間
  private final TimeUnit stopTimeoutUnit;
  private final long stopTimeoutVal;
  public TThreadPoolServer(Args args) {
    super(args);
    //同步阻塞隊列,每個put操作必須等待一個take操作,沒有容量,常用於線程間交換單一元素
    SynchronousQueue<Runnable> executorQueue =
      new SynchronousQueue<Runnable>();
    stopTimeoutUnit = args.stopTimeoutUnit;
    stopTimeoutVal = args.stopTimeoutVal;
    //初始化線程池
    executorService_ = new ThreadPoolExecutor(args.minWorkerThreads,
                                              args.maxWorkerThreads,
                                              60,
                                              TimeUnit.SECONDS,
                                              executorQueue);
  }

 

       再看一下TThreadPoolServer的serve()方法,主線程專門用來接受連接,一旦接收了一個連接,該Client連接會被放入ThreadPoolExecutor中的一個worker線程里處理,主線程繼續接收下一個Client連接請求。由於線程池的阻塞隊列使用的是SynchronousQueue,所以TThreadPoolServer能夠支撐的最大Client連接數為線程池的線程數,也就是說每個Client連接都會占用一個線程。需要注意的是,當並發的Client連接數很大時,Server端的線程數會很大,可能會引發Server端的性能問題。

  public void serve() {
    try {
      //啟動監聽Socket
      serverTransport_.listen();
    } catch (TTransportException ttx) {
      LOGGER.error("Error occurred during listening.", ttx);
      return;
    }
    stopped_ = false;
    setServing(true);
    //如果Server沒有被停止,就一直循環
    while (!stopped_) {
      int failureCount = 0;
      try {
        //阻塞方式接收Client連接請求,每收到一個Client連接請求就新建一個Worker,放入線程池處理該連接的業務
        TTransport client = serverTransport_.accept();
        WorkerProcess wp = new WorkerProcess(client);
        executorService_.execute(wp);
      } catch (TTransportException ttx) {
        if (!stopped_) {
          ++failureCount;
          LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
        }
      }
    }
    //Server停止,關閉線程池
    executorService_.shutdown();

    // Loop until awaitTermination finally does return without a interrupted
    // exception. If we don't do this, then we'll shut down prematurely. We want
    // to let the executorService clear it's task queue, closing client sockets
    // appropriately.
    //在timeoutMS時間內,循環直到完成調用awaitTermination方法。防止過早的關閉線程池,關閉遺留的client sockets。
    long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal);
    long now = System.currentTimeMillis();
    while (timeoutMS >= 0) {
      try {
        //awaitTermination方法調用會被阻塞,直到所有任務執行完畢並且shutdown請求被調用,或者參數中定義的timeout時間到達或者當前線程被中斷
        executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
        break;
      } catch (InterruptedException ix) {
        //如果發生中斷異常,繼續循環
        long newnow = System.currentTimeMillis();
        timeoutMS -= (newnow - now);
        now = newnow;
      }
    }
    setServing(false);
  }

 

       最后看一下WorkerProcess類。WorkerProcess是TThreadPoolServer的內部類。每個WorkerProcess線程被綁定到特定的客戶端連接上,處理該連接上的請求,直到它關閉,一旦連接關閉,該worker線程就又回到了線程池中。

  private class WorkerProcess implements Runnable {
    private TTransport client_;
    private WorkerProcess(TTransport client) {
      client_ = client;
    }
    public void run() {
      TProcessor processor = null;
      TTransport inputTransport = null;
      TTransport outputTransport = null;
      TProtocol inputProtocol = null;
      TProtocol outputProtocol = null;
      try {
        processor = processorFactory_.getProcessor(client_);
        inputTransport = inputTransportFactory_.getTransport(client_);
        outputTransport = outputTransportFactory_.getTransport(client_);
        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
        // we check stopped_ first to make sure we're not supposed to be shutting
        // down. this is necessary for graceful shutdown.
        //循環處理該Client連接的請求,除非Server關閉或連接異常否則一直循環
        while (!stopped_ && processor.process(inputProtocol, outputProtocol)) {}
      } catch (TTransportException ttx) {
        // Assume the client died and continue silently
      } catch (TException tx) {
        LOGGER.error("Thrift error occurred during processing of message.", tx);
      } catch (Exception x) {
        LOGGER.error("Error occurred during processing of message.", x);
      }
      //關閉inputTransport和outputTransport
      if (inputTransport != null) {
        inputTransport.close();
      }
      if (outputTransport != null) {
        outputTransport.close();
      }
    }
  }

 

       用流程圖表示TThreadPoolServer的處理流程如下:

      

 

    AbstractNonblockingServer

      AbstractNonblockingServer類是非阻塞I/O TServer的父類,提供了公用的方法和類。先通過源碼了解它的實現機制。啟動服務的大致流程為 startThreads() -> startListening() -> setServing(true) -> waitForShutdown(),具體內容依賴於AbstractNonblockingServer子類的具體實現。基於Java NIO(多路復用I/O模型)實現。

public abstract class AbstractNonblockingServer extends TServer {
  protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());

  public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> {
    //讀緩沖區的最大字節數
    public long maxReadBufferBytes = Long.MAX_VALUE;
    //設置父類inputTransportFactory_、outputTransportFactory_對象
    public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
      super(transport);
      transportFactory(new TFramedTransport.Factory());
    }
  }
  private final long MAX_READ_BUFFER_BYTES;
  //已分配讀緩存字節數
  private final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
  public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
    super(args);
    MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
  }
  /**
   * Begin accepting connections and processing invocations.
   */
  public void serve() {
    // start any IO threads  啟動IO線程
    if (!startThreads()) {
      return;
    }
    // start listening, or exit    開啟監聽端口,接收Client請求
    if (!startListening()) {
      return;
    }
    setServing(true);    //置狀態為服務中
    // this will block while we serve
    waitForShutdown();    //啟動服務后的阻塞方法,Server停止后會解除阻塞
    setServing(false);    //置狀態為服務結束
    // do a little cleanup
    stopListening();    //停止監聽端口
  }

  /**
   * Starts any threads required for serving.
   * 
   * @return true if everything went ok, false if threads could not be started.
   */
  protected abstract boolean startThreads();//啟動IO線程,由子類實現

  /**
   * A method that will block until when threads handling the serving have been
   * shut down.
   */
  protected abstract void waitForShutdown();//啟動服務后的阻塞方法,Server停止后會解除阻塞,由子類實現
  //開啟監聽端口
  protected boolean startListening() {
    try {
      serverTransport_.listen();
      return true;
    } catch (TTransportException ttx) {
      LOGGER.error("Failed to start listening on server socket!", ttx);
      return false;
    }
  }
  //停止監聽端口
  protected void stopListening() {
    serverTransport_.close();
  }

  /**
   * Perform an invocation. This method could behave several different ways -
   * invoke immediately inline, queue for separate execution, etc.
   * 
   * @return true if invocation was successfully requested, which is not a
   *         guarantee that invocation has completed. False if the request
   *         failed.
   */
  protected abstract boolean requestInvoke(FrameBuffer frameBuffer);//對frameBuffer執行業務邏輯,由子類實現
}

 

      AbstractNonblockingServer的內部類 FrameBuffer是非阻塞I/O TServer實現讀寫數據的核心類。FrameBuffer類存在多種狀態,不同的狀態表現出不同的行為,先看一下FrameBufferState枚舉類。

  private enum FrameBufferState {
    // in the midst of reading the frame size off the wire 讀取FrameSize的狀態
    READING_FRAME_SIZE,
    // reading the actual frame data now, but not all the way done yet 讀取真實數據的狀態
    READING_FRAME,    
    // completely read the frame, so an invocation can now happen 完成讀取數據,調用業務處理方法
    READ_FRAME_COMPLETE,
    // waiting to get switched to listening for write events 完成業務調用,等待被轉換為監聽寫事件
    AWAITING_REGISTER_WRITE,
    // started writing response data, not fully complete yet 寫response數據狀態
    WRITING,
    // another thread wants this framebuffer to go back to reading 
    //完成寫response數據,等待另一個線程注冊為讀事件,注冊成功后變為READING_FRAME_SIZE狀態
    AWAITING_REGISTER_READ,
    // we want our transport and selection key invalidated in the selector
    // thread 上面任一種狀態執行異常時處於該狀態,selector輪詢時會關閉該連接
    AWAITING_CLOSE
  }

      如果Client需要返回結果,FrameBuffer狀態轉換過程為: READING_FRAME_SIZE -> READING_FRAME -> READ_FRAME_COMPLETE -> AWAITING_REGISTER_WRITE -> WRITING -> AWAITING_REGISTER_READ -> READING_FRAME_SIZE ;

      如果Client不需要返回結果,FrameBuffer狀態轉換過程為: READING_FRAME_SIZE -> READING_FRAME -> READ_FRAME_COMPLETE -> AWAITING_REGISTER_READ -> READING_FRAME_SIZE ;

      如果以上任何狀態執行時出現異常,FrameBuffer狀態將轉換為 AWAITING_CLOSE。

      FrameBuffer類的源碼分析如下,FrameBuffer與SelectionKey綁定,它實現了從客戶端讀取數據、調用業務邏輯、向客戶端返回數據,並管理閾值綁定的SelectionKey的注冊事件的改變。

  protected class FrameBuffer {
    // the actual transport hooked up to the client.
    private final TNonblockingTransport trans_;//與客戶端建立的連接,具體的實現是TNonblockingSocket
    // the SelectionKey that corresponds to our transport
    private final SelectionKey selectionKey_;//該FrameBuffer對象關聯的SelectionKey對象
    // the SelectThread that owns the registration of our transport
    private final AbstractSelectThread selectThread_;//該FrameBuffer對象所屬的selectThread_線程
    // where in the process of reading/writing are we?
    private FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE;//該FrameBuffer對象的狀態
    // the ByteBuffer we'll be using to write and read, depending on the state
    private ByteBuffer buffer_;//讀寫數據時使用的buffer,Java NIO
    private TByteArrayOutputStream response_;//執行完業務邏輯后,保存在本地的結果

    public FrameBuffer(final TNonblockingTransport trans,
        final SelectionKey selectionKey,
        final AbstractSelectThread selectThread) {
      trans_ = trans;
      selectionKey_ = selectionKey;
      selectThread_ = selectThread;
      buffer_ = ByteBuffer.allocate(4);//因為TFramedTransport的frameSize為4-byte,所以分配4字節
    }

    /**
     * Give this FrameBuffer a chance to read. The selector loop should have
     * received a read event for this FrameBuffer.
     * 
     * @return true if the connection should live on, false if it should be
     *         closed
     */
    //讀取一次數據,如果狀態為READING_FRAME_SIZE,則讀取FrameSize;如果狀態為READING_FRAME,則讀數據
    public boolean read() {
      if (state_ == FrameBufferState.READING_FRAME_SIZE) {
        // try to read the frame size completely 
        //從trans_讀取數據到buffer_中,數據大小小於等於Framesize
        if (!internalRead()) {
          return false;
        }

        // if the frame size has been read completely, then prepare to read the
        // actual frame.
        //remaining()返回buffer_剩余的可用長度,返回0代表buffer_的4字節緩存已經被占滿,即讀完了FrameSize
        if (buffer_.remaining() == 0) {
          // pull out the frame size as an integer.
          int frameSize = buffer_.getInt(0);//轉化為Int型frameSize
          //對frameSize進行校驗
          if (frameSize <= 0) {
            LOGGER.error("Read an invalid frame size of " + frameSize
                + ". Are you using TFramedTransport on the client side?");
            return false;
          }
          // if this frame will always be too large for this server, log the
          // error and close the connection.
          if (frameSize > MAX_READ_BUFFER_BYTES) {
            LOGGER.error("Read a frame size of " + frameSize
                + ", which is bigger than the maximum allowable buffer size for ALL connections.");
            return false;
          }
          // if this frame will push us over the memory limit, then return.
          // with luck, more memory will free up the next time around.
          // 超出已分配讀緩存字節數,返回true,等待下次讀取
          if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
            return true;
          }
          // increment the amount of memory allocated to read buffers已分配讀緩存字節數增加frameSize
          readBufferBytesAllocated.addAndGet(frameSize);
          // reallocate the readbuffer as a frame-sized buffer
          //frameSize通過校驗后,重新為buffer_分配frameSize大小的緩存空間,讀取真實數據時使用
          buffer_ = ByteBuffer.allocate(frameSize);
          //frameSize通過校驗后,將狀態改為READING_FRAME,接着讀真實數據
          state_ = FrameBufferState.READING_FRAME;
        } else {
          // this skips the check of READING_FRAME state below, since we can't
          // possibly go on to that state if there's data left to be read at
          // this one.
          //buffer_還有剩余空間,即還沒有讀完FrameSize,返回true,下次繼續讀
          return true;
        }
      }

      // it is possible to fall through from the READING_FRAME_SIZE section
      // to READING_FRAME if there's already some frame data available once
      // READING_FRAME_SIZE is complete.

      if (state_ == FrameBufferState.READING_FRAME) {
        if (!internalRead()) {
          return false;
        }
        // since we're already in the select loop here for sure, we can just
        // modify our selection key directly.
        //此時的buffer_大小為frameSize,當==0時,說明數據讀取完成
        if (buffer_.remaining() == 0) {
          // get rid of the read select interests
          //注銷掉當前FrameBuffer關聯的selectionKey_的read事件
          selectionKey_.interestOps(0);
          //修改狀態為READ_FRAME_COMPLETE
          state_ = FrameBufferState.READ_FRAME_COMPLETE;
        }
        //數據讀取沒有完成,返回true下次繼續讀取
        return true;
      }
      // if we fall through to this point, then the state must be invalid.
      LOGGER.error("Read was called but state is invalid (" + state_ + ")");
      return false;
    }

    /**
     * Give this FrameBuffer a chance to write its output to the final client.寫數據
     */
    public boolean write() {
      if (state_ == FrameBufferState.WRITING) {
        try {
          //將buffer_中的數據寫入客戶端trans_
          if (trans_.write(buffer_) < 0) {
            return false;
          }
        } catch (IOException e) {
          LOGGER.warn("Got an IOException during write!", e);
          return false;
        }
        // we're done writing. now we need to switch back to reading.
        if (buffer_.remaining() == 0) {
          prepareRead();//已經write完成,准備切換為讀模式
        }
        return true;
      }
      LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
      return false;
    }

    /**
     * Give this FrameBuffer a chance to set its interest to write, once data
     * has come in. 修改selectionKey_的事件,當狀態為AWAITING_狀態時調用,
     */
    public void changeSelectInterests() {
      if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) {
        // set the OP_WRITE interest
        selectionKey_.interestOps(SelectionKey.OP_WRITE);
        state_ = FrameBufferState.WRITING;
      } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) {
        prepareRead();
      } else if (state_ == FrameBufferState.AWAITING_CLOSE) {
        close();
        selectionKey_.cancel();
      } else {
        LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")");
      }
    }

    /**
     * Shut the connection down. 關閉當前FrameBuffer
     */
    public void close() {
      // if we're being closed due to an error, we might have allocated a
      // buffer that we need to subtract for our memory accounting.
      if (state_ == FrameBufferState.READING_FRAME || state_ == FrameBufferState.READ_FRAME_COMPLETE) {
        readBufferBytesAllocated.addAndGet(-buffer_.array().length);
      }
      trans_.close();
    }

    /**
     * Check if this FrameBuffer has a full frame read.
     */
    public boolean isFrameFullyRead() {
      return state_ == FrameBufferState.READ_FRAME_COMPLETE;
    }

    /**
     * After the processor has processed the invocation, whatever thread is
     * managing invocations should call this method on this FrameBuffer so we
     * know it's time to start trying to write again. Also, if it turns out that
     * there actually isn't any data in the response buffer, we'll skip trying
     * to write and instead go back to reading.
     */
    //准備返回結果
    public void responseReady() {
      // the read buffer is definitely no longer in use, so we will decrement
      // our read buffer count. we do this here as well as in close because
      // we'd like to free this read memory up as quickly as possible for other
      // clients.
      // 此時已完成調用,釋放讀緩存
      readBufferBytesAllocated.addAndGet(-buffer_.array().length);

      if (response_.len() == 0) {
        // go straight to reading again. this was probably an oneway method
        // 不需要返回結果,直接將狀態置為AWAITING_REGISTER_READ,准備進行下次讀取操作
        state_ = FrameBufferState.AWAITING_REGISTER_READ;
        buffer_ = null;
      } else {
        //將返回數據寫入buffer_
        buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());
        // set state that we're waiting to be switched to write. we do this
        // asynchronously through requestSelectInterestChange() because there is
        // a possibility that we're not in the main thread, and thus currently
        // blocked in select(). (this functionality is in place for the sake of
        // the HsHa server.)
        //狀態置為AWAITING_REGISTER_WRITE,准備寫回數據
        state_ = FrameBufferState.AWAITING_REGISTER_WRITE;
      }
      //請求注冊selector事件變化
      requestSelectInterestChange();
    }

    /**
     * Actually invoke the method signified by this FrameBuffer.
     * 調用業務邏輯的方法
     */
    public void invoke() {
      TTransport inTrans = getInputTransport();
      TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
      TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());

      try {
        //執行業務邏輯
        processorFactory_.getProcessor(inTrans).process(inProt, outProt);
        //准被返回數據
        responseReady();
        return;
      } catch (TException te) {
        LOGGER.warn("Exception while invoking!", te);
      } catch (Throwable t) {
        LOGGER.error("Unexpected throwable while invoking!", t);
      }
      // This will only be reached when there is a throwable.
      state_ = FrameBufferState.AWAITING_CLOSE;
      requestSelectInterestChange();
    }

    /**
     * Wrap the read buffer in a memory-based transport so a processor can read
     * the data it needs to handle an invocation.
     */
    private TTransport getInputTransport() {
      return new TMemoryInputTransport(buffer_.array());
    }

    /**
     * Get the transport that should be used by the invoker for responding.
     */
    private TTransport getOutputTransport() {
      response_ = new TByteArrayOutputStream();
      return outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
    }

    /**
     * Perform a read into buffer.
     * 從trans_讀取數據到buffer_中
     * @return true if the read succeeded, false if there was an error or the
     *         connection closed.
     */
    private boolean internalRead() {
      try {
        if (trans_.read(buffer_) < 0) {
          return false;
        }
        return true;
      } catch (IOException e) {
        LOGGER.warn("Got an IOException in internalRead!", e);
        return false;
      }
    }

    /**
     * We're done writing, so reset our interest ops and change state
     * accordingly.
     */
    private void prepareRead() {
      // we can set our interest directly without using the queue because
      // we're in the select thread. 注冊讀事件
      selectionKey_.interestOps(SelectionKey.OP_READ);
      // get ready for another go-around
      buffer_ = ByteBuffer.allocate(4);//分配4字節緩存
      state_ = FrameBufferState.READING_FRAME_SIZE;//狀態置為READING_FRAME_SIZE
    }

    /**
     * When this FrameBuffer needs to change its select interests and execution
     * might not be in its select thread, then this method will make sure the
     * interest change gets done when the select thread wakes back up. When the
     * current thread is this FrameBuffer's select thread, then it just does the
     * interest change immediately.
     */
    private void requestSelectInterestChange() {
      if (Thread.currentThread() == this.selectThread_) {
        changeSelectInterests();
      } else {
        this.selectThread_.requestSelectInterestChange(this);
      }
    }
  }

 

      AbstractSelectThread類是Selector非阻塞I/O讀寫的線程,源碼分析如下:

  protected abstract class AbstractSelectThread extends Thread {
    protected final Selector selector;
    // List of FrameBuffers that want to change their selection interests.
    // 當FrameBuffer需要修改已注冊到selector的事件時,要把該FrameBuffer加入這個集合
    protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
    public AbstractSelectThread() throws IOException {
      this.selector = SelectorProvider.provider().openSelector();
    }
    /**
     * If the selector is blocked, wake it up. 喚醒selector
     */
    public void wakeupSelector() {
      selector.wakeup();
    }
    /**
     * Add FrameBuffer to the list of select interest changes and wake up the
     * selector if it's blocked. When the select() call exits, it'll give the
     * FrameBuffer a chance to change its interests.
     * 將frameBuffer加入selectInterestChanges集合
     */
    public void requestSelectInterestChange(FrameBuffer frameBuffer) {
      synchronized (selectInterestChanges) {
        selectInterestChanges.add(frameBuffer);
      }
      // wakeup the selector, if it's currently blocked.
      selector.wakeup();
    }
    /**
     * Check to see if there are any FrameBuffers that have switched their
     * interest type from read to write or vice versa.
     * 檢查是否有需要改變注冊事件的FrameBuffer
     */
    protected void processInterestChanges() {
      synchronized (selectInterestChanges) {
        for (FrameBuffer fb : selectInterestChanges) {
          fb.changeSelectInterests();
        }
        selectInterestChanges.clear();
      }
    }
    /**
     * Do the work required to read from a readable client. If the frame is
     * fully read, then invoke the method call.
     * 讀取Client數據,如果已經讀取完成則調用業務邏輯
     */
    protected void handleRead(SelectionKey key) {
      FrameBuffer buffer = (FrameBuffer) key.attachment();
      if (!buffer.read()) {
        //讀取失敗則清除連接
        cleanupSelectionKey(key);
        return;
      }
      // if the buffer's frame read is complete, invoke the method.
      if (buffer.isFrameFullyRead()) {
        if (!requestInvoke(buffer)) {
          //調用失敗則清除連接
          cleanupSelectionKey(key);
        }
      }
    }
    /**
     * Let a writable client get written, if there's data to be written.
     * 向Client返回數據
     */
    protected void handleWrite(SelectionKey key) {
      FrameBuffer buffer = (FrameBuffer) key.attachment();
      if (!buffer.write()) {
        //寫入失敗則清除連接
        cleanupSelectionKey(key);
      }
    }
    /**
     * Do connection-close cleanup on a given SelectionKey. 
     * 關閉連接
     */
    protected void cleanupSelectionKey(SelectionKey key) {
      // remove the records from the two maps
      FrameBuffer buffer = (FrameBuffer) key.attachment();
      if (buffer != null) {
        // close the buffer
        buffer.close();
      }
      // cancel the selection key
      key.cancel();
    }
  }

 

      總結:AbstractNonblockingServer、FrameBuffer、AbstractSelectThread三個類是實現非阻塞I/O TServer的關鍵,三種的關系如下圖所示。

其中AbstractSelectThread中handleRead(SelectionKey key),processInterestChanges(),handleWrite(SelectionKey key)是子類調用的方法入口,我們按照 一次請求的流程來介紹整個過程。
1.1.子類調用handRead(SelectionKey key)方法時,會對傳入的SelectionKey綁定的FrameBuffer調用read()方法,這里read()可能一次不會讀完,有可能多次handRead方法調用才會讀完數據,最終讀完數據狀態轉為READ_FRAME_COMPLETE,從而isFrameFullyRead()才會通過。 
1.2.讀完數據后,會調用用子類的requestInvoke(buffer)方法,內部最終回調FrameBuffer.invoke()方法,進行業務邏輯處理。 
1.3.業務調用結束后,調整FrameBuffer進入AWAITING_REGISTER_WRITE或AWAITING_REGISTER_READ狀態,然后將變更Selector事件類型,這里的requestSelectInterestChange()方法會有判斷當前線程是否為所屬Select線程,是因為非阻塞服務模型中有單線程、多線程,一般來說,多線程由於業務邏輯的執行是線程池在調用,所以肯定是調用AbstractSelectThread.requestSelectInterestChange(FrameBuffer frameBuffer)將事件變更注冊到AbstractSelectThread的事件集合中。 
2.processInterestChanges()由子類調用后,會對事件集合中的FrameBuffer進行已注冊的事件轉換。 
3.handleWrite(SelectionKey key)由子類調用后,會對傳入的SelectionKey綁定的FrameBuffer調用write()方法,同read()一樣,可能需要多次才能寫完,寫完后又回到READING_FRAME_SIZE狀態。 
注意:handleRead,handleWrite調用時,如果讀寫操作出錯,則調用cleanupSelectionKey(SelectionKey key)清理key和釋放FrameBuffer相關資源。
圖片和解釋摘自http://blog.csdn.net/chen7253886/article/details/53024848

 

    TNonblockingServer

      TNonblockingServer是非阻塞AbstractNonblockingServer的一種實現,采用單線程處理I/O事件。將所有的Socket注冊到Selector中,在一個線程中循環檢查並處理Selector的就緒事件。TNonblockingServer與TSimpleServer都是使用單線程,但與阻塞TSimpleServer不同的是,TNonblockingServer可以實現同時接入多個客戶端連接。下面看一下源碼。

public class TNonblockingServer extends AbstractNonblockingServer {
  private SelectAcceptThread selectAcceptThread_;
  //開啟selectAcceptThread_處理Client連接和請求
  @Override
  protected boolean startThreads() {
    try {
      //單線程SelectAcceptThread處理I/O
      selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport)serverTransport_);
      stopped_ = false;
      selectAcceptThread_.start();
      return true;
    } catch (IOException e) {
      LOGGER.error("Failed to start selector thread!", e);
      return false;
    }
  }
  @Override
  protected void waitForShutdown() {
    joinSelector();
  }
  //阻塞直到selectAcceptThread_退出
  protected void joinSelector() {
    try {
      selectAcceptThread_.join();
    } catch (InterruptedException e) {
      // for now, just silently ignore. technically this means we'll have less of
      // a graceful shutdown as a result.
    }
  }
  //關閉Server
  @Override
  public void stop() {
    stopped_ = true;
    if (selectAcceptThread_ != null) {
      selectAcceptThread_.wakeupSelector();
    }
  }
  /**
   * Perform an invocation. This method could behave several different ways
   * - invoke immediately inline, queue for separate execution, etc.
   * 調用業務邏輯,在handleRead方法中讀取數據完成后會調用該方法
   */
  @Override
  protected boolean requestInvoke(FrameBuffer frameBuffer) {
    frameBuffer.invoke();
    return true;
  }
}

      其中SelectAcceptThread線程類是處理I/O的核心方法,SelectAcceptThread繼承了抽象類AbstractSelectThread。

  /**
   * The thread that will be doing all the selecting, managing new connections
   * and those that still need to be read. 
   * 處理I/O事件的線程,繼承了抽象類AbstractSelectThread
   */
  protected class SelectAcceptThread extends AbstractSelectThread {

    // The server transport on which new client transports will be accepted
    private final TNonblockingServerTransport serverTransport;

    /**
     * Set up the thread that will handle the non-blocking accepts, reads, and
     * writes.
     */
    public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
    throws IOException {
      this.serverTransport = serverTransport;
      //注冊serverSocketChannel到selector,SelectionKey.OP_ACCEPT
      serverTransport.registerSelector(selector);
    }

    public boolean isStopped() {
      return stopped_;
    }

    /**
     * The work loop. Handles both selecting (all IO operations) and managing
     * the selection preferences of all existing connections.
     */
    public void run() {
      //循環檢查selector是否有就緒的事件
      try {
        while (!stopped_) {
          //檢查並處理IO事件
          select();
          //檢查是否有FrameBuffer需要修改他們的interest 
          processInterestChanges();
        }
        //服務關閉,清除所有的SelectionKey
        for (SelectionKey selectionKey : selector.keys()) {
          cleanupSelectionKey(selectionKey);
        }
      } catch (Throwable t) {
        LOGGER.error("run() exiting due to uncaught error", t);
      } finally {
        stopped_ = true;
      }
    }
    /**
     * Select and process IO events appropriately:
     * If there are connections to be accepted, accept them.
     * If there are existing connections with data waiting to be read, read it,
     * buffering until a whole frame has been read.
     * If there are any pending responses, buffer them until their target client
     * is available, and then send the data.
     * 檢查並處理I/O事件
     */
    private void select() {
      try {
        // wait for io events. 檢查是否有就緒的I/O操作,如果沒有則一直阻塞
        selector.select();
        // process the io events we received
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();
          // skip if not valid
          if (!key.isValid()) {
            //清除無效的SelectionKey
            cleanupSelectionKey(key);
            continue;
          }
          // if the key is marked Accept, then it has to be the server
          // transport. 對不同的事件做不同的處理
          if (key.isAcceptable()) {
            handleAccept();
          } else if (key.isReadable()) {
            // deal with reads 處理讀數據,調用AbstractSelectThread的handleRead方法。
            handleRead(key);
          } else if (key.isWritable()) {
            // deal with writes 處理寫數據,調用AbstractSelectThread的handleWrite方法。
            handleWrite(key); 
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }
    /**
     * Accept a new connection. Client建立連接
     */
    private void handleAccept() throws IOException {
      SelectionKey clientKey = null;
      TNonblockingTransport client = null;
      try {
        // accept the connection 建立與客戶端的連接,並將該連接注冊到selector的OP_READ事件
        //在Java NIO中SelectionKey是跟蹤被注冊事件的句柄
        client = (TNonblockingTransport)serverTransport.accept();
        clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
        // add this key to the map 每個與客戶端的連接都對應一個FrameBuffer
        // 
        FrameBuffer frameBuffer = new FrameBuffer(client, clientKey,
          SelectAcceptThread.this);
        //將frameBuffer附着到SelectionKey上,這樣就能方便的識別某個給定的通道
        clientKey.attach(frameBuffer);
      } catch (TTransportException tte) {
        // something went wrong accepting.
        LOGGER.warn("Exception trying to accept!", tte);
        tte.printStackTrace();
        if (clientKey != null) cleanupSelectionKey(clientKey);
        if (client != null) client.close();
      }
    }
  }

 

      由源碼可以看出,TNonblockingServer的處理流程如下

      

      

    THsHaServer

      THsHaServer是TNonblockingServer的子類,它重寫了 requestInvoke() 方法,與TNonblockingServer使用單線程處理selector和業務邏輯調用不同的是,THsHaServer采用線程池異步處理業務邏輯調用,因此THsHaServer也被稱為半同步/半異步Server。它的源碼就很簡單了。

public class THsHaServer extends TNonblockingServer {
  private final ExecutorService invoker;//處理業務邏輯調用的線程池
  private final Args args;
  public THsHaServer(Args args) {
    super(args);
    //如果參數中沒有線程池則創建線程池
    invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;
    this.args = args;
  }
  @Override
  protected void waitForShutdown() {
    joinSelector();//Server關閉前一直阻塞
    gracefullyShutdownInvokerPool();
  }
  //創建線程池方法
  protected static ExecutorService createInvokerPool(Args options) {
    int workerThreads = options.workerThreads;
    int stopTimeoutVal = options.stopTimeoutVal;
    TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
    ExecutorService invoker = new ThreadPoolExecutor(workerThreads,
      workerThreads, stopTimeoutVal, stopTimeoutUnit, queue);
    return invoker;
  }
  //友好的關閉線程池
  protected void gracefullyShutdownInvokerPool() {
    invoker.shutdown();
    // Loop until awaitTermination finally does return without a interrupted
    // exception. If we don't do this, then we'll shut down prematurely. We want
    // to let the executorService clear it's task queue, closing client sockets
    // appropriately.
    long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
    long now = System.currentTimeMillis();
    while (timeoutMS >= 0) {
      try {
        invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
        break;
      } catch (InterruptedException ix) {
        long newnow = System.currentTimeMillis();
        timeoutMS -= (newnow - now);
        now = newnow;
      }
    }
  }
  //重寫的業務邏輯調用的方法,使用線程池異步完成
  @Override
  protected boolean requestInvoke(FrameBuffer frameBuffer) {
    try {
      Runnable invocation = getRunnable(frameBuffer);
      invoker.execute(invocation);
      return true;
    } catch (RejectedExecutionException rx) {
      LOGGER.warn("ExecutorService rejected execution!", rx);
      return false;
    }
  }
  protected Runnable getRunnable(FrameBuffer frameBuffer){
    return new Invocation(frameBuffer);
  }
}

 

      THsHaServer處理流程如下

      

       

    TThreadedSelectorServer

      TThreadedSelectorServer是非阻塞服務AbstractNonblockingServer的另一種實現,也是TServer的最高級形式。雖然THsHaServer對業務邏輯調用采用了線程池的方式,但是所有的數據讀取和寫入操作還都在單線程中完成,當需要在Client和Server之間傳輸大量數據時,THsHaServer就會面臨性能問題。TThreadedSelectorServer將數據讀取和寫入操作也進行了多線程化,先通過模型圖了解實現原理。

      

      由上圖可以看到:

        1)單個AcceptThread線程負責處理Client的新建連接;

        2)多個SelectorThread線程負責處理數據讀取和寫入操作;

        3)單個負載均衡器SelectorThreadLoadBalancer負責將AcceptThread線程建立的新連接分配給哪個SelectorThread線程處理;

        4)ExecutorService線程池負責業務邏輯的異步調用。

      源碼分析,先看一下TThreadedSelectorServer的參數類Args增加了那些參數。

  public static class Args extends AbstractNonblockingServerArgs<Args> {
    public int selectorThreads = 2;    //SelectorThread線程數量
    //業務邏輯調用線程池大小,為0時相當於在SelectorThread線程中直接調用業務邏輯
    private int workerThreads = 5; 
    private int stopTimeoutVal = 60;
    private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
    private ExecutorService executorService = null; //業務邏輯調用線程池
    private int acceptQueueSizePerThread = 4; //SelectorThread線程接收請求的隊列大小
    //處理Client新連接請求的策略
    public static enum AcceptPolicy {
      //已接收的連接請求需要注冊到線程池中,如果線程池已滿,將立即關閉連接,由於調度將會稍微增加延遲
      FAIR_ACCEPT,
      //快速接收,不關心線程池的狀態
      FAST_ACCEPT
    }
    //默認使用快速接收
    private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT;
  }

 

      再看一下TThreadedSelectorServer類的成員變量和對父類AbstractNonblockingServer抽象方法的具體實現。

public class TThreadedSelectorServer extends AbstractNonblockingServer {
  private volatile boolean stopped_ = true;
  private AcceptThread acceptThread; //處理Client新連接線程
  private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>(); //處理讀寫數據的線程集合
  private final ExecutorService invoker; //線程池
  private final Args args;
  //構造函數,初始化Server
  public TThreadedSelectorServer(Args args) {
    super(args);
    args.validate();
    invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService;
    this.args = args;
  }
  //啟動acceptThread和若干個selectorThreads
  @Override
  protected boolean startThreads() {
    try {
      for (int i = 0; i < args.selectorThreads; ++i) {
        selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
      }
      acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
        createSelectorThreadLoadBalancer(selectorThreads));
      stopped_ = false;
      for (SelectorThread thread : selectorThreads) {
        thread.start();
      }
      acceptThread.start();
      return true;
    } catch (IOException e) {
      LOGGER.error("Failed to start threads!", e);
      return false;
    }
  }
  //等待關閉Server
  @Override
  protected void waitForShutdown() {
    try {
      joinThreads(); //等待accept and selector threads都停止運行
    } catch (InterruptedException e) {
      LOGGER.error("Interrupted while joining threads!", e);
    }
    //關閉回調業務邏輯的線程池
    gracefullyShutdownInvokerPool();
  }
  protected void joinThreads() throws InterruptedException {
    //accept and selector threads都停止運行前一直阻塞
    acceptThread.join();
    for (SelectorThread thread : selectorThreads) {
      thread.join();
    }
  }
  //停止Server
  @Override
  public void stop() {
    stopped_ = true;
    stopListening(); //停止接收新請求
    if (acceptThread != null) {
      //可能acceptThread處於阻塞中,喚醒acceptThread
      acceptThread.wakeupSelector();
    }
    if (selectorThreads != null) {
      //可能selectorThreads處於阻塞中,喚醒selectorThreads
      for (SelectorThread thread : selectorThreads) {
        if (thread != null)
          thread.wakeupSelector();
      }
    }
  }

  protected void gracefullyShutdownInvokerPool() {
    invoker.shutdown();
    // Loop until awaitTermination finally does return without a interrupted
    // exception. If we don't do this, then we'll shut down prematurely. We want
    // to let the executorService clear it's task queue, closing client sockets
    // appropriately.
    long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
    long now = System.currentTimeMillis();
    while (timeoutMS >= 0) {
      try {
        invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
        break;
      } catch (InterruptedException ix) {
        long newnow = System.currentTimeMillis();
        timeoutMS -= (newnow - now);
        now = newnow;
      }
    }
  }
  //業務邏輯調用,在handleRead方法讀取數據完成后調用
  @Override
  protected boolean requestInvoke(FrameBuffer frameBuffer) {
    Runnable invocation = getRunnable(frameBuffer);
    if (invoker != null) {
      //放進線程池執行
      try {
        invoker.execute(invocation);
        return true;
      } catch (RejectedExecutionException rx) {
        LOGGER.warn("ExecutorService rejected execution!", rx);
        return false;
      }
    } else {
      // 線程池為null,在調用requestInvoke的線程(I/O線程)中執行
      invocation.run();
      return true;
    }
  }
  protected Runnable getRunnable(FrameBuffer frameBuffer) {
    return new Invocation(frameBuffer);
  }

  protected static ExecutorService createDefaultExecutor(Args options) {
    return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null;
  }

  private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int queueSize) {
    if (queueSize == 0) {
      return new LinkedBlockingQueue<TNonblockingTransport>();//無界隊列
    }
    return new ArrayBlockingQueue<TNonblockingTransport>(queueSize);
  }
}

      最后看一下最核心的兩個類AcceptThread與SelectorThread的源碼。

      AcceptThread負責接收CLient的新連接請求。

  protected class AcceptThread extends Thread {
    private final TNonblockingServerTransport serverTransport;//監聽端口的ServerSocket
    private final Selector acceptSelector;
    private final SelectorThreadLoadBalancer threadChooser;//負責負載均衡
    public AcceptThread(TNonblockingServerTransport serverTransport,
        SelectorThreadLoadBalancer threadChooser) throws IOException {
      this.serverTransport = serverTransport;
      this.threadChooser = threadChooser;
      //acceptSelector是AcceptThread專屬的,專門用於接收新連接使用,不要與處理I/O時的selector混淆
      this.acceptSelector = SelectorProvider.provider().openSelector();
      //將serverTransport注冊到Selector上接收OP_ACCEPT連接事件
      this.serverTransport.registerSelector(acceptSelector);
    }
    public void run() {
      try {
        //不斷循環select()
        while (!stopped_) {
          select();
        }
      } catch (Throwable t) {
        LOGGER.error("run() exiting due to uncaught error", t);
      } finally {
        TThreadedSelectorServer.this.stop();//調用Stop方法,喚醒SelectorThreads中的線程
      }
    }
    //喚醒acceptSelector
    public void wakeupSelector() {
      acceptSelector.wakeup();
    }
    private void select() {
      try {
        // wait for connect events.
        acceptSelector.select();
        // process the io events we received
        Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();
          // skip if not valid
          if (!key.isValid()) {
            continue;
          }
          //處理連接請求
          if (key.isAcceptable()) {
            handleAccept();
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }
    
    //處理連接請求
    private void handleAccept() {
      final TNonblockingTransport client = doAccept();//新建連接
      if (client != null) {
        //取出一個selector thread
        final SelectorThread targetThread = threadChooser.nextThread();
        //當接收策略為FAST_ACCEPT或invoker為空時,直接將client扔給SelectorThread
        if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
          doAddAccept(targetThread, client);
        } else {
          //當接收策略為FAIR_ACCEPT時,將doAddAccept任務扔到線程池處理
          try {
            invoker.submit(new Runnable() {
              public void run() {
                doAddAccept(targetThread, client);
              }
            });
          } catch (RejectedExecutionException rx) {
            LOGGER.warn("ExecutorService rejected accept registration!", rx);
            // 如果線程池invoker隊列滿,關閉該Client連接
            client.close();
          }
        }
      }
    }
    //接收新連接
    private TNonblockingTransport doAccept() {
      try {
        return (TNonblockingTransport) serverTransport.accept();
      } catch (TTransportException tte) {
        LOGGER.warn("Exception trying to accept!", tte);
        return null;
      }
    }
    //將新連接添加到SelectorThread的隊列中
    private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
      if (!thread.addAcceptedConnection(client)) {
        client.close();//如果添加失敗,關閉client
      }
    }
  }

 

       SelectorThread線程負責讀寫數據:

  protected class SelectorThread extends AbstractSelectThread {
    private final BlockingQueue<TNonblockingTransport> acceptedQueue;//存放Client連接的阻塞隊列
    public SelectorThread() throws IOException {
      this(new LinkedBlockingQueue<TNonblockingTransport>());//默認為無界隊列
    }
    public SelectorThread(int maxPendingAccepts) throws IOException {
      this(createDefaultAcceptQueue(maxPendingAccepts));//指定大小有界隊列
    }
    public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws IOException {
      this.acceptedQueue = acceptedQueue;//指定隊列
    }

    //將連接添加到acceptedQueue,如果隊列滿將阻塞
    public boolean addAcceptedConnection(TNonblockingTransport accepted) {
      try {
        acceptedQueue.put(accepted);
      } catch (InterruptedException e) {
        LOGGER.warn("Interrupted while adding accepted connection!", e);
        return false;
      }
      //某個線程調用select()方法后阻塞了,即使沒有通道就緒,wakeup()辦法也能讓其從select()方法返回
      //喚醒selector,很重要,因為首次添加accepted時select()方法肯定會一直阻塞,只有喚醒后才能執行processAcceptedConnections方法,將新連接注冊到注冊到selector,下次調用select()方法時就可以檢測到該連接就緒的事件
      selector.wakeup();
      return true;
    }

    public void run() {
      try {
        while (!stopped_) {
          select();//如果沒有通道就緒,將阻塞
          processAcceptedConnections();//處理新連接,注冊到selector
          processInterestChanges();//處理現有連接,注冊事件修改請求
        }
        //Server關閉時的清理工作
        for (SelectionKey selectionKey : selector.keys()) {
          cleanupSelectionKey(selectionKey);
        }
      } catch (Throwable t) {
        LOGGER.error("run() exiting due to uncaught error", t);
      } finally {
        // This will wake up the accept thread and the other selector threads
        TThreadedSelectorServer.this.stop();
      }
    }

    /**
     * Select and process IO events appropriately: If there are existing
     * connections with data waiting to be read, read it, buffering until a
     * whole frame has been read. If there are any pending responses, buffer
     * them until their target client is available, and then send the data.
     */
    private void select() {
      try {
        // wait for io events.
        selector.select();//每個SelectorThread線程都有自己的selector
        // process the io events we received
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();
          // skip if not valid
          if (!key.isValid()) {
            cleanupSelectionKey(key);
            continue;
          }
          if (key.isReadable()) {
            // deal with reads
            handleRead(key);
          } else if (key.isWritable()) {
            // deal with writes
            handleWrite(key);
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }
    private void processAcceptedConnections() {
      // Register accepted connections
      while (!stopped_) {
        TNonblockingTransport accepted = acceptedQueue.poll();
        if (accepted == null) {
          break;
        }
        registerAccepted(accepted);
      }
    }
    //將accepted注冊到selector監聽OP_READ事件,並組裝FrameBuffer附着在SelectionKey上
    private void registerAccepted(TNonblockingTransport accepted) {
      SelectionKey clientKey = null;
      try {
        clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
        FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this);
        clientKey.attach(frameBuffer);
      } catch (IOException e) {
        LOGGER.warn("Failed to register accepted connection to selector!", e);
        if (clientKey != null) {
          cleanupSelectionKey(clientKey);
        }
        accepted.close();
      }
    }
  }

  總結:幾種TServer的對比

  是否阻塞I/O 接收連接處理 I/O處理 業務邏輯調用 特點 適用情況
TSimpleServer 阻塞 單線程 單線程處理所有操作,同一時間只能處理一個客戶端連接,當前客戶端斷開連接后才能接收下一個連接 測試使用,不能在生產環境使用
TThreadPoolServer 阻塞 單線程 線程池

有一個專用的線程用來接受連接,一旦接受了一個連接,它就會被放入ThreadPoolExecutor中的一個worker線程里處理,

worker線程被綁定到特定的客戶端連接上,直到它關閉。一旦連接關閉,該worker線程就又回到了線程池中。

如果客戶端數量超過了線程池中的最大線程數,在有一個worker線程可用之前,請求將被一直阻塞在那里。

性能較高,適合並發Client連接數不是太高的情況
TNonblockingServer 非阻塞 單線程 采用非阻塞的I/O可以單線程監控多個連接,所有處理是被調用select()方法的同一個線程順序處理的 適用於業務處理簡單,客戶端連接較少的情況,不適合高並發場景,單線程效率低
THsHaServer 非阻塞 單線程 線程池

半同步半異步,使用一個單獨的線程來處理接收連接和網絡I/O,一個獨立的worker線程池來處理消息。

只要有空閑的worker線程,消息就會被立即處理,因此多條消息能被並行處理。

適用於網絡I/O不是太繁忙、對業務邏輯調用要求較高的場景
TThreadedSelectorServer 非阻塞 單線程 多線程 線程池

半同步半異步Server。用多個線程來處理網絡I/O,用線程池來進行業務邏輯調用的處理。

當網絡I/O是瓶頸的時候,TThreadedSelectorServer比THsHaServer的表現要好。

適用於網絡I/O繁忙、對業務邏輯調用要求較高的、高並發場景

    一般情況下,生產環境中使用會在TThreadPoolServer和TThreadedSelectorServer中選一個。TThreadPoolServer優勢是處理速度快、響應時間短,缺點是在高並發情況下占用系統資源較高;TThreadedSelectorServer優勢是支持高並發,劣勢是處理速度沒有TThreadPoolServer高,但在大多數情況下能也滿足業務需要。

 

  本篇文章主要介紹了Thrtft RPC的簡單實用、整體協議棧介紹,TServer幾種實現類的原理和源碼解析。下一篇將介紹Thrift的其他重要組成部分TProtocol、TTransport等

 

 

參考資料

  Thrift RPC詳解

  架構設計:系統間通信(12)——RPC實例Apache Thrift 中篇

  [原創](翻譯)Java版的各種Thrift server實現的比較

  多線程awaitTermination和shutdown的使用問題

  由淺入深了解Thrift(三)——Thrift server端的幾種工作模式分析

  Thrift源碼系列----5.AbstractNonblockingServer源碼

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM