Nio編程模型總結


終於,這兩天的考試熬過去了, 興致沖沖的來整理筆記來, 這篇博客是我近幾天的NIO印象筆記匯總,記錄了對Selector及Selector的重要參數的理解,對Channel的理解,常見的Channel,對NIO事件驅動的編程模型的理解,NIO與傳統IO的對比,NIO的TCP/IP編程的實踐.

Channel

什么是Channel

這個概念絕對是一級概念,Channel是一個管道,用於連接字節緩沖區和另一端的實體, 這個字節緩沖區就是ByteBuffer, 另一端的實體可以是一個File 或者是 Socket ;

基於IO的網絡編程, 數據的交互借助於InputStream或者是OutputStream, 而Channel可以理解成對Stream的又一層封裝;在這種編程模型中 服務端想和客戶端進行交互,就需要從服務端自己的ServerSocketChannel中獲取前來連接的客戶端的SocketChannel,並把他注冊關聯上感性趣的事件到自己的Selector選擇器上, 這樣一旦客戶端把Buffer中的數據推送進channel, 服務端就可以感知,進而處理

常用的Chanenl

img

  • 文件通道: FileChannel
  • 套接字通道
    • 服務端: ServerSocketChannel
    • 客戶端: SocketChannel
  • 數據包通道: DataGramSocket

Channel 與 Stream

Channel是NIO編程模型中一大組件,它類似IO中的Stream,但是兩者也有本質的區別;

為什么說是類似呢? 看下面的兩段代碼, 需求是磁盤上的文件進行讀寫

在IO編程中,我們第一步可能要像下面這樣獲取輸入流,按字節把磁盤上的數據讀取到程序中,再進行下一步操作

FileInputStream fileInputStream = new FileInputStream("123.txt");

在NIO編程中,目標是需要先獲取通道,再基於Channel進行讀寫

FileInputStream fileInputStream = new FileInputStream("123.txt");
FileChannel channel = fileInputStream.channel();

對用戶來說,在IO / NIO 中這兩種都直接關聯這磁盤上的數據文件,數據的讀寫首先都是獲取Stream和Channel,所以說他們相似;

但是: 對於Stream來說,所有的Stream都是單向的,對我們的程序來說,Stream要么只能是從里面獲取數據的輸入流,要么是往里面輸入數據的輸出流,因為InputStream和outputStream都是抽象類,在java中是不支持多繼承的, 而通道不同,他是雙向的,對一個通道可讀可寫

怎么理解 Channel可以是雙向的?

如上圖,凡是同時實現了readable,writeable接口的類,都雙向的通道. 下面是典型的例子

SocketChannel
在NIO網絡編程中,服務端可以通過ServerSocketChannel獲取客戶端的SocketChannel
這個SocketChannel可以read() 客戶端的消息存入Buffer, 往客戶端 write()buffer里的內容
socketChannel1.read(byteBuffer);
socketChannel1.write(byteBuffer);

對於一個channel,我們既能從中獲取數據,也能往外read數據

基於channel的文件拷貝方式和傳統的IO拷貝的競速

效率最低的按字節拷貝

public static  void text4() throws IOException {
   
        FileInputStream    fis = new FileInputStream("123.txt");
        FileOutputStream   fos = new FileOutputStream("output123.txt");
    int read=0;
    long start =0;
    System.out.println("開始: ... ");
    while((read=fis.read())!=-1){
        fos.write(read);
    }
    System.out.println("耗時: "+(System.currentTimeMillis()-start) );
    fis.close();
    fos.close();
}

一個3901KB的文件的拷貝,在我的機器上跑出了 1561097384707 的好成績; 實屬無奈,擦點以為編譯器卡死


以NIO,channel+buffer的模型,拷貝文件

try (
    FileInputStream  fis = new FileInputStream("123.txt");
    FileOutputStream   fos = new FileOutputStream("output123.txt");
){
    //1.獲取通道
    FileChannel   inChannel = fis.getChannel();
    FileChannel   outChannel = fos.getChannel();

    //2.分配指定大小的緩沖區
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    long start = System.currentTimeMillis();
    //3.將通道中的數據緩沖區中
    while (inChannel.read(buffer) != -1) {
        buffer.flip();//切換成讀數據模式
        //4.將緩沖區中的數據寫入通道中
        outChannel.write(buffer);
        buffer.clear();//清空緩沖區
    }
    System.out.println("總耗時:" + (System.currentTimeMillis() - start));
} catch (Exception e) {
    e.printStackTrace();
}

速度明顯提升 大約平均耗時 110


NIO+零拷貝 復制文件

  // 直接獲取通道
    FileChannel inChannel2 = FileChannel.open(Paths.get("123.txt"), StandardOpenOption.READ);
    FileChannel outChannel2 = FileChannel.open(Paths.get("output123.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
    //內存映射文件
    MappedByteBuffer inMappedBuf = inChannel2.map(FileChannel.MapMode.READ_ONLY, 0, inChannel2.size());
    MappedByteBuffer outMappedBuf = outChannel2.map(FileChannel.MapMode.READ_WRITE, 0, inChannel2.size());
    //直接對緩沖區進行數據讀寫操作
    byte[] dst = new byte[inMappedBuf.limit()];
    long start = System.currentTimeMillis();
    inMappedBuf.get(dst);
    outMappedBuf.put(dst);
    System.out.println("耗費的時間為:" + ( System.currentTimeMillis() - start));

    inChannel2.close();
    outChannel2.close();

或者

/*
     * 通道之間的數據傳輸(直接緩沖區)
     */
    FileChannel inChannel3 = FileChannel.open(Paths.get("123.txt"), StandardOpenOption.READ);
    FileChannel outChannel3 = FileChannel.open(Paths.get("output123.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
    long start = System.currentTimeMillis();
    inChannel3.transferTo(0, inChannel3.size(), outChannel3);
    System.out.println("耗時: "+(System.currentTimeMillis()-start) );

    //等價於
    // outChannel3.transferFrom(inChannel3, 0, inChannel3.size());

    inChannel3.close();
    outChannel3.close();

零拷貝僅需要耗時 6 就可以完成


NIO的非阻塞與IO的阻塞

什么是阻塞? 舉個例子, 如果有一天我碰到了不會的作業題,於是我給老師發了條短息請教咋做, 這時,假如我進入了阻塞模式,我就會一直瞅着手機,別的也不干,就等着老師回信息, 假如我進入了非阻塞的模式,發完短信后跳過這個題,去做別的題

常見的阻塞比如, 鍵盤錄入, Socket的accept()以及IO的read write, 全部會卡在那行代碼直到執行完畢才會往下執行, 這種風格的好處是顯而易見的, 及其容易的進行順序編程

但是在NIO中,channel的read,write可以是阻塞的,也可以是非阻塞的,這取決於channel是否阻塞, 一般在進行網絡編程時,要搭配上selector選擇器,一起用, 同時channel我們也會設置成非阻塞的, 想想也不能讓服務器的讀寫阻塞住,因為它可不是面對一兩個用戶,我們需要它可以一遍一遍的正常流水運行

在客戶端,connect方法不再是阻塞的,和服務端進行數據交互之前,java提供了檢查機確保連接百分百健康, 如果服務端沒有接受連接,客戶端是是沒辦法進一步操作的

if (selectionKey.isConnectable()) {
// 強轉成 有連接事件發生的Channel
client = (SocketChannel) selectionKey.channel();
// 完成連接
if (client.isConnectionPending()) {
client.finishConnect();

從通道中的read和write方法也不是阻塞的,即刻返回,可以讓服務端的業務代碼很流暢的執行完,再接受新的請求,處理新請求

Selector

Selector選擇器NIO的第三個組件,三者的關系圖如上所示

什么是selector? 作用是什么?

selector是選擇器的意思, 和它直接關聯的組件是Channel, 沒錯,它的作用就是不斷的輪詢綁定在他身上的所有channel. 一旦有通道發生了它感興趣的事件,接着處理此事件

selector維護了什么?

無論是服務端的Selector 還是客戶端的Selector 它都維護了三個Set集合 , 里面封裝的是 SelectionKey, 他是channel注冊進Selector的產物,一般是使用它反向獲取channel

  1. key set
  • 他是一個全集,每當channel通過register方法注冊進選擇器時,於此同時也會把包含自己信息的key添加到這個全集中來 注冊的信息就會以SelectionKey的封裝形式保存在這個集合中, 選擇器每次輪詢的channel,就是這里面的channel
  1. selected key
  • 感興趣的key的集合, 舉個例子, 通道1注冊進選擇器時,告訴選擇器,我可能會給你發信息,你得盯着我,讀我給你的信息, 於是選擇器對通道1感性趣的事件是 read, 那么在選擇器輪詢channel時, 一旦通道1出現了write操作,就會被選擇器感知,開始read

  • 每次遍歷selected key時我們會執行這行代碼: Set<SelectionKey> selectionKeys = selector.selectedKeys(); 它的意思是,我們取出了 選擇器的感性事件的set集合,只要程序還在運行,只要選擇器一旦被open(),除非我們手動的close() 否則選擇器對象就不會被釋放,所以它的感興趣的set集合是不會被自動會收到,於是我們就得收到的把處理過的感興趣的事件對應的SelectionKey移除出這個set集合,不然下一次輪詢時,這個事件還會再一次被處理,並且無限制的處理下去

  • key有且僅有兩種方式從 selected-key-set 中剔除 1. 通過Set的remove()方法, 2.通過迭代器的remove()方法

  1. cannelled key
  • 取消的key的集合,代表原來感興趣的事件,現在不感興趣了. 下一次輪詢,進行select() 本集合中的SelectionKey會從key set中移除, 意味着它所關聯的channel將會被選擇器丟棄掉,不再進行監聽
  • 關閉channel 或者是調用了cancel()方法都會將key添加到cannelled key 集合中
  • 使用場景: 一般會在客戶端主動斷開連接的時候使用它.

selector的select()方法

select(long); // 設置超時時間

selectNow(); // 立即返回,不阻塞

select(); 阻塞輪詢

select()過程的細節:

  • 第一步, cannelled-key中的每一個元素會從全集key set中剔除,表示這些可以關聯的通道不會被注冊
  • 第二步操作系統幫我們輪詢每一個通道是否有選擇器感性趣的事情發生
    • 對於一條准備就緒的channel(發生事件通道),他至少會發生下面兩件事之一:
      • 它的key會被添加進selected-key-set中,來標識它將被選中,進而處理
      • 如果它的key,已經存在於這個集合中了,下一步就是它的 read-operation將被更新
  • 第三步: 如果在輪詢時發現了有任何key被放置在了cannelled-key-set中,重復第一步,不再注冊它關聯的通道

romove key 和 cannel key 的區別

前者是把key從selected key set集合,也就是被選中的集合中剔除出去,表示當前的事件已經處理完了

后者是表示,把key從全集中剔除出去, 表示想要廢棄這個key關聯的channel

selector的創建

他是根據不同操作系統提供的不同的Provider使用provide()創建出來的

NIO編程模型


如上圖, 在NIO網絡編程模式中,不再是傳統的多線程編程模型,當有新的客戶端的連接到來,不再重新開辟新的線程去跑本次連接,而是統一,一條線程處理所有的連接, 而一次連接本質上就是一個Channel, NIO網絡編程模型是基於事件驅動型的; 即,有了提前約定好的事件發生,接着處理事件,沒有事件發生,選擇器就一直輪詢 下面解釋上圖的流程

  1. 服務端創建代表服務端的Channel,綁定好端口,設置成非阻塞的通道 並且初始化選擇器,然后開始輪詢綁定在自己身上的通道,此時的通道只有一個ServerSocketChannel,而選擇器只關心ServerSocketChannel上發生的OP_ACCEPT事件,而又沒有客戶端來鏈接 所以他被阻塞在了select()
System.out.println("Server...");
// 獲取服務端的SerSokcetChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// todo 一定要把他配置成 非阻塞的
serverSocketChannel.configureBlocking(false);

// 從通道中獲取 服務端的對象
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8899));

// 創建選擇器
Selector selector = Selector.open();
// 把通到注冊到 選擇器上
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

  while (true) {
            // 阻塞式等待 channel上有事件發生
            int select = selector.select();
  1. 客戶端 創建代表自己的SocketChannel, 創建選擇器,把自己感興趣的事件注冊在上面,如下代碼, 初始化自己,SocketChannel, 把客戶端的通道注冊進選擇器,並告訴選擇器SocketChannel的感興趣事件是OP_CONNECT連接事件; 當執行到下面的socketChannel.connect(new InetSocketAddress("localhost", 8899)); 連接的請求就已經發送出去了,也就是說,如果沒有意外,執行完這一行代碼,服務端的select()方法已經返回了, 但是客戶端的connect()是非阻塞的,立即返回,故在客戶端依然會繼續執行, 進而判斷一下是否是真的連接上了
// 獲取客戶端的通道
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);

Selector selector = Selector.open();
// 把客戶端的通道注冊進選擇器
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// todo 連接客戶端, 執行完這行代碼后, 服務端就能就收到通知!!!
socketChannel.connect(new InetSocketAddress("localhost", 8899));

while (true) {
    int number = selector.select(); // 選擇器阻塞式的 等待 Channel上發生它關心的事件
    System.out.println(" 發生了感興趣的事件: " + number);
    Set<SelectionKey> keySet = selector.selectedKeys();
// 驗證
    for (SelectionKey selectionKey : keySet) {
        SocketChannel client = null;
if (selectionKey.isConnectable()) {
    // 強轉成 有連接事件發生的Channel
    client = (SocketChannel) selectionKey.channel();
    // 完成連接
    if (client.isConnectionPending()) {
        client.finishConnect();
        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
        byteBuffer.put((LocalDate.now() + "連接成功").getBytes());
        byteBuffer.flip();
        client.write(byteBuffer);
  1. 對於服務端,輪詢了這么久,終於有連接進來了,於是進一步處理, 判斷如果當前的連接是請求建立連接的話,就去建立連接, 對於服務端來說,建立連接就是得讓服務端記住客戶端, 客戶端是誰呢?SocketChanel, 怎么獲取呢? serverSocketChannel1.accept(); 怎么建立連接呢? 實際上就是把當前的客戶端的channel注冊在服務端的選擇器上,並告訴它自己關心的事件啥, 當然一開始建立連接時, 服務端肯定首先要做的就是監聽客戶端發送過來的數據,於是 綁定上感興趣的事件是read, 並且不要忘了,每次遍歷感興趣的key的集合時,都要及時的把當前的key剔除
selectionKeys.forEach(selectionKey -> {
    SocketChannel socketChannel = null;
    String sendKey = null;
    try {
        if (selectionKey.isAcceptable()) {
            // 1. 用戶請求建立連接, 根據SelectionKey 獲取服務端的通道
            // todo 當前的這個SelecttionKey 是有 ServerSocketChannel 和 selector 聯系生成的, 因此我們 強制轉換回 ServerSocketChannel
            ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel();

            // todo  !!!!!!!  這是重點, 這里的accept是非阻塞的 !!!!!!!!
            // 根據服務的 通道  獲取到客戶端的通道
            socketChannel = serverSocketChannel1.accept();
            System.out.println("socketChannel.class: " + socketChannel.getClass());
            // todo 配置成非阻塞的
            socketChannel.configureBlocking(false);

            // todo 新獲取的通道 注冊進選擇器
            socketChannel.register(selector, SelectionKey.OP_READ);

            // 保存客戶端的信息
            String key = "[ " + UUID.randomUUID().toString() + " ]";
            clientMap.put(key, socketChannel);
            // todo   把 擁有當前事件SelectionKey 剔除
  1. 對於客戶端,如果它想往服務端發送鍵盤錄入的內容時,獲取鍵盤錄入對象是免不了的事, 但是這對象會阻塞,於是客戶端不得不開啟一條新的線程運行讀取鍵盤錄入,讓自己具有鍵盤錄入的功能,同時又不會被阻塞, 如果客戶端想要接受服務端推送回來的數據怎么辦呢? 於是我們就得告訴客戶端的選擇器,添加一個感興趣的事件,read, 這樣,一旦服務端有數據推送過來的,客戶端的選擇器就會感知到這個事件,並且這個事件的selectionKey是可讀的,這樣一個比較完善的客戶端就ok了
executorService.submit(() -> {
    while (true) {
        try {
            // 清空上面的緩存
            byteBuffer.clear();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            String msg = bufferedReader.readLine();
            byteBuffer.put(msg.getBytes());
            byteBuffer.flip();
            finalClient.write(byteBuffer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
});
}

// 上面的代碼是發生了 請求連接事件
// todo 給客戶端注冊一個讀取客戶端返回數據的事件
client.register(selector, SelectionKey.OP_READ);

  1. 服務端在建立連接時,就給客戶端的通道綁定了感興趣的事件是read, 於是當客戶端往channel中write數據了,服務端就會來到下面的代碼塊, 如果是群聊的話, 我們就得知道,往哪些用戶轉發信息, 於是我們提前構造了map,這個map存放就是一個一個和服務的channel建立連接的SocketChannel; 只需要遍歷map, 往里面的chanel,write數據即可
 else if (selectionKey.isReadable()) {
    System.out.println("readable...");
    // 獲取客戶端的通道
    socketChannel = (SocketChannel) selectionKey.channel();
    System.out.println("當前的客戶端 通道實例: socketChannel == " + socketChannel);
    // 獲取當前 是哪個客戶端發起的信息
    ByteBuffer byteBuffer = ByteBuffer.allocate(512);
    // 讀取客戶端發送的消息
    while (true) {// todo todo todo  很重要的一點!!!  read方法是非阻塞的, 很可能還有沒讀取到數據就返回了
        int read = socketChannel.read(byteBuffer);
        System.out.println("read == : " + read);
        if (read <= 0) {
            break;
        }
    }
    // 往其他客戶端寫
    byteBuffer.flip();
    Charset charset = Charset.forName("utf-8");
    String msg = String.valueOf(charset.decode(byteBuffer).array());
    // Buffer轉字符串
    System.out.println("收到客戶端: " + socketChannel + "  發送的消息: " + msg);
    // 遍歷map
    for (Map.Entry<String, SocketChannel> map : clientMap.entrySet()) {
        if (socketChannel == map.getValue()) {
            sendKey = map.getKey();
        }
    }
    // todo 轉發給全部的客戶端發送
    for (Map.Entry<String, SocketChannel> map : clientMap.entrySet()) {
        SocketChannel socketChannel1 = map.getValue();
        ByteBuffer byteBuffer1 = ByteBuffer.allocate(512);
        // 把信息放進 byteBuffer1中
        String message = msg + " : " + sendKey;
        byteBuffer1.put(message.getBytes());
        byteBuffer.flip();
        socketChannel1.write(byteBuffer);
    }
  1. 客戶端斷開了怎么辦呢? 在一台電腦上,手動將一個客戶端停掉,服務端會運行到selectionKey.isReadable() 並且進入這個if塊, 當它嘗試從里面讀取的時候,就發現這個連接已經壞掉了,於是報錯,強制斷開連接, 因為還要繼續輪詢,全集key set 中依然保存着當前的客戶端的channel, 所以會一直報錯下去, 怎么辦呢? 如下
//  selectionKey.cancel();  常規

try {
    // 這樣也能取消這個鍵
    socketChannel.close();
} catch (IOException e1) {
    e1.printStackTrace();
}

// 當然我們現在還要多一步,  因為他還在我們的map里面  不然一會發消息的時候,會出錯
// todo 移除出map 中失效的 channel
// todo 遍歷map
for (Map.Entry<String, SocketChannel> map : clientMap.entrySet()) {
    if (socketChannel == map.getValue()) {
        sendKey = map.getKey();
    }
}
clientMap.remove(sendKey, socketChannel);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM