JAVA中的NIO
標准的IO是基於字節流和字符流進行操作的,而JAVA中的NIO是基於Channel和Buffer進行操作的。
傳統IO
NIO
核心模塊
NIO主要有三個核心部分:Selector、Channel、Buffer
數據總是從Channel讀取到Buffer或者從Buffer寫入到Channel中。
Selector可以監聽多個Channel的多個事件。
傳統的IO與Channel的區別
1.傳統的IO是BIO的,而Channel是NIO的。
*當流調用了read()、write()方法后會一直阻塞線程直到數據被讀取或寫入完畢。
2.傳統IO流是單向的,而Channel是雙向的。
Channel
FileChannel:從文件中進行讀取
DatagramChannel:可以通過UDP協議在網絡中進行數據的傳輸
SocketChannel:可以通過TCP協議在網絡中進行數據的傳輸
ServerSocketChannel:可以作為一個服務器監聽連接
Channel通用API:
read(buffer):將數據從Channel讀取到Buffer中,讀取完畢返回-1。
read(buffer []):將數據從Channel讀取到多個Buffer中,僅當第一個Buffer被寫滿后往第二個Buffer中進行寫入。
write(buffer):將Buffer中的數據寫入到Channel中。
write(buffer[]):將多個Buffer中的數據寫入到Channel中,僅當第一個Buffer中的數據被讀取完畢后再從第二個Buffer中進行讀取。
register(selector,interest):將Channel注冊到Selector中,同時需要向Selector傳遞要監聽此Channel的事件類型(注冊到Selector中的Channel一定要非阻塞的)
configureBlocking(boolean):設置Channel是否為阻塞。
transferFrom(position,count,channel):將其他Channel中的數據傳輸到當前Channel中。
transferTo(position,count,channel):將當前Channel中的數據傳輸到其他Channel中。
SocketChannel API
open()靜態方法:創建SocketChannel。
connect(new InetSocketAddress(port))方法:連接服務器。
finishConnect()方法:判斷是否已經與服務器建立連接。
ServerSocketChannel API
open()靜態方法:創建ServerSocketChannel。
accept()方法:該方法會一直阻塞線程直到有新連接到達。
阻塞式與非阻塞式Channel
正常情況下Channel都是阻塞的,只有當調用了configureBlocking(false)方法時Channel才為非阻塞。
阻塞式Channel的connect()、accept()、read()、write()方法都會阻塞線程,直到處理完畢。
非阻塞式Channel的connect()、accept()、read()、write()方法都是異步的。
*當調用了非阻塞式Channel的connect()方法后,需要使用finishConnect()方法判斷是否已經與服務器建立連接。
*當調用了非阻塞式Channel的accept()方法后,需要根據方法的返回值是否為NULL判斷是否接收到新的連接。
*當調用了非阻塞式Channel的read()方法后,需要根據方法的返回值是否大於0判斷是否有讀取到數據。
*在使用非阻塞式Channel的write()方法時,需要借助while循環與hasRemaining()方法保證buffer中的內容被全部寫入。
*FileChannel一定是阻塞的。
示例
public void testFileChannel() throws IOException {
RandomAccessFile randomAccessFile = new RandomAccessFile(new File("F:\\筆記\\nginx.txt"), "rw");
FileChannel fileChannel = randomAccessFile.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(64);
int count = fileChannel.read(byteBuffer);
while (count != -1) {
byteBuffer.flip();
System.out.println(new String(Arrays.copyOfRange(byteBuffer.array(),0,byteBuffer.limit()),Charset.forName("UTF-8")));
byteBuffer.clear();
count = fileChannel.read(byteBuffer);
}
}
Buffer
Buffer是一塊可以進行讀寫操作的內存(順序存儲結構)
ByteBuffer:基於Byte類型進行存儲
CharBuffer:基於Char類型進行存儲
DoubleBuffer:基於Double類型進行存儲
FloatBuffer:基於Float類型進行存儲
IntBuffer:基於Int類型進行存儲
LongBuffer:基於Long類型進行存儲
ShortBuffer:基於Short類型進行存儲
Buffer的內部結構
1.capacity:表示buffer的容量
2.position:表示當前的位置(從0開始,最大值為capacity-1)
3.limit:在寫模式中表示可以寫入的個數(與capacity一樣),在讀模式中表示可以讀取的個數。
從寫模式轉換成讀模式
limit設置為position+1,position設置為0。
從讀模式轉換成寫模式
limit設置為capacity,position設置為0。
往Buffer中寫數據
1.將數據從Channel讀取到Buffer中。
2.使用Buffer的put()方法。
從Buffer中讀數據
1.將Buffer中的數據寫入到Channel中。
2.使用Buffer的get()方法
Buffer通用API:
allocate(size)靜態靜態:初始化一個Buffer。
flip():將buffer從寫模式轉換成讀模式。
array():將Buffer中的內容轉換成數組(不受limit控制)
get():獲取Buffer中的內容。
hasRemaining():判斷Buffer中是否還有未讀的元素(limit - (postion+1) )
rewind():將positon設置為0。
clear():將limit設置為capacity,position設置為0。
compact():將所有未讀的元素移動到Buffer的起始處,position指向最后一個未讀的元素的下一位,limit設置為capacity。
*clear()和compact()方法都可以理解成將Buffer從讀模式轉換成寫模式,區別在於compact()方法會保留未讀取的元素。
mark():在當前position處打一個標記。
reset():將position恢復到標記處。
Selector
Selector用於監聽多個Channel的多個事件(單線程)
Channel的事件類型
1.連接就緒:當SocketChannel、DatagramChannel成功與服務器建立連接時將會觸發連接就緒事件。
2.接收就緒:當有連接到達服務器時將會觸發接收就緒事件。
3.讀就緒:當SocketChannel、DatagramChannel有數據可讀時將會觸發讀就緒事件。
4.寫就緒:當SocketChannel、DatagramChannel可以進行數據寫入時將會觸發寫就緒事件。
SelectionKey
SelectionKey用於存儲Selector與Channel之間的相關信息。
SelectionKey中提供了四個常量分別代表Channel的事件類型。
SelectionKey.OP_CONNECT
SelectionKey.OP_ACCEPT
SelectionKey.OP_READ
SelectionKey.OP_WRITE
SelectableChannel提供的register(selector,interest)方法用於將Channel注冊到Selector中,同時需要向Selector傳遞要監聽此Channel的事件類型,當要監聽的事件類型不止一個時可以使用或運算,當將Channel注冊到Selector后會返回SelectionKey實例,用於存儲Selector與此Channel之間的相關信息。
SelectionKey API:
interestOps()方法:返回Selector監聽此Channel的事件類型。
readyOps()方法:返回此Channel目前就緒的事件。
isAcceptable():判斷Channel是否接收就緒。
isConnectable():判斷Channel是否連接就緒。
isReadable():判斷Channel是否讀就緒。
isWriteable():判斷Channel是否寫就緒。
channel():返回具體的Channel實例。
selector():返回Selector實例。
attach():往SelectionKey中添加一個附加對象。
attachment():返回保存在SelectionKey中的附加對象。
Selector API:
open()靜態方法:創建一個Selector。
select()方法:該方法會一直阻塞線程直到所監聽的Channel有事件就緒,返回就緒的Channel個數(只會返回新就緒的Channel個數)
selectedKeys()方法:返回就緒的Channel對應的SelectionKey。
*當Channel就緒的事件處理完畢后,需要手動刪除SelectionKey集合中該Channel對應的SelectionKey,當該Channel再次有事件就緒時會自動加入到Selectionkey集合中。
非阻塞式Channel與Selector
非阻塞式Channel一般與Selector配合使用
當Selector監聽到ServerSocketChannel接收就緒時,那么此時可以立即調用ServerSocketChannel的accept()方法獲取新連接。
當Selector監聽到SocketChannel讀就緒時,那么此時可以立即調用SocketChannel的read()方法進行數據的讀取。
非阻塞式服務器
/**
* @Author: Zhuang HaoTang
* @Date: 2019/10/26 16:35
* @Description:
*/
public class Server {
public void start() throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = createNIOServerSocketChannel();
System.out.println("start nio server and bind port 8888");
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
int ready = selector.select();
while (ready > 0) {
System.out.println("ready channel count " + ready);
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
for (Iterator<SelectionKey> iterator = selectionKeySet.iterator(); iterator.hasNext(); ) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
System.out.println("acceptable");
acceptHandler(selectionKey);
} else if (selectionKey.isReadable()) {
System.out.println("readable");
readHandler(selectionKey);
}
iterator.remove();
}
ready = selector.select();
}
}
private ServerSocketChannel createNIOServerSocketChannel() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), 8888));
serverSocketChannel.configureBlocking(false);
return serverSocketChannel;
}
private void acceptHandler(SelectionKey selectionKey) throws IOException {
Selector selector = selectionKey.selector();
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("accept client connection " + socketChannel.getLocalAddress());
}
private void readHandler(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
int num = socketChannel.read(byteBuffer);
if(num == -1){ // 連接已斷開
System.out.println("client "+socketChannel.getLocalAddress() + " disconnection");
socketChannel.close();
return;
}
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
byte b = byteBuffer.get();
System.out.println((char) b);
}
}
public static void main(String[] args) throws IOException {
Server server = new Server();
server.start();
}
}
*一個Channel不會同時有多個事件就緒,以事件為單位。
*當客戶端斷開連接,那么將會觸發讀就緒,並且channel的read()方法返回-1,表示連接已斷開,服務器應該要做出處理,關閉這個連接。
客戶端
/**
* @Auther: Zhuang HaoTang
* @Date: 2019/10/26 16:36
* @Description:
*/
public class Client {
public static void main(String[] args) throws IOException, InterruptedException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(),8888));
String message = "today is sunday";
ByteBuffer byteBuffer = ByteBuffer.allocate(message.getBytes().length);
byteBuffer.put(message.getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
Thread.sleep(5000);
}
}
運行結果
Reactor模式
Reactor有三種模式
1.Reactor單線程模式
2.Reactor多線程模式
3.主從Reactor多線程模式
*Reactor模式是在NIO下實現的。
Reactor單線程模式
1.單線程的事件分化器,同時這個線程需要處理接收、讀、寫就緒事件。
/**
* @Author: Zhuang HaoTang
* @Date: 2019/10/26 16:35
* @Description:
*/
public class ReactorSingleThreadServer {
private void start() throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = createNIOServerSocketChannel();
System.out.println("start nio server and bind port 8888");
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
int ready = selector.select();
while (ready > 0) {
System.out.println("ready channel count " + ready);
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
for (Iterator<SelectionKey> iterator = selectionKeySet.iterator(); iterator.hasNext(); ) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
System.out.println("acceptable");
acceptHandler(selectionKey);
} else if (selectionKey.isReadable()) {
System.out.println("readable");
readHandler(selectionKey);
}
iterator.remove();
}
ready = selector.select();
}
}
private ServerSocketChannel createNIOServerSocketChannel() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), 8888));
serverSocketChannel.configureBlocking(false);
return serverSocketChannel;
}
private void acceptHandler(SelectionKey selectionKey) throws IOException {
Selector selector = selectionKey.selector();
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("accept client connection " + socketChannel.getLocalAddress());
}
private void readHandler(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
int num = socketChannel.read(byteBuffer);
if (num == -1) {
System.out.println("client " + socketChannel.getLocalAddress() + " disconnection");
socketChannel.close();
return;
}
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
byte b = byteBuffer.get();
System.out.println((char) b);
}
}
public static void main(String[] args) throws IOException {
ReactorSingleThreadServer server = new ReactorSingleThreadServer();
server.start();
}
}
Reactor多線程模式
1.單線程的事件分發器。
2.具體事件類型的Handler線程池(針對讀寫就緒事件)
3.業務線程池。
/**
* @Author: Zhuang HaoTang
* @Date: 2019-10-28 17:00
* @Description:
*/
public class ReactorMultiThreadServer {
private ThreadPoolExecutor eventHandlerPool = new ThreadPoolExecutor(10, 50, 2, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(200), new ThreadPoolExecutor.CallerRunsPolicy());
private void start() throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = createNIOServerSocketChannel();
System.out.println("start nio server and bind port 8888");
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
selector.select();
for (;;) {
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
for (Iterator<SelectionKey> iterator = selectionKeySet.iterator(); iterator.hasNext(); ) {
final SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
System.out.println("acceptable");
acceptHandler(selectionKey); // 單線程同步處理接收就緒
} else if (selectionKey.isReadable()) {
System.out.println("readable");
eventHandlerPool.submit(new Runnable() {
@Override
public void run() {
readHandler(selectionKey);
}
});
}
iterator.remove();
}
selector.select();
}
}
private ServerSocketChannel createNIOServerSocketChannel() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), 8888));
serverSocketChannel.configureBlocking(false);
return serverSocketChannel;
}
private void acceptHandler(SelectionKey selectionKey) throws IOException {
Selector selector = selectionKey.selector();
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("accept client connection " + socketChannel.getLocalAddress());
}
}
private void readHandler(SelectionKey selectionKey) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
try {
int num = socketChannel.read(byteBuffer);
if (num == -1) {
System.out.println("client " + socketChannel.getLocalAddress() + " disconnection");
socketChannel.close(); // 底層有些邏輯
return;
}
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
byte b = byteBuffer.get();
System.out.println((char) b);
}
} catch (Exception e) {
System.out.println("由於連接關閉導致並發線程讀取異常");
}
}
public static void main(String[] args) throws IOException {
ReactorMultiThreadServer reactorServer = new ReactorMultiThreadServer();
reactorServer.start();
}
}
主從Reactor多線程模式
1.使用兩個單線程的事件分發器。
第一個事件分發器只負責監聽ServerSocketChannel的接收就緒事件,同時ServerSocketChannel接收到的連接要注冊到第二個事件分發器中。
第二個事件分發器只負責監聽SocketChannel的讀、寫就緒事件。
2.具體事件類型的Handler線程池(針對讀寫就緒事件)
3.業務線程池。
/**
* @Author: Zhuang HaoTang
* @Date: 2019-10-28 17:00
* @Description:
*/
public class MainSubReactorMultiThreadServer {
private ThreadPoolExecutor eventHandlerPool = new ThreadPoolExecutor(10, 50, 2, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(200), new ThreadPoolExecutor.CallerRunsPolicy());
private void start() throws IOException {
final Selector mainSelector = Selector.open();
final Selector subSelector = Selector.open();
new Thread(new Runnable() {
@Override
public void run() {
try {
startMainSelector(mainSelector, subSelector);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
startSubSelector(subSelector);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
/**
* 第一個事件分發器,用於監聽ServerSocketChannel的接收就緒事件
*/
private void startMainSelector(Selector mainSelector, final Selector subSelector) throws IOException {
ServerSocketChannel serverSocketChannel = createNIOServerSocketChannel();
System.out.println("start nio server and bind port 8888");
serverSocketChannel.register(mainSelector, SelectionKey.OP_ACCEPT);
mainSelector.select();
for (; ; ) {
Set<SelectionKey> selectionKeySet = mainSelector.selectedKeys();
SelectionKey selectionKey = Iterables.getOnlyElement(selectionKeySet);
if (selectionKey.isAcceptable()) {
System.out.println("acceptable");
acceptHandler(selectionKey, subSelector); // 單線程同步處理接收就緒
selectionKeySet.clear();
}
mainSelector.select();
}
}
/**
* 第二個事件分發器,用於監聽SockChannel的讀寫就緒事件
*/
private void startSubSelector(Selector subSelector) throws IOException {
subSelector.select();
for (; ; ) {
Set<SelectionKey> selectionKeySet = subSelector.selectedKeys();
for (Iterator<SelectionKey> iterator = selectionKeySet.iterator(); iterator.hasNext(); ) {
final SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()) {
System.out.println("readable");
eventHandlerPool.submit(new Runnable() {
@Override
public void run() {
readHandler(selectionKey);
}
});
iterator.remove();
}
}
subSelector.select();
}
}
private ServerSocketChannel createNIOServerSocketChannel() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(InetAddress.getLocalHost(), 8888));
serverSocketChannel.configureBlocking(false);
return serverSocketChannel;
}
private void acceptHandler(SelectionKey selectionKey, Selector subSelector) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
socketChannel.configureBlocking(false);
subSelector.wakeup(); // 往Selector注冊Channel時,Selector要處於非阻塞狀態
socketChannel.register(subSelector, SelectionKey.OP_READ);
System.out.println("accept client connection " + socketChannel.getLocalAddress() + " and register to subSelector");
}
}
private void readHandler(SelectionKey selectionKey) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
try {
int num = socketChannel.read(byteBuffer);
if (num == -1) {
System.out.println("client " + socketChannel.getLocalAddress() + " disconnection");
socketChannel.close(); // 底層有些邏輯
return;
}
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
byte b = byteBuffer.get();
System.out.println((char) b);
}
} catch (Exception e) {
System.out.println("由於連接關閉導致並發線程讀取異常");
}
}
public static void main(String[] args) throws IOException {
MainSubReactorMultiThreadServer reactorServer = new MainSubReactorMultiThreadServer();
reactorServer.start();
}
}
通用客戶端
/**
* @Author: Zhuang HaoTang
* @Date: 2019/10/26 16:36
* @Description:
*/
public class Client {
public static void main(String[] args) throws IOException, InterruptedException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8888));
String message = "today is sunday";
ByteBuffer byteBuffer = ByteBuffer.allocate(message.getBytes().length);
byteBuffer.put(message.getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
Thread.sleep(5000);
ByteBuffer byteBuffer1 = ByteBuffer.allocate("wo".getBytes().length).put("wo".getBytes());
byteBuffer1.flip();
socketChannel.write(byteBuffer1);
ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
while (true) {
socketChannel.read(receiveBuffer);
receiveBuffer.flip();
while (receiveBuffer.hasRemaining()) {
System.out.println((char)receiveBuffer.get());
}
receiveBuffer.clear();
}
}
}
*主線程不需要等待具體事件類型的Handler處理完畢,直接異步返回,那么將會導致事件重復就緒,程序做出相應的控制即可。
*當channel有數據可讀時,將會觸發讀就緒,那么主線程將會不停的向線程池提交任務,直到某個線程讀取完畢,此時將會停止讀就緒,其他線程讀取到的個數為0。
*當客戶端斷開連接時,將會觸發讀就緒,那么主線程將會不停的向線程池提交任務,直到某個線程關閉連接,此時將會停止讀就緒
一般不會直接去使用JAVA NIO,只是通過JAVA NIO學習他的設計思想,如果要想搭建NIO服務器那么應該使用Netty等NIO框架。
關於BIO和NIO的選擇
BIO即同步並阻塞,線程會進入阻塞狀態,如果並發連接數只有幾百,那么創建幾百個線程去處理是沒有任何問題的,這種方式更加簡單高效。
但是如果並發連接數達到幾萬,那么顯然創建幾萬個線程去處理是不可行的,系統承受不了這個負荷,此時應該使用NIO,即同步非阻塞,利用更少的線程去做更多的事情。
JAVA NIO就是使用NIO(同步非阻塞),使用IO多路復用的Select模型。
*不管客戶端有多少個並發連接和請求,服務端總是可以利用更少的線程去處理(單線程事件分發器 和 具體事件類型的Handler線程池)