JAVA NIO學習三:NIO 的非阻塞式網絡通信


緊接着上一章,我們繼續來研究NIO,上一章中我們講了NIO 中最常見的操作即文件通道的操作,但實際上NIO的主要用途還是在於網絡通信,那么這個時候就會涉及到選擇器,這一章我們就會對其進行講解操作。

一、阻塞和非阻塞

傳統的 IO 流都是阻塞式的。也就是說,當一個線程調用 read() 或 write()時,該線程被阻塞,直到有一些數據被讀取或寫入,該線程在此期間不能執行其他任務。因此,在完成網絡通信進行 IO 操作時,由於線程會阻塞,所以服務器端必須為每個客戶端都提供一個獨立的線程進行處理,當服務器端需要處理大量客戶端時,性能急劇下降。
Java NIO 是非阻塞模式的。當線程從某通道進行讀寫數據時,若沒有數據可用時,該線程可以進行其他任務。線程通常將非阻塞 IO 的空閑時間用於在其他通道上執行 IO 操作,所以單獨的線程可以管理多個輸入和輸出通道。因此, NIO 可以讓服務器端使用一個或有限幾個線程來同時處理連接到服務器端的所有客戶端。

下面我看個例子來使用NIO 演示下阻塞式,即不采用選擇器情況下:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

import org.junit.Test;

/*
 * 一、使用 NIO 完成網絡通信的三個核心:
 * 
 * 1. 通道(Channel):負責連接
 *         
 *        java.nio.channels.Channel 接口:
 *             |--SelectableChannel
 *                 |--SocketChannel
 *                 |--ServerSocketChannel
 *                 |--DatagramChannel
 * 
 *                 |--Pipe.SinkChannel
 *                 |--Pipe.SourceChannel
 * 
 * 2. 緩沖區(Buffer):負責數據的存取
 * 
 * 3. 選擇器(Selector):是 SelectableChannel 的多路復用器。用於監控 SelectableChannel 的 IO 狀況
 * 
 */
public class TestBlockingNIO {

    //客戶端
    @Test
    public void client() throws IOException{
        //1. 獲取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
        
        FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
        
        //2. 分配指定大小的緩沖區
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        //3. 讀取本地文件,並發送到服務端
        while(inChannel.read(buf) != -1){
            buf.flip();
            sChannel.write(buf);
            buf.clear();
        }
        
        //4. 關閉通道
        inChannel.close();
        sChannel.close();
    }
    
    //服務端
    @Test
    public void server() throws IOException{
        //1. 獲取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        
        FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
        
        //2. 綁定連接
        ssChannel.bind(new InetSocketAddress(9898));
        
        //3. 獲取客戶端連接的通道
        SocketChannel sChannel = ssChannel.accept();
        
        //4. 分配指定大小的緩沖區
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        //5. 接收客戶端的數據,並保存到本地
        while(sChannel.read(buf) != -1){
            buf.flip();
            outChannel.write(buf);
            buf.clear();
        }
        
        //6. 關閉通道
        sChannel.close();
        outChannel.close();
        ssChannel.close();
        
    }
    
}

那么解決上面的方法,以前沒有選擇器的時候,對於阻塞情況,我們可以采用下面的方法:(發送完成,自動自己關閉告知已發送完成)

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

import org.junit.Test;

public class TestBlockingNIO2 {
    
    //客戶端
    @Test
    public void client() throws IOException{
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
        
        FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ);
        
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        while(inChannel.read(buf) != -1){
            buf.flip();
            sChannel.write(buf);
            buf.clear();
        }
        
        sChannel.shutdownOutput(); //接收服務端的反饋
        int len = 0;
        while((len = sChannel.read(buf)) != -1){
            buf.flip();
            System.out.println(new String(buf.array(), 0, len));
            buf.clear();
        }
        
        inChannel.close();
        sChannel.close();
    }
    
    //服務端
    @Test
    public void server() throws IOException{
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        
        FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
        
        ssChannel.bind(new InetSocketAddress(9898));
        
        SocketChannel sChannel = ssChannel.accept();
        
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        while(sChannel.read(buf) != -1){
            buf.flip();
            outChannel.write(buf);
            buf.clear();
        }
        
        //發送反饋給客戶端
        buf.put("服務端接收數據成功".getBytes());
        buf.flip();
        sChannel.write(buf);
        
        sChannel.close();
        outChannel.close();
        ssChannel.close();
    }

}

二、選擇器(Selector)

選擇器(Selector) 是 SelectableChannle 對象的多路復用器, Selector 可以同時監控多個 SelectableChannel 的 IO 狀況,也就是說,利用 Selector可使一個單獨的線程管理多個 Channel。 Selector 是非阻塞 IO 的核心。
SelectableChannle 的結構如下圖:

 選擇器(Selector)的應用

創建 Selector :通過調用 Selector.open() 方法創建一個 Selector

向選擇器注冊通道: SelectableChannel.register(Selector sel, int ops)

選擇器(Selector)的應用

當調用 register(Selector sel, int ops) 將通道注冊選擇器時,選擇器對通道的監聽事件,需要通過第二個參數 ops 指定。
可以監聽的事件類型(可使用 SelectionKey 的四個常量表示):
 讀 : SelectionKey.OP_READ (1)
 寫 : SelectionKey.OP_WRITE (4)
 連接 : SelectionKey.OP_CONNECT (8)
 接收 : SelectionKey.OP_ACCEPT (16)
若注冊時不止監聽一個事件,則可以使用“位或”操作符連接

例: 

SelectionKey 

SelectionKey: 表示 SelectableChannel 和 Selector 之間的注冊關系。每次向選擇器注冊通道時就會選擇一個事件(選擇鍵)。 選擇鍵包含兩個表示為整數值的操作集。操作集的每一位都表示該鍵的通道所支持的一類可選擇操作 。

方 法 描 述
int interestOps() 獲取感興趣事件集合
int readyOps() 獲取通道已經准備就緒的操作的集合
SelectableChannel channel() 獲取注冊通道
Selector selector() 返回選擇器
boolean isReadable() 檢測 Channal 中讀事件是否就緒
boolean isWritable() 檢測 Channal 中寫事件是否就緒
boolean isConnectable() 檢測 Channel 中連接是否就緒
boolean isAcceptable() 檢測 Channel 中接收是否就緒

Selector 的常用方法

方 法 描 述
Set<SelectionKey> keys() 所有的 SelectionKey 集合。代表注冊在該Selector上的Channel
selectedKeys() 被選擇的 SelectionKey 集合。返回此Selector的已選擇鍵集
int select() 監控所有注冊的Channel,當它們中間有需要處理的 IO 操作時,
該方法返回,並將對應得的 SelectionKey 加入被選擇的
SelectionKey 集合中,該方法返回這些 Channel 的數量。
int select(long timeout) 可以設置超時時長的 select() 操作
int selectNow() 執行一個立即返回的 select() 操作,該方法不會阻塞線程
Selector wakeup() 使一個還未返回的 select() 方法立即返回
void close() 關閉該選擇器

SocketChannel 

Java NIO中的SocketChannel是一個連接到TCP網絡套接字的通道。
操作步驟:
 打開 SocketChannel
 讀寫數據
 關閉 SocketChannel

Java NIO中的 ServerSocketChannel 是一個可以監聽新進來的TCP連接的通道,就像標准IO中的ServerSocket一樣
代碼樣例;

package com.atguigu.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;

import org.junit.Test;

/*
 * 一、使用 NIO 完成網絡通信的三個核心:
 * 
 * 1. 通道(Channel):負責連接
 *         
 *        java.nio.channels.Channel 接口:
 *             |--SelectableChannel
 *                 |--SocketChannel
 *                 |--ServerSocketChannel
 *                 |--DatagramChannel
 * 
 *                 |--Pipe.SinkChannel
 *                 |--Pipe.SourceChannel
 * 
 * 2. 緩沖區(Buffer):負責數據的存取
 * 
 * 3. 選擇器(Selector):是 SelectableChannel 的多路復用器。用於監控 SelectableChannel 的 IO 狀況
 * 
 */
public class TestNonBlockingNIO {
    
    //客戶端
    @Test
    public void client() throws IOException{
        //1. 獲取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
        
        //2. 切換非阻塞模式
        sChannel.configureBlocking(false);
        
        //3. 分配指定大小的緩沖區
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        //4. 發送數據給服務端
        Scanner scan = new Scanner(System.in);
        
        while(scan.hasNext()){
            String str = scan.next();
            buf.put((new Date().toString() + "\n" + str).getBytes());
            buf.flip();
            sChannel.write(buf);
            buf.clear();
        }
        
        //5. 關閉通道
        sChannel.close();
    }

    //服務端
    @Test
    public void server() throws IOException{
        //1. 獲取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        
        //2. 切換非阻塞模式
        ssChannel.configureBlocking(false);
        
        //3. 綁定連接
        ssChannel.bind(new InetSocketAddress(9898));
        
        //4. 獲取選擇器
        Selector selector = Selector.open();
        
        //5. 將通道注冊到選擇器上, 並且指定“監聽接收事件”
        ssChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        //6. 輪詢式的獲取選擇器上已經“准備就緒”的事件
        while(selector.select() > 0){
            
            //7. 獲取當前選擇器中所有注冊的“選擇鍵(已就緒的監聽事件)”
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            
            while(it.hasNext()){
                //8. 獲取准備“就緒”的是事件
                SelectionKey sk = it.next();
                
                //9. 判斷具體是什么事件准備就緒
                if(sk.isAcceptable()){
                    //10. 若“接收就緒”,獲取客戶端連接
                    SocketChannel sChannel = ssChannel.accept();
                    
                    //11. 切換非阻塞模式
                    sChannel.configureBlocking(false);
                    
                    //12. 將該通道注冊到選擇器上
                    sChannel.register(selector, SelectionKey.OP_READ);
                }else if(sk.isReadable()){
                    //13. 獲取當前選擇器上“讀就緒”狀態的通道
                    SocketChannel sChannel = (SocketChannel) sk.channel();
                    
                    //14. 讀取數據
                    ByteBuffer buf = ByteBuffer.allocate(1024);
                    
                    int len = 0;
                    while((len = sChannel.read(buf)) > 0 ){
                        buf.flip();
                        System.out.println(new String(buf.array(), 0, len));
                        buf.clear();
                    }
                }
                
                //15. 取消選擇鍵 SelectionKey
                it.remove();
            }
        }
    }
}

DatagramChannel

Java NIO中的DatagramChannel是一個能收發UDP包的通道。
 操作步驟:
 打開 DatagramChannel
 接收/發送數據

樣例;

package com.atguigu.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;

import org.junit.Test;

public class TestNonBlockingNIO2 {
    
    @Test
    public void send() throws IOException{
        DatagramChannel dc = DatagramChannel.open();
        
        dc.configureBlocking(false);
        
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        Scanner scan = new Scanner(System.in);
        
        while(scan.hasNext()){
            String str = scan.next();
            buf.put((new Date().toString() + ":\n" + str).getBytes());
            buf.flip();
            dc.send(buf, new InetSocketAddress("127.0.0.1", 9898));
            buf.clear();
        }
        
        dc.close();
    }
    
    @Test
    public void receive() throws IOException{
        DatagramChannel dc = DatagramChannel.open();
        
        dc.configureBlocking(false);
        
        dc.bind(new InetSocketAddress(9898));
        
        Selector selector = Selector.open();
        
        dc.register(selector, SelectionKey.OP_READ);
        
        while(selector.select() > 0){
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            
            while(it.hasNext()){
                SelectionKey sk = it.next();
                
                if(sk.isReadable()){
                    ByteBuffer buf = ByteBuffer.allocate(1024);
                    
                    dc.receive(buf);
                    buf.flip();
                    System.out.println(new String(buf.array(), 0, buf.limit()));
                    buf.clear();
                }
            }
            
            it.remove();
        }
    }

}

管道 (Pipe)

Java NIO 管道是2個線程之間的單向數據連接。Pipe有一個source通道和一個sink通道。數據會被寫到sink通道,從source通道讀取。

向管道寫數據

 

從管道讀取數據

從讀取管道的數據,需要訪問source通道。

調用source通道的read()方法來讀取數據

代碼樣例:

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;

import org.junit.Test;

public class TestPipe {

    @Test
    public void test1() throws IOException{
        //1. 獲取管道
        Pipe pipe = Pipe.open();
        
        //2. 將緩沖區中的數據寫入管道
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        Pipe.SinkChannel sinkChannel = pipe.sink();
        buf.put("通過單向管道發送數據".getBytes());
        buf.flip();
        sinkChannel.write(buf);
        
        //3. 讀取緩沖區中的數據
        Pipe.SourceChannel sourceChannel = pipe.source();
        buf.flip();
        int len = sourceChannel.read(buf);
        System.out.println(new String(buf.array(), 0, len));
        
        sourceChannel.close();
        sinkChannel.close();
    }
    
}

 

參考資料:

《尚硅谷》視頻


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM