Selector 實現原理


概述

Selector是NIO中實現I/O多路復用的關鍵類。Selector實現了通過一個線程管理多個Channel,從而管理多個網絡連接的目的。

Channel代表這一個網絡連接通道,我們可以將Channel注冊到Selector中以實現Selector對其的管理。一個Channel可以注冊到多個不同的Selector中。

當Channel注冊到Selector后會返回一個SelectionKey對象,該SelectionKey對象則代表這這個Channel和它注冊的Selector間的關系。並且SelectionKey中維護着兩個很重要的屬性:interestOps、readyOps

interestOps是我們希望Selector監聽Channel的哪些事件。我們將我們感興趣的事件設置到該字段,這樣在selection操作時,當發現該Channel有我們所感興趣的事件發生時,就會將我們感興趣的事件再設置到readyOps中,這樣我們就能得知是哪些事件發生了以做相應處理。

Selector的中的重要屬性

Selector中維護3個特別重要的SelectionKey集合,分別是

  • keys:所有注冊到Selector的Channel所表示的SelectionKey都會存在於該集合中。keys元素的添加會在Channel注冊到Selector時發生。
  • selectedKeys:該集合中的每個SelectionKey都是其對應的Channel在上一次操作selection期間被檢查到至少有一種SelectionKey中所感興趣的操作已經准備好被處理。該集合是keys的一個子集。
  • cancelledKeys:執行了取消操作的SelectionKey會被放入到該集合中。該集合是keys的一個子集。

下面的源碼解析會說明上面3個集合的用處。

Selector 源碼解析

下面我們通過一段對Selector的使用流程講解來進一步深入其實現原理。
首先先來段Selector最簡單的使用片段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ServerSocketChannel serverChannel = ServerSocketChannel.open();
         serverChannel.configureBlocking( false );
         int port = 5566 ;         
         serverChannel.socket().bind( new InetSocketAddress(port));
         Selector selector = Selector.open();
         serverChannel.register(selector, SelectionKey.OP_ACCEPT);
         while ( true ){
             int n = selector.select();
             if (n > 0 ) {
                 Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                 while (iter.hasNext()) {
                     SelectionKey selectionKey = iter.next();
                     ......
                     iter.remove();
                 }
             }
         }

1、Selector的構建

SocketChannel、ServerSocketChannel和Selector的實例初始化都通過SelectorProvider類實現。

ServerSocketChannel.open();

1
2
3
public static ServerSocketChannel open() throws IOException {
     return SelectorProvider.provider().openServerSocketChannel();
}

SocketChannel.open();

1
2
3
public static SocketChannel open() throws IOException {
     return SelectorProvider.provider().openSocketChannel();
}

Selector.open();

1
2
3
public static Selector open() throws IOException {
     return SelectorProvider.provider().openSelector();
}

我們來進一步的了解下SelectorProvider.provider()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static SelectorProvider provider() {
         synchronized (lock) {
             if (provider != null )
                 return provider;
             return AccessController.doPrivileged(
                 new PrivilegedAction<>() {
                     public SelectorProvider run() {
                             if (loadProviderFromProperty())
                                 return provider;
                             if (loadProviderAsService())
                                 return provider;
                             provider = sun.nio.ch.DefaultSelectorProvider.create();
                             return provider;
                         }
                     });
         }
     }

① 如果配置了“java.nio.channels.spi.SelectorProvider”屬性,則通過該屬性值load對應的SelectorProvider對象,如果構建失敗則拋異常。
② 如果provider類已經安裝在了對系統類加載程序可見的jar包中,並且該jar包的源碼目錄META-INF/services包含有一個java.nio.channels.spi.SelectorProvider提供類配置文件,則取文件中第一個類名進行load以構建對應的SelectorProvider對象,如果構建失敗則拋異常。
③ 如果上面兩種情況都不存在,則返回系統默認的SelectorProvider,即,sun.nio.ch.DefaultSelectorProvider.create();
④ 隨后在調用該方法,即SelectorProvider.provider()。則返回第一次調用的結果。

不同系統對應着不同的sun.nio.ch.DefaultSelectorProvider

這里我們看linux下面的sun.nio.ch.DefaultSelectorProvider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class DefaultSelectorProvider {
 
     /**
      * Prevent instantiation.
      */
     private DefaultSelectorProvider() { }
 
     /**
      * Returns the default SelectorProvider.
      */
     public static SelectorProvider create() {
         return new sun.nio.ch.EPollSelectorProvider();
     }
 
}

可以看見,linux系統下sun.nio.ch.DefaultSelectorProvider.create(); 會生成一個sun.nio.ch.EPollSelectorProvider類型的SelectorProvider,這里對應於linux系統的epoll

接下來看下 selector.open():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
      * Opens a selector.
      *
      * <p> The new selector is created by invoking the {@link
      * java.nio.channels.spi.SelectorProvider#openSelector openSelector} method
      * of the system-wide default {@link
      * java.nio.channels.spi.SelectorProvider} object.  </p>
      *
      * @return  A new selector
      *
      * @throws  IOException
      *          If an I/O error occurs
      */
     public static Selector open() throws IOException {
         return SelectorProvider.provider().openSelector();
     }

在得到sun.nio.ch.EPollSelectorProvider后調用openSelector()方法構建Selector,這里會構建一個EPollSelectorImpl對象。

EPollSelectorImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
class EPollSelectorImpl
     extends SelectorImpl
{
 
     // File descriptors used for interrupt
     protected int fd0;
     protected int fd1;
 
     // The poll object
     EPollArrayWrapper pollWrapper;
 
     // Maps from file descriptors to keys
     private Map<Integer,SelectionKeyImpl> fdToKey;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
EPollSelectorImpl(SelectorProvider sp) throws IOException {
         super (sp);
         long pipeFds = IOUtil.makePipe( false );
         fd0 = ( int ) (pipeFds >>> 32 );
         fd1 = ( int ) pipeFds;
         try {
             pollWrapper = new EPollArrayWrapper();
             pollWrapper.initInterrupt(fd0, fd1);
             fdToKey = new HashMap<>();
         } catch (Throwable t) {
             try {
                 FileDispatcherImpl.closeIntFD(fd0);
             } catch (IOException ioe0) {
                 t.addSuppressed(ioe0);
             }
             try {
                 FileDispatcherImpl.closeIntFD(fd1);
             } catch (IOException ioe1) {
                 t.addSuppressed(ioe1);
             }
             throw t;
         }
     }

EPollSelectorImpl構造函數完成:

① EPollArrayWrapper的構建,EpollArrayWapper將Linux的epoll相關系統調用封裝成了native方法供EpollSelectorImpl使用。
② 通過EPollArrayWrapper向epoll注冊中斷事件

1
2
3
4
5
void initInterrupt( int fd0, int fd1) {
         outgoingInterruptFD = fd1;
         incomingInterruptFD = fd0;
         epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
     }

③ fdToKey:構建文件描述符-SelectionKeyImpl映射表,所有注冊到selector的channel對應的SelectionKey和與之對應的文件描述符都會放入到該映射表中。

EPollArrayWrapper

EPollArrayWrapper完成了對epoll文件描述符的構建,以及對linux系統的epoll指令操縱的封裝。維護每次selection操作的結果,即epoll_wait結果的epoll_event數組。
EPollArrayWrapper操縱了一個linux系統下epoll_event結構的本地數組。

1
2
3
4
5
6
7
8
9
10
11
* typedef union epoll_data {
*     void *ptr;
*     int fd;
*     __uint32_t u32;
*     __uint64_t u64;
*  } epoll_data_t;
*
* struct epoll_event {
*     __uint32_t events;
*     epoll_data_t data;
* };

epoll_event的數據成員(epoll_data_t data)包含有與通過epoll_ctl將文件描述符注冊到epoll時設置的數據相同的數據。這里data.fd為我們注冊的文件描述符。這樣我們在處理事件的時候持有有效的文件描述符了。

EPollArrayWrapper將Linux的epoll相關系統調用封裝成了native方法供EpollSelectorImpl使用。

1
2
3
4
private native int epollCreate();
     private native void epollCtl( int epfd, int opcode, int fd, int events);
     private native int epollWait( long pollAddress, int numfds, long timeout,
                                  int epfd) throws IOException;

上述三個native方法就對應Linux下epoll相關的三個系統調用

1
2
3
4
5
6
7
8
// The fd of the epoll driver
     private final int epfd;
 
      // The epoll_event array for results from epoll_wait
     private final AllocatedNativeObject pollArray;
 
     // Base address of the epoll_event array
     private final long pollArrayAddress;
1
2
3
// 用於存儲已經注冊的文件描述符和其注冊等待改變的事件的關聯關系。在epoll_wait操作就是要檢測這里文件描述法注冊的事件是否有發生。
     private final byte [] eventsLow = new byte [MAX_UPDATE_ARRAY_SIZE];
     private final Map<Integer,Byte> eventsHigh = new HashMap<>();
1
2
3
4
5
6
7
8
9
EPollArrayWrapper() throws IOException {
         // creates the epoll file descriptor
         epfd = epollCreate();
 
         // the epoll_event array passed to epoll_wait
         int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
         pollArray = new AllocatedNativeObject(allocationSize, true );
         pollArrayAddress = pollArray.address();
     }

EPoolArrayWrapper構造函數,創建了epoll文件描述符。構建了一個用於存放epoll_wait返回結果的epoll_event數組。

ServerSocketChannel的構建

ServerSocketChannel.open();

返回ServerSocketChannelImpl對象,構建linux系統下ServerSocket的文件描述符。

1
2
3
4
5
6
// Our file descriptor
     private final FileDescriptor fd;
 
     // fd value needed for dev/poll. This value will remain valid
     // even after the value in the file descriptor object has been set to -1
     private int fdVal;
1
2
3
4
5
6
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
         super (sp);
         this .fd =  Net.serverSocket( true );
         this .fdVal = IOUtil.fdVal(fd);
         this .state = ST_INUSE;
     }

將ServerSocketChannel注冊到Selector

serverChannel.register(selector, SelectionKey.OP_ACCEPT);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final SelectionKey register(Selector sel, int ops,
                                        Object att)
         throws ClosedChannelException
     {
         synchronized (regLock) {
             if (!isOpen())
                 throw new ClosedChannelException();
             if ((ops & ~validOps()) != 0 )
                 throw new IllegalArgumentException();
             if (blocking)
                 throw new IllegalBlockingModeException();
             SelectionKey k = findKey(sel);
             if (k != null ) {
                 k.interestOps(ops);
                 k.attach(att);
             }
             if (k == null ) {
                 // New registration
                 synchronized (keyLock) {
                     if (!isOpen())
                         throw new ClosedChannelException();
                     k = ((AbstractSelector)sel).register( this , ops, att);
                     addKey(k);
                 }
             }
             return k;
         }
     }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final SelectionKey register(AbstractSelectableChannel ch,
                                           int ops,
                                           Object attachment)
     {
         if (!(ch instanceof SelChImpl))
             throw new IllegalSelectorException();
         SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this );
         k.attach(attachment);
         synchronized (publicKeys) {
             implRegister(k);
         }
         k.interestOps(ops);
         return k;
     }

① 構建代表channel和selector間關系的SelectionKey對象
② implRegister(k)將channel注冊到epoll中
③ k.interestOps(int) 完成下面兩個操作:
a) 會將注冊的感興趣的事件和其對應的文件描述存儲到EPollArrayWrapper對象的eventsLow或eventsHigh中,這是給底層實現epoll_wait時使用的。
b) 同時該操作還會將設置SelectionKey的interestOps字段,這是給我們程序員獲取使用的。

EPollSelectorImpl. implRegister

1
2
3
4
5
6
7
8
9
protected void implRegister(SelectionKeyImpl ski) {
         if (closed)
             throw new ClosedSelectorException();
         SelChImpl ch = ski.channel;
         int fd = Integer.valueOf(ch.getFDVal());
         fdToKey.put(fd, ski);
         pollWrapper.add(fd);
         keys.add(ski);
     }

① 將channel對應的fd(文件描述符)和對應的SelectionKeyImpl放到fdToKey映射表中。
② 將channel對應的fd(文件描述符)添加到EPollArrayWrapper中,並強制初始化fd的事件為0 ( 強制初始更新事件為0,因為該事件可能存在於之前被取消過的注冊中。)
③ 將selectionKey放入到keys集合中。

Selection操作

selection操作有3中類型:

① select():該方法會一直阻塞直到至少一個channel被選擇(即,該channel注冊的事件發生了)為止,除非當前線程發生中斷或者selector的wakeup方法被調用。
② select(long time):該方法和select()類似,該方法也會導致阻塞直到至少一個channel被選擇(即,該channel注冊的事件發生了)為止,除非下面3種情況任意一種發生:a) 設置的超時時間到達;b) 當前線程發生中斷;c) selector的wakeup方法被調用
③ selectNow():該方法不會發生阻塞,如果沒有一個channel被選擇也會立即返回。

我們主要來看看select()的實現 :int n = selector.select();

1
2
3
public int select() throws IOException {
     return select( 0 );
}

最終會調用到EPollSelectorImpl的doSelect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected int doSelect( long timeout) throws IOException {
         if (closed)
             throw new ClosedSelectorException();
         processDeregisterQueue();
         try {
             begin();
             pollWrapper.poll(timeout);
         } finally {
             end();
         }
         processDeregisterQueue();
         int numKeysUpdated = updateSelectedKeys();
         if (pollWrapper.interrupted()) {
             // Clear the wakeup pipe
             pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0 );
             synchronized (interruptLock) {
                 pollWrapper.clearInterrupted();
                 IOUtil.drain(fd0);
                 interruptTriggered = false ;
             }
         }
         return numKeysUpdated;
     }

① 先處理注銷的selectionKey隊列
② 進行底層的epoll_wait操作
③ 再次對注銷的selectionKey隊列進行處理
④ 更新被選擇的selectionKey

先來看processDeregisterQueue():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void processDeregisterQueue() throws IOException {
         Set var1 = this .cancelledKeys();
         synchronized (var1) {
             if (!var1.isEmpty()) {
                 Iterator var3 = var1.iterator();
 
                 while (var3.hasNext()) {
                     SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next();
 
                     try {
                         this .implDereg(var4);
                     } catch (SocketException var12) {
                         IOException var6 = new IOException( "Error deregistering key" );
                         var6.initCause(var12);
                         throw var6;
                     } finally {
                         var3.remove();
                     }
                 }
             }
 
         }
     }

從cancelledKeys集合中依次取出注銷的SelectionKey,執行注銷操作,將處理后的SelectionKey從cancelledKeys集合中移除。執行processDeregisterQueue()后cancelledKeys集合會為空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected void implDereg(SelectionKeyImpl ski) throws IOException {
         assert (ski.getIndex() >= 0 );
         SelChImpl ch = ski.channel;
         int fd = ch.getFDVal();
         fdToKey.remove(Integer.valueOf(fd));
         pollWrapper.remove(fd);
         ski.setIndex(- 1 );
         keys.remove(ski);
         selectedKeys.remove(ski);
         deregister((AbstractSelectionKey)ski);
         SelectableChannel selch = ski.channel();
         if (!selch.isOpen() && !selch.isRegistered())
             ((SelChImpl)selch).kill();
     }

注銷會完成下面的操作:
① 將已經注銷的selectionKey從fdToKey( 文件描述與SelectionKeyImpl的映射表 )中移除
② 將selectionKey所代表的channel的文件描述符從EPollArrayWrapper中移除
③ 將selectionKey從keys集合中移除,這樣下次selector.select()就不會再將該selectionKey注冊到epoll中監聽
④ 也會將selectionKey從對應的channel中注銷
⑤ 最后如果對應的channel已經關閉並且沒有注冊其他的selector了,則將該channel關閉

完成上面的的操作后,注銷的SelectionKey就不會出現先在keys、selectedKeys以及cancelKeys這3個集合中的任何一個。

接着我們來看EPollArrayWrapper.poll(timeout):

1
2
3
4
5
6
7
8
9
10
11
12
int poll( long timeout) throws IOException {
         updateRegistrations();
         updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
         for ( int i= 0 ; i<updated; i++) {
             if (getDescriptor(i) == incomingInterruptFD) {
                 interruptedIndex = i;
                 interrupted = true ;
                 break ;
             }
         }
         return updated;
     }

updateRegistrations()方法會將已經注冊到該selector的事件(eventsLow或eventsHigh)通過調用epollCtl(epfd, opcode, fd, events); 注冊到linux系統中。

這里epollWait就會調用linux底層的epoll_wait方法,並返回在epoll_wait期間有事件觸發的entry的個數

再看updateSelectedKeys():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private int updateSelectedKeys() {
         int entries = pollWrapper.updated;
         int numKeysUpdated = 0 ;
         for ( int i= 0 ; i<entries; i++) {
             int nextFD = pollWrapper.getDescriptor(i);
             SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
             // ski is null in the case of an interrupt
             if (ski != null ) {
                 int rOps = pollWrapper.getEventOps(i);
                 if (selectedKeys.contains(ski)) {
                     if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                         numKeysUpdated++;
                     }
                 } else {
                     ski.channel.translateAndSetReadyOps(rOps, ski);
                     if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0 ) {
                         selectedKeys.add(ski);
                         numKeysUpdated++;
                     }
                 }
             }
         }
         return numKeysUpdated;
     }

該方法會從通過EPollArrayWrapper pollWrapper 以及 fdToKey( 構建文件描述符-SelectorKeyImpl映射表 )來獲取有事件觸發的SelectionKeyImpl對象,然后將SelectionKeyImpl放到selectedKey集合( 有事件觸發的selectionKey集合,可以通過selector.selectedKeys()方法獲得 )中,即selectedKeys。並重新設置SelectionKeyImpl中相關的readyOps值。

但是,這里要注意兩點:

① 如果SelectionKeyImpl已經存在於selectedKeys集合中,並且發現觸發的事件已經存在於readyOps中了,則不會使numKeysUpdated++;這樣會使得我們無法得知該事件的變化。

上面這點說明了為什么我們要在每次從selectedKey中獲取到Selectionkey后,將其從selectedKey集合移除,就是為了當有事件觸發使selectionKey能正確到放入selectedKey集合中,並正確的通知給調用者。

② 如果發現channel所發生I/O事件不是當前SelectionKey所感興趣,則不會將SelectionKeyImpl放入selectedKeys集合中,也不會使numKeysUpdated++

epoll原理

epoll是Linux下的一種IO多路復用技術,可以非常高效的處理數以百萬計的socket句柄。
先看看使用c封裝的3個epoll系統調用:

  • int epoll_create(int size)

epoll_create建立一個epoll對象。參數size是內核保證能夠正確處理的最大句柄數,多於這個最大數時內核可不保證效果。

  • int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

epoll_ctl可以操作epoll_create創建的epoll,如將socket句柄加入到epoll中讓其監控,或把epoll正在監控的某個socket句柄移出epoll。

  • int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout)

epoll_wait在調用時,在給定的timeout時間內,所監控的句柄中有事件發生時,就返回用戶態的進程。

大概看看epoll內部是怎么實現的:

  1. epoll初始化時,會向內核注冊一個文件系統,用於存儲被監控的句柄文件,調用epoll_create時,會在這個文件系統中創建一個file節點。同時epoll會開辟自己的內核高速緩存區,以紅黑樹的結構保存句柄,以支持快速的查找、插入、刪除。還會再建立一個list鏈表,用於存儲准備就緒的事件。
  2. 當執行epoll_ctl時,除了把socket句柄放到epoll文件系統里file對象對應的紅黑樹上之外,還會給內核中斷處理程序注冊一個回調函數,告訴內核,如果這個句柄的中斷到了,就把它放到准備就緒list鏈表里。所以,當一個socket上有數據到了,內核在把網卡上的數據copy到內核中后,就把socket插入到就緒鏈表里。
  3. 當epoll_wait調用時,僅僅觀察就緒鏈表里有沒有數據,如果有數據就返回,否則就sleep,超時時立刻返回。

epoll的兩種工作模式:

  • LT:level-trigger,水平觸發模式,只要某個socket處於readable/writable狀態,無論什么時候進行epoll_wait都會返回該socket。
  • ET:edge-trigger,邊緣觸發模式,只有某個socket從unreadable變為readable或從unwritable變為writable時,epoll_wait才會返回該socket。

socket讀數據

socket寫數據

最后順便說下在Linux系統中JDK NIO使用的是 LT ,而Netty epoll使用的是 ET。

后記

因為本人對計算機系統組成以及C語言等知識比較欠缺,因為文中相關知識點的表示也相當“膚淺”,如有不對不妥的地方望讀者指出。同時我也會繼續加強對該方面知識點的學習~

參考

  • http://www.jianshu.com/p/0d497fe5484a
  • http://remcarpediem.com/2017/04/02/Netty源碼-三-I-O模型和Java-NIO底層原理/
  • 聖思園netty課程
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM