Java NIO——Selector機制源碼分析---轉


一直不明白pipe是如何喚醒selector的,所以又去看了jdk的源碼(openjdk下載),整理了如下:

以Java nio自帶demo : OperationServer.java   OperationClient.java(見附件)

其中server端的核心代碼:

public void initSelector() {
        try {
            selector = SelectorProvider.provider().openSelector();
            this.serverChannel1 = ServerSocketChannel.open();
            serverChannel1.configureBlocking(false);
            InetSocketAddress isa = new InetSocketAddress("localhost", this.port1);
            serverChannel1.socket().bind(isa);
            serverChannel1.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
}

從頭開始,

先看看SelectorProvider.provider()做了什么:

public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }

其中provider = sun.nio.ch.DefaultSelectorProvider.create();會根據操作系統來返回不同的實現類,windows平台就返回WindowsSelectorProvider;

if (provider != nullreturn provider;

保證了整個server程序中只有一個WindowsSelectorProvider對象;

再看看WindowsSelectorProvider. openSelector():

public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
    }
new WindowsSelectorImpl(SelectorProvider)代碼:
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
        super(sp);
        pollWrapper = new PollArrayWrapper(INIT_CAP);
        wakeupPipe = Pipe.open();
        wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

        // Disable the Nagle algorithm so that the wakeup is more immediate
        SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
        (sink.sc).socket().setTcpNoDelay(true);
        wakeupSinkFd = ((SelChImpl)sink).getFDVal();

        pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
    }

其中Pipe.open()是關鍵,這個方法的調用過程是:

Java代碼  
public static Pipe open() throws IOException {
        return SelectorProvider.provider().openPipe();
}
SelectorProvider 中:
public Pipe openPipe() throws IOException {
        return new PipeImpl(this);
}

 

再看看怎么new PipeImpl()的:

Java代碼
PipeImpl(SelectorProvider sp) {
        long pipeFds = IOUtil.makePipe(true);
        int readFd = (int) (pipeFds >>> 32);
        int writeFd = (int) pipeFds;
        FileDescriptor sourcefd = new FileDescriptor();
        IOUtil.setfdVal(sourcefd, readFd);
        source = new SourceChannelImpl(sp, sourcefd);
        FileDescriptor sinkfd = new FileDescriptor();
        IOUtil.setfdVal(sinkfd, writeFd);
        sink = new SinkChannelImpl(sp, sinkfd);
 }

 

 

其中IOUtil.makePipe(true)是個native方法:

/**

     * Returns two file descriptors for a pipe encoded in a long.

     * The read end of the pipe is returned in the high 32 bits,

     * while the write end is returned in the low 32 bits.

     */

staticnativelong makePipe(boolean blocking);

具體實現:

 

JNIEXPORT jlong JNICALL
Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking)
{
    int fd[2];

    if (pipe(fd) < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");
        return 0;
    }
    if (blocking == JNI_FALSE) {
        if ((configureBlocking(fd[0], JNI_FALSE) < 0)
            || (configureBlocking(fd[1], JNI_FALSE) < 0)) {
            JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");
            close(fd[0]);
            close(fd[1]);
            return 0;
        }
    }
    return ((jlong) fd[0] << 32) | (jlong) fd[1];
}
static int
configureBlocking(int fd, jboolean blocking)
{
    int flags = fcntl(fd, F_GETFL);
    int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);

    return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);
}

 

正如這段注釋:

/**

     * Returns two file descriptors for a pipe encoded in a long.

     * The read end of the pipe is returned in the high 32 bits,

     * while the write end is returned in the low 32 bits.

     */

High32位存放的是通道read端的文件描述符FD(file descriptor),low 32 bits存放的是write端的文件描述符。所以取到makepipe()返回值后要做移位處理。

 

pollWrapper.addWakeupSocket(wakeupSourceFd, 0);

這行代碼把返回的pipe的write端的FD放在了pollWrapper中(后面會發現,這么做是為了實現selector的wakeup())

 

ServerSocketChannel.open()的實現:

public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
}
SelectorProvider:
public ServerSocketChannel openServerSocketChannel() throws IOException {
        return new ServerSocketChannelImpl(this);
}

 

可見創建的ServerSocketChannelImpl也有WindowsSelectorImpl的引用。

ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
        super(sp);
        this.fd =  Net.serverSocket(true);    //打開一個socket,返回FD
        this.fdVal = IOUtil.fdVal(fd);
        this.state = ST_INUSE;
}

 

然后通過serverChannel1.register(selector, SelectionKey.OP_ACCEPT);把selector和channel綁定在一起,也就是把new ServerSocketChannel時創建的FD與selector綁定在了一起。

到此,server端已啟動完成了,主要創建了以下對象:

WindowsSelectorProvider:單例

WindowsSelectorImpl中包含:

    pollWrapper:保存selector上注冊的FD,包括pipe的write端FD和ServerSocketChannel所用的FD

    wakeupPipe:通道(其實就是兩個FD,一個read,一個write)

 

再到Server 中的run():

selector.select();主要調用了WindowsSelectorImpl中的這個方法:

 

protected int doSelect(long timeout) throws IOException {
        if (channelArray == null)
            throw new ClosedSelectorException();
        this.timeout = timeout; // set selector timeout
        processDeregisterQueue();
        if (interruptTriggered) {
            resetWakeupSocket();
            return 0;
        }
        // Calculate number of helper threads needed for poll. If necessary
        // threads are created here and start waiting on startLock
        adjustThreadsCount();
        finishLock.reset(); // reset finishLock
        // Wakeup helper threads, waiting on startLock, so they start polling.
        // Redundant threads will exit here after wakeup.
        startLock.startThreads();
        // do polling in the main thread. Main thread is responsible for
        // first MAX_SELECTABLE_FDS entries in pollArray.
        try {
            begin();
            try {
                subSelector.poll();
            } catch (IOException e) {
                finishLock.setException(e); // Save this exception
            }
            // Main thread is out of poll(). Wakeup others and wait for them
            if (threads.size() > 0)
                finishLock.waitForHelperThreads();
          } finally {
              end();
          }
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        finishLock.checkForException();
        processDeregisterQueue();
        int updated = updateSelectedKeys();
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        resetWakeupSocket();
        return updated;
    }

 

 其中subSelector.poll()是核心,也就是輪訓pollWrapper中保存的FD;具體實現是調用native方法poll0:

private int poll() throws IOException{ // poll for the main thread
            return poll0(pollWrapper.pollArrayAddress,
                         Math.min(totalChannels, MAX_SELECTABLE_FDS),
                         readFds, writeFds, exceptFds, timeout);
        }
private native int poll0(long pollAddress, int numfds,
             int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
// These arrays will hold result of native select().
            // The first element of each array is the number of selected sockets.
        // Other elements are file descriptors of selected sockets.
        private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//保存發生read的FD
        private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1]; //保存發生write的FD
        private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1]; //保存發生except的FD

 

這個poll0()會監聽pollWrapper中的FD有沒有數據進出,這會造成IO阻塞,直到有數據讀寫事件發生。比如,由於pollWrapper中保存的也有ServerSocketChannel的FD,所以只要ClientSocket發一份數據到ServerSocket,那么poll0()就會返回;又由於pollWrapper中保存的也有pipe的write端的FD,所以只要pipe的write端向FD發一份數據,也會造成poll0()返回;如果這兩種情況都沒有發生,那么poll0()就一直阻塞,也就是selector.select()會一直阻塞;如果有任何一種情況發生,那么selector.select()就會返回,所有在OperationServer的run()里要用while (true) {,這樣就可以保證在selector接收到數據並處理完后繼續監聽poll();

這時再來看看WindowsSelectorImpl. Wakeup():

 

public Selector wakeup() {
        synchronized (interruptLock) {
            if (!interruptTriggered) {
                setWakeupSocket();
                interruptTriggered = true;
            }
        }
        return this;
    }
// Sets Windows wakeup socket to a signaled state.
    private void setWakeupSocket() {
        setWakeupSocket0(wakeupSinkFd);
    }
private native void setWakeupSocket0(int wakeupSinkFd);
JNIEXPORT void JNICALL
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
                                                jint scoutFd)
{
    /* Write one byte into the pipe */
    const char byte = 1;
    send(scoutFd, &byte, 1, 0);
}

可見wakeup()是通過pipe的write 端send(scoutFd, &byte, 1, 0),發生一個字節1,來喚醒poll()。所以在需要的時候就可以調用selector.wakeup()來喚醒selector。

原文:http://goon.iteye.com/blog/1775421

 

補充linux操作系統下的DefaultSelectorProvider的實現,可以看到,如果內核版本>=2.6則,具體的SelectorProvider為EPollSelectorProvider,否則為默認的PollSelectorProvider

//sun.nio.ch.DefaultSelectorProvider

public static SelectorProvider create() {
PrivilegedAction pa = new GetPropertyAction("os.name");
String osname = (String) AccessController.doPrivileged(pa);
    if ("SunOS".equals(osname)) {
        return new sun.nio.ch.DevPollSelectorProvider();
    }
 
    // use EPollSelectorProvider for Linux kernels >= 2.6
    if ("Linux".equals(osname)) {
        pa = new GetPropertyAction("os.version");
        String osversion = (String) AccessController.doPrivileged(pa);
        String[] vers = osversion.split("\\.", 0);
        if (vers.length >= 2) {
            try {
                int major = Integer.parseInt(vers[0]);
                int minor = Integer.parseInt(vers[1]);
                if (major > 2 || (major == 2 && minor >= 6)) {
                    return new sun.nio.ch.EPollSelectorProvider();
                }
            } catch (NumberFormatException x) {
                // format not recognized
            }
        }
    }
 
    return new sun.nio.ch.PollSelectorProvider();
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM