NIO-WindowsSelectorImpl源碼分析
目錄
NIO-概覽
NIO-Buffer
NIO-Channel
NIO-Channel接口分析
NIO-SocketChannel源碼分析
NIO-FileChannel源碼分析
NIO-Selector源碼分析
NIO-WindowsSelectorImpl源碼分析
NIO-EPollSelectorIpml源碼分析
前言
本來是想學習Netty的,但是Netty是一個NIO框架,因此在學習netty之前,還是先梳理一下NIO的知識。通過剖析源碼理解NIO的設計原理。
本系列文章針對的是JDK1.8.0.161的源碼。
上一篇文章對Selector
的功能和創建過程進行了分析,本篇對Windows下的WindowsSelectorImpl
源碼實現進行詳細講解。
初始化WindowsSelectorProvider
上一篇文章提到,若沒有進行配置時,默認通過sun.nio.ch.DefaultSelectorProvider.create()
創建SelectorProvider
。
Windows下的代碼路徑在jdk\src\windows\classes\sun\nio\ch\DefaultSelectorProvider.java
。在其內部通過實際是創建了一個WindowsSelectorProvider)
。
創建WindowsSelectorImpl
WindowsSelectorProvider
是用於創建WindowsSelectorImpl
的。
Selector.Open()->
SelectorProvider.provider()->
sun.nio.ch.DefaultSelectorProvider.create()->
new WindowsSelectorImpl(this)->
WindowsSelectorProvider.openSelector()
public class WindowsSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
}
WindowsSelectorImpl結構
在詳細講解WindowsSelectorImpl
源碼之前,先了解WindowsSelectorImpl
的大致代碼結構。
在其內部有幾個主要的數據結構和屬性。
名稱 | 作用 |
---|---|
SelectionKeyImpl[] channelArray | 存放注冊的SelectionKey |
PollArrayWrapper pollWrapper | 底層的本機輪詢數組包裝對象,用於存放Socket文件描述符和事件掩碼 |
List<SelectThread> threads | 輔助線程,多個線程有助於提高高並發時的性能 |
Pipe wakeupPipe | 用於喚醒輔助線程 |
FdMap fdMap | 保存文件描述符和SelectionKey的映射關系 |
SubSelector subSelector | 調用JNI的poll和處理就緒的SelectionKey |
StartLock startLock | 新增的輔助線程使用該鎖等待主線程的開始信號 |
FinishLock finishLock | 主線程用該鎖等待所有輔助線程執行完畢 |
SelectionKeyImpl
用於存放Channel
,Selector
以及存放Channel注冊時的事件掩碼。
- 在注冊的時候會創建
SelectionKeyImpl
- 將
SelectionKeyImpl
加入到SelectionKeyImpl[] channelArray
- 將文件句柄和
SelectionKeyImpl
的對應關系加入到FdMap fdMap
- 將key的文件描述符保存到
PollArrayWrapper pollWrapper
中。
PollArrayWrapper
PollArrayWrapper
用於存放文件描述符的文件描述符和事件掩碼的native數組。相關的文件描述符的結構如下圖:
其中每項的結構如下:
名稱 | 大小 | 說明 |
---|---|---|
SOCKET fd | 4字節 | 存放Socket文件句柄 |
short events | 2字節 | 等待的事件掩碼 |
short reevents | 2字節 | 實際發生的事件掩碼,暫時美有用到 |
如上所示,每項為8字節,即為SIZE_POLLFD
的值,目前NIO實際只用前兩個字段。
class PollArrayWrapper {
private AllocatedNativeObject pollArray; // The fd array
long pollArrayAddress; // pollArrayAddress
static short SIZE_POLLFD = 8; // sizeof pollfd struct
private int size; // Size of the pollArray
PollArrayWrapper(int newSize) {
int allocationSize = newSize * SIZE_POLLFD;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
this.size = newSize;
}
...
}
在 PollArrayWrapper
內部使用 AllocatedNativeObject
對象創建的堆外(native)內存對象。
將數組的首地址保存到pollArrayAddress
中,在調用Poll
的時候需要傳遞該參數給JNI。
PollArrayWrapper
暴露了讀寫FD和Event的方法供WindowsSelectorImpl
使用。
void putDescriptor(int i, int fd) {
pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {
pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}
int getEventOps(int i) {
return pollArray.getShort(SIZE_POLLFD * i + EVENT_OFFSET);
}
int getDescriptor(int i) {
return pollArray.getInt(SIZE_POLLFD * i + FD_OFFSET);
}
SelectThread
由於select最大一次性獲取1024個文件描述符。因此為了提高poll的性能
WindowsSelectorImpl
底層 通過引入多個輔助線程的方式實現多線程poll以提高高並發時的性能問題。 我們先看一下注冊的邏輯
protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (pollWrapper == null)
throw new ClosedSelectorException();
//判斷是否需要擴容隊列以及添加輔助線程
growIfNeeded();
//保存到緩存中
channelArray[totalChannels] = ski;
//保存在數組中的位置
ski.setIndex(totalChannels);
//保存文件描述符和SelectionKeyImpl的映射關系到FDMap
fdMap.put(ski);
//保存到keys中
keys.add(ski);
//保存文件描述符和事件到native數組中
pollWrapper.addEntry(totalChannels, ski);
totalChannels++;
}
}
在注冊之前會先會判斷當前注冊的Channel
數量 是否達到需要啟動輔助線程的閾值。如果達到閾值則需要擴容pollWrapper
數組,同時還要 將wakeupSourceFd
加入到擴容后的第一個位置 (具體作用下面會講解)。
private void growIfNeeded() {
if (channelArray.length == totalChannels) {
//channel數組已滿,擴容兩倍
int newSize = totalChannels * 2; // Make a larger array
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
channelArray = temp;
//文件描述符數組擴容
pollWrapper.grow(newSize);
}
//達到最大文件描述符數量時添加輔助線程
if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
//將喚醒的文件描述符加入到擴容后的第一個位置。
pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
totalChannels++;
//添加線程數
threadsCount++;
}
}
擴容PollArrayWrapper
pollWrapper.grow(newSize);
void grow(int newSize) {
//創建新的數組
PollArrayWrapper temp = new PollArrayWrapper(newSize);
for (int i = 0; i < size; i++)
//將原來的數組的內容存放到新的數組中
replaceEntry(this, i, temp, i);
//釋放原來的數組
pollArray.free();
//更新引用
pollArray = temp.pollArray;
//更新大小
this.size = temp.size;
//更新地址
pollArrayAddress = pollArray.address();
}
擴容完成時,需要添加一個輔助線程以並行的處理所有文件描述符。主線程處理前1024個文件描述符,第二個輔助線程處理1025到2048的文件描述符,以此類推。 這樣使得主線程調用poll的時候,通過多線程並行執行一次性獲取到所有的已就緒的文件描述符,從而提高在高並發時的poll的性能。
每1024個PollFD的第一個句柄都要設置為wakeupSourceFd
,因此在擴容的時候也需要將新的位置的第一個設置為wakeupSourceFd
,該線程的目的是為了喚醒輔助線程 。當多個線程阻塞在Poll
,若此時主線程已經處理完成,則需要等待所有輔助線程完成,通過向wakeupSourceFd
發送信號以激活Poll
不在阻塞。
現在我們知道了windows下poll多線程的使用方法,因為多線程poll還需要其他的數據結構支持同步,具體的多線程執行邏輯我們下面再討論。
FdMap
FDMap只是為了保存文件描述符句柄和SelectionKey
的關系,前面我們提到了PollFD的數據結構包含了文件描述符句柄信息,因此我們可以通過文件描述符句柄從FdMap中獲取到對應的SelectionKey
。
private final static class FdMap extends HashMap<Integer, MapEntry> {
static final long serialVersionUID = 0L;
private MapEntry get(int desc) {
return get(new Integer(desc));
}
private MapEntry put(SelectionKeyImpl ski) {
return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski));
}
private MapEntry remove(SelectionKeyImpl ski) {
Integer fd = new Integer(ski.channel.getFDVal());
MapEntry x = get(fd);
if ((x != null) && (x.ski.channel == ski.channel))
return remove(fd);
return null;
}
}
SubSelector
SubSelector
封裝了調用JNI poll的邏輯,以及獲取就緒SelectionKey
的方法。
主線程和每一個子線程都有一個SubSelector
,其內存保存了poll獲取到的可讀文件描述符,可寫文件描述符以及異常的文件描述符。這樣每個線程就有自己單獨的就緒文件描述符數組。
private final int pollArrayIndex;
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
pollArrayIndex
記錄了當前SubSelector
的序號,在調用poll的時候,需要將文件描述符數組的地址傳遞給JNI中,由於我們有多個線程一起調用poll,且每個線程處理1024個Channel
。通過序號和數組的地址計算當前SubSelector
所負責哪些通道。
private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private int poll(int index) throws IOException {
// poll for helper threads
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private native int poll0(long pollAddress, int numfds,
int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
在主線程調用poll之后,會獲取到已就緒的文件描述符(包含可讀、可寫、異常)。通過調用processSelectedKeys
將就緒的文件描述符對應的SelectorKey
加入到selectedKeys
中。這樣我們外部就可以調用到所有就緒的SelectorKey
進行遍歷處理。
private int processSelectedKeys(long updateCount) {
int numKeysUpdated = 0;
numKeysUpdated += processFDSet(updateCount, readFds,
Net.POLLIN,
false);
numKeysUpdated += processFDSet(updateCount, writeFds,
Net.POLLCONN |
Net.POLLOUT,
false);
numKeysUpdated += processFDSet(updateCount, exceptFds,
Net.POLLIN |
Net.POLLCONN |
Net.POLLOUT,
true);
return numKeysUpdated;
}
可讀文件描述符,可寫文件描述符以及異常文件描述符的處理邏輯都是一樣的,調用processFDSet
處理更新SelectorKey
的就緒事件。這里會傳入文件描述符的數組。需要注意的是文件描述符第一個元素是數組的長度。
private int processFDSet(long updateCount, int[] fds, int rOps, boolean isExceptFds)
{
int numKeysUpdated = 0;
//1. 遍歷文件描述符數組
for (int i = 1; i <= fds[0]; i++) {
//獲取文件描述符句柄值
int desc = fds[i];
//2. 判斷當前文件描述符是否是用於喚醒的文件描述
if (desc == wakeupSourceFd) {
synchronized (interruptLock) {
interruptTriggered = true;
}
continue;
}
//3. 獲取文件描述符句柄對應的SelectionKey的映射值
MapEntry me = fdMap.get(desc);
// 4. 若為空,則表示已經被取消。
if (me == null)
continue;
SelectionKeyImpl sk = me.ski;
// 5. 丟棄OOD數據(緊急數據)
if (isExceptFds &&
(sk.channel() instanceof SocketChannelImpl) &&
discardUrgentData(desc))
{
continue;
}
//6. 判斷key是否已經就緒,若已就緒,則將當前操作累加到原來的操作上,比如原來寫事件就緒,現在讀事件就緒,就需要更新該key讀寫就緒
if (selectedKeys.contains(sk)) { // Key in selected set
//clearedCount 和 updateCount用於避免同一個key的事件設置多次,因為同一個文件描述符可能在可讀文件描述符數組也可能在異常文件描述符數組中。
if (me.clearedCount != updateCount) {
if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
(me.updateCount != updateCount)) {
me.updateCount = updateCount;
numKeysUpdated++;
}
} else { // The readyOps have been set; now add
if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
(me.updateCount != updateCount)) {
me.updateCount = updateCount;
numKeysUpdated++;
}
}
me.clearedCount = updateCount;
} else { // Key is not in selected set yet
//key原來未就緒,將key加入selectedKeys中
if (me.clearedCount != updateCount) {
sk.channel.translateAndSetReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk);
me.updateCount = updateCount;
numKeysUpdated++;
}
} else { // The readyOps have been set; now add
sk.channel.translateAndUpdateReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk);
me.updateCount = updateCount;
numKeysUpdated++;
}
}
me.clearedCount = updateCount;
}
}
return numKeysUpdated;
}
- 首先忽略
wakeupSourceFd
,前面說了該文件描述符用於喚醒。 - 過濾fdMap不存在的文件描述符,這些文件描述符已經被取消了。
- 忽略OOB(緊急)數據,這些數據需要調用
discardUrgentData
讀取並忽略。 - 根據key是否在SelectorKeys中決定是設置事件掩碼還是更新事件掩碼。
多線程Poll
現在大部分數據結構都已經介紹了,在談論Pipe、StartLock和FinishLock之前,是時候引入多線程Poll功能了,在談論多線程時,會對上述三個數據結構和功能進行詳細說明。
首先我們先看一下創建WindowsSelectorImpl
做了什么
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
- 首先創建了一個默認8個長度(8*8字節)的文件描述符數組
PollArrayWrapper
- 創建一個Pipe,Pipe我們之前討論過是一個單向通訊管道。
- 獲取Pipe的源端和目標端的文件描述符句柄,該句柄用於激活線程。
- 將
wakeupSourceFd
存到PollArrayWapper
每1024個元素的第一個位置。使得每個線程都能被wakeupSourceFd
喚醒。
由於select最大支持1024個句柄,這里第一個文件描述符是wakeupSourceFd
,所以一個線程實際最多並發處理1023個socket文件描述符。
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
void addWakeupSocket(int fdVal, int index) {
putDescriptor(index, fdVal);
putEventOps(index, Net.POLLIN);
}
現在我們看一下doSelect邏輯
protected int doSelect(long timeout) throws IOException {
if (channelArray == null)
throw new ClosedSelectorException();
this.timeout = timeout; // set selector timeout
//1. 刪除取消的key
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
//2. 調整線程數 ,等待運行
adjustThreadsCount();
//3. 設置輔助線程數
finishLock.reset();
//4. 開始運行新增的輔助線程
startLock.startThreads();
try {
begin();
try {
//5. 獲取就緒文件描述符
subSelector.poll();
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
//6. 等待所有輔助線程完成
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
//7. 再次檢查刪除取消的key
processDeregisterQueue();
//8. 將就緒的key加入到selectedKeys中
int updated = updateSelectedKeys();
// 完成,重置喚醒標記下次在運行。
resetWakeupSocket();
return updated;
}
- 刪除取消key,當channel關閉時,對應的Key會被取消,被取消的key會加入到
cancelledKeys
中。
protected final void implCloseChannel() throws IOException {
implCloseSelectableChannel();
synchronized (keyLock) {
int count = (keys == null) ? 0 : keys.length;
for (int i = 0; i < count; i++) {
SelectionKey k = keys[i];
if (k != null)
k.cancel();
}
}
}
public final void cancel() {
...
((AbstractSelector)selector()).cancel(this);
...
}
void cancel(SelectionKey k) { // package-private
synchronized (cancelledKeys) {
cancelledKeys.add(k);
}
}
調用processDeregisterQueue
進行注銷。
processDeregisterQueue();
//遍歷所有已取消的key,取消他們
void processDeregisterQueue() throws IOException {
// Precondition: Synchronized on this, keys, and selectedKeys
Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
//遍歷所有key
Iterator<SelectionKey> i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
try {
//注銷key
implDereg(ski);
} catch (SocketException se) {
throw new IOException("Error deregistering key", se);
} finally {
i.remove();
}
}
}
}
}
protected void implDereg(SelectionKeyImpl ski) throws IOException{
int i = ski.getIndex();
assert (i >= 0);
synchronized (closeLock) {
if (i != totalChannels - 1) {
// 把最后一個通道復制到取消key所在的位置。
SelectionKeyImpl endChannel = channelArray[totalChannels-1];
channelArray[i] = endChannel;
endChannel.setIndex(i);
pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
pollWrapper, i);
}
ski.setIndex(-1);
}
//將最后一個通道清空。
channelArray[totalChannels - 1] = null;
totalChannels--;
//判斷是否需要減少一個輔助線程。
if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
totalChannels--;
threadsCount--; // The last thread has become redundant.
}
//清除對應的緩存。
fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
keys.remove(ski);
selectedKeys.remove(ski);
//設置key無效
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
//關閉文件描述符
((SelChImpl)selch).kill();
}
//將所有key都設置為無效
protected final void deregister(AbstractSelectionKey key) {
((AbstractSelectableChannel)key.channel()).removeKey(key);
}
void removeKey(SelectionKey k) { // package-private
synchronized (keyLock) {
for (int i = 0; i < keys.length; i++)
if (keys[i] == k) {
keys[i] = null;
keyCount--;
}
//將key設置為無效
((AbstractSelectionKey)k).invalidate();
}
}
- 取消時首先會將該Key的文件描述符的PollFD項從pollWrapper中移除。
- 將key從
channelArray
中刪除。 - 若總的注冊通道數達到了減小線程的閾值,則減小一個線程。
- 清理
fdMap
、keys
、selectedKeys
數據緩存。 - 設置key無效
- 關閉文件描述符
((SelChImpl)selch).kill();
是在各個Channel中實現的,以SocketChannel為例,最終會調用nd.close(fd);
關閉對應的文件描述符
- 調整輔助線程數
private void adjustThreadsCount() {
//當線程大於實際線程,創建更多線程
if (threadsCount > threads.size()) {
// More threads needed. Start more threads.
for (int i = threads.size(); i < threadsCount; i++) {
SelectThread newThread = new SelectThread(i);
threads.add(newThread);
//設置為守護線程
newThread.setDaemon(true);
newThread.start();
}
} else if (threadsCount < threads.size()) {
// 當線程小於實際線程,移除線程。
for (int i = threads.size() - 1 ; i >= threadsCount; i--)
threads.remove(i).makeZombie();
}
}
在創建新的線程時,會記錄上一次運行的數量保存到lastRun
變量中
private SelectThread(int i) {
this.index = i;
this.subSelector = new SubSelector(i);
//make sure we wait for next round of poll
this.lastRun = startLock.runsCounter;
}
當線程啟動時會等待主線程激活
public void run() {
while (true) { // poll loop
//等待主線程信號激活
if (startLock.waitForStart(this))
return;
// call poll()
try {
subSelector.poll(index);
} catch (IOException e) {
// Save this exception and let other threads finish.
finishLock.setException(e);
}
// 通知主線程完成.
finishLock.threadFinished();
}
}
通過startLock
等待主線程的開始信號。若當前線程是新啟動的線程,則runsCounter == thread.lastRun
為真,此時新的線程需要等待主線程調用啟動。
startLock.waitForStart(this)
private synchronized boolean waitForStart(SelectThread thread) {
while (true) {
while (runsCounter == thread.lastRun) {
try {
startLock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (thread.isZombie()) { // redundant thread
return true; // will cause run() to exit.
} else {
thread.lastRun = runsCounter; // update lastRun
return false; // will cause run() to poll.
}
}
}
}
- 設置輔助線程數量
記錄當前輔助線程數量,下次新增的輔助線程需要等待主線程通知啟動。
finishLock.reset();
private void reset() {
threadsToFinish = threads.size(); // helper threads
}
- 開始運行新增的輔助線程
startLock.startThreads();
private synchronized void startThreads() {
runsCounter++; // next run
notifyAll(); // 通知所有輔助線程繼續執行,
}
- 獲取已就緒的文件描述符
subSelector.poll();
//主線程調用
private int poll() throws IOException{
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
//輔助線程調用
private int poll(int index) throws IOException {
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
輔助線程和主線程調用的區別就是存放PollFD的位置變化,每個線程會有1024個PollFD(8B)的位置存放PollFD。這樣使得多個線程的數據內存分離互不影響。
下面看一下JNI的poll0
做了什么處理。下面羅略了主要的邏輯
typedef struct {
jint fd;
jshort events;
} pollfd;
Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(JNIEnv *env, jobject this,
jlong pollAddress, jint numfds,
jintArray returnReadFds, jintArray returnWriteFds,
jintArray returnExceptFds, jlong timeout)
{
DWORD result = 0;
pollfd *fds = (pollfd *) pollAddress;
int i;
FD_SET readfds, writefds, exceptfds;
struct timeval timevalue, *tv;
static struct timeval zerotime = {0, 0};
...
/* Call select */
if ((result = select(0 , &readfds, &writefds, &exceptfds, tv))
== SOCKET_ERROR) {
//當出現錯誤時,變量每個socket獲取它的就緒狀態
FD_SET errreadfds, errwritefds, errexceptfds;
...
for (i = 0; i < numfds; i++) {
errreadfds.fd_count = 0;
errwritefds.fd_count = 0;
if (fds[i].events & POLLIN) {
errreadfds.fd_array[0] = fds[i].fd;
errreadfds.fd_count = 1;
}
if (fds[i].events & (POLLOUT | POLLCONN))
{
errwritefds.fd_array[0] = fds[i].fd;
errwritefds.fd_count = 1;
}
errexceptfds.fd_array[0] = fds[i].fd;
errexceptfds.fd_count = 1;
//遍歷每個socket,探測它的狀態
/* call select on the i-th socket */
if (select(0, &errreadfds, &errwritefds, &errexceptfds, &zerotime)
== SOCKET_ERROR) {
/* This socket causes an error. Add it to exceptfds set */
exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;
exceptfds.fd_count++;
} else {
...
}
}
}
}
/* Return selected sockets. */
/* Each Java array consists of sockets count followed by sockets list */
...
(*env)->SetIntArrayRegion(env, returnReadFds, 0,
readfds.fd_count + 1, (jint *)&readfds);
(*env)->SetIntArrayRegion(env, returnWriteFds, 0,
writefds.fd_count + 1, (jint *)&writefds);
(*env)->SetIntArrayRegion(env, returnExceptFds, 0,
exceptfds.fd_count + 1, (jint *)&exceptfds);
return 0;
}
- 首先會通過
pollfd *fds = (pollfd *) pollAddress;
將pollAddress的地址轉換為polldf的數組結構。
這里會自動內存對齊,pollfd一共只有6個字節,第一個是int類型的文件描述符句柄,第二個是short類型的等待事件掩碼值。第二個short后會填充2B,因此每個pollFD是8B。而實際后面2字節用於存放實際發生事件的事件掩碼。
- 通過調用Win32API的select執行實際的操作獲取就緒的文件描述符。當socket收到OOB(緊急)數據時,會產生異常。此時需要遍歷所有文件描述符,以確定是哪個socket接收到OOB數據。從而正常處理。上面也提到過OOB數據會通過調用
discardUrgentData
進行清理。
JNIEXPORT jboolean JNICALL
Java_sun_nio_ch_WindowsSelectorImpl_discardUrgentData(JNIEnv* env, jobject this,
jint s)
{
char data[8];
jboolean discarded = JNI_FALSE;
int n;
do {
//讀取MSG_OOB數據
n = recv(s, (char*)&data, sizeof(data), MSG_OOB);
if (n > 0) {
//讀取到設置標記為true
discarded = JNI_TRUE;
}
} while (n > 0);
return discarded;
}
如果timeval為{0,0},則select()立即返回,這可用於探詢所選套接口的狀態。如果處於這種狀態,則select()調用可認為是非阻塞的,且一切適用於非阻塞調用的假設都適用於它。
-
當獲取到所有的就緒的文件描述符時,需要保存到返回結果中,同時讀寫和異常的返回結果的數組第一個為就緒的長度值。
-
等待所有輔助線程完成,當主線程完成時會立即調用
wakeup
向wakeupSourceFd
發生數據以觸發輔助線程喚醒。輔助線程喚醒后也會調用wakeup
一次。當輔助線程都被喚醒后就會通知主線程。
if (threads.size() > 0)
finishLock.waitForHelperThreads();
private synchronized void waitForHelperThreads() {
if (threadsToFinish == threads.size()) {
// no helper threads finished yet. Wakeup them up.
wakeup();
}
while (threadsToFinish != 0) {
try {
finishLock.wait();
} catch (InterruptedException e) {
// Interrupted - set interrupted state.
Thread.currentThread().interrupt();
}
}
}
private synchronized void threadFinished() {
if (threadsToFinish == threads.size()) { // finished poll() first
// if finished first, wakeup others
wakeup();
}
threadsToFinish--;
if (threadsToFinish == 0) // all helper threads finished poll().
notify(); // notify the main thread
}
若輔助線接收到數據,則它需要調用wakeup
來喚醒其他輔助線程,這樣使得主線程火輔助線程至少能調用一次wakeup
激活其他輔助線程。wakeup
內部會調用setWakeupSocket
向wakeupSourceFd
發生一個信號。
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
setWakeupSocket();
interruptTriggered = true;
}
}
return this;
}
//發生一個字節數據喚醒wakeupsocket
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
jint scoutFd)
{
/* Write one byte into the pipe */
const char byte = 1;
send(scoutFd, &byte, 1, 0);
}
當主線被激活時,需要調用resetWakeupSocket
將wakeupSourceFd
的數據讀取出來。
private void resetWakeupSocket() {
synchronized (interruptLock) {
if (interruptTriggered == false)
return;
resetWakeupSocket0(wakeupSourceFd);
interruptTriggered = false;
}
}
//讀取wakeupsocket的數據。
Java_sun_nio_ch_WindowsSelectorImpl_resetWakeupSocket0(JNIEnv *env, jclass this,
jint scinFd)
{
char bytes[WAKEUP_SOCKET_BUF_SIZE];
long bytesToRead;
/* 獲取數據大小 */
ioctlsocket (scinFd, FIONREAD, &bytesToRead);
if (bytesToRead == 0) {
return;
}
/* 從緩沖區讀取所有數據 */
if (bytesToRead > WAKEUP_SOCKET_BUF_SIZE) {
char* buf = (char*)malloc(bytesToRead);
recv(scinFd, buf, bytesToRead, 0);
free(buf);
} else {
recv(scinFd, bytes, WAKEUP_SOCKET_BUF_SIZE, 0);
}
}
ioctlsocket()是一個計算機函數,功能是控制套接口的模式。可用於任一狀態的任一套接口。它用於獲取與套接口相關的操作參數,而與具體協議或通訊子系統無關。第二個參數時對socket的操作命令
- 再次調用刪除取消的key
- 將就緒的key加入到selectKeys中,有多個線程會將所有線程的就緒key加入到selectKeys中。
int updated = updateSelectedKeys();
private int updateSelectedKeys() {
updateCount++;
int numKeysUpdated = 0;
numKeysUpdated += subSelector.processSelectedKeys(updateCount);
for (SelectThread t: threads) {
numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
}
return numKeysUpdated;
}
若key首次被加入,則會調用translateAndSetReadyOps
,若key已經在selectKeys中,則會調用translateAndUpdateReadyOps
。這兩個方法都是調用translateReadyOps
,translateReadyOps
操作會將已就緒的操作保存。
public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
return translateReadyOps(ops, sk.nioReadyOps(), sk);
}
public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
return translateReadyOps(ops, 0, sk);
}
關閉WindowsSelectorImpl
關閉WindowsSelectorImpl時會將所有注冊的通道一同關閉
protected void implClose() throws IOException {
synchronized (closeLock) {
if (channelArray != null) {
if (pollWrapper != null) {
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
wakeupPipe.sink().close();
wakeupPipe.source().close();
//關閉所有channel
for(int i = 1; i < totalChannels; i++) { // Deregister channels
if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
deregister(channelArray[i]);
SelectableChannel selch = channelArray[i].channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
}
//釋放數據
pollWrapper.free();
pollWrapper = null;
selectedKeys = null;
channelArray = null;
//釋放輔助線程
for (SelectThread t: threads)
t.makeZombie();
//喚醒輔助線程使其退出。
startLock.startThreads();
}
}
}
}
總結
本文對WindowsSelectorImpl
的代碼實現進行詳細解析。下一篇將對Linux下的EpollSelectorImpl
的實現繼續講解。
相關文獻
微信掃一掃二維碼關注訂閱號傑哥技術分享
出處:https://www.cnblogs.com/Jack-Blog/p/12375678.html
作者:傑哥很忙
本文使用「CC BY 4.0」創作共享協議。歡迎轉載,請在明顯位置給出出處及鏈接。