NioEventLoop功能
前面channel已經准備好了,可以接收來自客戶端的請求了,NioEventLoop作為一個線程池,只有一個線程,但是有一個queue存儲了待執行的task,由於只有一個線程,所以run方法是死循環,除非線程池shutdown。
這個run方法的主要作用:
- 執行selector.select,監聽IO事件,並處理IO事件
- 由於NioEventLoop兼有線程池的功能,執行線程池中任務
// io.netty.channel.nio.NioEventLoop#run
protected void run() {
// loop,循環處理IO事件或者處理線程池中的task任務
for (;;) {
try {
// 判斷接下來是是執行select還是直接處理IO事件和執行隊列中的task
// hasTask判斷當前線程的queue中是否還有待執行的任務
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
// NioEventLoop默認不會有這種狀態
continue;
case SelectStrategy.SELECT:
// 說明當前queue中沒有task待執行
select(wakenUp.getAndSet(false));
// 喚醒epoll_wait
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
// 這個比例是處理IO事件所需的時間和花費在處理task時間的比例
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
// 如果比例是100,表示每次都處理完IO事件后,執行所有的task
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// 保證能執行所有的task
runAllTasks();
}
} else {
// 記錄處理IO事件開始的時間
final long ioStartTime = System.nanoTime();
try {
// 處理IO事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// 當前時間減去處理IO事件開始的時間就是處理IO事件花費的時間
final long ioTime = System.nanoTime() - ioStartTime;
// 執行task的時間taskTime就是ioTime * (100 - ioRatio) / ioRatio
// 如果taskTime時間到了還有未執行的task,runAllTasks也會返回
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
// 如果已經shutdown則關閉所有資源
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
// io.netty.channel.DefaultSelectStrategy#calculateStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
// 如果還有task待執行則先執行selectNow,selectNow是立即返回的,不是阻塞等待
// 如果沒有待執行的task則執行select,有可能是阻塞等待IO事件
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
// io.netty.channel.nio.NioEventLoop#selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
// epoll_wait的參數timeout可以指定超時時間,selectNow傳入的參數是0,也就是不超時等待立即返回
return selectNow();
}
};
select過程
epoll模型中最重要的一部分來了,Java把epoll_wait封裝成了一個selector,可以理解為多路復用選擇器,所以在調用selector.select過程中最后都是通過epoll_wait實現的,下面先看看SelectorImpl的兩個select方法
public int select(long timeout)
throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
// timeout = 0,傳遞給epoll_wait的參數是-1,表示阻塞等待
// timeout > 0,表示超時等待timeout時間后返回
return lockAndDoSelect((timeout == 0) ? -1 : timeout);
}
// 調用epoll_wait阻塞等待
public int select() throws IOException {
return select(0);
}
// 調用epoll_wait立即返回
public int selectNow() throws IOException {
return lockAndDoSelect(0);
}
上面三個select方法都調用了lockAndDoSelect,只是timeout參數不同,其實最后就是調用epoll_wait參數不同,epoll_wait有一個timeout參數,表示超時時間
- -1:阻塞
- 0:立即返回,非阻塞
- 大於0:指定微秒
// sun.nio.ch.EPollSelectorImpl#doSelect
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
// 省略中間代碼...
// 開始poll,這里的pollWrapper是EPollArrayWrapper
pollWrapper.poll(timeout);
// 省略中間代碼...
int numKeysUpdated = updateSelectedKeys();
// 如果epoll_wait是因為wakeup pipe解除阻塞返回
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
// 清除中斷文件描述符接收到的IO事件
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
// 讀取完管道中的數據
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
int poll(long timeout) throws IOException {
// 這里會向epoll注冊每個socket需要監聽的事件
updateRegistrations();
// 調用epollWait,這是一個native方法
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
看看epollWait的native實現
// jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
jlong address, jint numfds,
jlong timeout, jint epfd)
{
struct epoll_event *events = jlong_to_ptr(address);
int res;
if (timeout <= 0) { /* Indefinite or no wait */
// epoll_wait參數的含義是
// epfd,創建的epoll句柄
// events是一個結構體指針,如果有IO事件發生,linux會將事件放在這個結構體中返回
// numfds是上面指針指向的結構體的個數,也就是最多能接收的IO事件的個數
// timeout是超時時間
RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
} else { /* Bounded wait; bounded restarts */
res = iepoll(epfd, events, numfds, timeout);
}
if (res < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
}
return res;
}
從native實現上可以看出最終調用了epoll_wait(在timeout >= 0),接着看看epoll_wait的里一個參數events的來源。在上一篇文章里面我們說了channel注冊的時候會將自己需要監聽的事件類型保存在sun.nio.ch.EPollArrayWrapper#eventsLow中,而上面EPollArrayWrapper#poll中又調用了updateSelectedKeys來注冊每個socket監聽的事件
// sun.nio.ch.EPollArrayWrapper#getUpdateEvents
// 獲取需要監聽的文件描述符對應的事件
private byte getUpdateEvents(int fd) {
// 如果沒有超出預定義的數組大小則直接從數組中獲取
if (fd < MAX_UPDATE_ARRAY_SIZE) {
return eventsLow[fd];
} else {
// 超出預訂單數組大小的部分從map中獲取
Byte result = eventsHigh.get(Integer.valueOf(fd));
// result should never be null
return result.byteValue();
}
}
// sun.nio.ch.EPollArrayWrapper#updateRegistrations
// 這個方法是在epoll_wait前把需要監聽的文件描述符及其需要監聽的事件注冊到epoll上
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
// 每調用一次setInterest,updateCount加1
while (j < updateCount) {
// 需要監聽的文件描述符
int fd = updateDescriptors[j];
// 需要監聽的事件,比如channel注冊之后的事件是
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
if (isRegistered) {
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
// 調用epollCtl來add、update或者delete對應文件描述符堅挺的事件
epollCtl(epfd, opcode, fd, events);
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
}
上面epollCtl又是一個native方法
// jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
jint opcode, jint fd, jint events)
{
struct epoll_event event;
int res;
event.events = events;
event.data.fd = fd;
// opcode,EPOLL_CTL_ADD(注冊新的fd到epfd), EPOLL_CTL_MOD(修改已經注冊的fd的監聽事件), EPOLL_CTL_DEL(從epfd刪除一個fd);
// fd,需要監聽的socket對應的文件描述符
// event,該文件描述符監聽的事件
RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
// 省略中間代碼...
}
關於select過程中的中斷說明
這里的中斷是什么?
這里中斷並不是操作系統層面的中斷,只是中斷epoll_wait。由於epoll_wait可能會阻塞等待IO事件(timeout = -1),這里的中斷就是指中斷epoll_wait,即時返回。也就是讓select即時返回
這里的中斷是怎么實現的?
由於epoll_wait處在等待的情況下的時候,如果有文件描述符上有事件發生,epoll_wait就會返回,所以基本思路就是在epoll監控的文件描述符上產生IO事件,具體實現原理就是使用管道創建兩個文件描述符fd0,fd1(EPollSelectorImpl),fd0用來作為讀描述符,fd1作為寫描述符,然后將讀描述符注冊到epoll上,如果向fd1寫內容,epoll發現fd0有IO事件就會返回,起到了讓epoll_wait及時返回的作用。
什么時候會中斷
- 調用Thread.interrupt()
- selector關閉的時候
- 可以直接調用sun.nio.ch.EPollSelectorImpl#wakeup,這是一個public方法
中斷的方法是sun.nio.ch.EPollArrayWrapper#interrupt(),這個方法會調用一個native方法
// jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
{
int fakebuf[1];
fakebuf[0] = 1;
// 傳入的文件描述符是sun.nio.ch.EPollArrayWrapper#outgoingInterruptFD,也就是創建的pipe的寫文件描述符fd1,向pipe的fd1寫入一個字節的1
if (write(fd, fakebuf, 1) < 0) {
JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");
}
}
這個時候epoll_wait就收到中斷文件描述符sun.nio.ch.EPollArrayWrapper#incomingInterruptFD,也就是創建的pipe的讀文件描述符上有IO事件產生,epoll_wait可以返回。
所以調用到方法EPollArrayWrapper#interrupt()就可以中斷文件描述符,而方法EPollSelectorImpl#wakeup調用了EPollArrayWrapper#interrupt()。
那么為什么調用Thread.interrupt()的時候也會中斷epoll_wait呢?
因為在Thread.interrupt()方法中
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
// 上面設置完中斷標志位后,會調用當前線程的blocker的interrupt方法
b.interrupt(this);
return;
}
}
interrupt0();
}
而在sun.nio.ch.EPollSelectorImpl#doSelect方法中,開始poll之前會調用begin方法
protected final void begin() {
if (interruptor == null) {
// 新建一個interruptor
interruptor = new Interruptible() {
public void interrupt(Thread ignore) {
// 此時Thread.interrupt()中的blocker就是這個匿名內部類,也就是調用的這個interrupt方法
AbstractSelector.this.wakeup();
}};
}
// 設置當前線程的interruptor
AbstractInterruptibleChannel.blockedOn(interruptor);
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
所以Thread.interrupt()會調用到EPollSelectorImpl#wakeup方法,也就可以起到中斷select的作用。
什么時候清除中斷標志?
可以不止一次的中斷select,為了實現這個功能,每次在中斷之后斗湖清除相關的中斷標志。在sun.nio.ch.EPollSelectorImpl#doSelect方法中pollWrapper.poll完成之后
int poll(long timeout) throws IOException {
updateRegistrations();
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
// epollWait返回之后會判斷有沒有中斷文件描述符
if (getDescriptor(i) == incomingInterruptFD) {
// 設置中斷的文件描述符處於返回的pollArray的index
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
protected int doSelect(long timeout) throws IOException {
// 省略中間代碼...
pollWrapper.poll(timeout);
// // 省略中間代碼...
// 如果
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
// 將返回的中斷文件描述符的IO事件清空,也就是用戶不會讀取到用做中斷的文件描述符
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
// sun.nio.ch.EPollArrayWrapper#interrupted設置為false,可再次中斷
pollWrapper.clearInterrupted();
// 讀取用以中斷的文件描述符上所有的數據
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
總結
到目前為止已經看清了Java對應linux中epoll相關api的封裝
// 創建epoll文件描述符
// 對應到Java就是創建selector
int epoll_create(int size);
// 打開一個網絡通訊端口,也就是創建一個socket,創建返回一個文件描述符
// 對應到Java就是創建一個socketChannel
int socket(int domain, int type, int protocol);
// 將socket對應的文件描述符和ip:port綁定在一起
// 對應於Java中綁定ip:port
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
// 表明socket對應的文件描述符處於監聽狀態,並且最多允許有backlog個客戶端處於連接等待狀態
// 對應於Java中bind中調用listen方法
int listen(int sockfd, int backlog);
// 控制某個文件描述符上的事件:add、update、delete事件
// 對應於Java中調用select過程中添加每個channel關注的事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 等待監控的所有描述符有事件發生
// 對應於Java中select的時候等待有IO事件發生
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
