網絡IO
1. 網絡IO
1.1 什么是IO流以及IO流的作用
I/O實際上是Input和Output,也就是輸入和輸出。而流其實是一種抽象的概念,它表示的是數據的無結構化傳遞。會被當成無結構的字節序列或字符序列。流可以當作是磁盤與內存之間的一個管道。
1.2 IO流的分類
在Java中I/O流操作很多,但是核心體系實際上就只有File(文件流)、InputStream(字節輸入流)、OutputStream(字節輸出流)、Reader(字符輸入流)、Writer(字符輸出流)。

- 字節流:操作的數據單元是8位的字節。InputStream、OutputStream作為抽象基類。可以處理所有的數據文件。
- 字符流:操作的數據單元是字符。以Writer、Reader作為抽象基類。只限於處理文本的數據文件。
- 訪問管道處理流,是用來去完成管道的讀寫操作,用於線程間的通訊
- 訪問數組處理流,是針對內存的操作
- 緩沖流是提供一個緩沖區,對於緩沖區的一個處理流,避免每次與磁盤的交互,提高輸入輸出的一個效率
- 對象流,主要用在序列化這個機制上,將一個對象序列化后轉換成一個可存儲可傳輸的對象,傳輸時用到的流。
- 轉換流:將字符流轉換成字節流
- 打印流

2. IO流的數據來源及操作的API
- 硬盤
- 內存
- 鍵盤
- 網絡
2.1 File類簡介
File類是Java中為文件進行創建、刪除、重命名、移動等操作而設計的一個類
- File(File parent, String child):根據parent抽象路徑名和child路徑名字符串創建一個新的File實例。
- File(String pathname):將指定路徑名轉化為抽象路徑名創建一個新的File實例。
- File(String parent, String child):根據parent路徑名和child路徑名創建一個File實例。
- File(URI uri):指定URI轉化為抽象路徑名。
2.2 基於文件的輸入輸出流
public static void main(String[] args) {
File file = new File("D:\\appdata\\IODemo\\Capture001.png");
try (
FileOutputStream fileOutputStream = new FileOutputStream("D:\\appdata\\IODemo\\Capture002.png");
FileInputStream fileInputStream = new FileInputStream(file)) { // 1.7之后,將流寫入try()中,代碼執行完畢后,會自動關閉流
int len = 0;
byte[] buffer = new byte[1024];
long start = System.currentTimeMillis();
while ((len = fileInputStream.read(buffer)) != -1) {
fileOutputStream.write(buffer, 0, len);
}
long end = System.currentTimeMillis();
System.out.println((end - start) / 1000);
} catch (IOException e) {
e.printStackTrace();
}
}
流一定要關閉,否則當前線程沒執行完會一直使其被進程占用。
try (FileReader reader = new FileReader("/appdata/IODemo/IODemo");
FileWriter writer = new FileWriter("/appdata/IODemo/IODemo.txt")) {
int i = 0;
char[] chars = new char[1];
while ((i = reader.read(chars)) != -1) {
writer.write(new String(chars, 0, i));
}
} catch (Exception e) {
e.printStackTrace();
}
2.3 緩沖流
緩沖流是帶緩沖區的處理流,他會提供一個緩沖區,緩沖區的作用主要目的是:避免每次和硬盤打交道,能夠提高輸入/輸出的執行效率。
BufferedInputStream
private static int DEFAULT_BUFFER_SIZE = 8192; // 默認8Kb的緩沖區
private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8; // 最大緩沖區大小
// 每次讀取的8Kb size的字節會存儲在buf[]數組中
//每次調用read()方法時,會首先去嘗試從這個數組中讀取,如果讀取失敗,會從數據源(磁盤上)去讀取
protected volatile byte buf[];
// 兩種構造方法最終調用該方法,帶int參數的會覆蓋默認的8Kb size
public BufferedInputStream(InputStream in, int size) {
super(in);
if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
buf = new byte[size];
}
其實緩沖流原理上是幫我們封裝了8Kb大小的數據,先從磁盤讀8Kb到我們內存,后由我們自己去操作這8Kb的數據,當處理完8Kb緩沖區沒有了,再加載數據到緩沖區,再讀到內存去處理。當我們用普通流去處理文件,將buffer[]設置的稍微大一點,一樣可以達到提高效率的結果。
public static void main(String[] args) {
try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream("/appdata/IODemo/IODemo"));
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream("/appdata/IODemo/IODemo.txt"))) {
int len = 0;
byte[] bytes = new byte[1024];
while ((len = bufferedInputStream.read(bytes)) != -1) {
// System.out.println(new String(bytes, 0, len));
bufferedOutputStream.write(bytes, 0, len);
bufferedOutputStream.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}
將創建InputStream寫入到try()中,可以幫我們實現close()關閉流的操作,這個close中包含了buffred的flush操作,如果沒有關閉流,又沒有手動flush(),將會丟失數據。
public void close() throws IOException {
try (OutputStream ostream = out) {
flush();
}
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("/appdata/IODemo/IODemo"), StandardCharsets.UTF_8))) {
String str;
while ((str = reader.readLine()) != null) {
System.out.println(str);
}
} catch (Exception e) {
e.printStackTrace();
}
2.4 轉換流
try (InputStream inputStream = new FileInputStream("/appdata/IODemo/IODemo");
InputStreamReader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8)) {
char[] chars = new char[1024];
int i;
while ((i = reader.read(chars)) != -1) {
System.out.println(new String(chars, 0, i));
}
} catch (Exception e) {
e.printStackTrace();
}
在這個轉換流中,時可以指定字符集編碼的。
2.5 對象流
關於序列化和反序列化這個問題,我在18年參加工作的時候,遇到過一個項目,之后就再沒有用過了。當時架構還是分布式dubbo+zookeeper,但是傳輸報文竟然用到這個我是沒想到的。
什么是序列化和反序列化?
- 序列化是把對象的狀態信息轉化為可存儲或傳輸的形式的過程,也就是把對象轉化為字節序列的過程成為對象的序列化
- 反序列化是序列化的逆向過程,把字節數組反序列化為對象。
public class UserSerializable implements Serializable {
private static final long serialVersionUID = 8160464260217334369L;
private String name;
private int age;
public void setName(String name) {
this.name = name;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "UserSerializable{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
public static void main(String[] args) {
UserSerializable user = new UserSerializable();
user.setAge(26);
user.setName("Elian");
String fileName = "/appdata/IODemo/User";
try (FileInputStream fileInputStream = new FileInputStream(fileName);
FileOutputStream fileOutputStream = new FileOutputStream(fileName);
ObjectOutputStream outputStream = new ObjectOutputStream(fileOutputStream);
ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream)
) {
outputStream.writeObject(user);
outputStream.flush();
UserSerializable newUser = (UserSerializable) objectInputStream.readObject();
System.out.println(newUser);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. 本地IO和網絡IO
3.0 本地I/O操作實例
public class NIOFirstDemo {
public static void main(String[] args) {
bio();
bufferBio();
nio();
mmap();
zeroCopy();
}
private static void bio() {
try (FileInputStream bioInputStream = new FileInputStream("/appdata/IODemo/jdk api 1.8_google.CHM");
FileOutputStream bioOutputStream = new FileOutputStream("/appdata/IODemo/jdk_bio.CHM")) {
// bio實現copy
long bioStart = System.currentTimeMillis();
int len = 0;
byte[] buffer = new byte[1024];
while ((len = bioInputStream.read(buffer)) != -1) {
bioOutputStream.write(buffer, 0, len);
}
bioOutputStream.flush();
System.out.println(System.currentTimeMillis() - bioStart);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void bufferBio() {
try (BufferedInputStream bioInputStream = new BufferedInputStream(new FileInputStream("/appdata/IODemo/jdk api 1.8_google.CHM"));
BufferedOutputStream bioOutputStream = new BufferedOutputStream(new FileOutputStream("/appdata/IODemo/jdk_bufferBio.CHM"))) {
// bio實現copy
long bioStart = System.currentTimeMillis();
int len = 0;
byte[] buffer = new byte[1024];
while ((len = bioInputStream.read(buffer)) != -1) {
bioOutputStream.write(buffer, 0, len);
}
bioOutputStream.flush();
System.out.println(System.currentTimeMillis() - bioStart);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void nio() {
try (FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk_nio.CHM"),
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) {
// nio 實現copy
long nioStart = System.currentTimeMillis();
int len = 0;
ByteBuffer buffer = ByteBuffer.allocate(1024);
while ((len = inChannel.read(buffer)) != -1) {
buffer.flip();
outChannel.write(buffer);
buffer.clear();
}
System.out.println(System.currentTimeMillis() - nioStart);
} catch (Exception e) {
e.printStackTrace();
}
}
// 依然將用戶空間的
private static void mmap() {
try (FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdb_mmap.CHM"),
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) {
long mmapStart = System.currentTimeMillis();
MappedByteBuffer inMappedBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMappedBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
byte[] bytes = new byte[inMappedBuffer.limit()];
inMappedBuffer.get(bytes);
outMappedBuffer.put(bytes);
System.out.println("mmap:" + (System.currentTimeMillis() - mmapStart));
} catch (Exception e) {
e.printStackTrace();
}
}
private static void zeroCopy() {
try(FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk_zeroCopy.CHM"),
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) {
long zeroCopyStart = System.currentTimeMillis();
inChannel.transferTo(0, inChannel.size(), outChannel);
System.out.println(System.currentTimeMillis() - zeroCopyStart);
} catch (Exception e) {
e.printStackTrace();
}
}
}
實驗順序(速度由快到慢排序)
zeroCopy(零拷貝) > mmap(內存映射) > bufferedInputStream > bio(基於channle) ~= nio
zerCopy無需將文件映射到內存,mmap會將buffer讀進內存,關於Buffer繼續往下看4.2。
3.1 Socket和ServerSocket
// 服務端
final int DEFAULT_PORT = 9090;
try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
Socket socket = serverSocket.accept();// 阻塞操作,等待客戶端的連接
System.out.println("Client port:" + socket.getPort() + " has been connected!");
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
String str = bufferedReader.readLine();
System.out.println("Client Content:" + str);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
writer.write(str + "\n"); // 如果不換行,客戶端會一直等待讀取完
writer.flush();
bufferedReader.close();
writer.close();
} catch (Exception e) {
e.printStackTrace();
}
try (Socket socket = new Socket("localhost", 9090)) {
OutputStream outputStream = socket.getOutputStream();
outputStream.write("Hello Elian\n".getBytes(StandardCharsets.UTF_8)); // 不換行服務端會一直等待讀取完,進入阻塞
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println(reader.readLine());
} catch (Exception e) {
e.printStackTrace();
}
3.2 網絡通訊協議分析


客戶端是怎樣找到目標服務的呢?
客戶端發起請求的時候,在不同的層去增加不同的協議頭,在數據鏈路層組裝目標機器的Mac地址,這個地址是通過ARP協議,我們已知目標的IP,需要獲得目標的Mac地址,會發送一個廣播消息,會在網段內去詢問這個IP是誰,目標地址會發送自己Mac地址給到當前這個發送端,就可以去組裝目標的Mac地址。那么在數據發送過程中,進入IP廣播后,某個網卡就會發現,對應Mac的網卡就會把數據包收進來。
3.3 網絡通信原理
本地磁盤IO通信:

網絡磁盤通信:

兩者不同在於:本地磁盤要通過DMA(直接存儲訪問器)將磁盤上的內容讀取到內核空間緩沖區,再從內核空間緩沖區讀到用戶空間緩沖區進行操作。而網絡IO是通過網卡中的緩沖區讀取到系統內核緩沖區,如果應用進程一直沒有調用socket的read()方法讀取數據將數據copy到用戶緩沖區,數據會一直被緩存在內核緩沖區里面。
理解阻塞過程

accept()每次只能接收一個並處理一個socket,這樣只能等上一個socket處理完才能繼續處理下一個請求。BIO每次阻塞兩個位置,第一個阻塞位置是accept過程,另一個阻塞過程是I/O流讀寫的過程。
解決辦法:通過線程池進行處理。
public static void main(String[] args) {
final int DEFAULT_PORT = 9090;
try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
while (true) {
Socket socket = serverSocket.accept();// 阻塞操作,等待客戶端的連接
executorService.submit(new ServerSocketThread(socket));
}
} catch (Exception e) {
e.printStackTrace();
}
}
public class ServerSocketThread implements Runnable {
private Socket socket;
public ServerSocketThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
System.out.println("Client port:" + socket.getPort() + " has been connected");
try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
BufferedWriter writer=new BufferedWriter((new OutputStreamWriter(socket.getOutputStream())))){
String clinetStr = reader.readLine();
System.out.println("Client resived message: " + clinetStr);
Thread.sleep(15000);
writer.write("OK.\n");
} catch(Exception e){
e.printStackTrace();
}
}
}
現在還有一個缺點:
線程數取決於計算機本身的線程數,但是線程數設置太大,又會造成線程之間切換造成的資源消耗。
3.4 手寫RPCDemo
RPC(Remote Procedure Call) 遠程過程調用,是一種通過網絡從計算機程序上請求服務,而不需要了解底層網絡技術的協議。一般用來實現部署在不同機器上的系統之間的方法調用,使得程序能夠像訪問本地系統資源一樣,通過網絡傳輸去訪問遠端系統資源。

// 1. 公共類
// 接口
public interface IHelloWorld {
String sayHello(String content);
}
// Request
public class RpcRequest implements Serializable {
private static final long serialVersionUID = -7922155162004878476L;
private String className;
private String methodName;
private Object[] parameters;
private Class[] types;
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getParameters() {
return parameters;
}
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
public Class[] getTypes() {
return types;
}
public void setTypes(Class[] types) {
this.types = types;
}
}
// 2. provider
// impl
public class HelloWorldImpl implements IHelloWorld {
@Override
public String sayHello(String content) {
return "Hello " + content;
}
}
// Server
public class RpcProxyServer {
private final ExecutorService executorService = Executors.newCachedThreadPool();
public void publisher (int port) {
try (ServerSocket server = new ServerSocket(port)) {
while (true) {
final Socket socket = server.accept();
executorService.execute(new ProcessorHandler(socket));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ProcessorHandler implements Runnable {
private final Socket socket;
public ProcessorHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream())) {
RpcRequest request = (RpcRequest)objectInputStream.readObject();
Object object = invoke(request);
objectOutputStream.writeObject(object);
objectOutputStream.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
private Object invoke(RpcRequest request) throws Exception {
Class<?> clazz = Class.forName(request.getClassName());
Method method = clazz.getMethod(request.getMethodName(), request.getTypes());
if (request.getClassName().substring(request.getClassName().lastIndexOf('.') + 1).equals("IHelloWorld"))
return method.invoke(new HelloWorldImpl(), request.getParameters());
else
return null;
}
}
// 3.consumer
// client
public class App
{
public static void main( String[] args )
{
RpcProxyClient client = new RpcProxyClient();
IHelloWorld iHelloWorld = client.clientProxy(IHelloWorld.class, "localhost", 9090);
System.out.println(iHelloWorld.sayHello("Elian"));
}
}
// Client
public class RpcProxyClient {
public <T> T clientProxy(final Class<T> interfaceCls, final String host, final int port) {
return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port));
}
}
// 動態代理類
public class RemoteInvocationHandler implements InvocationHandler {
private String host;
private int port;
public RemoteInvocationHandler(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest request = new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameters(args);
request.setTypes(method.getParameterTypes());
RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
Object object = rpcNetTransport.send(request);
return object;
}
}
// reader讀取返回報文
public class RpcNetTransport {
private String host;
private int port;
public RpcNetTransport(String host, int port) {
this.host = host;
this.port = port;
}
public Object send( RpcRequest request ) {
try (Socket socket = new Socket(host, port);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream())) {
objectOutputStream.writeObject(request);
objectOutputStream.flush();
return objectInputStream.readObject();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
3.4 NIO
五種IO模型
阻塞IO,非阻塞IO,IO復用,信號驅動IO,異步IO,無論哪種IO,都是為了能夠提高服務端能夠並行處理的連接數量。
-
阻塞IO

應用進程調用accept()時,觸發系統把數據從網卡緩沖區復制到內核空間,再從內核空間復制到用戶空間,如果這個過程中,數據沒有准備好,到數據返回或發生錯誤返回之前,用戶進程一直處於阻塞狀態,這個就是阻塞IO。
- 非阻塞IO

非阻塞是指用戶進程調用accept()后,如果數據沒有准備好,會返回一個EWOULDBLOCK狀態,並創建一個線程出來不斷輪詢返回結果。由此可見會增加CPU的消耗。
- IO復用

- select/poll
單個進程可以同時處理多個客戶端的網絡IO鏈接,我們可以把所有鏈接過來的客戶端注冊到select/poll復用器上,用一個線程或者進程來調用這個select/poll,調用這個select的時候會阻塞,阻塞的時候,內核會去監視所有select/poll所負責的socket,當其中一個socket准備好的時候,那么這個select/poll就會返回,如果再次調用這個select的時候,就會把數據從內核拷貝到用戶空間。
select/poll模型最大的缺點是,他只能線性的輪詢1024個鏈接,當然這1024個鏈接只有少數處於活躍狀態,會導致網絡的延遲。jdk1.5之前的NIO是使用這種模型。
這種模型處理的情況是:多個不同的監聽,而且只是提高了並發連接數,並不是提高單個線程處理性能。連接數少的情況下,不一定比BIO效率更高。
- epoll
對select/poll進行的優化:
- 對單個進程所打開的連接數沒有限制;
- 利用每個文件描述符fd上的callback函數來實現異步回調,不需要輪詢了;
- mmap,可以通過內核和用戶空間映射同一塊內存來減少內存復制。

- 信號驅動

- 異步IO

總結:
- BIO是指accept()過程和讀寫過程會被阻塞,每個線程只能同時處理一個鏈接,這個時候線程是不能做別的事情的,我們通過將獲取的Socket丟進線程池,來解決能夠處理下個監聽到的Socket的能力。如果連接數量足夠多,這時候性能就會下降,會有其他連接在等待被accept()到,並把它獲取的socket丟進線程池。
- NIO是一種非阻塞IO,當線程在某個復用器通道讀取數據沒有讀取到,可以進行其他事情的處理,不需要等待連接。
4. 深入分析NIO
4.1 NIO的新特性

相比較老的IO來說,所有操作都是基於Channel和Buffer來說的,可以將Channel看成是InputStream/OutputStream,應用程序與磁盤/網絡緩沖區之間的一個通道,而所有數據操作都是通過緩沖區來實現的。
4.2 核心組件
通道(Channel):Java NIO數據來源,可以是網絡,也可以是本地磁盤
緩沖區(Buffer):數據讀寫的中轉區
選擇器(selectors):異步IO的核心類,可以實現異步非阻塞IO,一個selectors可以管理多個通道Channel
- Channle
FileChannle:從文件中讀取數據
DatagramChannel:通過UDP協議讀寫網絡中的數據
SocketChannel:通過TCP協議讀寫網絡中的數據
ServerSocketChannel:監聽一個TCP連接,對於每一個新的客戶端連接都會創建一個SocketChannel
- Buffer
緩沖區本質上是一塊可以寫入的數據,以及從中讀取數據的內存,實際上也是一個byte[]數據,只是在NIO中被封裝成了NIO Buffer對象,並提供了一組方法來訪問這個內存塊,要理解buffer的工作原理,需要知道幾個屬性:
private int position = 0; // 下一個位置 private int limit; // private int capacity; // 容量,buffer數組初始化的最大容量 private int mark; // 標記
讀:position=0; limit = capacity = [size];當要添加的數據byte[].lenth > limit - position時都可以成功。
flip():limit=position; position=0,防止多余數據的寫出
寫:position遍歷到limit的過程
get():有4個重載方法,get()獲取一個單字節,get(int) 獲取特定位置的字節,get(byte[]) ,get(byte[], int, int)獲取一段字節
put():有5個重載,put(byte),put(int, byte),put(ByteBuffer),put(byte[], int, int),put(byte[])
堆內內存:由JVM控制的內存,堆外內存不數據JVM運行時內存,而是用的系統內存,但GC會觸發回收。ByteBuffer有兩個子類:HeapByteBuffer和DirectByteBuffer
HeapByteBuffer:JVM堆內存
DirectByteBuffer:堆外本地內存
MappedByteBuffer:mmap的內存映射,讀寫性能極高
MappedByteBuffer將文件直接映射到內存。可以映射整個文件,如果文件比較大的話可以考慮分段進行映射,只要指定文件的感興趣部分就可以。
由於MappedByteBuffer申請的是直接內存,因此不受Minor GC控制,只能在發生Full GC時才能被回收,因此Java提供了DirectByteBuffer類來改善這一情況。它是MappedByteBuffer類的子類,同時它實現了DirectBuffer接口,維護一個Cleaner對象來完成內存回收。因此它既可以通過Full GC來回收內存,也可以調用clean()方法來進行回收
FileChannel提供了map方法來把文件映射為內存對象:
MappedByteBuffer outMappedBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
使用堆外內存的原因
對垃圾回收停頓的改善。因為full gc時,垃圾收集器會對所有分配的堆內內存進行掃描,垃圾收集對Java應用造成的影響,跟堆的大小是成正比的。過大的堆會影響Java應用的性能。如果使用堆外內存的話,堆外內存是直接受操作系統管理。這樣做的結果就是能保持一個較小的JVM堆內存,以減少垃圾收集對應用的影響。(full gc時會觸發堆外空閑內存的回收。)
減少了數據從JVM拷貝到native堆的次數,在某些場景下可以提升程序I/O的性能。
可以突破JVM內存限制,操作更多的物理內存。
使用堆外內存的問題
堆外內存難以控制,如果內存泄漏,那么很難排查(VisualVM可以通過安裝插件來監控堆外內存)。
堆外內存只能通過序列化和反序列化來存儲,保存對象速度比堆內存慢,不適合存儲很復雜的對象。一般簡單的對象或者扁平化的比較適合。
直接內存的訪問速度(讀寫方面)會快於堆內存。在申請內存空間時,堆內存速度高於直接內存。
當直接內存不足時會觸發full gc,排查full gc的時候,一定要考慮。
ByteBuffer模型
初始

read(), put()
position = n;
limit = capacity = 8;
mark = -1;
flip()
limit = position; // 用來設置限制
position = 0;
mark = -1;
mark()
mark = postion; // 標記
reset()
position = mark;
clear()實際上數據還在
position = 0;
limit = capacity;
mark = -1;
4.3 零拷貝
- 正常情況下,將一個文件發送給另一台服務器,需要四次拷貝,首先用戶進程調用cpu,從用戶空間切換到內核空間,內核空間調用DMA,從硬盤/網卡copy數據到內核空間,然后cpu系統調用,從內核空間copy到用戶空間,然后再從用戶空間通過cpu系統調用將數據copy到內核空間,再從內核空間通過DMAcopy到網卡緩沖區/硬盤。

- 零拷貝:不要用戶空間了,省略了內核緩沖區拷貝到用戶緩沖區。(目前理解是只針對客戶端)

Linux支持的零拷貝方式:
- mmap內存映射:MappedByteBuffer channle.map();的方式
- sendfile:transferTo/transferFrom
- sendfile with DMA Scatter/Gather Copy
- splice
server
public class ZeroCopyServer {
public static void main(String[] args) {
try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
RandomAccessFile writeFile = new RandomAccessFile("/appdata/IODemo/Capture001_zerCopy.png", "rw");
FileChannel fileChannel = writeFile.getChannel();
) {
long start = System.currentTimeMillis();
serverSocketChannel.bind(new InetSocketAddress(9090));
SocketChannel socketChannel = serverSocketChannel.accept();
ByteBuffer buffer = ByteBuffer.allocate(8192);
int i = 0;
int j = 0;
/*while ((i = socketChannel.read(buffer)) != -1) {
buffer.flip();
fileChannel.map(FileChannel.MapMode.READ_WRITE, j, i ).put(buffer);
buffer.clear();
j += i;
}*/ // 2527ms mmap()方式
while ((i = socketChannel.read(buffer)) != -1) {
buffer.flip();
fileChannel.write(buffer);
buffer.clear();
j += i;
} // 4462ms 普通寫
System.out.println("傳輸大小:" + j + ";時間:" + (System.currentTimeMillis() - start));
} catch (Exception e) {
e.printStackTrace();
}
}
}
client
public class ZeroCopyClient {
public static void main(String[] args) {
try (SocketChannel socketChannel = SocketChannel.open();
FileChannel fileChannel = FileChannel.open(Paths.get("/appdata/IODemo/Capture001.png"))) {
socketChannel.connect(new InetSocketAddress("localhost", 9090));
int position = 0;
long size=fileChannel.size();
while (size > 0) {
long transfer = fileChannel.transferTo(position, fileChannel.size(), socketChannel); // 零拷貝,只從File Copy到緩沖區
position += transfer;
size -= transfer;
}
System.out.println("上傳文件大小:" + position);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.4 Selector
Selector(選擇器,多路復用器)是Java NIO中能夠檢測一到多個NIO通道,是否為諸如讀寫事件做好准備的組件。這樣,一個單獨的線程可以管理多個channel,從而管理多個網絡連接。

服務端處理過程:
-
Selector.open()開啟一個多路復用器,將ServerSocketChannel注冊到selector上,這個ServerSocketServer必然不能是阻塞的,一個Channel會以4種狀態注冊到selector上:
- SelectionKey.OP_ACCEPT:可接收
- SelectionKey.OP_CONNECT:可連接
- SelectionKey.OP_READ:可讀
- SelectionKey.OP_WRITE:可寫
-
通過Selector的select()方法可以阻塞selection的操作,當通道中有已准備好進行I/O操作的SelectionKey,會返回這些准備好的SelectionKey的個數,下面是select()的重載方法:
-
int select():阻塞到至少有一個通道在你注冊的事件上就緒了。
-
int select(long timeout):和select()一樣,但最長阻塞時間為timeout毫秒。
-
int selectNow():非阻塞,只要有通道就緒就立刻返回。
select()方法返回的int值表示有多少通道已經就緒,是自上次調用select()方法后有多少通道變成就緒狀態。之前在select()調用時進入就緒的通道不會在本次調用中被記入,而在前一次select()調用進入就緒但現在已經不在處於就緒的通道也不會被記入。例如:首次調用select()方法,如果有一個通道變成就緒狀態,返回了1,若再次調用select()方法,如果另一個通道就緒了,它會再次返回1。
-
-
當selector.select()返回了准備好的連接數量后,可以通過selector.selectedKeys()獲取所有已就緒的Channel的描述符selectedKey,這個selectKyes中包含了它所對應的selector和channel,並且能獲取到當前這個selectedKey對應的channel的狀態(key.isAcceptable()等)
-
如果當前selectedKey描述的是一個isAcceptable(),可以從當前selectedKey中將其對應的ServerSocketChannel也就是我們最初注冊進來的channel獲取出來,並建立accept()監聽,進入阻塞(其實已經不用阻塞了,肯定是個准備好的channel,拿到SocketChannel后,將其設置為非阻塞,通過SelectionKey.OP_READ狀態注冊到selector中去,最后將其在selectKeys中移除。
注冊事件狀態時,可用 | 連接,比如SelectionKey.OP_READ | SelectionKey.OP_ACCEPT

// selector.open(); private native int epollCreate(); // serverSocketChannel/socketChannle.register(selector, SelectionKey.OP_ACCEPT) private native void epollCtl(int epfd, int opcode, int fd, int events); // selector.select() private native int epollwait(long pollAddress, int numfds, long timeout, int epfd) throws IOException; -
再次進行selector.select(),這時會返回剛剛readable的SelectionKey,通過selector.selectionKeys()拿到后,判斷其狀態為isReadable(),就可以對其進行讀寫操作了,最后也要將其描述符移除掉
客戶端處理過程:
- 創建SocketChannel設置為非阻塞,通過SelectionKey.OP_CONNECT狀態注冊到一個selector上。
- 通過selector的select()方法阻塞selection操作。
- 然后通過selector.seletedKeys()獲取所有就緒的SelectionKey描述符。
- 如果當前描述符為isConnectable(),獲取當前描述符對應的channel,判斷當前channel是否已啟動連接操作,但是並沒有通過finishConnect()完成連接,如果為true,執行channel.finishConnect(),並將channel設置為非阻塞,寫數據后以SelectionKey.OP_READ狀態重新注冊到selector中,最后將其在selectKeys中移除。
- 再次進行selector.select()操作,如果在輪詢過程中獲得了isReadable()的描述符selectionKey,對其進行讀取,完成處理過程,最后也要將其描述符移除掉
server
public class NIOSelectorServer {
public static void main(String[] args) {
try (Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
serverSocketChannel.configureBlocking(false); // 在多路復用器中,這個必須設置為非阻塞
serverSocketChannel.bind(new InetSocketAddress(9090));
// 監聽連接事件
// 將serverSocketChannel注冊到selector上
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 參數可以帶時間:0:阻塞;有時間:設置一個超時時間
selector.select(); // 阻塞所有注冊到多路復用器上的事件
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 對於連接的SocketChannel的selectKey的集合
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove(); // 避免重復處理
// socket兩種狀態:listen 通信R/W
if (selectionKey.isAcceptable()) { // 是一個連接事件
acceptHandler(selectionKey);
} else if (selectionKey.isReadable()) { // 是一個讀事件
readHandler(selectionKey);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void acceptHandler(SelectionKey key) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel client = ssc.accept(); // 目的是調用accept接收客戶端,例如fd7
client.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.register(key.selector(), SelectionKey.OP_READ, buffer);
System.out.println("-------------------------------------------");
System.out.println("新客戶端:" + client.getRemoteAddress());
System.out.println("-------------------------------------------");
} catch (IOException e) {
e.printStackTrace();
}
}
public static void readHandler(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
try {
channel.read(buffer);
buffer.flip();
System.out.println("Client Info: "+new String(buffer.array()));
buffer.clear();
buffer.put("Hello Client, i'm Server".getBytes());
buffer.flip();
channel.write(buffer);
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
client
public class NIOSelectorClient {
public static void main(String[] args) {
try (Selector selector = Selector.open()) {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("localhost", 9090));
socketChannel.register(selector, SelectionKey.OP_CONNECT);
while (true) {
selector.select();
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> selectionKeyIterator = selectionKeySet.iterator();
while (selectionKeyIterator.hasNext()) {
SelectionKey selectionKey = selectionKeyIterator.next();
selectionKeyIterator.remove();
if (selectionKey.isConnectable()) {
connectHandler(selector, selectionKey);
} else if (selectionKey.isReadable()) {
readHandler(selectionKey);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private static void connectHandler(Selector selector, SelectionKey selectionKey) throws IOException {
SocketChannel channel = (SocketChannel) selectionKey.channel();
if (channel.isConnectionPending()) {
channel.finishConnect();
}
channel.configureBlocking(false);
channel.write(ByteBuffer.wrap("Hello Server, I'm NIO Client".getBytes()));
channel.register(selector, SelectionKey.OP_READ);
}
private static void readHandler(SelectionKey selectionKey) throws IOException {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
channel.read(byteBuffer);
byteBuffer.flip();
System.out.println("client receive message: " + new String(byteBuffer.array()));
channel.close();
}
}
5. Reactor模式
- 5.1 傳統阻塞式
accpet() + new Thread(() -> {
// 業務處理
}).start();
- 5.2 單reactor單線程處理
new SocketServerChannel().registror(selector, SelectionKey.OP_ACCEPT);
while(true) {
selector.select();
seletor.selectedKeys().iterator();
while (iterator.hasnext()) {
// 業務處理
}
}
- 5.3 單reactor多線程處理
在上面業務處理部分加入多線程。
- 5.4 多reactor多線程處理
Netty,兩個Grop,一個處理accept,一個處理業務
回顧
IO
- inputStream/outputStream, reader/writer
- BufferedIutStream/BufferedReader(readLine()) 需要flush(), FileInputStream/FileReader
- RandomAccessFile(File/路徑, "rw"),File,FileChannle.open(Paths, StrandardOpenOption)
- Socket -> BIO + 多線程
- NewIO(No Block IO),基於Buffer + Channel
- ServerSocketChannle.open().bind().configurBlocking(false)
- Selector.open()
- serverSocketChannle.registor(selector, SelectionKey.ACCEPT)
- selector.select()
- selector.selectionKeys()
- 阻塞(I/O阻塞,連接阻塞)
- epool(多路[多個Channle注冊到selector上] 復用[一個或少量的線程])
- Netty框架,后續維護
- 零拷貝(Kafka, rocketMQ),內存映射(mmap)

