JAVA RPC (十) nio服務端解析


源碼地址:https://gitee.com/a1234567891/koalas-rpc

 企業生產級百億日PV高可用可拓展的RPC框架。理論上並發數量接近服務器帶寬,客戶端采用thrift協議,服務端支持netty和thrift的TThreadedSelectorServer半同步半異步線程模型,支持動態擴容,服務上下線,權重動態,可用性配置,頁面流量統計,支持trace跟蹤等,天然接入cat支持數據大盤展示等,持續為個人以及中小型公司提供可靠的RPC框架技術方案

ServerSocketChannel簡單介紹:

上一篇文章我們講了netty server服務端的使用方式,對於netty來說對nio層進行了全方位的封裝,我們使用netty的使用可以當內部nio是黑盒處理即可,只需要處理netty的hander處理即可,但是koalas-rpc同時也實現了高性能的nio服務框架,給大家另外一種原生的選擇,下面我們來簡單看一下NIO相關的入門知識。

            this.serverSocketChannel = ServerSocketChannel.open();
            this.serverSocketChannel.configureBlocking(false);
            this.serverSocket_ = this.serverSocketChannel.socket();
            this.serverSocket_.setReuseAddress(true);
            this.serverSocket_.bind(bindAddr);

Java NIO中的ServerSocketChannel是一個可以監聽新進來的TCP連接的通道,就像標准的IIO中的ServerSocket一樣。ServerSocketChannel類在java.nio.channels包中。

Selector acceptSelector = SelectorProvider.provider().openSelector();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
try {
acceptSelector.select();

Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();

if (!key.isValid()) {
continue;
}

if (key.isAcceptable()) {
handleAccept();
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
 

acceptSelector為服務端選擇,是Java NIO中能夠檢測一到多個NIO通道,並能夠知曉通道是否為諸如讀寫事件做好准備的組件。這樣,一個單獨的線程可以管理多個channel,從而管理多個網絡連接。

為什么使用Selector?

Selector允許單線程處理多個 Channel。如果你的應用打開了多個連接(通道),但每個連接的流量都很低,使用Selector就會很方便。例如,在一個聊天服務器中。

這是在一個單線程中使用一個Selector處理3個Channel的圖示:

 

 

僅用單個線程來處理多個Channels的好處是,只需要更少的線程來處理通道。事實上,可以只用一個線程處理所有的通道。對於操作系統來說,線程之間上下文切換的開銷很大,而且每個線程都要占用系統的一些資源(如內存)。因此,使用的線程越少越好。要使用Selector,得向Selector注冊Channel,然后調用它的select()方法。這個方法會一直阻塞到某個注冊的通道有事件就緒。一旦這個方法返回,線程就可以處理這些事件,事件的例子有如新連接進來,數據接收等。但是,需要記住,現代的操作系統和CPU在多任務方面表現的越來越好,所以多線程的開銷隨着時間的推移,變得越來越小了。實際上,如果一個CPU有多個內核,不使用多任務可能是在浪費CPU能力。不管怎么說,關於那種設計的討論應該放在另一篇不同的文章中。在這里,只要知道使用Selector能夠處理多個通道就足夠了。

注意register()方法的第二個參數。這是一個“interest集合”,意思是在通過Selector監聽Channel時對什么事件感興趣。可以監聽四種不同類型的事件:Connect,Accept,Read,Write

這四種事件用SelectionKey的四個常量來表示:SelectionKey.OP_CONNECT,SelectionKey.OP_ACCEPT,SelectionKey.OP_READ,SelectionKey.OP_WRITE,多個事件的監聽int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

回到我們的代碼中就會發現當服務端接收到client端的連接請求時 acceptSelector.select()阻塞可以獲取到執行權限。

if (key.isAcceptable()) {
      handleAccept();
    }

這段代碼的意思是可被連接,獲取到連接事件,用戶業務邏輯就可以在handleAccept中執行了。

 

SocketChannel簡單介紹

Java NIO中的SocketChannel是一個連接到TCP網絡套接字的通道(這句話是翻譯過來java api的英文注釋,比較雞肋,大家明白意思即可)。可以通過以下2種方式創建SocketChannel:

對於服務端來說:當客戶端連接服務端之后並且服務端獲取到了accept事件,這樣就可以獲取到SocketChannel對象。 例如

 SocketChannel socketChannel = serverSocketChannel.accept();

對於客戶端來說: 客戶端可以手動聲明一個SocketChannel對象,例如

1 SocketChannel socketChannel = SocketChannel.open();
2 
3 socketChannel.connect(new InetSocketAddress("192.168.3.1", 8080));

我們這次只討論nio的server端實現,先不考慮client端的nio實現,今后有時間也會為大家專門寫一篇關於client端關於nio的實現

serverSocketChannel對象就是我們上一小節中的ServerSocketChannel。同樣的socketChannel可以支持讀和寫的監聽

        clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);

或者

        clientKey = accepted.registerSelector(selector, SelectionKey.OP_WRITE);

這樣當服務端的接收到寫事件或者讀事件后就會非常快速的響應數據流信息了,這里也是NIO速度比BIO速度快的關鍵,BIO通過用戶線程不斷的去輪訓內核中滑動接收窗口中的數據,效率較慢,而NIO是通過內核依賴IO多路復用的方式主動通知JVM,這樣吞吐速度會快很多,所以NIO是靠內核支持的。現在win,mac和linux都支持IO多路復用。介紹完了NIO的簡單知識,我們來看看KOALAS-RPC是怎么通過NIO來實現服務端的,由於NIO的細節知識過於繁雜,作者沒有辦法通過一篇文章來詳細說明,感興趣的小伙伴可以加群聯系作者溝通。

 

KOALAS-RPC的NIO SERVER實現:

koalas-rpc的nio server實現主要是在KoalasThreadedSelectorServer類中,我們先看一下連接線程和讀寫線程

private AcceptThread acceptThread;

  private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();

元素聲明

@Override
  protected boolean startThreads() {
    try {
      for (int i = 0; i < args.selectorThreads; ++i) {
        selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
      }
      acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
        createSelectorThreadLoadBalancer(selectorThreads));
      stopped_ = false;
      for (SelectorThread thread : selectorThreads) {
        thread.start();
      }
      acceptThread.start();
      return true;
    } catch (IOException e) {
      LOGGER.error("Failed to start threads!", e);
      return false;
    }
  }

 

這里可以看到聲明了一個AcceptThread對象和多個selectorThreads對象,AcceptThread對象負責獲取client的連接事件,selectorThreads負責讀和寫事件,這里由於client端連接事件非常非常少,所以只需要單個線程就可以滿足需求了,但是讀和寫事件是非常頻繁的,所以這里用了多個線程去讀寫。我們看一下連接事件中干了些什么事情

 private void select() {
      try {
        acceptSelector.select();

        Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();

          if (!key.isValid()) {
            continue;
          }

          if (key.isAcceptable()) {
            handleAccept();
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }
private void handleAccept() {
      final TNonblockingTransport client = doAccept();
      if (client != null) {
        final SelectorThread targetThread = threadChooser.nextThread();

        if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
          doAddAccept(targetThread, client);
        } else {
          try {
            invoker.submit(new Runnable() {
              public void run() {
                doAddAccept(targetThread, client);
              }
            });
          } catch (RejectedExecutionException rx) {
            LOGGER.warn("ExecutorService rejected accept registration!", rx);
            client.close();
          }
        }
      }
    }
 private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
      if (!thread.addAcceptedConnection(client)) {
        client.close();
      }
    }

 

讀者結合源碼可以非常清晰的看到當AcceptThread獲取到連接事件時,獲取到讀寫的通道SocketChannel,並且將SocketChannel通道addAcceptedConnection方法傳給讀寫線程SelectorThread,接着往下看

public boolean addAcceptedConnection(TNonblockingTransport accepted) {
      try {
        acceptedQueue.put(accepted);
      } catch (InterruptedException e) {
        LOGGER.warn("Interrupted while adding accepted connection!", e);
        return false;
      }
      selector.wakeup();
      return true;
    }

把讀寫通道對象交給SelectorThread中的隊列,供讀寫線程去獲取。我們在看看讀寫線程中做了些什么事情:

public void run() {
      try {
        while (!stopped_) {
          select();
          processAcceptedConnections();
          processInterestChanges();
        }
        for (SelectionKey selectionKey : selector.keys()) {
          cleanupSelectionKey(selectionKey);
        }
      } catch (Throwable t) {
        LOGGER.error("run() exiting due to uncaught error", t);
      } finally {
        // This will wake up the accept thread and the other selector threads
        KoalasThreadedSelectorServer.this.stop();
      }
    }
private void select() {
      try {
        selector.select();

        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();

          if (!key.isValid()) {
            cleanupSelectionKey(key);
            continue;
          }

          if (key.isReadable()) {
            handleRead(key);
          } else if (key.isWritable()) {
            handleWrite(key);
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }

 

private void processAcceptedConnections() {
      // Register accepted connections
      while (!stopped_) {
        TNonblockingTransport accepted = acceptedQueue.poll();
        if (accepted == null) {
          break;
        }
        registerAccepted(accepted);
      }
    }

 

private void registerAccepted(TNonblockingTransport accepted) {
      SelectionKey clientKey = null;
      try {
        clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);

        FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this,privateKey,publicKey,serviceName,tGenericProcessor,cat);
        clientKey.attach(frameBuffer);
      } catch (IOException e) {
        LOGGER.warn("Failed to register accepted connection to selector!", e);
        if (clientKey != null) {
          cleanupSelectionKey(clientKey);
        }
        accepted.close();
      }
    }

 

核心代碼說明,當連接線程將通道對象傳給讀寫線程時,讀寫線程獲取到了執行代碼的權限,然后從隊列中獲取到了連接通道對象,之后注冊讀的事件

clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);

FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this,privateKey,publicKey,serviceName,tGenericProcessor,cat);
clientKey.attach(frameBuffer);

並且聲明了一個FrameBuffer對象,之后的讀寫操作都包裝到FrameBuffer對象中,讀寫線程的核心讀寫代碼如下:

        if (key.isReadable()) {
            handleRead(key);
          } else if (key.isWritable()) {
            handleWrite(key);

當有可讀對象和可返回數據的時間通知后進行不同的業務邏輯處理,假設有client端連接之后發送了幾個字節的數據,那么key.isReadable()就會被觸發,會執行讀取字節流,拆包處理,和調用業業務方法等操作,調用用戶方法之后會返回結果,序列化之后寫入,這樣就會調用key.isWritable()中的方法去等待下次讀取連接,循環往復此操作。handleRead比較復雜我們簡單看一下實現

 public boolean read() {
            if (state_ == FrameBufferState.READING_FRAME_SIZE) {
                if (!internalRead ()) {
                    return false;
                }

                if (buffer_.remaining () == 0) {
                    int frameSize = buffer_.getInt ( 0 );
                    if (frameSize <= 0) {
                        LOGGER.error ( "Read an invalid frame size of " + frameSize
                                + ". Are you using TFramedTransport on the client side?" );
                        return false;
                    }

                    if (frameSize > MAX_READ_BUFFER_BYTES) {
                        LOGGER.error ( "Read a frame size of " + frameSize
                                + ", which is bigger than the maximum allowable buffer size for ALL connections." );
                        return false;
                    }

                    if (readBufferBytesAllocated.get () + frameSize > MAX_READ_BUFFER_BYTES) {
                        return true;
                    }

                    readBufferBytesAllocated.addAndGet ( frameSize );

                    buffer_ = ByteBuffer.allocate ( frameSize );

                    state_ = FrameBufferState.READING_FRAME;
                } else {

                    return true;
                }
            }

            if (state_ == FrameBufferState.READING_FRAME) {
                if (!internalRead ()) {
                    return false;
                }

                if (buffer_.remaining () == 0) {
                    selectionKey_.interestOps ( 0 );
                    state_ = FrameBufferState.READ_FRAME_COMPLETE;
                }

                return true;
            }

            LOGGER.error ( "Read was called but state is invalid (" + state_ + ")" );
            return false;
        }

 

首先讀取字節長度,然后在讀消息體,並且將數據保存在ByteBuffer對象中備用。然后通過buffer.isFrameFullyRead ()方法來判斷本次請求的字節流是否都讀完了,requestInvoke方法來調用用戶實現,通過handleWrite方法來將結果返回給client端對象。

 

結論:

 由於koalas-rpc是nio server主題設計比較復雜,一篇文章無法完全說清細節實現,但是大概的核心內容就是上面這些了,讀者對NIO比較感興趣的話可以通過讀源碼的方式來更深入的了解。

更多學習內容請加高級java QQ群:825199617,spring 源碼,spring mvc源碼,dubbo源碼,jdk源碼,ioc aop源碼分享等你來。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM