NIO非阻塞網絡編程原理


NIO非阻塞網絡編程原理

1、NIO基本介紹

  1. Java NIO 全稱 java non-blocking IO,是指 JDK 提供的新 API。從 JDK1.4 開始,Java 提供了一系列改進的
    輸入/輸出的新特性,被統稱為 NIO(即 New IO),是同步非阻塞的。
  2. NIO 相關類都被放在 java.nio 包及子包下,並且對原 java.io 包中的很多類進行改寫。【基本案例】
  3. NIO 有三大核心部分:Channel( 通道) Buffer( 緩沖區), Selector( 選擇器)
  4. NIO 是 是 區 面向緩沖區 ,或者面向塊編程。數據讀取到一個它稍后處理的緩沖區,需要時可在緩沖區中前后移動,這就增加了處理過程中的靈活性,使用它可以提供非阻塞式的高伸縮性網絡。
  5. Java NIO 的非阻塞模式,使一個線程從某通道發送請求或者讀取數據,但是它僅能得到目前可用的數據,如果目前沒有數據可用時,就什么都不會獲取,而不是保持線程阻塞,所以直至數據變的可以讀取之前,該線程可以繼續做其他的事情。 非阻塞寫也是如此,一個線程請求寫入一些數據到某通道,但不需要等待它完全寫入,這個線程同時可以去做別的事情。
  6. 通俗理解:NIO 是可以做到用一個線程來處理多個操作的。假設有 10000 個請求過來,根據實際情況,可以分配50 或者 100 個線程來處理。不像之前的阻塞 IO 那樣,非得分配 10000 個。
  7. HTTP2.0 使用了多路復用的技術,做到同一個連接並發處理多個請求,而且並發請求的數量比 HTTP1.1 大了好幾個數量級。

2、NIO與BIO的比較

  1. BIO 以流的方式處理數據,而 NIO 以塊的方式處理數據,塊 I/O 的效率比流 I/O 高很多
  2. BIO 是阻塞的,NIO 則是非阻塞的
  3. BIO 基於字節流和字符流進行操作,而 NIO 基於 Channel(通道)和 Buffer(緩沖區)進行操作,數據總是從通道
    讀取到緩沖區中,或者從緩沖區寫入到通道中。Selector(選擇器)用於監聽多個通道的事件(比如:連接請求,數據到達等),因此使用單個線程就可以監聽多個客戶端通道

3、NIO三大核心原理示意圖

NIO中的三個核心分別是Selector、Channel、Buffer,他們之間的關系如下圖:

三大核心組件介紹:

  1. 每個 channel 都會對應一個 Buffer。
  2. Selector 對應一個線程, 一個線程對應channel(連接)。
  3. 該圖反應了有三個 channel 注冊到 該 selector //程序。
  4. 程序切換到哪個 channel 是有事件決定的, Event 就是一個重要的概念。
  5. Selector 會根據不同的事件,在各個通道上切換。
  6. Buffer 就是一個內存塊 , 底層是一個數組。
  7. 數據的讀取寫入是通過 Buffer, 這個和 BIO , BIO 中要么是輸入流,或者是輸出流, 不能雙向,但是 NIO的Buffer 是可以讀也可以寫, 需要 flip 方法切換channel 是雙向的, 可以返回底層操作系統的情況, 比如 Linux , 底層的操作系統通道就是雙向的.

4、緩沖區(Buffer)

基本介紹

緩沖區(Buffer):緩沖區本質上是一個 可以讀寫數據的內存塊,可以理解成是一個 容器對象( 含數組),該對象提供了一組方法,可以更輕松地使用內存塊,,緩沖區對象內置了一些機制,能夠跟蹤和記錄緩沖區的狀態變化情況。Channel 提供從文件、網絡讀取數據的渠道,但是讀取或寫入的數據都必須經由 Buffer。

Buffer 類及其子類

1)在 NIO 中,Buffer 是一個頂層父類,它是一個抽象類, 類的層級關系圖:

2)Buffer 類定義了所有的緩沖區都具有的四個屬性來提供關於其所包含的數據元素的信息:

private int mark = -1;
private int position = 0;
private int limit;
private int capacity;

屬性解釋:

屬性 含義
mark 標記作用,buffer.position(0).mark()進行標記,buffer.reset();就會回到剛剛的標記位置
position 下一個要被讀取或者要被寫入位置的索引,每次讀寫之后都會自動變換位置
limit 極限,例如容量是10的buffer,但limit設置為5的話,沒法對5后面的數據進行操作
capacity buffer容量

ByteBuffer

Buffer類中一些常用方法

public abstract class Buffer {
    //JDK1.4時,引入的api
    public final int capacity( )//返回此緩沖區的容量
    public final int position( )//返回此緩沖區的位置
    public final Buffer position (int newPositio)//設置此緩沖區的位置
    public final int limit( )//返回此緩沖區的限制
    public final Buffer limit (int newLimit)//設置此緩沖區的限制
    public final Buffer mark( )//在此緩沖區的位置設置標記
    public final Buffer reset( )//將此緩沖區的位置重置為以前標記的位置
    public final Buffer clear( )//清除此緩沖區, 即將各個標記恢復到初始狀態,但是數據並沒有真正擦除, 后面操作會覆蓋
    public final Buffer flip( )//反轉此緩沖區
    public final Buffer rewind( )//重繞此緩沖區
    public final int remaining( )//返回當前位置與限制之間的元素數
    public final boolean hasRemaining( )//告知在當前位置和限制之間是否有元素
    public abstract boolean isReadOnly( );//告知此緩沖區是否為只讀緩沖區
    //JDK1.6時引入的api
    public abstract boolean hasArray();//告知此緩沖區是否具有可訪問的底層實現數組
    public abstract Object array();//返回此緩沖區的底層實現數組
    public abstract int arrayOffset();//返回此緩沖區的底層實現數組中第一個緩沖區元素的偏移量
    public abstract boolean isDirect();//告知此緩沖區是否為直接緩沖區
}

5、通道(Channel)

基本介紹

1)、NIO 的通道類似於流,但有些區別如下:

通道可以同時進行讀寫,而流只能讀或者只能寫
通道可以實現異步讀寫數據
通道可以從緩沖讀數據,也可以寫數據到緩沖:

2)、BIO 中的 stream 是單向的,例如 FileInputStream 對象只能進行讀取數據的操作,而 NIO 中的通道(Channel)
是雙向的,可以讀操作,也可以寫操作。

3)、 Channel 在 NIO 中是一個接口 public interface Channel extends Closeable{}
4)、 常 用 的 Channel 類 有 : FileChannel 、 DatagramChannel 、 ServerSocketChannel 和 SocketChannel 。ServerSocketChanne 類似 ServerSocket , SocketChannel 類似 Socket

  1. 、FileChannel 用於文件的數據讀寫,DatagramChannel 用於 UDP 的數據讀寫,ServerSocketChannel 和
    SocketChannel 用於 TCP 的數據讀寫。

6)、 圖示

6、Channel基本介紹

FileChannel 類

FileChannel 主要用來對本地文件進行 IO 操作,常見的方法有

  1. public int read(ByteBuffer dst) ,從通道讀取數據並放到緩沖區中

  2. public int write(ByteBuffer src) ,把緩沖區的數據寫到通道中

  3. public long transferFrom(ReadableByteChannel src, long position, long count),從目標通道中復制數據到當前通道

  4. public long transferTo(long position, long count, WritableByteChannel target),把數據從當前通道復制給目標通道

關於 Buffer 和 Channel 的注意事項和細節

ByteBuffer 支持類型化的 put 和 get, put 放入的是什么數據類型,get 就應該使用相應的數據類型來取出,否
則可能有 BufferUnderflowException 異常。

7、Selector(選擇器)

基本介紹

1)Java 的 NIO,用非阻塞的 IO 方式。可以用一個線程,處理多個的客戶端連接,就會使用到 Selector(選擇器)

  1. Selector 能夠檢測多個注冊的通道上是否有事件發生(注意:多個 Channel 以事件的方式可以注冊到同一個
    Selector),如果有事件發生,便獲取事件然后針對每個事件進行相應的處理。這樣就可以只用一個單線程去管
    理多個通道,也就是管理多個連接和請求。

  2. 只有在 連接/通道 真正有讀寫事件發生時,才會進行讀寫,就大大地減少了系統開銷,並且不必為每個連接都
    創建一個線程,不用去維護多個線程

  3. 避免了多線程之間的上下文切換導致的開銷

Selector 示意圖和特點說明

  1. Netty 的 IO 線程 NioEventLoop 聚合了 Selector(選擇器,也叫多路復用器),可以同時並發處理成百上千個客
    戶端連接。
  2. 當線程從某客戶端 Socket 通道進行讀寫數據時,若沒有數據可用時,該線程可以進行其他任務。
  3. 線程通常將非阻塞 IO 的空閑時間用於在其他通道上執行 IO 操作,所以單獨的線程可以管理多個輸入和輸出
    通道。
  4. 由於讀寫操作都是非阻塞的,這就可以充分提升 IO 線程的運行效率,避免由於頻繁 I/O 阻塞導致的線程掛
    起。
  5. 一個 I/O 線程可以並發處理 N 個客戶端連接和讀寫操作,這從根本上解決了傳統同步阻塞 I/O 一連接一線
    程模型,架構的性能、彈性伸縮能力和可靠性都得到了極大的提升

Selector 類相關方法

public abstract class Selector implements Closeable { 
public static Selector open();//得到一個選擇器對象
public int select(long timeout);//監控所有注冊的通道,當其中有 IO 操作可以進行時,將
對應的 SelectionKey 加入到內部集合中並返回,參數用來設置超時時間
public Set<SelectionKey> selectedKeys();//從內部集合中得到所有的 SelectionKey	
}

注意事項

  1. NIO 中的 ServerSocketChannel 功能類似 ServerSocket,SocketChannel 功能類似 Socket

  2. selector 相關方法說明

selector.select()//阻塞
selector.select(1000);//阻塞 1000 毫秒,在 1000 毫秒后返回
selector.wakeup();//喚醒 selector
selector.selectNow();//不阻塞,立馬返還
3.8 NIO 非阻塞 網絡編程原理分析圖

8、NIO非阻塞網絡編程原理分析圖

NIO 非阻塞 網絡編程相關的(Selector、SelectionKey、ServerScoketChannel 和 SocketChannel) 關系梳理圖

關系圖:

對上圖的說明:

  1. 當客戶端連接時,會通過 ServerSocketChannel 得到 SocketChannel

  2. Selector 進行監聽 select 方法, 返回有事件發生的通道的個數

  3. 將 socketChannel 注冊到 Selector 上, register(Selector sel, int ops), 一個 selector 上可以注冊多個 SocketChannel

  4. 注冊后返回一個 SelectionKey, 會和該 Selector 關聯(集合)

  5. 進一步得到各個 SelectionKey (有事件發生)

  6. 在通過 SelectionKey 反向獲取 SocketChannel , 方法 channel()

  7. 可以通過 得到的 channel , 完成業務處理

9、NIO非阻塞網絡編程快速入門

  1. 編寫一個 NIO 入門案例,實現服務器端和客戶端之間的數據簡單通訊(非阻塞)
  2. 目的:理解 NIO 非阻塞網絡編程機制

服務端:

public class NIOServer {
    public static void main(String[] args) throws Exception {
        //打開服務器的ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //創建並且打開Selector
        Selector selector = Selector.open();
        //將我們的Socket綁定一個端口,便於客戶端連接
        serverSocketChannel.socket().bind(new InetSocketAddress(6666));
        //設置為非阻塞
        serverSocketChannel.configureBlocking(false);
        //將ServerSocketChannel注冊到selector上
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("當前注冊的SelectionKey數量:"+selector.keys().size());
        while(true){
            //一直監聽,每秒刷新一次,等待客戶端的連接
            if (selector.select(1000) == 0){
                System.out.println("服務器等待了1s,無連接");
                continue;
            }
            //通過selector得到selectionKey
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            //打印selectionKey數量
            System.out.println("SelectionKey數量:"+selectionKeys.size());
            //通過迭代器遍歷所有的selectionKey
            Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();
            while(selectionKeyIterator.hasNext()){
                //拿到當前的key
                SelectionKey selectionKey = selectionKeyIterator.next();
                if (selectionKey.isAcceptable()){
                    //當確定有selectionKey時,說明必有socketChannel,Server接收后得到SocketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println("客戶端連接成功,生成一個socketChannel,哈希值:"+socketChannel.hashCode());
                    //將SocketChannel設置為非阻塞
                    socketChannel.configureBlocking(false);
                    //將其注冊到Selector上
                    socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                    System.out.println("客戶端連接后 ,注冊的selectionkey 數量=" + selector.keys().size());
                }
                if (selectionKey.isReadable()) {
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
                    channel.read(byteBuffer);
                    //打印一下接到的buffer
                    System.out.println(byteBuffer.toString());
                }
                //手動從集合中移動當前的selectionKey, 防止重復操作
                selectionKeyIterator.remove();
            }
        }
    }
}

客戶端:

public class NIOClient {
    public static void main(String[] args) throws Exception{
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
        if (!socketChannel.connect(inetSocketAddress)) {
            while(!socketChannel.finishConnect()){
                System.out.println("連接需要時間,可以去做其他事情");
            }
        }
        //...如果連接成功,就發送數據
        String str = "hello, Courage";
        //Wraps a byte array into a buffer
        ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
        //發送數據,將 buffer 數據寫入 channel
        socketChannel.write(buffer);
        System.in.read();
    }
}

10、SelectionKey

簡單介紹:

SelectionKey,表示 Selector 和網絡通道的注冊關系, 共四種:
int OP_ACCEPT:有新的網絡連接可以 accept,值為 16
int OP_CONNECT:代表連接已經建立,值為 8
int OP_READ:代表讀操作,值為 1
int OP_WRITE:代表寫操作,值為 4
源碼中:

public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

SelectionKey 相關方法

public abstract class SelectionKey {
	public abstract Selector selector();//得到與之關聯的 Selector 對象
 	public abstract SelectableChannel channel();//得到與之關聯的通道
	public final Object attachment();//得到與之關聯的共享數據
 	public abstract SelectionKey interestOps(int ops);//設置或改變監聽事件
 	public final boolean isAcceptable();//是否可以 accept
	public final boolean isReadable();//是否可以讀
 	public final boolean isWritable();//是否可以寫
}

11、ServerSocketChannel

基本介紹:

NIO中的 ServerSocketChannel 是一個可以監聽新進來的TCP連接的通道, 就像標准IO中的ServerSocket一樣。

相關方法:

public abstract class ServerSocketChannel extends AbstractSelectableChannel  implements NetworkChannel{
    public static ServerSocketChannel open()//得到一個 ServerSocketChannel 通道
    public final ServerSocketChannel bind(SocketAddress local)//設置服務器端端口號
    public final SelectableChannel configureBlocking(boolean block)//設置阻塞或非阻塞模式,															取值 false 表示采用非阻塞模式
    public SocketChannel accept()//接受一個連接,返回代表這個連接的通道對象
    public final SelectionKey register(Selector sel, int ops)//注冊一個選擇器並設置監聽事件
}

12、SocketChannel

基本介紹

SocketChannel,網絡 IO 通道,具體負責進行讀寫操作。NIO 把緩沖區的數據寫入通道,或者把通道里的數
據讀到緩沖區。

相關方法

public abstract class SocketChannel extends
    AbstractSelectableChannel 
    implements ByteChannel, 
    ScatteringByteChannel, 
    GatheringByteChannel,
    NetworkChannel
{
    public static SocketChannel open();//得到一個 SocketChannel 通道
    public final SelectableChannel configureBlocking(boolean block);//設置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
    public boolean connect(SocketAddress remote);//連接服務器
    public boolean finishConnect();//如果上面的方法連接失敗,接下來就要通過該方法完成連接操作
    public int write(ByteBuffer src);//往通道里寫數據
    public int read(ByteBuffer dst);//從通道里讀數據
    public final SelectionKey register(Selector sel, int ops, Object att);//注冊一個選擇器並設置監聽事件,最后一個參數可以設置共享數據
    public final void close();//關閉通道
}

13、NIO網絡編程應用實例-群聊系統

編寫一個 NIO 群聊系統,實現服務器端和客戶端之間的數據簡單通訊(非阻塞)
2) 實現多人群聊
3) 服務器端:可以監測用戶上線,離線,並實現消息轉發功能
4) 客戶端:通過 channel 可以無阻塞發送消息給其它所有用戶,同時可以接受其它用戶發送的消息(有服務器轉發
得到)
5) 目的:進一步理解 NIO 非阻塞網絡編程機制

服務端以及客戶端

public class GroupChatServer {
    //定義基本屬性
    private Selector selector;
    private ServerSocketChannel listenChannel;
    private static final int PORT = 6667;
    /**
     * 構造器,初始化工作
     * **/
    public GroupChatServer(){
        try {
            //得到選擇器
            selector = Selector.open();
            //得到ServerSocketChannel
            listenChannel= ServerSocketChannel.open();
            //綁定端口
            listenChannel.socket().bind(new InetSocketAddress(PORT));
            //設置非阻塞模式
            listenChannel.configureBlocking(false);
            //將這個channel注冊到selector
            listenChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //監聽
    public void listen(){
        System.out.println("監聽程序:"+Thread.currentThread().getName());
            try {
                while(true){
                int count = selector.select();
                    if (count > 0) {
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        while(iterator.hasNext()){
                            SelectionKey key = iterator.next();
                            //監聽到accept
                            if(key.isAcceptable()) {
                                SocketChannel sc = listenChannel.accept();
                                sc.configureBlocking(false);
                                //將該 sc 注冊到seletor
                                sc.register(selector, SelectionKey.OP_READ);
                                //提示
                                System.out.println(sc.getRemoteAddress() + " 上線 ");
                            }
                            if(key.isReadable()) { //通道發送read事件,即通道是可讀的狀態
                                //處理讀 (專門寫方法..)
                                readData(key);
                            }
                            //當前的key 刪除,防止重復處理
                            iterator.remove();
                        }
                    }else{
                        System.out.println("等待......");
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
    }
    //讀取客戶端消息
    private void readData(SelectionKey key) {
        //取到關聯的channle
        SocketChannel channel = null;
        try {
            //得到channel
            channel = (SocketChannel) key.channel();
            //創建buffer
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = channel.read(buffer);
            //根據count的值做處理
            if(count > 0) {
                //把緩存區的數據轉成字符串
                String msg = new String(buffer.array());
                //輸出該消息
                System.out.println("form 客戶端: " + msg);
                //向其它的客戶端轉發消息(去掉自己), 專門寫一個方法來處理
                sendInfoToOtherClients(msg, channel);
            }
        }catch (IOException e) {
            try {
                System.out.println(channel.getRemoteAddress() + " 離線了..");
                //取消注冊
                key.cancel();
                //關閉通道
                channel.close();
            }catch (IOException e2) {
                e2.printStackTrace();;
            }
        }
    }
    //轉發消息給其它客戶(通道)
    private void sendInfoToOtherClients(String msg, SocketChannel self ) throws  IOException{
        System.out.println("服務器轉發消息中...");
        System.out.println("服務器轉發數據給客戶端線程: " + Thread.currentThread().getName());
        //遍歷 所有注冊到selector 上的 SocketChannel,並排除 self
        for(SelectionKey key: selector.keys()) {
            //通過 key  取出對應的 SocketChannel
            Channel targetChannel = key.channel();
            //排除自己
            if(targetChannel instanceof  SocketChannel && targetChannel != self) {
                //轉型
                SocketChannel dest = (SocketChannel)targetChannel;
                //將msg 存儲到buffer
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                //將buffer 的數據寫入 通道
                dest.write(buffer);
            }
        }
    }
    public static void main(String[] args) {
        //創建服務器對象
        GroupChatServer groupChatServer = new GroupChatServer();
        groupChatServer.listen();
    }
}
//可以寫一個Handler
class MyHandler {
    public void readData() {
    }
    public void sendInfoToOtherClients(){
    }
}
package com.courage.groupchat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
public class GroupChatClient {
    //定義相關的屬性
    private final String HOST = "127.0.0.1"; // 服務器的ip
    private final int PORT = 6667; //服務器端口
    private Selector selector;
    private SocketChannel socketChannel;
    private String username;
    //構造器, 完成初始化工作
    public GroupChatClient() throws IOException {
        selector = Selector.open();
        //連接服務器
        socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
        //設置非阻塞
        socketChannel.configureBlocking(false);
        //將channel 注冊到selector
        socketChannel.register(selector, SelectionKey.OP_READ);
        //得到username
        username = socketChannel.getLocalAddress().toString().substring(1);
        System.out.println(username + " is ok...");
    }
    //向服務器發送消息
    public void sendInfo(String info) {
        info = username + " 說:" + info;
        try {
            socketChannel.write(ByteBuffer.wrap(info.getBytes()));
        }catch (IOException e) {
            e.printStackTrace();
        }
    }
    //讀取從服務器端回復的消息
    public void readInfo() {
        try {
            int readChannels = selector.select();
            if(readChannels > 0) {//有可以用的通道
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if(key.isReadable()) {
                        //得到相關的通道
                        SocketChannel sc = (SocketChannel) key.channel();
                        //得到一個Buffer
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        //讀取
                        sc.read(buffer);
                        //把讀到的緩沖區的數據轉成字符串
                        String msg = new String(buffer.array());
                        System.out.println(msg.trim());
                    }
                }
                iterator.remove(); //刪除當前的selectionKey, 防止重復操作
            } else {
                //System.out.println("沒有可以用的通道...");
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws Exception {
        //啟動我們客戶端
        GroupChatClient chatClient = new GroupChatClient();
        //啟動一個線程, 每個3秒,讀取從服務器發送數據
        new Thread() {
            public void run() {
                while (true) {
                    chatClient.readInfo();
                    try {
                        Thread.currentThread().sleep(3000);
                    }catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
        //發送數據給服務器端
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String s = scanner.nextLine();
            chatClient.sendInfo(s);
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM