NIO、AIO、BIO是個啥?


一、簡介

NIO

一種同步非阻塞的I/O。

AIO

異步非阻塞I/O。

BIO

同步阻塞IO操作。

二、名詞解釋

阻塞和非阻塞

當線程執行阻塞操作時,是只能等待,而不能執行其他事情的。
非阻賽是不需要等待,直接返回,繼續執行下一個操作。

同步和異步

同步異步是運行機制,當我們進行同步操作時,后續的任務是等待當前調用返回,才會進行下一步。
異步則相反,其他任務不需要等待當前調用返回,通常依靠事件、回調等機制來實現任務間次序關系。

IO分類

按操作數據分為:字節流(Reader、Writer)和字符流(InputStream、OutputStream)
按流向分:輸入流(Reader、InputStream)和輸出流(Writer、OutputStream)

三、實戰演示

BIO

服務端代碼:

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
 * @ProjectName: onereader
 * @Package: com.onereader.webblog.common.bio
 * @ClassName: BIOServer
 * @Author: onereader
 * @Description: ${description}
 * @Date: 2019/9/1 14:30
 * @Version: 1.0
 */
public class BIOServer {

    ServerSocket server;
    //服務器
    public BIOServer(int port){
        try {
            //把Socket服務端啟動
            server = new ServerSocket(port);
            System.out.println("BIO服務已啟動,監聽端口是:" + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 開始監聽,並處理邏輯
     * @throws IOException
     */
    public void listener() throws IOException{
        //死循環監聽
        while(true){
            //等待客戶端連接,阻塞方法
            Socket client = server.accept();
            //獲取輸入流
            InputStream is = client.getInputStream();
            //定義數組,接收字節流
            byte [] buff = new byte[1024];
            int len = is.read(buff);
            //只要一直有數據寫入,len就會一直大於0
            if(len > 0){
                String msg = new String(buff,0,len);
                System.out.println("收到" + msg);
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new BIOServer(9999).listener();
    }
}

客戶端代碼:

import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
/**
 * @ProjectName: onereader
 * @Package: com.onereader.webblog.common.bio
 * @ClassName: BIOClient
 * @Author: onereader
 * @Description: ${description}
 * @Date: 2019/9/1 14:31
 * @Version: 1.0
 */
public class BIOClient {

    public static void main(String[] args){
        int count = 10;
        //計數器,模擬10個線程
        final CountDownLatch latch = new CountDownLatch(count);
        for(int i = 0 ; i < count; i ++){
            new Thread(){
                @Override
                public void run() {
                    try{
                        //等待,保證線程一個一個創建連接
                        latch.await();
                        //創建socket,連接服務端
                        Socket client = new Socket("localhost", 9999);
                        //獲取輸出流
                        OutputStream os = client.getOutputStream();
                        //獲取當前線程名
                        String name = "客戶端線程:"+Thread.currentThread().getName();
                        //發送到服務端
                        os.write(name.getBytes());
                        //關閉輸入流
                        os.close();
                        //關閉socket連接
                        client.close();

                    }catch(Exception e){

                    }
                }

            }.start();
            //計數器減1
            latch.countDown();
        }
    }
}

結果:

收到客戶端線程:Thread-1
收到客戶端線程:Thread-4
收到客戶端線程:Thread-0
收到客戶端線程:Thread-3
收到客戶端線程:Thread-2
收到客戶端線程:Thread-6
收到客戶端線程:Thread-8
收到客戶端線程:Thread-7
收到客戶端線程:Thread-5
收到客戶端線程:Thread-9

簡單來說明以下,啟動服務端后,服務端就會一直處於阻塞等待狀態,等待客戶端連接后,才能繼續執行讀取客戶端發送的信息,然后在進入循環阻塞,等待新的連接處理新的信息數據。上面,我們創建了10個線程模擬是個客戶端連接服務端發送信息,服務端在循環阻塞方式,接收到客戶端傳過來的信息。

NIO

NIO的三個主要組成部分:

Channel(通道)、Buffer(緩沖區)、Selector(選擇器)

NIO執行流程:

當客戶端在使用NIO連接到服務端,先會打開Channel(通道),然后把數據寫入到Buffer(緩沖區),並把Buffer數據通過通道發送,最后向選擇器中注冊一個事件。
服務端則會先打開Channel(通道),然后監聽選擇器中的事件,如果發現有事件后,就從渠道中獲取Buffer,然后從Buffer中讀取數據。如果沒有,則一直循環監聽下去。

Channel(通道)

Channel是一個對象,可以通過它讀取和寫入數據。
Channel是雙向的,既可以讀又可以寫。
Channel可以進行異步的讀寫。
Channel的讀寫必須通過buffer對象。

在Java NIO中的Channel主要有如下幾種類型:
FileChannel:從文件讀取數據的
DatagramChannel:讀寫UDP網絡協議數據
SocketChannel:讀寫TCP網絡協議數據
ServerSocketChannel:可以監聽TCP連接

Buffer(緩存區)

Buffer是一個對象,它包含一些要寫入或者讀到Stream對象的。應用程序不能直接對 Channel 進行讀寫操作,而必須通過 Buffer 來進行,即 Channel 是通過 Buffer 來讀寫數據的。

在NIO中,所有的數據都是用Buffer處理的,它是NIO讀寫數據的中轉池。Buffer實質上是一個數組,通常是一個字節數據,但也可以是其他類型的數組。但一個緩沖區不僅僅是一個數組,重要的是它提供了對數據的結構化訪問,而且還可以跟蹤系統的讀寫進程。

Buffer屬性介紹:

容量(Capacity):緩沖區能夠容納的數據元素的最大數量。這一個容量在緩沖區創建時被設定,並且永遠不能改變。
上界(Limit):緩沖區的第一個不能被讀或寫的元素。或者說,緩沖區中現存元素的計數。
位置(Position):下一個要被讀或寫的元素的索引。位置會自動由相應的 get( )和 put( )函數更新。
標記(Mark):下一個要被讀或寫的元素的索引。位置會自動由相應的 get( )和 put( )函數更新。

讀寫數據步驟:

1.寫入數據到 Buffer;
2.調用 flip() 方法;
3.從 Buffer 中讀取數據;
4.調用 clear() 方法或者 compact() 方法。

當向 Buffer 寫入數據時,Buffer 會記錄下寫了多少數據。一旦要讀取數據,需要通過 flip() 方法將 Buffer 從寫模式切換到讀模式。在讀模式下,可以讀取之前寫入到 Buffer 的所有數據。

一旦讀完了所有的數據,就需要清空緩沖區,讓它可以再次被寫入。有兩種方式能清空緩沖區:調用 clear() 或 compact() 方法。clear() 方法會清空整個緩沖區。compact() 方法只會清除已經讀過的數據。任何未讀的數據都被移到緩沖區的起始處,新寫入的數據將放到緩沖區未讀數據的后面。

Buffer主要有如下幾種:
ByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer

Selector(選擇器)

無論客戶端還是服務端都可以向Selector中注冊事件,然后監聽事件,最后根據不同的事件做不同的處理。

下面看下代碼
客戶端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
 * @ProjectName: onereader
 * @Package: com.onereader.webblog.common.nio
 * @ClassName: NIOClient
 * @Author: onereader
 * @Description: ${description}
 * @Date: 2019/9/1 13:41
 * @Version: 1.0
 */
public class NIOClient {
    private final InetSocketAddress serverAdrress = new InetSocketAddress("localhost", 9999);
    private Selector selector = null;
    private SocketChannel client = null;
    private Charset charset = Charset.forName("UTF-8");

    public NIOClient() throws IOException{
        //1.連接遠程主機的IP和端口
        client = SocketChannel.open(serverAdrress);
        client.configureBlocking(false);

        //打開選擇器,注冊讀事件
        selector = Selector.open();
        client.register(selector, SelectionKey.OP_READ);
    }

    public void session(){
        //開辟一個新線程從服務器端讀數據
        new Reader().start();
        //開辟一個新線程往服務器端寫數據
        new Writer().start();
    }

    /**
     * 寫數據線程
     */
    private class Writer extends Thread{

        @Override
        public void run() {
            try{
                //在主線程中 從鍵盤讀取數據輸入到服務器端
                Scanner scan = new Scanner(System.in);
                while(scan.hasNextLine()){
                    String line = scan.nextLine();
                    if("".equals(line)){
                        //不允許發空消息
                        continue;
                    }
                    //當前渠道是共用的,發送當前輸入數據
                    client.write(charset.encode(line));
                }
                scan.close();
            }catch(Exception e){

            }
        }

    }

    /**
     * 讀數據線程
     */
    private class Reader extends Thread {
        @Override
        public void run() {
            try {
                //循環檢測
                while(true) {
                    int readyChannels = selector.select();
                    if(readyChannels == 0){
                        continue;
                    }
                    //獲取selected所有的事件
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                    while(keyIterator.hasNext()) {
                        SelectionKey key = (SelectionKey) keyIterator.next();
                        keyIterator.remove();
                        process(key);
                    }
                }
            }
            catch (IOException io){

            }
        }

        /**
         * 根據事件的不同,做不同的處理
         * @param key
         * @throws IOException
         */
        private void process(SelectionKey key) throws IOException {
            //讀就緒事件
            if(key.isReadable()){
                //通過key找到對應的通道
                SocketChannel sc = (SocketChannel)key.channel();
                //創建緩存區
                ByteBuffer buff = ByteBuffer.allocate(1024);
                String content = "";
                //讀數據
                while(sc.read(buff) > 0){
                    buff.flip();
                    content += charset.decode(buff);
                }
                //打印內容
                System.out.println(content);
                //設置當前為讀就緒
                key.interestOps(SelectionKey.OP_READ);
            }
        }
    }
    
    public static void main(String[] args) throws IOException {
        new NIOClient().session();
    }
}

服務端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

/**
 * @ProjectName: onereader
 * @Package: com.onereader.webblog.common.nio
 * @ClassName: NIOServer
 * @Author: onereader
 * @Description: ${description}
 * @Date: 2019/9/1 13:23
 * @Version: 1.0
 */
public class NIOServer {

    private int port = 9999;
    private Charset charset = Charset.forName("UTF-8");
    private Selector selector = null;

    public NIOServer(int port) throws IOException{

        this.port = port;
        //1.打開通道
        ServerSocketChannel server = ServerSocketChannel.open();
        //設置服務端口
        server.bind(new InetSocketAddress(this.port));
        server.configureBlocking(false);
        //2.打開選擇器
        selector = Selector.open();
        //注冊等待事件
        server.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服務已啟動,監聽端口是:" + this.port);
    }

    /**
     *  監聽事件
     * @throws IOException
     */
    public void listener() throws IOException{
        //死循環,這里不會阻塞
        while(true) {
            //1.在輪詢獲取待處理的事件
            int wait = selector.select();
            System.out.println("當前等待處理的事件:"+wait+"個");
            if(wait == 0){
                //如果沒有可處理的事件,則跳過
                continue;
            }
            //獲取所有待處理的事件
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            //遍歷
            while(iterator.hasNext()) {
                SelectionKey key = (SelectionKey) iterator.next();
                //處理前,關閉選在擇器中的事件
                iterator.remove();
                //處理事件
                process(key);
                System.out.println("事件Readable:"+key.isReadable());
                System.out.println("事件Acceptable:"+key.isAcceptable());
            }
        }

    }

    /**
     * 根據事件類型,做處理
     * @param key
     * @throws IOException
     */
    public void process(SelectionKey key) throws IOException {
        //連接就緒
        if(key.isAcceptable()){
            //獲取通道
            ServerSocketChannel server = (ServerSocketChannel)key.channel();
            //進入服務端等待
            SocketChannel client = server.accept();
            //非阻塞模式
            client.configureBlocking(false);
            //注冊選擇器,並設置為讀取模式,收到一個連接請求,
            // 然后起一個SocketChannel,並注冊到selector上,
            // 之后這個連接的數據,就由這個SocketChannel處理
            client.register(selector, SelectionKey.OP_READ);
            //將此對應的channel設置為准備接受其他客戶端請求
            key.interestOps(SelectionKey.OP_ACCEPT);
            client.write(charset.encode("來自服務端的慰問"));
        }
        //讀就緒
        if(key.isReadable()){
            //返回該SelectionKey對應的 Channel,其中有數據需要讀取
            SocketChannel client = (SocketChannel)key.channel();

            //往緩沖區讀數據
            ByteBuffer buff = ByteBuffer.allocate(1024);
            StringBuilder content = new StringBuilder();
            try{
                while(client.read(buff) > 0){
                    buff.flip();
                    content.append(charset.decode(buff));
                }
                System.out.println("接收到客戶端:"+content.toString());
                //將此對應的channel設置為准備下一次接受數據
                key.interestOps(SelectionKey.OP_READ);
            }catch (IOException io){
                key.cancel();
                if(key.channel() != null){
                    key.channel().close();
                }
            }

        }
    }

    public static void main(String[] args) throws IOException {
        new NIOServer(9999).listener();
    }


}

結果:

客戶端:

來自服務端的慰問
請輸入要發送服務端的信息:
你好呀

服務端:

服務已啟動,監聽端口是:9999
當前等待處理的事件:1個
事件Readable:false
事件Acceptable:true
當前等待處理的事件:1個
接收到客戶端:你好呀
事件Readable:true
事件Acceptable:false

這里配合上面的基礎知識介紹,然后看代碼就能明白。整個過程了~

AIO

客戶端:

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
/**
 * @ProjectName: onereader
 * @Package: com.onereader.webblog.common.aio
 * @ClassName: AIOClient
 * @Author: onereader
 * @Description: ${description}
 * @Date: 2019/9/1 14:03
 * @Version: 1.0
 */
public class AIOClient {

    private final AsynchronousSocketChannel client ;

    public AIOClient() throws Exception{

        client = AsynchronousSocketChannel.open();
    }

    public void connect(String host,int port)throws Exception{

        //連接服務端
        client.connect(new InetSocketAddress(host,port),null,new CompletionHandler<Void,Void>() {

            /**
             * 成功操作
             * @param result
             * @param attachment
             */
            @Override
            public void completed(Void result, Void attachment) {
                try {
                    client.write(ByteBuffer.wrap(("客戶端線程:" + Thread.currentThread().getName()+"請求服務端").getBytes())).get();
                     System.out.println("已發送至服務器");
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }

            /**
             * 失敗操作
             * @param exc
             * @param attachment
             */
            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
            }
        });

        //讀取數據
        final ByteBuffer bb = ByteBuffer.allocate(1024);
        client.read(bb, null, new CompletionHandler<Integer,Object>(){

                    /**
                     * 成功操作
                     * @param result
                     * @param attachment
                     */
                    @Override
                    public void completed(Integer result, Object attachment) {
                        System.out.println("獲取反饋結果:" + new String(bb.array()));
                    }

                    /**
                     * 失敗操作
                     * @param exc
                     * @param attachment
                     */
                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        exc.printStackTrace();
                    }
                }
        );


        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException ex) {
            System.out.println(ex);
        }

    }

    public static void main(String args[])throws Exception{
        int count = 10;
        final CountDownLatch latch = new CountDownLatch(count);

        for (int i = 0; i < count; i ++) {
            latch.countDown();
            new Thread(){
                @Override
                public void run(){
                    try{
                        latch.await();
                        new AIOClient().connect("localhost",9999);
                    }catch(Exception e){

                    }
                }
            }.start();
        }

        Thread.sleep(1000 * 60 * 10);
    }


}

服務端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

import static java.util.concurrent.Executors.*;

/**
 * @ProjectName: onereader
 * @Package: com.onereader.webblog.common.aio
 * @ClassName: AIOServer
 * @Author: onereader
 * @Description: ${description}
 * @Date: 2019/9/1 14:04
 * @Version: 1.0
 */
public class AIOServer {


    private final int port;

    public static void main(String args[]) {
        int port = 9999;
        new AIOServer(port);
    }

    /**
     * 注冊一個端口,用來給客戶端連接
     * @param port
     */
    public AIOServer(int port) {
        this.port = port;
        listen();
    }

    //偵聽方法
    private void listen() {
        try {
            //線程緩沖池,為了體現異步
            ExecutorService executorService = newCachedThreadPool();
            //給線程池初始化一個線程
            AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);

            //Asynchronous異步
            final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(threadGroup);

            //啟動監聽
            server.bind(new InetSocketAddress(port));
            System.out.println("服務已啟動,監聽端口" + port);

            final Map<String,Integer> count = new ConcurrentHashMap<String, Integer>();
            count.put("count", 0);
            //開始等待客戶端連接
            //實現一個CompletionHandler 的接口,匿名的實現類
            server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
                //緩存區
                final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
                //實現IO操作完成的方法
                @Override
                public void completed(AsynchronousSocketChannel result, Object attachment) {
                    count.put("count", count.get("count") + 1);
                    System.out.println(count.get("count"));
                    try {
                        //清空緩存標記
                        buffer.clear();
                        //讀取緩存內容
                        result.read(buffer).get();
                        //寫模式轉換成讀模式
                        buffer.flip();
                        result.write(buffer);
                        buffer.flip();
                    } catch (Exception e) {
                        System.out.println(e.toString());
                    } finally {
                        try {
                            result.close();
                            server.accept(null, this);
                        } catch (Exception e) {
                            System.out.println(e.toString());
                        }
                    }
                }

                //實現IO操作失敗的方法
                @Override
                public void failed(Throwable exc, Object attachment) {
                    System.out.println("IO操作是失敗: " + exc);
                }
            });
            try {
                Thread.sleep(Integer.MAX_VALUE);
            } catch (InterruptedException ex) {
                System.out.println(ex);
            }
        } catch (IOException e) {
            System.out.println(e);
        }
    }


}

結果:

客戶端

已發送至服務器
已發送至服務器
已發送至服務器
已發送至服務器
已發送至服務器
已發送至服務器
已發送至服務器
已發送至服務器
已發送至服務器
已發送至服務器
獲取反饋結果:客戶端線程:Thread-22請求服務端
獲取反饋結果:客戶端線程:Thread-21請求服務端
獲取反饋結果:客戶端線程:Thread-20請求服務端
獲取反饋結果:客戶端線程:Thread-19請求服務端
獲取反饋結果:客戶端線程:Thread-18請求服務端
獲取反饋結果:客戶端線程:Thread-17請求服務端
獲取反饋結果:客戶端線程:Thread-16請求服務端
獲取反饋結果:客戶端線程:Thread-15請求服務端
獲取反饋結果:客戶端線程:Thread-14請求服務端
獲取反饋結果:客戶端線程:Thread-13請求服務端                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      

服務端

服務已啟動,監聽端口9999
1
2
3
4
5
6
7
8
9
10

總結

看看代碼實現,是不是比看一堆文字要舒服多了,我這里也不想過多熬訴了,大家自己看代碼吧~
以上就是我本次總結的,說實話看了很多大佬寫的東西。實在是難以通過文字就明白這個東西是咋回事,最后還是親手實踐操作以下,才能理解。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM