JAVA NIO 簡單介紹


Version:0.9 StartHTML:-1 EndHTML:-1 StartFragment:00000099 EndFragment:00918492
一:為什么要使用NIO技術
      基本的Java套接字對於小規模系統可以很好地運行,但當涉及同時處理幾千甚至上萬個客戶端的服務器時,可能會產生一些問題。
      如果一個客戶端一個線程的方式去處理,則由於創建、維護和切換線程需要的系統開銷導致系統擴展性方面受到了很大限制;當然你也可以使用線程池,也可以節省一些開銷,也同時可以使用並行硬件的優勢,比如F5,網絡連接均衡服務器等等。但對於連接生存期比較長的協議來說,線程池的大小仍然限制了系統可以同時處理的客戶端數量。考慮一個在客戶端之間傳遞消息的即時消息服務器IM。客戶端必須不停地連接服務器以接收即時消息,因此線程池的大小限制了系統可以同時服務的客戶端總數。如果增加線程池的大小,將帶來更多的線程處理開銷,而不能提升系統的性能,因為在大部分的時間里客戶端是處於空閑狀態的。
如果這是所有問題,可能NIO還不是必須的。不幸的是,在使用線程的擴展性方面還涉及一些更加難把握的挑戰。其中一個挑戰就是程序員幾乎不能對什么時候哪個線程將獲得服務進行控制。你可以設置一個線程實例的優先級,但是這個優先級只是一種“建議”,下一個選擇執行的線程完全取決於具體實現。因此,如果程序員想要保證某些連接優先獲得服務,或想要制定一定的服務順序,線程可能就很難做到。
然而,有關線程的最重要的問題可能要保證數據的一致性,但很多客戶端之間共享一些狀態信息時,這就需要使用鎖機制或者其他互斥機制對依次訪問狀態進行嚴格的同步。否則,由於不同線程上的程序段交錯執行,他們之間會改掉其他線程說做的修改。
由於需要對共享狀態進行同步訪問,要同時考慮到多線程服務器的正確性和高效性就變得非常困難。使用鎖機制將增加更多的的系統調度和上下文切換開銷,而程序員對這些開銷又無法控制。由於其復雜性,一些程序員寧願繼續使用單線程方法。這類服務器只用一個線程來處理所有客戶端,但不是順序處理,而是一次全部處理。這種服務器不能為任何客戶端提供I/O操作的阻塞等待,而必須排他地使用非阻塞I/O。
在我們寫Socket服務器端的時候,肯定會用到ServerSocket類的accept方法,當在ServerSocket實例上調用 accept方法時,如果有一個新的連接來了,則accept方法會立即返回一個socket實例,否則該方法將一直阻塞直到有新的連接到來或計時器超時。假設我們用一個線程專門來處理連接的請求,也就是accept方法;不幸的是,我們會發現這種方法要求我們不斷地輪詢所有的I/O源,而這種“忙等” 方法又會引入很多系統開銷,因為程序要反復循環地連接I/O源,卻又發現什么都不用做。以下代碼就是一個典型的處理客戶端請求方式,循環一直在跑,除非有人把循環標志給修改了,server.accept()方法一直在阻塞直到有一個新的的連接過來,如果有新的連接過來這返回一個socket實例,並扔給連接管理器去處理,如果一直都沒有連接過來則一直阻塞在那里死等。
while(!bCanExit) {
try {
       //如果單在一個線程處理socket連接,該方法一直會阻塞,直到有新的連接過來
       Socketsocket = server.accept();
       Connection connection = newConnection(socket);
       connection.setClientId(Util.random32UUID());
       connectionManager.add(connection);
       if(logger.isInfoEnabled()){
             logger.info("有一個新的連接!");
       }
} catch(IOException e) {}
}
try {
server.close();
} catch(IOException e) {
logger.error("close serverSocketerror:", e);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
客戶端代碼:
packageorg.zapldy.tcpip.nio;
 
importjava.net.InetSocketAddress;
importjava.net.SocketException;
importjava.nio.ByteBuffer;
importjava.nio.channels.SocketChannel;
 
public classTCPEchoClientNoblocking {
 
   public staticvoid main(String[] args) throwsException {
      String server = "127.0.0.1";
      byte[] data = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".getBytes();
      int servPort = 8888;
 
      SocketChannel clntChan =SocketChannel.open();
      clntChan.configureBlocking(false);
      //我們通過持續調用finishConnect()方法來“輪詢”連接狀態,該方法在連接成功建立之前
       //一直返回false。打印操作顯示了在等待連接建立的過程中,程序還可以執行其他任務。不過
       //,這種忙等的方法非常浪費系統資源,這里這樣做只是為了演示該方法的使用。
      if (!clntChan.connect(newInetSocketAddress(server,servPort))) {
          while (!clntChan.finishConnect()) {
             System.out.print("=");//這里可以做其他事情
          }
      }
 
      ByteBuffer writeBuf =ByteBuffer.wrap(data);
      ByteBuffer readBuf =ByteBuffer.allocate(data.length);
 
      int totalBytesRcvd = 0;
      int bytesRcvd;
      while (totalBytesRcvd < data.length) {
          if (writeBuf.hasRemaining()) {
             clntChan.write(writeBuf);
          }
          if ((bytesRcvd =clntChan.read(readBuf)) == -1) {
             throw newSocketException("Connection closedprematurely");
          }
          totalBytesRcvd +=bytesRcvd;
          System.out.print("=");
      }
      System.out.println("Recieved: "
             + new String(readBuf.array(), 0,totalBytesRcvd));
      clntChan.close();
   }
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   服務器端代碼(服務器還是用傳統的方式實現的,后面將再用NIO重寫,這里只是為了讀者能快速運行代碼):
packageorg.zapldy.tcpip;
 
importjava.io.IOException;
importjava.io.InputStream;
importjava.io.OutputStream;
importjava.net.ServerSocket;
importjava.net.Socket;
importjava.net.SocketAddress;
 
publicclass TCPEchoServer {
   private staticfinal int BUFSIZE = 32;
   public staticvoid main(String[] args) throwsIOException {
      int servPort = 8888;
      ServerSocket servSocket = newServerSocket(servPort);
 
      int recvMsgSize = 0;
      byte[] recvBuf = newbyte[BUFSIZE];
 
      while (true) {
          Socket clntSocket =servSocket.accept();// 該方法會阻塞
          SocketAddressclientAddress =
                               clntSocket.getRemoteSocketAddress();
          System.out.println("Handling client at " + clientAddress);
         InputStream in =clntSocket.getInputStream();
          OutputStream out =clntSocket.getOutputStream();
          while ((recvMsgSize = in.read(recvBuf))!= -1) {
            out.write(recvBuf, 0, recvMsgSize);
          }
          clntSocket.close();
      }
   }
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Selector
上面已經提到過,Selector類可以用於避免使用阻塞式客戶端中很浪費資源的“忙等”方法。例如,考慮一個IM服務器。像QQ或者旺旺這樣的,可能有幾萬甚至幾千萬個客戶端同時連接到了服務器,但在任何時刻都只是非常少量的消息
需要讀取和分發。這就需要一種方法阻塞等待,直到至少有一個信道可以進行I/O操作,並指出是哪個信道。NIO的選擇器就實現了這樣的功能。一個 Selector實例可以同時檢查一組信道的I/O狀態。用專業術語來說,選擇器就是一個多路開關選擇器,因為一個選擇器能夠管理多個信道上的I/O操作。然而如果用傳統的方式來處理這么多客戶端,使用的方法是循環地一個一個地去檢查所有的客戶端是否有I/O操作,如果當前客戶端有I/O操作,則可能把當前客戶端扔給一個線程池去處理,如果沒有I/O操作則進行下一個輪詢,當所有的客戶端都輪詢過了又接着從頭開始輪詢;這種方法是非常笨而且也非常浪費資源,因為大部分客戶端是沒有I/O操作,我們也要去檢查;而Selector就不一樣了,它在內部可以同時管理多個I/O,當一個信道有I/O操作的時候,他會通知Selector,Selector就是記住這個信道有I/O操作,並且知道是何種I/O操作,是讀呢?是寫呢?還是接受新的連接;所以如果使用Selector,它返回的結果只有兩種結果,一種是0,即在你調用的時刻沒有任何客戶端需要I/O操作,另一種結果是一組需要I/O操作的客戶端,這是你就根本不需要再檢查了,因為它返回給你的肯定是你想要的。這樣一種通知的方式比那種主動輪詢的方式要高效得多!
要使用選擇器(Selector),需要創建一個Selector實例(使用靜態工廠方法open())並將其注冊(register)到想要監控的信道上(注意,這要通過channel的方法實現,而不是使用selector的方法)。最后,調用選擇器的select()方法。該方法會阻塞等待,直到有一個或更多的信道准備好了I/O操作或等待超時。select()方法將返回可進行I/O操作的信道數量。現在,在一個單獨的線程中,通過調用 select()方法就能檢查多個信道是否准備好進行I/O操作。如果經過一段時間后仍然沒有信道准備好,select()方法就會返回0,並允許程序繼續執行其他任務。
下面來看一個例子。假設我們想要使用信道和選擇器來實現一個像上面一樣的回顯服務器,並不使用多線程和忙等。為了使不同協議都能方便地使用這個基本的服務模式,我們把信道中與具體協議相關的處理各種I/O操作(接收,讀,寫)分離出來。Protocol定義了通用 EchoSelectorServer類與特定協議之間的接口,包括三個方法,每個方法代表了一種I/O形式。當有信道准備好I/O操作時,服務器只需要調用相應的方法即可。
packageorg.zapldy.tcpip.nio;
 
importjava.io.IOException;
importjava.nio.channels.SelectionKey;
 
publicinterface Protocol {
   public voidhandleAccept(SelectionKey key) throwsIOException;
   public voidhandleRead(SelectionKey key) throwsIOException;
   public voidhandleWrite(SelectionKey key) throwsIOException;
}
下面是具體的實現(注意看注釋):
packageorg.zapldy.tcpip.nio;
 
importjava.io.IOException;
importjava.nio.ByteBuffer;
importjava.nio.channels.SelectionKey;
importjava.nio.channels.ServerSocketChannel;
importjava.nio.channels.SocketChannel;
 
publicclass EchoProtocol implementsProtocol {
 
   private intbufsize;//為每個客戶端信道創建的緩沖區大小
 
   public EchoProtocol(int bufsize) {
      this.bufsize = bufsize;
   }
 
 
   public voidhandleAccept(SelectionKeykey) throws IOException {
      //channel()方法返回注冊時用來創建的Channel,該Channel是一個ServerSocketChannel,
       //因為這是我們注冊的唯一一種支持accept操作的信道,
//accept()方法為傳入的連接返回一個SocketChannel實例。
      SocketChannel channel =((ServerSocketChannel)          key.channel()).accept();
      //這里無法注冊阻塞式信道,必須是非阻塞式的
      channel.configureBlocking(false);
      //可以通過SelectionKey類的selector()方法來獲取相應的Selector。
       //我們根據指定大小創建了一個新的ByteBuffer實例,
       //並將其作為參數傳遞給register()方法。它將作為附件,與regiter()方法所返回的
//SelectionKey實例相關聯。
      channel.register(key.selector(),SelectionKey.OP_READ, ByteBuffer
             .allocateDirect(bufsize));
   }
 
 
   public voidhandleRead(SelectionKey key) throws IOException {
      //根據其支持數據讀取操作可知,這是一個SocketChannel。
      SocketChannel channel =(SocketChannel) key.channel();
      //建立連接后,有一個ByteBuffer附件加到該SelectionKey實例上,這個附件里面的內容將
       //會在發送的時候用到,附件始終是附着這個長連接上
      ByteBuffer buf = (ByteBuffer)key.attachment();
      long bytesRead = channel.read(buf);
      //如果read()方法返回-1,則表示底層連接已經關閉,此時需要關閉信道。
       //關閉信道時,將從選擇器的各種集合中移除與該信道關聯的鍵。
      if (bytesRead == -1) {
          channel.close();
      } else if (bytesRead > 0) {
          //這里依然保留了信道的可讀操作,雖然緩沖區中可能已經沒有剩余空間了,
//因為下次還是要接受新的數據
          key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
      }
   }
 
   public voidhandleWrite(SelectionKey key) throws IOException {
      //附加到SelectionKey上的ByteBuffer包含了之前從信道中讀取的數據。
      ByteBuffer buf =(ByteBuffer)key.attachment();
      //該方法用來修改緩沖區的內部狀態,以指示write操作從什么地方獲取數據,及還剩多少數據
      buf.flip();
      SocketChannel channel =(SocketChannel)key.channel();//獲取信道
      channel.write(buf);//向信道中寫數據
      if(!buf.hasRemaining()){
          //如果沒有剩余數據可讀,則修改該鍵關聯的操作集,指示其只能進行讀操作了
          key.interestOps(SelectionKey.OP_READ);
      }
   //如果緩沖區中還有剩余數據,該操作將剩余數據移到緩沖區前端,以使下次迭代能讀入更多數據。
      buf.compact();
   }
}
下面是回顯服務器端代碼的實現,在服務器端創建一個選擇器,並將其與每個偵聽客戶端連接的套接字說對應的ServerSocketChannel注冊在一起。然后進行反復循環,調用select()方法,並調用相應的操作器對各種類型的I/O操作進行處理。
packageorg.zapldy.tcpip.nio;
 
importjava.io.IOException;
importjava.net.InetSocketAddress;
importjava.nio.channels.SelectionKey;
importjava.nio.channels.Selector;
importjava.nio.channels.ServerSocketChannel;
importjava.util.Iterator;
 
publicclass EchoSelectorServer {
   private staticfinal int BUFSIZE = 256;
   private staticfinal int TIMEOUT = 3000;
 
   private staticfinal int PORT = 8888;
 
   public staticvoid main(String[] args) throwsIOException{
      Selector selector = Selector.open();
 
      ServerSocketChannellistnChannel = ServerSocketChannel.open();
      listnChannel.socket().bind(newInetSocketAddress(PORT));
      //只有非阻塞信道才可以注冊選擇器,因此需要將其配置為適當的狀態
      listnChannel.configureBlocking(false);
      //在注冊過程中指出該信道可以進行“accept”操作
      listnChannel.register(selector,SelectionKey.OP_ACCEPT);
 
      Protocolprotocol = newEchoProtocol(BUFSIZE);
      while(true){
          if(selector.select(TIMEOUT) == 0){
             System.out.print("==");
             continue;
          }
 
                        Iterator<SelectionKey>keyIter =
selector.selectedKeys().iterator();
          while(keyIter.hasNext()){
             SelectionKey key =keyIter.next();
             if(key.isAcceptable()){
                 protocol.handleAccept(key);
             }
             if(key.isReadable()){
                 protocol.handleRead(key);
             }
             if(key.isWritable() &&key.isValid()){
                 protocol.handleWrite(key);
             }
             //由於select()操作只是向Selector所關聯的鍵集合中添加元素
               //因此,如果不移除每個處理過的鍵,
//它就會在下次調用select()方法時仍然保留在集合中
               //而且可能會有無用的操作來調用它。
             keyIter.remove();
          }
      }
   }
}
 
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
流(TCP)信道詳解
流信道有兩個變體:SocketChannel和ServerSocketChannel。像其對應的Socket一樣,SocketChannel是相互連接的終端進行通信的信道。
SocketChannel:創建,連接和關閉
   static SocketChannel open(SocketAddressremote)
   static SocketChannel open()
   boolean connect(SocketAddress remote)
   boolean isConnected()
   void close()
   boolean isOpen()
   Socket socket()
   調用SocketChannel的靜態工廠方法open()可以創建一個實例。open()方法的第一種形式以SocketAddress為參數,返回一個連接到指定服務器的SocketChannel實例。注意,該方法可能會無限期地阻塞下去。open()的無參數形式用於創建一個沒有連接的 SocketChannel實例,該實例可以通過調用connect()方法連接到指定終端。當使用完SocketChannel后,需要調用 close()方法將其關閉。有一點很重要,即每個SocketChannel實例都包裹了一個基本的Java Socket,並可以通過socket()方法對該Socket進行訪問。這就可以通過基本的Socket方法進行綁定、設置套接字選項等操作。
 
在創建並連接SocketChannel后,就可以調用該信道的讀寫方法進行I/O操作。
SocketChannel讀和寫
   int read(ByteBuffer dst)
   long read(ByteBuffer[] dsts)
   long read(ByteBuffer[] dsts, int offset,int length)
   int write(ByteBuffer src)
   int write(ByteBuffer[] srcs)
   int write(ByteBuffer[] srcs, intoffset, int length)
        讀操作的最基本形式以一個ByteBuffer為參數,並將讀取的數據填入該緩沖區所有剩余字節空間中。另一種形式以多個ByteBuffer為參數(ByteBuffer數組),並根據其在數組中的順序,將讀取的數據依次填入每個緩沖區的剩余字節空間中。這種方法稱為散射式讀,因為它將讀入的直接分散到了多個緩沖區中。需要注意重要的一點,散射式讀不一定會將所有緩沖區填滿,這些緩沖區的總空間大小只是一個上限。
      寫操作的最基本形式是以一個ByteBuffer為參數,並試圖將該緩沖區中剩余的字節寫入信道。另一種形式以一個ByteBuffer數組作為參數,並試圖將所有緩沖區中的剩余字節都寫入信道。這種方法稱為聚集式寫,因為它把多個緩沖區中的字節聚集起來,一起發送出去。
        與其對應的ServerSocket一樣,ServerSocketChannel是用來偵聽客戶端連接的信道。
ServerSocketChannel創建,接受和關閉
   static ServerSocketChannel open()
   ServerSocket socket()
   SocketChannel accept()
   void close()
   boolean isOpen()
   調用靜態工廠方法open()可以創建一個ServerSocketChannel實例。每個實例都包裹了一個ServerSocket實例,並可以通過 socket()方法對其訪問。正如前面的例子所表明的,必須通過底層的ServerSocket實例來實現綁定制定端口,設置套接字選項等操作。在創建了信道實例並綁定端口后,就可以調用accept()方法來准備接收客戶端的連接請求。連接成功則返回一個新的已連接的SocketChannel。在用完ServerSocketChannel后,需要調用close()方法將其關閉。
   如前文提到的那樣,阻塞式信道除了能夠(必須)與Buffer一起使用外,對於普通套接字來說幾乎沒有優點。因此,可能總是需要將其設置成非阻塞式的。
SocketChannel, ServerSocketChannel設置阻塞行為
   SelectableChannelconfigureBlocking(boolean block)
   boolean isBlocking()
   通過調用configureBlocking(false)可以將SocketChannel或ServerSocketChannel設置為非阻塞模式。configureBlocking()方法將返回一個SelectableChannel,它是SocketChannel和 ServerSocketChannel父類。
   考慮為SocketChannel設置連接的情況。如果傳給SocketChannel的工廠方法open()一個遠程地址,對該方法的調用則將阻塞等待,直到成功建立了連接。要避免這種情況,可以使用open()方法的無參數形式,配置信道為非阻塞模式,再調用connect()方法,制定遠程終端地址。如果在沒有阻塞的情況下連接已經建立,connect()方法返回true;否則需要有檢查套接字是否連接成功的方法。
SocketChannel測試連接性
   boolean finishConnect()
   boolean isConnected()
   boolean isConnectionPending()
   對於非阻塞SocketChannel來說,一旦已經發起連接,底層套接字可能既不是已經連接,又不是沒有連接,而是連接“正在進行”。由於底層協議的工作機制,套接字可能會在這個狀態一直保持下去。finishConnect()方法可以用來檢查在非阻塞套接字上試圖進行的連接狀態,還可以在阻塞套接字建立連接的過程中阻塞等待,直到連接成功建立。例如,你可能需要將信道配置成非阻塞模式,通過connect()方法發起連接,做完一些其他工作后,又將信道配置成阻塞模式,然后調用finishConnect()方法等待連接建立完成。或者可以讓信道保持在非阻塞模式,並反復調用 finishConnect()方法。如TCPEchoClientNoblocking類中所示。
      isConnected()用於檢查套接字是否已經建立了連接,從而避免在進行其他操作時拋出NotYetConnectedException異常(如在調用read()或write()時)。還可以使用isConnectedPending()方法來檢查是否有連接在該信道上發起。知道是否有連接發起是有必要的,因為如果沒有的話,finishConnect()方法將拋出NoConnectionPendingException異常。
5、Selector詳解
EchoSelectorServer示例中展示了Selector的基本用法。在此,我們將對其進行更加詳細的介紹。
Selector創建和關閉
   static Selector open()
   boolean isOpen()
   void close()
   調用Selector的open()工廠方法可以創建一個選擇器實例。選擇器的狀態是“打開”或是“關閉”的。創建時選擇器的狀態時打開的,並保持該狀態,直到調用close()方法通知系統其任務已經完成。可以調用isOpen()方法來檢查選擇器是否已經關閉。
1)在信道中注冊
   我們已經知道,每個選擇器都有一組與之關聯的信道,選擇器對這些信道上“感興趣的”I/O操作進行監聽。Selector與Channel之間的關聯由一個SelectionKey實例表示。(注意:一個信道可以注冊多個Selector實例,因此可以有多個關聯的SelectionKey實例)。 SelectionKey維護了一個信道上感興趣的操作類型信息,並將這些信息存放在一個int型的位圖中,該int型數據的每一位都有相應的含義。
   SelectionKey類中的常量定義了信道上可能感興趣的操作類型,每個這種常量都是只有一位設置為1的位掩碼。
SelectionKey興趣操作集
   static int OP_ACCEPT
   static int OP_CONNECT
   static int OP_READ
   static int OP_WRITE
   int interestOps()
   SelectionKey interestOps(int ops)
   通過對OP_ACCEPT,OP_CONNECT,OP_READ以及OP_WRITE中適當的常量進行按位OR,我們可以構造一個位向量來制定一組操作。例如,一個包含讀和寫的操作集可由表達式(OP_READ| OP_WRITE)來指定。不帶參數的interestOps()方法將返回一個int型位圖,該位圖中設置為1的每一位都指示了信道上需要監聽的一種操作。另一種方法以一個位圖為參數,指示了應該監聽信道上的哪些操作。重點提示:任何對key(信道)所關聯的興趣操作集的改變,都只在下次調用了 select()方法后才會生效。
 
SocketChannel, ServerSocketChannel注冊Selector
   SelectionKey register(Selectorsel, intops)
   SelectionKey register(Selectorsel, int ops, Object attachment)
   int validOps()
   boolean isRegistered()
   SelectionKey keyFor(Selector sel)
   調用信道的register()方法可以將一個選擇器注冊到該信道。在注冊過程中,通過存儲在int型數據中的位圖來指定該信道上的初始興趣操作集。 register()方法將返回一個代表了信道和給定選擇器之間的關聯的SelectionKey實例。validOps()方法用於返回一個指示了該信道上的有效I/O操作集的位圖。對於SocketChannel來說,有效操作包括讀、寫和連接。一個信道可能只與一個選擇器注冊一次,因此后續對 register()方法的調用只是簡單地更新該key所關聯的興趣操作集。使用isRegistered()方法可以檢查信道是否已經注冊了選擇器。 keyFor()方法與第一次調用register()方法返回的是同一個SelectionKey實例,除非該信道沒有注冊給定的選擇器。
以下代碼注冊了一個信道,支持讀寫操作:
SelectionKey key = clientChannel.register(selector,
                        SelectionKey.OP_READ| SelectionKey.OP_WRITE)
下圖展示了一個選擇器,其鍵集中包含了7個代表注冊信道的鍵:兩個在端口8888和8889上的服務器信道,以及從服務器信道創建的5個客戶端信道:
SelectionKey獲取和取消
   Selector selector()
   SelectableChannel channel()
   void cancel()
   鍵關聯的Selector實例和Channel實例可以分別使用該鍵的selector()和channel()方法獲得。cancel()方法用於(永久性地)注銷該鍵,並將其放入選擇器的注銷集中。在下一次調用select()方法時,這些鍵將從該選擇器的所有集中移除,其關聯的信道也將不再被監聽(除非它又重新注冊)。
 
2) 選取和識別准備就緒的信道
   在信道上注冊了選擇器,並由關聯的鍵指定了感興趣的I/O操作集后,我們就只需要坐下來等待I/O了。這要使用選擇器來完成。
Selector等待信道准備就緒
   int select()
   int select(long timeout)
   int selectNow()
   Selector wakeup()
   select()方法用於從已經注冊的信道中返回在感興趣的I/O操作集上准備就緒的信道總數。(例如,興趣操作集中包含OP_READ的信道有數據可讀,或包含OP_ACCEPT的信道有連接請求待接受。)以上三個select()方法的唯一區別在於它們的阻塞行為。無參數的select()方法會阻塞等待,直到至少有一個注冊信道中有感興趣的操作准備就緒,或有別的線程調用了該選擇器wakeup()方法(這種情況下select()方法將返回 0)。以超時時長作為參數的select方法也會阻塞等待,直到至少有一個信道准備就緒,或等待時間超過了指定的毫秒數(正數),或者有另一個線程調用其 wakeup()方法。selectNow()方法是一個非阻塞版本:它總數立即返回,如果沒有信道准備就緒,則返回0.wakeup()方法可以使用當前阻塞(也就是說在另一個線程中阻塞)的任何一種select()方法立即返回;如果當前沒有select方法阻塞,下一次調用者三種方法的任何一個都將立即返回。
      選擇之后,我們需要知道哪些信道准備好了特定的I/O操作。每個選擇器都維護了一個已選鍵集,與這些鍵關聯的信道都有即將發生的特定I/O操作。通過調用 selectedKey()方法可以訪問已選鍵集,該方法返回一組selectionKey。我們可以在這組鍵上進行迭代,分別處理等待在每個鍵關聯的信道上的I/O操作。
   Iterator<SelectionKey>keyIter =
selector.selectedKeys().iterator();
while(keyIter.hasNext()){
       SelectionKey key = keyIter.next();
       //...在這里處理該key所關聯的信道channel
       keyIter.remove();
      }
   }
 
Selector獲取鍵集
   Set<SelectionKey> keys()
   Set<SelectionKey>selectedKeys()
      以上方法返回選擇器的不同鍵集。keys()方法返回當前已注冊的所有鍵。返回的鍵集是不可修改的;任何對其進行修改的嘗試(如,調用其remove() 方法)都將拋出UnsupportedOperationException異常。selectedKeys()方法用於返回上次調用select()方法時,被“選中”的已准備好進行I/O操作的鍵。重要提示:selectedKeys()方法返回的鍵是可修改的,在實際上在兩次調用select()方法之間,都必須“手工”將清空。換句話說,select方法只會在已有的所選鍵集上添加鍵,它們不會創建新的鍵集。
   所有鍵集指示了哪些信道當前可以進行I/O操作。對於選中的每個信道,我們需要知道它們各自准備好的特定I/O操作。除了興趣操作集外,每個鍵還維護了一個即將進行的I/O操作集,稱為就緒操作集。
SelectionKey查找就緒的I/O操作
   int readyOps()
   boolean isAcceptable()
   boolean isConnectable()
   boolean isReadable()
   boolean isValid()
   boolean isWritable()
   對於給定的鍵,可以使用readyOps()方法或其他指示方法來確定興趣集中的哪些I/O操作可以執行。readyOps()方法以位圖的形式返回所有准備就緒的操作集。其他方法用於分別檢查各種操作是否可用。
例如,查看鍵關聯的信道上是否有正在等待的讀操作,可以使用以下代碼:
(key.readOps() &SelectionKey.OP_READ) != 0
key.isReadable()
   選擇器的已選鍵集中的鍵,以及每個鍵中准備就緒的操作,都是由select()方法來確定的。隨着時間的推進,這些信息可能會過時。其他線程可能會處理准備就緒的I/O操作。同時,鍵也不是永遠存在的。但其關聯的信道或選擇器關閉時,鍵也將失效。通過調用其cancel()方法可以顯示地將鍵設置為無效。調用其isValid()方法可以檢測一個鍵的有效性。無效的鍵將添加到選擇器的注銷集中,並在下次調用任意一種形式的select()方法和或者 close()方法時從鍵集中移除。(當然,從鍵集中移除鍵意味着與它關聯的信道也不再受監聽。)
 
3) 信道附件
   當一個信道准備好進行I/O操作時,通常還需要額外的信息來處理請求。例如,在前面的回顯協議中,但客戶端信道准備好寫操作時,就需要有數據可寫。當然,我們所需要的可寫數據是由之前同一信道上的讀操作收集的,但是在其可寫之前,這些數據存放在什么地方呢?另一個例子,如果一個消息一次傳來了多個字節,我們需要保存已接收的部分消息,直到整個消息接收完成。這兩種情況都需要維護每個信道的狀態信息。然而,我們非常幸運!SelectionKey通過使用附件使保存每個信道的狀態變得容易。
SelectionKey查找就緒的I/O操作
Object attach(Object ob)
Object attachment()
        每個鍵可以有一個附件,數據類型只能是Object類。附件可以在信道第一次調用register()方法時與之關聯,或者后來再使用attach()方法直接添加到鍵上。通過SelectionKey的attachment()方法可以訪問鍵的附件。
 
4) Selector小結
   總的來說,使用Selector的步驟如下:
1、 創建一個Selector實例。
2、將其注冊到各種信道,指定每個信道上感興趣的I/O操作。
3、 重復執行:
1) 調用一種select方法
2) 獲取選取的鍵列表
3) 對於已選鍵集中的每個鍵。
a. 獲取信道,並從鍵中獲取附件(如果合適的話)
b. 確定准備就緒的操作並執行。如果是accept操作,將接受的信道設置為非阻塞模式,並將其與選擇器注冊。
c. 如果需要,修改鍵的興趣操作集
d. 從已選鍵中移除鍵
如果選擇器告訴了你什么時候I/O操作准備就緒,你還需要非阻塞I/O嗎?答案是肯定的。信道在已選鍵集中的鍵並不能確保非阻塞I/O,因為調用了 select()方法后,鍵集信息可能會過時。另外,阻塞式寫操作會阻塞等待直到寫完所有字節,而就緒集中的OP_WRITE僅表示至少有一個字節可寫。實際上,只是非阻塞模式的信道才能與選擇器進行注冊:如果信道在阻塞模式,SelectableChannel類的register()方法將拋出 IllegalBlockingModeException異常。
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM