Netty源碼—三、select


NioEventLoop功能

前面channel已經准備好了,可以接收來自客戶端的請求了,NioEventLoop作為一個線程池,只有一個線程,但是有一個queue存儲了待執行的task,由於只有一個線程,所以run方法是死循環,除非線程池shutdown。

這個run方法的主要作用:

  1. 執行selector.select,監聽IO事件,並處理IO事件
  2. 由於NioEventLoop兼有線程池的功能,執行線程池中任務
// io.netty.channel.nio.NioEventLoop#run
protected void run() {
    // loop,循環處理IO事件或者處理線程池中的task任務
    for (;;) {
        try {
            // 判斷接下來是是執行select還是直接處理IO事件和執行隊列中的task
            // hasTask判斷當前線程的queue中是否還有待執行的任務
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    // NioEventLoop默認不會有這種狀態
                    continue;
                case SelectStrategy.SELECT:
                    // 說明當前queue中沒有task待執行
                    select(wakenUp.getAndSet(false));
					// 喚醒epoll_wait
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            // 這個比例是處理IO事件所需的時間和花費在處理task時間的比例
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                // 如果比例是100,表示每次都處理完IO事件后,執行所有的task
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    // 保證能執行所有的task
                    runAllTasks();
                }
            } else {
                // 記錄處理IO事件開始的時間
                final long ioStartTime = System.nanoTime();
                try {
                    // 處理IO事件
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    // 當前時間減去處理IO事件開始的時間就是處理IO事件花費的時間
                    final long ioTime = System.nanoTime() - ioStartTime;
                    // 執行task的時間taskTime就是ioTime * (100 - ioRatio) / ioRatio
                    // 如果taskTime時間到了還有未執行的task,runAllTasks也會返回
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            // 如果已經shutdown則關閉所有資源
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

// io.netty.channel.DefaultSelectStrategy#calculateStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    // 如果還有task待執行則先執行selectNow,selectNow是立即返回的,不是阻塞等待
    // 如果沒有待執行的task則執行select,有可能是阻塞等待IO事件
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

// io.netty.channel.nio.NioEventLoop#selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        // epoll_wait的參數timeout可以指定超時時間,selectNow傳入的參數是0,也就是不超時等待立即返回
        return selectNow();
    }
};

select過程

epoll模型中最重要的一部分來了,Java把epoll_wait封裝成了一個selector,可以理解為多路復用選擇器,所以在調用selector.select過程中最后都是通過epoll_wait實現的,下面先看看SelectorImpl的兩個select方法

public int select(long timeout)
    throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
    // timeout = 0,傳遞給epoll_wait的參數是-1,表示阻塞等待
    // timeout > 0,表示超時等待timeout時間后返回
    return lockAndDoSelect((timeout == 0) ? -1 : timeout);
}

// 調用epoll_wait阻塞等待
public int select() throws IOException {
    return select(0);
}

// 調用epoll_wait立即返回
public int selectNow() throws IOException {
    return lockAndDoSelect(0);
}

上面三個select方法都調用了lockAndDoSelect,只是timeout參數不同,其實最后就是調用epoll_wait參數不同,epoll_wait有一個timeout參數,表示超時時間

  • -1:阻塞
  • 0:立即返回,非阻塞
  • 大於0:指定微秒
// sun.nio.ch.EPollSelectorImpl#doSelect
protected int doSelect(long timeout) throws IOException {
    if (closed)
        throw new ClosedSelectorException();
    // 省略中間代碼...
    // 開始poll,這里的pollWrapper是EPollArrayWrapper
    pollWrapper.poll(timeout);
    // 省略中間代碼...

    int numKeysUpdated = updateSelectedKeys();
    // 如果epoll_wait是因為wakeup pipe解除阻塞返回
    if (pollWrapper.interrupted()) {
        // Clear the wakeup pipe
        // 清除中斷文件描述符接收到的IO事件
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
        synchronized (interruptLock) {
            pollWrapper.clearInterrupted();
            // 讀取完管道中的數據
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}

int poll(long timeout) throws IOException {
    // 這里會向epoll注冊每個socket需要監聽的事件
    updateRegistrations();
    // 調用epollWait,這是一個native方法
    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
    for (int i=0; i<updated; i++) {
        if (getDescriptor(i) == incomingInterruptFD) {
            interruptedIndex = i;
            interrupted = true;
            break;
        }
    }
    return updated;
}

看看epollWait的native實現

// jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
                                            jlong address, jint numfds,
                                            jlong timeout, jint epfd)
{
    struct epoll_event *events = jlong_to_ptr(address);
    int res;

    if (timeout <= 0) {           /* Indefinite or no wait */
        // epoll_wait參數的含義是
        // epfd,創建的epoll句柄
        // events是一個結構體指針,如果有IO事件發生,linux會將事件放在這個結構體中返回
        // numfds是上面指針指向的結構體的個數,也就是最多能接收的IO事件的個數
        // timeout是超時時間
        RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
    } else {                      /* Bounded wait; bounded restarts */
        res = iepoll(epfd, events, numfds, timeout);
    }

    if (res < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
    }
    return res;
}

從native實現上可以看出最終調用了epoll_wait(在timeout >= 0),接着看看epoll_wait的里一個參數events的來源。在上一篇文章里面我們說了channel注冊的時候會將自己需要監聽的事件類型保存在sun.nio.ch.EPollArrayWrapper#eventsLow中,而上面EPollArrayWrapper#poll中又調用了updateSelectedKeys來注冊每個socket監聽的事件

// sun.nio.ch.EPollArrayWrapper#getUpdateEvents
// 獲取需要監聽的文件描述符對應的事件
private byte getUpdateEvents(int fd) {
    // 如果沒有超出預定義的數組大小則直接從數組中獲取
    if (fd < MAX_UPDATE_ARRAY_SIZE) {
        return eventsLow[fd];
    } else {
        // 超出預訂單數組大小的部分從map中獲取
        Byte result = eventsHigh.get(Integer.valueOf(fd));
        // result should never be null
        return result.byteValue();
    }
}

// sun.nio.ch.EPollArrayWrapper#updateRegistrations
// 這個方法是在epoll_wait前把需要監聽的文件描述符及其需要監聽的事件注冊到epoll上
private void updateRegistrations() {
    synchronized (updateLock) {
        int j = 0;
        // 每調用一次setInterest,updateCount加1
        while (j < updateCount) {
            // 需要監聽的文件描述符
            int fd = updateDescriptors[j];
            // 需要監聽的事件,比如channel注冊之后的事件是
            short events = getUpdateEvents(fd);
            boolean isRegistered = registered.get(fd);
            int opcode = 0;

            if (events != KILLED) {
                if (isRegistered) {
                    opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                } else {
                    opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
                }
                if (opcode != 0) {
                    // 調用epollCtl來add、update或者delete對應文件描述符堅挺的事件
                    epollCtl(epfd, opcode, fd, events);
                    if (opcode == EPOLL_CTL_ADD) {
                        registered.set(fd);
                    } else if (opcode == EPOLL_CTL_DEL) {
                        registered.clear(fd);
                    }
                }
            }
            j++;
        }
        updateCount = 0;
    }
}


上面epollCtl又是一個native方法

// jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
                                           jint opcode, jint fd, jint events)
{
    struct epoll_event event;
    int res;

    event.events = events;
    event.data.fd = fd;

    // opcode,EPOLL_CTL_ADD(注冊新的fd到epfd), EPOLL_CTL_MOD(修改已經注冊的fd的監聽事件), EPOLL_CTL_DEL(從epfd刪除一個fd);
    // fd,需要監聽的socket對應的文件描述符
    // event,該文件描述符監聽的事件
    RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
	// 省略中間代碼...
}

關於select過程中的中斷說明

這里的中斷是什么?

這里中斷並不是操作系統層面的中斷,只是中斷epoll_wait。由於epoll_wait可能會阻塞等待IO事件(timeout = -1),這里的中斷就是指中斷epoll_wait,即時返回。也就是讓select即時返回

這里的中斷是怎么實現的?

由於epoll_wait處在等待的情況下的時候,如果有文件描述符上有事件發生,epoll_wait就會返回,所以基本思路就是在epoll監控的文件描述符上產生IO事件,具體實現原理就是使用管道創建兩個文件描述符fd0,fd1(EPollSelectorImpl),fd0用來作為讀描述符,fd1作為寫描述符,然后將讀描述符注冊到epoll上,如果向fd1寫內容,epoll發現fd0有IO事件就會返回,起到了讓epoll_wait及時返回的作用。

什么時候會中斷

  1. 調用Thread.interrupt()
  2. selector關閉的時候
  3. 可以直接調用sun.nio.ch.EPollSelectorImpl#wakeup,這是一個public方法

中斷的方法是sun.nio.ch.EPollArrayWrapper#interrupt(),這個方法會調用一個native方法

// jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
{
    int fakebuf[1];
    fakebuf[0] = 1;
    // 傳入的文件描述符是sun.nio.ch.EPollArrayWrapper#outgoingInterruptFD,也就是創建的pipe的寫文件描述符fd1,向pipe的fd1寫入一個字節的1
    if (write(fd, fakebuf, 1) < 0) {
        JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");
    }
}

這個時候epoll_wait就收到中斷文件描述符sun.nio.ch.EPollArrayWrapper#incomingInterruptFD,也就是創建的pipe的讀文件描述符上有IO事件產生,epoll_wait可以返回。

所以調用到方法EPollArrayWrapper#interrupt()就可以中斷文件描述符,而方法EPollSelectorImpl#wakeup調用了EPollArrayWrapper#interrupt()。

那么為什么調用Thread.interrupt()的時候也會中斷epoll_wait呢?

因為在Thread.interrupt()方法中

public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            // 上面設置完中斷標志位后,會調用當前線程的blocker的interrupt方法
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

而在sun.nio.ch.EPollSelectorImpl#doSelect方法中,開始poll之前會調用begin方法

protected final void begin() {
    if (interruptor == null) {
		// 新建一個interruptor
        interruptor = new Interruptible() {
            public void interrupt(Thread ignore) {
                // 此時Thread.interrupt()中的blocker就是這個匿名內部類,也就是調用的這個interrupt方法
                AbstractSelector.this.wakeup();
            }};
    }
    // 設置當前線程的interruptor
    AbstractInterruptibleChannel.blockedOn(interruptor);
    Thread me = Thread.currentThread();
    if (me.isInterrupted())
        interruptor.interrupt(me);
}

所以Thread.interrupt()會調用到EPollSelectorImpl#wakeup方法,也就可以起到中斷select的作用。

什么時候清除中斷標志?

可以不止一次的中斷select,為了實現這個功能,每次在中斷之后斗湖清除相關的中斷標志。在sun.nio.ch.EPollSelectorImpl#doSelect方法中pollWrapper.poll完成之后

int poll(long timeout) throws IOException {
    updateRegistrations();
    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
    for (int i=0; i<updated; i++) {
        // epollWait返回之后會判斷有沒有中斷文件描述符
        if (getDescriptor(i) == incomingInterruptFD) {
            // 設置中斷的文件描述符處於返回的pollArray的index
            interruptedIndex = i;
            interrupted = true;
            break;
        }
    }
    return updated;
}
protected int doSelect(long timeout) throws IOException {
    // 省略中間代碼...
        pollWrapper.poll(timeout);
	// // 省略中間代碼...
    // 如果
    if (pollWrapper.interrupted()) {
        // Clear the wakeup pipe
        // 將返回的中斷文件描述符的IO事件清空,也就是用戶不會讀取到用做中斷的文件描述符
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
        synchronized (interruptLock) {
            // sun.nio.ch.EPollArrayWrapper#interrupted設置為false,可再次中斷
            pollWrapper.clearInterrupted();
            // 讀取用以中斷的文件描述符上所有的數據
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}

總結

到目前為止已經看清了Java對應linux中epoll相關api的封裝

// 創建epoll文件描述符
// 對應到Java就是創建selector
int epoll_create(int size);

// 打開一個網絡通訊端口,也就是創建一個socket,創建返回一個文件描述符
// 對應到Java就是創建一個socketChannel
int socket(int domain, int type, int protocol);

// 將socket對應的文件描述符和ip:port綁定在一起
// 對應於Java中綁定ip:port
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

// 表明socket對應的文件描述符處於監聽狀態,並且最多允許有backlog個客戶端處於連接等待狀態
// 對應於Java中bind中調用listen方法
int listen(int sockfd, int backlog);

// 控制某個文件描述符上的事件:add、update、delete事件
// 對應於Java中調用select過程中添加每個channel關注的事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

// 等待監控的所有描述符有事件發生
// 對應於Java中select的時候等待有IO事件發生
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM