IO


網絡IO

1. 網絡IO

1.1 什么是IO流以及IO流的作用

I/O實際上是Input和Output,也就是輸入和輸出。而流其實是一種抽象的概念,它表示的是數據的無結構化傳遞。會被當成無結構的字節序列或字符序列。流可以當作是磁盤與內存之間的一個管道。

1.2 IO流的分類

在Java中I/O流操作很多,但是核心體系實際上就只有File(文件流)、InputStream(字節輸入流)、OutputStream(字節輸出流)、Reader(字符輸入流)、Writer(字符輸出流)。

image-20220301161506677

  • 字節流:操作的數據單元是8位的字節。InputStream、OutputStream作為抽象基類。可以處理所有的數據文件。
  • 字符流:操作的數據單元是字符。以Writer、Reader作為抽象基類。只限於處理文本的數據文件。
  • 訪問管道處理流,是用來去完成管道的讀寫操作,用於線程間的通訊
  • 訪問數組處理流,是針對內存的操作
  • 緩沖流是提供一個緩沖區,對於緩沖區的一個處理流,避免每次與磁盤的交互,提高輸入輸出的一個效率
  • 對象流,主要用在序列化這個機制上,將一個對象序列化后轉換成一個可存儲可傳輸的對象,傳輸時用到的流。
  • 轉換流:將字符流轉換成字節流
  • 打印流

image-20220301162730551

2. IO流的數據來源及操作的API

  • 硬盤
  • 內存
  • 鍵盤
  • 網絡

2.1 File類簡介

File類是Java中為文件進行創建、刪除、重命名、移動等操作而設計的一個類

  • File(File parent, String child):根據parent抽象路徑名和child路徑名字符串創建一個新的File實例。
  • File(String pathname):將指定路徑名轉化為抽象路徑名創建一個新的File實例。
  • File(String parent, String child):根據parent路徑名和child路徑名創建一個File實例。
  • File(URI uri):指定URI轉化為抽象路徑名。

2.2 基於文件的輸入輸出流

public static void main(String[] args) {
    File file = new File("D:\\appdata\\IODemo\\Capture001.png");
    try (
        FileOutputStream fileOutputStream = new FileOutputStream("D:\\appdata\\IODemo\\Capture002.png");
        FileInputStream fileInputStream = new FileInputStream(file)) { // 1.7之后,將流寫入try()中,代碼執行完畢后,會自動關閉流
        int len = 0;
        byte[] buffer = new byte[1024];
        long start = System.currentTimeMillis();
        while ((len = fileInputStream.read(buffer)) != -1) {
            fileOutputStream.write(buffer, 0, len);
        }
        long end = System.currentTimeMillis();
        System.out.println((end - start) / 1000);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

流一定要關閉,否則當前線程沒執行完會一直使其被進程占用。

try (FileReader reader = new FileReader("/appdata/IODemo/IODemo");
     FileWriter writer = new FileWriter("/appdata/IODemo/IODemo.txt")) {
    int i = 0;
    char[] chars = new char[1];
    while ((i = reader.read(chars)) != -1) {
        writer.write(new String(chars, 0, i));
    }
} catch (Exception e) {
    e.printStackTrace();
}

2.3 緩沖流

緩沖流是帶緩沖區的處理流,他會提供一個緩沖區,緩沖區的作用主要目的是:避免每次和硬盤打交道,能夠提高輸入/輸出的執行效率。

BufferedInputStream

private static int DEFAULT_BUFFER_SIZE = 8192; // 默認8Kb的緩沖區
private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8; // 最大緩沖區大小
// 每次讀取的8Kb size的字節會存儲在buf[]數組中
//每次調用read()方法時,會首先去嘗試從這個數組中讀取,如果讀取失敗,會從數據源(磁盤上)去讀取
protected volatile byte buf[];

// 兩種構造方法最終調用該方法,帶int參數的會覆蓋默認的8Kb size
public BufferedInputStream(InputStream in, int size) {
    super(in);
    if (size <= 0) {
        throw new IllegalArgumentException("Buffer size <= 0");
    }
    buf = new byte[size];
}

其實緩沖流原理上是幫我們封裝了8Kb大小的數據,先從磁盤讀8Kb到我們內存,后由我們自己去操作這8Kb的數據,當處理完8Kb緩沖區沒有了,再加載數據到緩沖區,再讀到內存去處理。當我們用普通流去處理文件,將buffer[]設置的稍微大一點,一樣可以達到提高效率的結果。

public static void main(String[] args) {

    try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream("/appdata/IODemo/IODemo"));
         BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream("/appdata/IODemo/IODemo.txt"))) {
        int len = 0;
        byte[] bytes = new byte[1024];
        while ((len = bufferedInputStream.read(bytes)) != -1) {
            // System.out.println(new String(bytes, 0, len));
            bufferedOutputStream.write(bytes, 0, len);
            bufferedOutputStream.flush();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }

}

將創建InputStream寫入到try()中,可以幫我們實現close()關閉流的操作,這個close中包含了buffred的flush操作,如果沒有關閉流,又沒有手動flush(),將會丟失數據。

public void close() throws IOException {
    try (OutputStream ostream = out) {
        flush();
    }
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("/appdata/IODemo/IODemo"), StandardCharsets.UTF_8))) {
    String str;
    while ((str = reader.readLine()) != null) {
        System.out.println(str);
    }
} catch (Exception e) {
    e.printStackTrace();
}

2.4 轉換流

try (InputStream inputStream = new FileInputStream("/appdata/IODemo/IODemo");
     InputStreamReader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8)) {
    char[] chars = new char[1024];
    int i;
    while ((i = reader.read(chars)) != -1) {
        System.out.println(new String(chars, 0, i));
    }
} catch (Exception e) {
    e.printStackTrace();
}

在這個轉換流中,時可以指定字符集編碼的。

2.5 對象流

關於序列化和反序列化這個問題,我在18年參加工作的時候,遇到過一個項目,之后就再沒有用過了。當時架構還是分布式dubbo+zookeeper,但是傳輸報文竟然用到這個我是沒想到的。

什么是序列化和反序列化?

  • 序列化是把對象的狀態信息轉化為可存儲或傳輸的形式的過程,也就是把對象轉化為字節序列的過程成為對象的序列化
  • 反序列化是序列化的逆向過程,把字節數組反序列化為對象。
public class UserSerializable implements Serializable {

    private static final long serialVersionUID = 8160464260217334369L;

    private String name;

    private int age;

    public void setName(String name) {
        this.name = name;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "UserSerializable{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }

    public static void main(String[] args) {
        UserSerializable user = new UserSerializable();
        user.setAge(26);
        user.setName("Elian");
        String fileName = "/appdata/IODemo/User";
        try (FileInputStream fileInputStream = new FileInputStream(fileName);
             FileOutputStream fileOutputStream = new FileOutputStream(fileName);
             ObjectOutputStream outputStream = new ObjectOutputStream(fileOutputStream);
             ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream)
        ) {
            outputStream.writeObject(user);
            outputStream.flush();
            UserSerializable newUser = (UserSerializable) objectInputStream.readObject();
            System.out.println(newUser);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3. 本地IO和網絡IO

3.0 本地I/O操作實例

public class NIOFirstDemo {
    public static void main(String[] args) {
        bio();
        bufferBio();
        nio();
        mmap();
        zeroCopy();
    }

    private static void bio() {
        try (FileInputStream bioInputStream = new FileInputStream("/appdata/IODemo/jdk api 1.8_google.CHM");
             FileOutputStream bioOutputStream = new FileOutputStream("/appdata/IODemo/jdk_bio.CHM")) {
            // bio實現copy
            long bioStart = System.currentTimeMillis();
            int len = 0;
            byte[] buffer = new byte[1024];
            while ((len = bioInputStream.read(buffer)) != -1) {
                bioOutputStream.write(buffer, 0, len);
            }
            bioOutputStream.flush();
            System.out.println(System.currentTimeMillis() - bioStart);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void bufferBio() {
        try (BufferedInputStream bioInputStream = new BufferedInputStream(new FileInputStream("/appdata/IODemo/jdk api 1.8_google.CHM"));
             BufferedOutputStream bioOutputStream = new BufferedOutputStream(new FileOutputStream("/appdata/IODemo/jdk_bufferBio.CHM"))) {
            // bio實現copy
            long bioStart = System.currentTimeMillis();
            int len = 0;
            byte[] buffer = new byte[1024];
            while ((len = bioInputStream.read(buffer)) != -1) {
                bioOutputStream.write(buffer, 0, len);
            }
            bioOutputStream.flush();
            System.out.println(System.currentTimeMillis() - bioStart);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void nio() {
        try (FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ);
             FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk_nio.CHM"),
                     StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) {
            // nio 實現copy
            long nioStart = System.currentTimeMillis();
            int len = 0;
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            while ((len = inChannel.read(buffer)) != -1) {
                buffer.flip();
                outChannel.write(buffer);
                buffer.clear();
            }
            System.out.println(System.currentTimeMillis() - nioStart);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 依然將用戶空間的
    private static void mmap() {
        try (FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ);
             FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdb_mmap.CHM"),
                     StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) {
            long mmapStart = System.currentTimeMillis();
            MappedByteBuffer inMappedBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
            MappedByteBuffer outMappedBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
            byte[] bytes = new byte[inMappedBuffer.limit()];
            inMappedBuffer.get(bytes);
            outMappedBuffer.put(bytes);
            System.out.println("mmap:" + (System.currentTimeMillis() - mmapStart));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void zeroCopy() {
        try(FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ);
            FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk_zeroCopy.CHM"),
                    StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) {
            long zeroCopyStart = System.currentTimeMillis();
            inChannel.transferTo(0, inChannel.size(), outChannel);
            System.out.println(System.currentTimeMillis() - zeroCopyStart);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

實驗順序(速度由快到慢排序)

zeroCopy(零拷貝) > mmap(內存映射) > bufferedInputStream > bio(基於channle) ~= nio

zerCopy無需將文件映射到內存,mmap會將buffer讀進內存,關於Buffer繼續往下看4.2。

3.1 Socket和ServerSocket

// 服務端
final int DEFAULT_PORT = 9090;
try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
    Socket socket = serverSocket.accept();// 阻塞操作,等待客戶端的連接
    System.out.println("Client port:" + socket.getPort() + " has been connected!");
    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
    String str = bufferedReader.readLine();
    System.out.println("Client Content:" + str);
    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
    writer.write(str + "\n"); // 如果不換行,客戶端會一直等待讀取完
    writer.flush();
    bufferedReader.close();
    writer.close();
} catch (Exception e) {
    e.printStackTrace();
}

try (Socket socket = new Socket("localhost", 9090)) {
    OutputStream outputStream = socket.getOutputStream();
    outputStream.write("Hello Elian\n".getBytes(StandardCharsets.UTF_8)); // 不換行服務端會一直等待讀取完,進入阻塞
    BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    System.out.println(reader.readLine());
} catch (Exception e) {
    e.printStackTrace();
}

3.2 網絡通訊協議分析

image-20220302125049178

image-20220302125308839

客戶端是怎樣找到目標服務的呢?

客戶端發起請求的時候,在不同的層去增加不同的協議頭,在數據鏈路層組裝目標機器的Mac地址,這個地址是通過ARP協議,我們已知目標的IP,需要獲得目標的Mac地址,會發送一個廣播消息,會在網段內去詢問這個IP是誰,目標地址會發送自己Mac地址給到當前這個發送端,就可以去組裝目標的Mac地址。那么在數據發送過程中,進入IP廣播后,某個網卡就會發現,對應Mac的網卡就會把數據包收進來。

3.3 網絡通信原理

本地磁盤IO通信:

image-20220302142120134

網絡磁盤通信:

image-20220302142142094

兩者不同在於:本地磁盤要通過DMA(直接存儲訪問器)將磁盤上的內容讀取到內核空間緩沖區,再從內核空間緩沖區讀到用戶空間緩沖區進行操作。而網絡IO是通過網卡中的緩沖區讀取到系統內核緩沖區,如果應用進程一直沒有調用socket的read()方法讀取數據將數據copy到用戶緩沖區,數據會一直被緩存在內核緩沖區里面。

理解阻塞過程

image-20220302143625641

accept()每次只能接收一個並處理一個socket,這樣只能等上一個socket處理完才能繼續處理下一個請求。BIO每次阻塞兩個位置,第一個阻塞位置是accept過程,另一個阻塞過程是I/O流讀寫的過程。

解決辦法:通過線程池進行處理。

public static void main(String[] args) {
    final int DEFAULT_PORT = 9090;
    try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        while (true) {
            Socket socket = serverSocket.accept();// 阻塞操作,等待客戶端的連接
            executorService.submit(new ServerSocketThread(socket));
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

public class ServerSocketThread implements Runnable {

    private Socket socket;

    public ServerSocketThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        System.out.println("Client port:" + socket.getPort() + " has been connected");
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            BufferedWriter writer=new BufferedWriter((new OutputStreamWriter(socket.getOutputStream())))){
            String clinetStr = reader.readLine();
            System.out.println("Client resived message: " + clinetStr);
            Thread.sleep(15000);
            writer.write("OK.\n");
        } catch(Exception e){
            e.printStackTrace();
        }
    }
}

現在還有一個缺點:

線程數取決於計算機本身的線程數,但是線程數設置太大,又會造成線程之間切換造成的資源消耗。

3.4 手寫RPCDemo

RPC(Remote Procedure Call) 遠程過程調用,是一種通過網絡從計算機程序上請求服務,而不需要了解底層網絡技術的協議。一般用來實現部署在不同機器上的系統之間的方法調用,使得程序能夠像訪問本地系統資源一樣,通過網絡傳輸去訪問遠端系統資源。

image-20220302164640587

// 1. 公共類
// 接口
public interface IHelloWorld {
    String sayHello(String content);
}

// Request
public class RpcRequest implements Serializable {
    
    private static final long serialVersionUID = -7922155162004878476L;
    
    private String className;
    private String methodName;

    private Object[] parameters;
    private Class[]  types;

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Object[] getParameters() {
        return parameters;
    }

    public void setParameters(Object[] parameters) {
        this.parameters = parameters;
    }

    public Class[] getTypes() {
        return types;
    }

    public void setTypes(Class[] types) {
        this.types = types;
    }
}

// 2. provider
// impl
public class HelloWorldImpl implements IHelloWorld {
    @Override
    public String sayHello(String content) {
        return "Hello " + content;
    }
}

// Server
public class RpcProxyServer {
    private final ExecutorService executorService = Executors.newCachedThreadPool();

    public void publisher (int port) {
        try (ServerSocket server = new ServerSocket(port)) {
            while (true) {
                final Socket socket = server.accept();
                executorService.execute(new ProcessorHandler(socket));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class ProcessorHandler implements Runnable {
    private final Socket socket;
    public ProcessorHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
        ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream())) {
            RpcRequest request = (RpcRequest)objectInputStream.readObject();
            Object object = invoke(request);
            objectOutputStream.writeObject(object);
            objectOutputStream.flush();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private Object invoke(RpcRequest request) throws Exception {
        Class<?> clazz = Class.forName(request.getClassName());
        Method method = clazz.getMethod(request.getMethodName(), request.getTypes());
        if (request.getClassName().substring(request.getClassName().lastIndexOf('.') + 1).equals("IHelloWorld"))
            return method.invoke(new HelloWorldImpl(), request.getParameters());
        else
            return null;
    }
}

// 3.consumer
// client
public class App 
{
    public static void main( String[] args )
    {
        RpcProxyClient client = new RpcProxyClient();
        IHelloWorld iHelloWorld = client.clientProxy(IHelloWorld.class, "localhost", 9090);

        System.out.println(iHelloWorld.sayHello("Elian"));
    }
}

// Client
public class RpcProxyClient {
    public <T> T clientProxy(final Class<T> interfaceCls, final String host, final int port) {
        return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port));
    }
}
// 動態代理類
public class RemoteInvocationHandler implements InvocationHandler {

    private String host;
    private int port;

    public RemoteInvocationHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest request = new RpcRequest();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameters(args);
        request.setTypes(method.getParameterTypes());

        RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
        Object object = rpcNetTransport.send(request);
        return object;
    }
}
// reader讀取返回報文
public class RpcNetTransport {
    private String host;
    private int port;

    public RpcNetTransport(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public Object send( RpcRequest request ) {
        try (Socket socket = new Socket(host, port);
             ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
             ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream())) {
            objectOutputStream.writeObject(request);
            objectOutputStream.flush();
            return objectInputStream.readObject();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

3.4 NIO

五種IO模型

阻塞IO,非阻塞IO,IO復用,信號驅動IO,異步IO,無論哪種IO,都是為了能夠提高服務端能夠並行處理的連接數量。

  1. 阻塞IO

    image-20220302231856640

應用進程調用accept()時,觸發系統把數據從網卡緩沖區復制到內核空間,再從內核空間復制到用戶空間,如果這個過程中,數據沒有准備好,到數據返回或發生錯誤返回之前,用戶進程一直處於阻塞狀態,這個就是阻塞IO。

  1. 非阻塞IO

image-20220302232932498

非阻塞是指用戶進程調用accept()后,如果數據沒有准備好,會返回一個EWOULDBLOCK狀態,並創建一個線程出來不斷輪詢返回結果。由此可見會增加CPU的消耗。

  1. IO復用

image-20220302233336221

  • select/poll

單個進程可以同時處理多個客戶端的網絡IO鏈接,我們可以把所有鏈接過來的客戶端注冊到select/poll復用器上,用一個線程或者進程來調用這個select/poll,調用這個select的時候會阻塞,阻塞的時候,內核會去監視所有select/poll所負責的socket,當其中一個socket准備好的時候,那么這個select/poll就會返回,如果再次調用這個select的時候,就會把數據從內核拷貝到用戶空間。

select/poll模型最大的缺點是,他只能線性的輪詢1024個鏈接,當然這1024個鏈接只有少數處於活躍狀態,會導致網絡的延遲。jdk1.5之前的NIO是使用這種模型。

這種模型處理的情況是:多個不同的監聽,而且只是提高了並發連接數,並不是提高單個線程處理性能。連接數少的情況下,不一定比BIO效率更高。

  • epoll

對select/poll進行的優化:

  • 對單個進程所打開的連接數沒有限制;
  • 利用每個文件描述符fd上的callback函數來實現異步回調,不需要輪詢了;
  • mmap,可以通過內核和用戶空間映射同一塊內存來減少內存復制。

image-20220302235611165

image-20220304180148962

  1. 信號驅動

image-20220303000101291

  1. 異步IO

image-20220303001110759

總結:

  1. BIO是指accept()過程和讀寫過程會被阻塞,每個線程只能同時處理一個鏈接,這個時候線程是不能做別的事情的,我們通過將獲取的Socket丟進線程池,來解決能夠處理下個監聽到的Socket的能力。如果連接數量足夠多,這時候性能就會下降,會有其他連接在等待被accept()到,並把它獲取的socket丟進線程池。
  2. NIO是一種非阻塞IO,當線程在某個復用器通道讀取數據沒有讀取到,可以進行其他事情的處理,不需要等待連接。

4. 深入分析NIO

4.1 NIO的新特性

image-20220303003151530

相比較老的IO來說,所有操作都是基於Channel和Buffer來說的,可以將Channel看成是InputStream/OutputStream,應用程序與磁盤/網絡緩沖區之間的一個通道,而所有數據操作都是通過緩沖區來實現的。

4.2 核心組件

通道(Channel):Java NIO數據來源,可以是網絡,也可以是本地磁盤

緩沖區(Buffer):數據讀寫的中轉區

選擇器(selectors):異步IO的核心類,可以實現異步非阻塞IO,一個selectors可以管理多個通道Channel

  • Channle

FileChannle:從文件中讀取數據

DatagramChannel:通過UDP協議讀寫網絡中的數據

SocketChannel:通過TCP協議讀寫網絡中的數據

ServerSocketChannel:監聽一個TCP連接,對於每一個新的客戶端連接都會創建一個SocketChannel

  • Buffer

緩沖區本質上是一塊可以寫入的數據,以及從中讀取數據的內存,實際上也是一個byte[]數據,只是在NIO中被封裝成了NIO Buffer對象,並提供了一組方法來訪問這個內存塊,要理解buffer的工作原理,需要知道幾個屬性:

private int position = 0; // 下一個位置
private int limit;	// 
private int capacity; // 容量,buffer數組初始化的最大容量
private int mark; // 標記
  • 讀:position=0; limit = capacity = [size];當要添加的數據byte[].lenth > limit - position時都可以成功。

  • flip():limit=position; position=0,防止多余數據的寫出

  • 寫:position遍歷到limit的過程

  • get():有4個重載方法,get()獲取一個單字節,get(int) 獲取特定位置的字節,get(byte[]) ,get(byte[], int, int)獲取一段字節

  • put():有5個重載,put(byte),put(int, byte),put(ByteBuffer),put(byte[], int, int),put(byte[])

  • 堆內內存:由JVM控制的內存,堆外內存不數據JVM運行時內存,而是用的系統內存,但GC會觸發回收。ByteBuffer有兩個子類:HeapByteBuffer和DirectByteBuffer

    • HeapByteBuffer:JVM堆內存

    • DirectByteBuffer:堆外本地內存

    • MappedByteBuffer:mmap的內存映射,讀寫性能極高

      • MappedByteBuffer將文件直接映射到內存。可以映射整個文件,如果文件比較大的話可以考慮分段進行映射,只要指定文件的感興趣部分就可以。

      • 由於MappedByteBuffer申請的是直接內存,因此不受Minor GC控制,只能在發生Full GC時才能被回收,因此Java提供了DirectByteBuffer類來改善這一情況。它是MappedByteBuffer類的子類,同時它實現了DirectBuffer接口,維護一個Cleaner對象來完成內存回收。因此它既可以通過Full GC來回收內存,也可以調用clean()方法來進行回收

      • FileChannel提供了map方法來把文件映射為內存對象:

        MappedByteBuffer outMappedBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
        

使用堆外內存的原因

  1. 對垃圾回收停頓的改善。因為full gc時,垃圾收集器會對所有分配的堆內內存進行掃描,垃圾收集對Java應用造成的影響,跟堆的大小是成正比的。過大的堆會影響Java應用的性能。如果使用堆外內存的話,堆外內存是直接受操作系統管理。這樣做的結果就是能保持一個較小的JVM堆內存,以減少垃圾收集對應用的影響。(full gc時會觸發堆外空閑內存的回收。)

  2. 減少了數據從JVM拷貝到native堆的次數,在某些場景下可以提升程序I/O的性能。

  3. 可以突破JVM內存限制,操作更多的物理內存。

使用堆外內存的問題

  1. 堆外內存難以控制,如果內存泄漏,那么很難排查(VisualVM可以通過安裝插件來監控堆外內存)。

  2. 堆外內存只能通過序列化和反序列化來存儲,保存對象速度比堆內存慢,不適合存儲很復雜的對象。一般簡單的對象或者扁平化的比較適合。

  3. 直接內存的訪問速度(讀寫方面)會快於堆內存。在申請內存空間時,堆內存速度高於直接內存。

  4. 當直接內存不足時會觸發full gc,排查full gc的時候,一定要考慮。

ByteBuffer模型

初始

image-20220304225349332

read(), put()

position = n;
limit = capacity = 8;
mark = -1;

flip()

limit = position; // 用來設置限制
position = 0;
mark = -1;

mark()

mark = postion; // 標記

reset()

position = mark;

clear()實際上數據還在

position = 0;
limit = capacity;
mark = -1;

4.3 零拷貝

  • 正常情況下,將一個文件發送給另一台服務器,需要四次拷貝,首先用戶進程調用cpu,從用戶空間切換到內核空間,內核空間調用DMA,從硬盤/網卡copy數據到內核空間,然后cpu系統調用,從內核空間copy到用戶空間,然后再從用戶空間通過cpu系統調用將數據copy到內核空間,再從內核空間通過DMAcopy到網卡緩沖區/硬盤。

image-20220303151649071

  • 零拷貝:不要用戶空間了,省略了內核緩沖區拷貝到用戶緩沖區。(目前理解是只針對客戶端)

image-20220303151849935

Linux支持的零拷貝方式:

  • mmap內存映射:MappedByteBuffer channle.map();的方式
  • sendfile:transferTo/transferFrom
  • sendfile with DMA Scatter/Gather Copy
  • splice

server

public class ZeroCopyServer {
    public static void main(String[] args) {
        try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
             RandomAccessFile writeFile = new RandomAccessFile("/appdata/IODemo/Capture001_zerCopy.png", "rw");
             FileChannel fileChannel = writeFile.getChannel();
        ) {
            long start = System.currentTimeMillis();
            serverSocketChannel.bind(new InetSocketAddress(9090));
            SocketChannel socketChannel = serverSocketChannel.accept();
            ByteBuffer buffer = ByteBuffer.allocate(8192);
            int i = 0;
            int j = 0;
            /*while ((i = socketChannel.read(buffer)) != -1) {
                buffer.flip();
                fileChannel.map(FileChannel.MapMode.READ_WRITE, j, i ).put(buffer);
                buffer.clear();
                j += i;
            }*/ // 2527ms mmap()方式
            while ((i = socketChannel.read(buffer)) != -1) {
                buffer.flip();
                fileChannel.write(buffer);
                buffer.clear();
                j += i;
            } // 4462ms 普通寫
            System.out.println("傳輸大小:" + j + ";時間:" + (System.currentTimeMillis() - start));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

client

public class ZeroCopyClient {
    public static void main(String[] args) {
        try (SocketChannel socketChannel = SocketChannel.open();
             FileChannel fileChannel = FileChannel.open(Paths.get("/appdata/IODemo/Capture001.png"))) {
            socketChannel.connect(new InetSocketAddress("localhost", 9090));
            int position = 0;
            long size=fileChannel.size();
            while (size > 0) {
                long transfer = fileChannel.transferTo(position, fileChannel.size(), socketChannel); // 零拷貝,只從File Copy到緩沖區
                position += transfer;
                size -= transfer;
            }
            System.out.println("上傳文件大小:" + position);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4.4 Selector

Selector(選擇器,多路復用器)是Java NIO中能夠檢測一到多個NIO通道,是否為諸如讀寫事件做好准備的組件。這樣,一個單獨的線程可以管理多個channel,從而管理多個網絡連接。

image-20220303163415475

服務端處理過程:

  1. Selector.open()開啟一個多路復用器,將ServerSocketChannel注冊到selector上,這個ServerSocketServer必然不能是阻塞的,一個Channel會以4種狀態注冊到selector上:

    • SelectionKey.OP_ACCEPT:可接收
    • SelectionKey.OP_CONNECT:可連接
    • SelectionKey.OP_READ:可讀
    • SelectionKey.OP_WRITE:可寫
  2. 通過Selector的select()方法可以阻塞selection的操作,當通道中有已准備好進行I/O操作的SelectionKey,會返回這些准備好的SelectionKey的個數,下面是select()的重載方法:

    • int select():阻塞到至少有一個通道在你注冊的事件上就緒了。

    • int select(long timeout):和select()一樣,但最長阻塞時間為timeout毫秒。

    • int selectNow():非阻塞,只要有通道就緒就立刻返回。

    select()方法返回的int值表示有多少通道已經就緒,是自上次調用select()方法后有多少通道變成就緒狀態。之前在select()調用時進入就緒的通道不會在本次調用中被記入,而在前一次select()調用進入就緒但現在已經不在處於就緒的通道也不會被記入。例如:首次調用select()方法,如果有一個通道變成就緒狀態,返回了1,若再次調用select()方法,如果另一個通道就緒了,它會再次返回1。

  3. 當selector.select()返回了准備好的連接數量后,可以通過selector.selectedKeys()獲取所有已就緒的Channel的描述符selectedKey,這個selectKyes中包含了它所對應的selector和channel,並且能獲取到當前這個selectedKey對應的channel的狀態(key.isAcceptable()等)

  4. 如果當前selectedKey描述的是一個isAcceptable(),可以從當前selectedKey中將其對應的ServerSocketChannel也就是我們最初注冊進來的channel獲取出來,並建立accept()監聽,進入阻塞(其實已經不用阻塞了,肯定是個准備好的channel,拿到SocketChannel后,將其設置為非阻塞,通過SelectionKey.OP_READ狀態注冊到selector中去,最后將其在selectKeys中移除。

    注冊事件狀態時,可用 | 連接,比如SelectionKey.OP_READ | SelectionKey.OP_ACCEPT

    image-20220304172521562

    // selector.open();
    private native int epollCreate();
    // serverSocketChannel/socketChannle.register(selector, SelectionKey.OP_ACCEPT)
    private native void epollCtl(int epfd, int opcode, int fd, int events);
    // selector.select()
    private native int epollwait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
    
  5. 再次進行selector.select(),這時會返回剛剛readable的SelectionKey,通過selector.selectionKeys()拿到后,判斷其狀態為isReadable(),就可以對其進行讀寫操作了,最后也要將其描述符移除掉

客戶端處理過程:

  1. 創建SocketChannel設置為非阻塞,通過SelectionKey.OP_CONNECT狀態注冊到一個selector上。
  2. 通過selector的select()方法阻塞selection操作。
  3. 然后通過selector.seletedKeys()獲取所有就緒的SelectionKey描述符。
  4. 如果當前描述符為isConnectable(),獲取當前描述符對應的channel,判斷當前channel是否已啟動連接操作,但是並沒有通過finishConnect()完成連接,如果為true,執行channel.finishConnect(),並將channel設置為非阻塞,寫數據后以SelectionKey.OP_READ狀態重新注冊到selector中,最后將其在selectKeys中移除。
  5. 再次進行selector.select()操作,如果在輪詢過程中獲得了isReadable()的描述符selectionKey,對其進行讀取,完成處理過程,最后也要將其描述符移除掉

server

public class NIOSelectorServer {
    public static void main(String[] args) {
        try (Selector selector = Selector.open();
             ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
            serverSocketChannel.configureBlocking(false); // 在多路復用器中,這個必須設置為非阻塞
            serverSocketChannel.bind(new InetSocketAddress(9090));
            // 監聽連接事件
            // 將serverSocketChannel注冊到selector上
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                // 參數可以帶時間:0:阻塞;有時間:設置一個超時時間
                selector.select(); // 阻塞所有注冊到多路復用器上的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 對於連接的SocketChannel的selectKey的集合
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove(); // 避免重復處理
                    // socket兩種狀態:listen    通信R/W
                    if (selectionKey.isAcceptable()) {  // 是一個連接事件
                        acceptHandler(selectionKey);
                    } else if (selectionKey.isReadable()) { // 是一個讀事件
                        readHandler(selectionKey);
                    }
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void acceptHandler(SelectionKey key) {
        try {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel client = ssc.accept(); // 目的是調用accept接收客戶端,例如fd7
            client.configureBlocking(false);
            ByteBuffer buffer = ByteBuffer.allocate(1024);

            client.register(key.selector(), SelectionKey.OP_READ, buffer);
            System.out.println("-------------------------------------------");
            System.out.println("新客戶端:" + client.getRemoteAddress());
            System.out.println("-------------------------------------------");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void readHandler(SelectionKey key) {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        try {
            channel.read(buffer);
            buffer.flip();
            System.out.println("Client Info: "+new String(buffer.array()));
            buffer.clear();
            buffer.put("Hello Client, i'm Server".getBytes());
            buffer.flip();
            channel.write(buffer);
            channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

client

public class NIOSelectorClient {
    public static void main(String[] args) {
        try (Selector selector = Selector.open()) {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("localhost", 9090));
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            while (true) {
                selector.select();
                Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                Iterator<SelectionKey> selectionKeyIterator = selectionKeySet.iterator();
                while (selectionKeyIterator.hasNext()) {
                    SelectionKey selectionKey = selectionKeyIterator.next();
                    selectionKeyIterator.remove();
                    if (selectionKey.isConnectable()) {
                        connectHandler(selector, selectionKey);
                    } else if (selectionKey.isReadable()) {
                        readHandler(selectionKey);
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void connectHandler(Selector selector, SelectionKey selectionKey) throws IOException {
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        if (channel.isConnectionPending()) {
            channel.finishConnect();
        }
        channel.configureBlocking(false);
        channel.write(ByteBuffer.wrap("Hello Server, I'm NIO Client".getBytes()));
        channel.register(selector, SelectionKey.OP_READ);
    }

    private static void readHandler(SelectionKey selectionKey) throws IOException {
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        channel.read(byteBuffer);
        byteBuffer.flip();
        System.out.println("client receive message: " + new String(byteBuffer.array()));
        channel.close();
    }
}

5. Reactor模式

  • 5.1 傳統阻塞式
accpet() + new Thread(() -> {

	// 業務處理

}).start();
  • 5.2 單reactor單線程處理
new SocketServerChannel().registror(selector, SelectionKey.OP_ACCEPT);

while(true) {

	selector.select();

	seletor.selectedKeys().iterator();

	while (iterator.hasnext()) {

		// 業務處理

	}

}
  • 5.3 單reactor多線程處理

在上面業務處理部分加入多線程。

  • 5.4 多reactor多線程處理

Netty,兩個Grop,一個處理accept,一個處理業務

回顧

IO

  • inputStream/outputStream, reader/writer
  • BufferedIutStream/BufferedReader(readLine()) 需要flush(), FileInputStream/FileReader
  • RandomAccessFile(File/路徑, "rw"),File,FileChannle.open(Paths, StrandardOpenOption)
  • Socket -> BIO + 多線程
  • NewIO(No Block IO),基於Buffer + Channel
  • ServerSocketChannle.open().bind().configurBlocking(false)
    • Selector.open()
    • serverSocketChannle.registor(selector, SelectionKey.ACCEPT)
    • selector.select()
    • selector.selectionKeys()
  • 阻塞(I/O阻塞,連接阻塞)
  • epool(多路[多個Channle注冊到selector上] 復用[一個或少量的線程])
  • Netty框架,后續維護
  • 零拷貝(Kafka, rocketMQ),內存映射(mmap)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM