淺談NIO和Epoll的實現原理


什么是NIO

  NIO又叫New/Non-blocking IO,這個概念基本人人都聽過,但是不一定每個人都懂他它的運行的原理。

  這里我們來探討這個問題,先用一個例子解釋一下BIO到底阻塞了哪里。

/**
 * 這是一個單線程BIOServer
 * @author endless
 * @create 2020-03-23
 */
public class BioServerDemo {

  public static void main(String[] args) throws IOException {
    // 創建ServerSocket,並綁定端口
    ServerSocket serverSocket = new ServerSocket(9999);
    System.out.println("服務啟動成功");
    while (true) {
 Socket socket = serverSocket.accept(); System.out.println("連接成功"); System.out.println("准備接收數據"); byte[] bytes = new byte[1024]; socket.getInputStream().read(bytes); System.out.println("接收到了數據:" + new String(bytes));
    }
  }
}

/**
 * BIO client
 *
 * @author endless
 * @create 2020-03-23
 */
public class BioClientDemo {

  public static void main(String[] args) throws IOException {
    // 連接Server
    Socket socket = new Socket("127.0.0.1", 9999);
    System.out.println("連接成功");
    Scanner scanner = new Scanner(System.in);
    // 循環等待輸入消息
    while (true) {
      String str = scanner.next();
      // 約定退出口令
      if ("exit".equalsIgnoreCase(str)) {
        socket.close();
        System.exit(0);
      }
      socket.getOutputStream().write(str.getBytes());
      socket.getOutputStream().flush();
    }
  }
}

先運行Server

淺談NIO和Epoll的實現原理

  命令行打印服務啟動成功,此時並無客戶端連接,所以連接成功並未打印,說明程序被阻塞在了serverSocket.accept()方法

  此時運行Client,Server打印日志連接成功和准備接收數據,此時Client尚未發送數據,Server被阻塞在

socket.getInputStream().read(bytes)上,因此其他客戶端無法進行連接。

淺談NIO和Epoll的實現原理

  在Client輸入Hello回車,此時Server打印接收到了數據:Hello,說明客戶端的連接發送過來數據了,此時服務端線程才解阻塞,在這

個情況下,這個Server沒有辦法處理並發,同時期只能處理一個連接。

  那么BIO是如何實現並發呢?答案也很明顯,就是使用多線程,我們對Server進行一些小改動。

/**
 * 這是一個BIOServer
 * @author endless
 * @create 2020-03-23
 */
public class BioServerDemo {
  public static void main(String[] args) throws IOException {

    ServerSocket serverSocket = new ServerSocket(9999);
    System.out.println("服務啟動成功");
    while (true) {
      Socket socket = serverSocket.accept();
      new Thread(()->{
        System.out.println("連接成功");
        System.out.println("准備接收數據");
      	byte[] bytes = new byte[1024];
        try {
          socket.getInputStream().read(bytes);
        } catch (IOException e) {
          e.printStackTrace();
        }
        System.out.println("接收到了數據:" + new String(bytes));
      }).start();
    }

  }

}

  使用子線程來對接收到的Socket進行處理,這樣每個連接都被阻塞在單獨的線程上,就可以實現並發訪問Server。

  總結:BIO的阻塞有兩個地方:accept()和read(),並且BIO的並發只能通過多線程。

  但是這里會有一個問題,就是如果絕大部分的連接都沒有進行數據傳輸,只是建立了連接,這樣就會產生很多無效的線程,而線程又

是非常寶貴的稀缺資源,這樣就會白白損失很多的性能,這也是BIO最大的性能瓶頸。

  那能不能只用一個線程就能實現並發並且處理全部的連接呢?是否能設計一個Api,讓accept和read不再阻塞,使用一個線程就能處

並發連接呢?答案是肯定的,這里就要用到我們的今天的主角NIO了。

  NIO在JDK中被封裝在了一個新的類中,我們先來寫一個例子,這個例子實現了使用單線程來處理多連接。

 
/**
 * NIO Server Demo
 *
 * @author endless
 * @create 2020-03-23
 */
public class NioServerDemo {

  // 保存客戶端連接
  static List<SocketChannel> channelList = new ArrayList<>();

  public static void main(String[] args) throws IOException, InterruptedException {

    // 創建NIO ServerSocketChannel
    ServerSocketChannel serverSocket = ServerSocketChannel.open();
    serverSocket.bind(new InetSocketAddress(9998));
    // 設置ServerSocketChannel為非阻塞
    serverSocket.configureBlocking(false);
    System.out.println("服務啟動成功");

    while (true) {
      SocketChannel socketChannel = serverSocket.accept();
      if (socketChannel != null) { // 如果有客戶端進行連接
        System.out.println("連接成功");
        // 設置SocketChannel為非阻塞
        socketChannel.configureBlocking(false);
        // 保存客戶端連接在List中
        channelList.add(socketChannel);
      }
      // 遍歷連接進行數據讀取
      Iterator<SocketChannel> iterator = channelList.iterator();
      while (iterator.hasNext()) {
        SocketChannel o = iterator.next();
        ByteBuffer byteBuffer = ByteBuffer.allocate(128);
        int read = o.read(byteBuffer);
        // 如果有數據,把數據打印出來
        if (read > 0) {
          System.out.println("接收到消息:" + new String(byteBuffer.array()));
        } else if (read == -1) { // 如果客戶端斷開,把socket從集合中去掉
          iterator.remove();
          System.out.println("客戶端斷開連接");
        }
      }
    }
  }
}

客戶端可以復用之前的BIO客戶端

運行NIOServer,Server啟動完畢后運行兩個Client,各發送一條消息進行測試

淺談NIO和Epoll的實現原理

 

淺談NIO和Epoll的實現原理

 

淺談NIO和Epoll的實現原理

控制台顯示兩個連接成功,並且接收到了來自兩個客戶端的消息,表明Server可以使用單線程處理並發連接,這個Api的原理是什么呢?我們來進一步探究一下。

我們沿着源碼一路往下找,會找到一個無法下載源碼的文件

淺談NIO和Epoll的實現原理

  這里可以看得出,在Windows系統中,編譯后的代碼顯示直接返回了WindowsSelectorProvider對象,很顯然,這個對象是在和windows系統核心中的select方法交互,但是Linux中是不是也是這樣呢,我們需要下載一個Linux版本的OpenJDK源碼來探究一下。下載

OpenJDK源碼

  下載后解壓,Linux源碼分布在\openjdk\jdk\src\share和\openjdk\jdk\src\solaris目錄中。

淺談NIO和Epoll的實現原理

 

  在上圖中可以看出,JDK在不同的系統中采用了不同的實現方案,這里使用的是EPollSelectorProvider,說明在Linux中,使用的是EPoll來實現的。

  我們先來看看serverSocket.configureBlocking(false);到底是如何工作的,沿着源碼往下找,發現一個本地方法。

淺談NIO和Epoll的實現原理

 

  這個本地方法的源碼可以在OpenJDK中找到,如下圖

淺談NIO和Epoll的實現原理

 

  上圖紅框中的函數就是這個本地方法調用的底層C語言的方法,前面的前綴是根據JNI調用規則添加的,我們知道,在C當中是可以直接調用操作系統的Api的,這個方法調用了fcntl這個命令,把傳進來的blocking參數設置到了文件描述符上(文件描述符可以看作是一個對象,Linux中一切皆文件,類似於高級語言中的一切皆對象,任何的數據流轉都要通過一個文件描述符來操作)。

接着看看serverSocket.accept()是如何實現

淺談NIO和Epoll的實現原理

 

上圖可以看到,accept方法調用了一個native方法accept0

淺談NIO和Epoll的實現原理

 

這個accept0方法的描述簡單翻譯一下,就是接受一個新的連接,把給定的文件描述符引用設置為新的Socket,並將isaa[0]設置為套接字的遠程地址,成功返回1,不成功返回IOStatus.UNAVAILABLE or IOStatus.INTERRUPTED。

繼續探究一下本地方法的源碼

淺談NIO和Epoll的實現原理

 

這里調用了操作系統的accept方法,想知道這個方法文檔的同學可以在Linux環境中使用man命令來查看

淺談NIO和Epoll的實現原理

 

man命令可以查看Linux詳盡的文檔,2表示第二章:系統調用指令,最后加上你想查的指令方法即可

淺談NIO和Epoll的實現原理

 

  這里主要看一下返回值,accept返回一個非負數作為socket的描述符,如果失敗返回-1。通過這個文件描述符,Java就可以通過native方法調用操作系統的API來在Socket中進行數據的傳輸。我們的代碼用一個List保存了接收到的Socket,相當於保存了Socket的文件描述符,通過一個線程輪詢這些文件描述符即可實現數據通信和處理新連接,這樣就節約了大量的線程資源,但是大家想一想這種模型還有什么缺陷。

……

  是的,如果連接數太多的話,還是會有大量的無效遍歷,例如10000個連接中只有1000個連接時有數據的,但是由於其他9000個連接並沒有斷開,我們還是要每次輪詢都遍歷一萬次,相當於有十分之九的遍歷都是無效的,這顯然不是一個讓人很滿意的狀態。

  總結:NIO的非阻塞是由操作系統來完成的,SocketChannel與文件描述符一一對應,通過遍歷文件描述符來讀取數據。

什么是多路復用器

  上面的例子還不是Java NIO的完全體,僅僅是將原來的同步阻塞IO優化成了同步非阻塞IO,既然還是同步的,就意味着我們每次遍

歷,還是需要對每個Socket進行一次read操作來檢查是不是有數據過來,都會調用系統內核的read指令,只不過是把阻塞變成了非阻

塞,如果無用連接很多的話,那么絕大部分的read指令都是無意義的,這就會占用很多的CPU時間。

  Linux有select、poll和epoll三個解決方案來實現多路復用,其中的select和poll,有點類似於上面的NIOServerDemo程序,他會把

所有的文件描述符記錄在一個數組中,通過在系統內核中遍歷來篩選出有數據過來的Socket,只不過是從應用程序中遍歷改成了在內核中

遍歷,本質還是一樣的。

  Epoll則使用了事件機制,在復用器中注冊了一個回調事件,當Socket中有數據過來的時候調用,通知用戶處理信息,這樣就不需要對

全部的文件描述符進行輪訓了,這就是Epoll對NIO進行的改進。

  我們來探究一下在Java中是如何用Epoll實現NIO事件機制的,先對上面的NIOServerDemo再進行改進。

/**
 * NIO Selector Server Demo
 *
 * @author  endless
 * @create 2020-03-23
 */
public class NioSelectorServerDemo {

  public static void main(String[] args) throws IOException, InterruptedException {

    // 創建NIO ServerSocketChannel
    ServerSocketChannel serverSocket = ServerSocketChannel.open();
    serverSocket.socket().bind(new InetSocketAddress(9998));
    // 設置ServerSocketChannel為非阻塞
    serverSocket.configureBlocking(false);
    // 打開Selector處理Channel,即創建epoll
    Selector selector = Selector.open();
    // 將ServerSocket注冊到selector用來接收連接
    serverSocket.register(selector, SelectionKey.OP_ACCEPT);
    System.out.println("服務啟動成功");

    while (true) {

      // 阻塞等待需要處理的事件發生
      selector.select();

      // 獲取selector中注冊的全部事件的 SelectionKey 實例
      Set<SelectionKey> selectionKeys = selector.selectedKeys();
      Iterator<SelectionKey> iterator = selectionKeys.iterator();

      // 遍歷SelectionKey對事件進行處理
      while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove();
        // 如果是OP_ACCEPT事件,則進行連接獲取和事件注冊
        if (key.isAcceptable()) {
          ServerSocketChannel server = (ServerSocketChannel) key.channel();
          SocketChannel socketChannel = server.accept();
          socketChannel.configureBlocking(false);
          // 這里只注冊了讀事件,如果需要給客戶端發送數據可以注冊寫事件
          socketChannel.register(selector, SelectionKey.OP_READ);
          System.out.println("客戶端連接成功");
        }

        // 如果是OP_READ事件,則進行讀取和打印
        if (key.isReadable()) {
          SocketChannel socketChannel = (SocketChannel) key.channel();
          ByteBuffer byteBuffer = ByteBuffer.allocate(128);
          int read = socketChannel.read(byteBuffer);
          // 如果有數據,把數據打印出來
          if (read > 0) {
            System.out.println("接收到消息:" + new String(byteBuffer.array()));
          } else if (read == -1) { // 如果客戶端斷開連接,關閉Socket
            System.out.println("客戶端斷開連接");
            socketChannel.close();
          }
        }
      }

    }

  }

}

 

 

這段代碼需要關注的點有以下幾個方法:

  1. Selector.open()
  2. socketChannel.register()
  3. selector.select()

接下來就來看看這三個方法究竟做了什么。

Select.open()

首先調用了SelectorProvider的openSelector()方法,這個方法返回一個EPollSelectorImpl實例

EPollSelectorProvider.java

    public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}

EPollSelectorImpl的構造方法中new了一個EPollArrayWrapper實例

EPollSelectorImpl.java

    EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
// 創建管道,用long類型返回管道的兩個文件描述符。讀端以高32位的形式返回,寫端以低32位的形式返回。
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
// 創建Epoll文件描述符,並且創建映射數組記錄事件
pollWrapper = new EPollArrayWrapper();
// 初始化中斷文件描述符,把新創建的Epoll注冊到管道的讀文件描述符上
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
}

EPollArrayWrapper的構造方法中創建了Epoll實例

EPollArrayWrapper.java

    EPollArrayWrapper() throws IOException {
// 創建了Epoll實例,並將它的事件數組地址記錄下來方便操作
epfd = epollCreate();

// the epoll_event array passed to epoll_wait
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();

// eventHigh needed when using file descriptors > 64k
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}

...

private native int epollCreate();

Native方法epollCreate()的源碼,調用了內核的epoll_create指令,創建並獲取了一個文件描述符。

epoll有三個指令,epoll_create、epoll_ctl、epoll_wait,都可以在Linux環境中使用man命令來查看詳細文檔。

EPollArrayWrapper.c

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
/*
* epoll_create expects a size as a hint to the kernel about how to
* dimension internal structures. We can't predict the size in advance.
*/
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
socketChannel.register()

注冊事件時其實並沒有對Epoll進行事件添加,而是在只是把它加入了待添加的容器。

public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
// 判斷是否存在事件的key,存在的話就更新,不存在就新建
// 最終都會走到 EPollArrayWrapper#setUpdateEvents方法
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}

AbstractSelector.register調用實現的子類方法implRegister

EPollSelectorImpl.java

    protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
pollWrapper.add(fd);
keys.add(ski);
}

調用setUpdateEvents寫入待更新事件容器

EPollArrayWrapper.java


/**
* Add a file descriptor
*/
void add(int fd) {
// 強制初始update events為0,因為他可能會被之前注冊kill掉
synchronized (updateLock) {
assert !registered.get(fd);
setUpdateEvents(fd, (byte)0, true);
}
}

/**
* 設置文件描述符的待更新事件到 eventsHigh 中。
*/
private void setUpdateEvents(int fd, byte events, boolean force) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
selector.select()

調用selector.select()時,會將剛才注冊的待更新事件綁定到文件描述符上,然后進入阻塞狀態等待事件回調。

EPollSelectorImpl.java

    protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}

EPollArrayWrapper.java


int poll(long timeout) throws IOException {
// 將上一步待更新的時間進行注冊
updateRegistrations();
// 進入阻塞狀態,等待事件發生
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}


/**
* Update the pending registrations.
*/
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
int fd = updateDescriptors[j];
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;

if (events != KILLED) {
if (isRegistered) {
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
// 調用native方法進行事件綁定
epollCtl(epfd, opcode, fd, events);
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
}

總結:到此為止,我們已經粗略的把整個NIO調用流程都梳理了一遍,Java調用了操作系統的Api來創建Socket,獲取到Socket的文件描述符,再創建一個Selector對象,對應操作系統的EPoll描述符,將獲取到的Socket連接的文件描述符的事件綁定到Selector對應的EPoll文件描述符上,進行事件的異步通知,這樣就實現了使用一條線程,並且不需要太多的無效的遍歷,將事件處理交給了操作系統內核,大大提高了效率。

EPoll指令詳解

epoll_create

int epoll_create(int size);

創建一個epoll實例,並返回一個非負數作為文件描述符,用於對epoll接口的所有后續調用。參數size代表可能會容納size個描述符,但size不是一個最大值,只是提示操作系統它的數量級,現在這個參數基本上已經棄用了。

epoll_ctl

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

使用文件描述符epfd引用的epoll實例,對目標文件描述符fd執行op操作。

參數epfd表示epoll對應的文件描述符,參數fd表示socket對應的文件描述符。

參數op有以下幾個值:EPOLL_CTL_ADD:注冊新的fd到epfd中,並關聯時間eventEPOLL_CTL_MOD:修改已經注冊的fd的監聽事件;EPOLL_CTL_DEL:從epfd中移除fd,並且忽略掉綁定的event,這時event可以為null;

參數event是一個結構體。


struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

events有很多可選值,這里只舉例最常見的幾個:

EPOLLIN :表示對應的文件描述符是可讀的(close也會發送消息);EPOLLOUT:表示對應的文件描述符是可寫的;EPOLLERR:表示對應的文件描述符發生了錯誤;

成功則返回0,失敗返回-1

epoll_wait

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

等待文件描述符epfd上的事件。

epfd是Epoll對應的文件描述符,events表示調用者所有可用事件的集合,maxevents表示最多等到多少個事件就返回,timeout是超時時間。

操作系統的IO還涉及到零拷貝和直接內存兩部分的知識,也是操作系統提高性能的利器,將在以后的文章中進行探討。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM