DatagramChannel和SocketChannel都實現定義讀寫功能,ServerSocketChannel不實現,只負責監聽傳入的連接,並建立新的SocketChannel,本身不傳輸數據。
Socket通道被實例化時都會創建一個對等的socket,通過此方式創建的socket都會有關聯的通道,通過getChannel()獲取。
繼承於 SelectableChannel,所以socket可以在非阻塞模式下運行:
Readiness Selection:就緒選擇,查詢通道的機制,該機制可以判斷通道是否准備好執行下一個目標操作(讀,寫...),其價值在於潛在的大量通道可以同時進行就緒檢查,真正的就緒選擇需要由操作系統來做,處理IO請求,並通知各個線程數據准備情況。
Selector選擇器:提供了這種抽象(抽象接口),是的Java代碼能夠以可移植的方式,請求底層操作系統提供這種服務。
Selector選擇器類:管理着一個被注冊的通道集合的信息和他們的狀態,通道和選擇器是一起被注冊的,並且使用選擇器來更新通道狀態。
一個通道可以被注冊到多個選擇器上,但在每個選擇器上,只能注冊一次。
SelectionKey選擇鍵:封裝了通道和選擇器的注冊關系,選擇鍵被SelectableChannel.register()返回並提供標識這種注冊關系的標記。
通道在被注冊到選擇器之前必須設置為noblocking模式,正常狀態。
chanel.register(selector, keystate):通道注冊選擇器。
selector.select():阻塞操作,直到某一個channel的keystate發生。
selectionKey.cancel(),取消注冊關系。
通道關閉,相關的注冊鍵會自動取消,選擇器關閉,則所有注冊到該選擇器的通道都將被注銷,並且相關的鍵會立刻失效。
selectionkey包含兩個以整數型式進行編碼的比特掩碼,一個用於指示那些通道和選擇器組合所關心的操作,另一個表示通道准備好要執行的操作。當前的interest集合可以通過調用見對象的interestOps()方法來獲取,且永遠不會被選擇器改變,但可以調用interestOps()方法,傳入一個新的比特碼來改變。
readyOpts()獲取相關通道的已就緒的操作,ready集合是interest集合的子集,表示從上次調用select()之后已經就緒的操作。如下:
if((key.readOps() & SelctionKey.OP_READ) != 0){
buffer.clear();
key.channel().read(buffer);
do()....
}
附加參數:attach()
SelectionKey key = SelectableChannel.register(Selector, SelectionKey.OP_XXX, paramObj);
等價:
SelectionKey key = SelectableChannel.register(Selector, SelectionKey.OP_XXX);
key.attach(paramObj);
SelectionKey 多線程應用同步問題。
選擇器:
Selector上的已注冊鍵集合中,會存在失效鍵、null,keys()返回,不可修改。
已選擇鍵集合,selectedKeys()返回,已經准備好的鍵集合,可能為空。
核心:選擇過程,是對select(),poll(),epoll()等本地調用(native call)或者類似的操作系統的本地調用的包裝(抽象),期間,將執行以下過程:
-
已取消的鍵的集合將會被檢查,如果非空,則會被從其它兩個集合(已注冊、已選擇)移除,相關通道將會被注銷,鍵被清空。
-
已注冊鍵的集合的鍵的interest集合將會被檢查,就緒條件確定,底層操作系統對通道所關心的操作的就緒狀態進行檢查,如果沒有,則阻塞當前(超時值)。
對於已經就緒的通道執行:
a. 如果通道的鍵未在已選擇的鍵集合中,那么鍵的reay集合將會被清空,然后設置當前准備好的比特掩碼。
b. 如果通道的鍵已在已選擇的鍵集合中,鍵的ready集合更新。不再就緒的狀態不會被清除。
-
select返回的是從上一次select()調用之后進入就緒狀態的通道數量,之前的調用中已經就緒的,並且本次調用中仍然就緒的不會被計入。
使用內部已取消的鍵的集合來延遲注銷,防止線程在取消鍵時阻塞及與正在進行的選擇操作沖突的優化,
三種形式的select: select(), select(timeout),selectNow()(非阻塞,立刻返回當前狀況)。
調用 Selector 對象的 wakeup( )方法將使得選擇器上的第一個還沒有返回的選擇操作立即回。如果當前沒有在進行中的選擇,那么下一次對 select( )方法的一種形式的調用將立即返回。后續的選擇操作將正常進行。在選擇操作之間多次調用 wakeup( )方法與調用它一次沒有什么不同。有時這種延遲的喚醒行為並不是您想要的。您可能只想喚醒一個睡眠中的線程,而使得后續的
選擇繼續正常地進行。您可以通過在調用 wakeup( )方法后調用 selectNow( )方法來繞過這個問題。
通常的做法是在選擇器上調用一次 select 操作(這將更新已選擇的鍵的集合),然后遍歷 selectKeys( )方法返回的鍵的集合。在按順序進行檢查每個鍵的過程中,相關的通道也根據鍵的就緒集合進行處理。然后鍵將從已選擇的鍵的集合中被移除(通過在 Iterator對象上調用 remove( )方法),然后檢查下一個鍵。完成后,通過再次調用 select( )方法重復這個循環。如下:
package org.windwant.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by windwant on 2016/10/27. */ public class SocketChannelOpt { private static final String HOST = "localhost"; private static final int PORT = 8888; private static ExecutorService read = Executors.newFixedThreadPool(5); private static ExecutorService write = Executors.newFixedThreadPool(5); public static void main(String[] args){ ServerSocketChannel serverSocketChannel = null; ServerSocket serverSocket = null; Selector selector = null; try { serverSocketChannel = ServerSocketChannel.open();//工廠方法創建ServerSocketChannel serverSocket = serverSocketChannel.socket(); //獲取channel對應的ServerSocket serverSocket.bind(new InetSocketAddress(HOST, PORT)); //綁定地址 serverSocketChannel.configureBlocking(false); //設置ServerSocketChannel非阻塞模式 selector = Selector.open();//工廠方法創建Selector serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//通道注冊選擇器,接受連接就緒狀態。 while (true){//循環檢查 if(selector.select() == 0){//阻塞檢查,當有就緒狀態發生,返回鍵集合 continue; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //獲取就緒鍵遍歷對象。 while (it.hasNext()){ SelectionKey selectionKey = it.next(); //處理就緒狀態 if (selectionKey.isAcceptable()){ ServerSocketChannel schannel = (ServerSocketChannel) selectionKey.channel();//只負責監聽,阻塞,管理,不發送、接收數據 SocketChannel socketChannel = schannel.accept();//就緒后的操作,剛到達的socket句柄 if(null == socketChannel){ continue; } socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); //告知選擇器關心的通道,准備好讀數據 }else if(selectionKey.isReadable()){ SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024); StringBuilder result = new StringBuilder(); while (socketChannel.read(byteBuffer) > 0){//確保讀完 byteBuffer.flip(); result.append(new String(byteBuffer.array())); byteBuffer.clear();//每次清空 對應上面flip() } System.out.println("server receive: " + result.toString()); socketChannel.register(selector, SelectionKey.OP_WRITE); }else if(selectionKey.isWritable()){ SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); String sendStr = "server send data: " + Math.random(); ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes()); while (send.hasRemaining()){ socketChannel.write(send); } socketChannel.register(selector, SelectionKey.OP_READ); System.out.println(sendStr); } it.remove(); } } } catch (IOException e) { e.printStackTrace(); } } }
Selector多線程執行,同步需求。
一個線程監控通道的就緒狀態,一個線程池處理業務需求。線程池也可以擴展為不同的業務處理線程池,如日志、業務、心跳。
package org.windwant.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 線程處理讀取,寫出 * Created by windwant on 2016/10/27. */ public class TSocketChannelOpt { private static final String HOST = "localhost"; private static final int PORT = 8888; private static ExecutorService read = Executors.newFixedThreadPool(5); private static ExecutorService write = Executors.newFixedThreadPool(5); public static void main(String[] args){ ServerSocketChannel serverSocketChannel = null; ServerSocket serverSocket = null; Selector selector = null; try { serverSocketChannel = ServerSocketChannel.open();//工廠方法創建ServerSocketChannel serverSocket = serverSocketChannel.socket(); //獲取channel對應的ServerSocket serverSocket.bind(new InetSocketAddress(HOST, PORT)); //綁定地址 serverSocketChannel.configureBlocking(false); //設置ServerSocketChannel非阻塞模式 selector = Selector.open();//工廠方法創建Selector serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//通道注冊選擇器,接受連接就緒狀態。 while (true){//循環檢查 if(selector.select() == 0){//阻塞檢查,當有就緒狀態發生,返回鍵集合 continue; } Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //獲取就緒鍵遍歷對象。 while (it.hasNext()){ SelectionKey selectionKey = it.next(); it.remove(); //處理就緒狀態 if (selectionKey.isAcceptable()){ ServerSocketChannel schannel = (ServerSocketChannel) selectionKey.channel();//只負責監聽,阻塞,管理,不發送、接收數據 SocketChannel socketChannel = schannel.accept();//就緒后的操作,剛到達的socket句柄 if(null == socketChannel){ continue; } socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); //告知選擇器關心的通道,准備好讀數據 }else if(selectionKey.isReadable()){ SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); read.execute(new MyReadRunnable(socketChannel)); // SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); // ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024); // // StringBuilder result = new StringBuilder(); // while (socketChannel.read(byteBuffer) > 0){//確保讀完 // byteBuffer.flip(); // result.append(new String(byteBuffer.array())); // byteBuffer.clear();//每次清空 對應上面flip() // } // // System.out.println("server receive: " + result.toString()); socketChannel.register(selector, SelectionKey.OP_WRITE); }else if(selectionKey.isWritable()){ SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); write.execute(new MyWriteRunnable(socketChannel)); // String sendStr = "server send data: " + Math.random(); // ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes()); // while (send.hasRemaining()){ // socketChannel.write(send); // } // System.out.println(sendStr); socketChannel.register(selector, SelectionKey.OP_READ); } } } } catch (IOException e) { e.printStackTrace(); } } static class MyReadRunnable implements Runnable { private SocketChannel channel; public MyReadRunnable(SocketChannel channel){ this.channel = channel; } @Override public synchronized void run() { ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024); StringBuilder result = new StringBuilder(); try { while (channel.read(byteBuffer) > 0){//確保讀完 byteBuffer.flip(); result.append(new String(byteBuffer.array())); byteBuffer.clear();//每次清空 對應上面flip() } System.out.println("server receive: " + result.toString()); } catch (IOException e) { e.printStackTrace(); } } } static class MyWriteRunnable implements Runnable { private SocketChannel channel; public MyWriteRunnable(SocketChannel channel){ this.channel = channel; } @Override public void run() { String sendStr = "server send data: " + Math.random(); ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes()); try { while (send.hasRemaining()) { channel.write(send); } System.out.println(sendStr); }catch (Exception e){ e.printStackTrace(); } } } }