java I/O
I/O模型
前置知識
- 什么是同步與異步?
- 什么是阻塞與非阻塞?
- 什么是阻塞I/O與非阻塞I/O?
- 什么是同步I/O與異步I/O?
什么是同步與異步?
同步是指多個任務一起執行時,任務必須逐個完成,一個任務執行時會導致其他任務和整個流程的暫時等待。
異步是指多任務同時執行,不會導致其他任務或者整個流程處於暫停狀態。
同步和異步的區別就是,執行多個任務時,一個任務的執行會不會導致其他任務的暫時暫停。
什么是阻塞與非阻塞?
阻塞:當一個任務執行時且執行的條件不滿足的時候會一直等待直到條件的滿足。
非阻塞:當一個任務執行且條件不滿足時會返回一個指示標志(告知條件不滿足),而不是一直等待。
阻塞與非阻塞的區別就是:當一個任務執行且條件不滿足時,是一直等待還是返回一個指示標志。
什么是阻塞I/O與非阻塞I/O?
i/o操作分為兩個步驟:
- 檢查數據是否就緒
- 內核將數據拷貝到線程中
阻塞I/O和非阻塞I/O區別就在第一步,阻塞I/O檢查到數據未就緒就會一直等待知道數據准備就緒。而非阻塞I/O遇到數據為就緒就返回一個指示標志,告訴線程數據沒有准備就緒。
什么是同步I/O與異步I/O?
同步I/O和異步I/O的區別就在於第二個步驟,同步I/O第二個步驟是線程完成的(會使當前線程阻塞,去完成I/O操作,把數據從內核拷貝到線程),而異步I/O第二個步驟是由內核完成的。
一個同步I/O操作會導致線程被阻塞,直到I/O操作完成。
而一個異步I/O操作不會導致發出請求的線程進入阻塞狀態。
5種I/O模型
阻塞I/O模型
線程提出I/O請求之后,數據如果沒有就緒那么線程就會交出CPU,進入阻塞狀態,數據准備完畢后再將數據從內核態復制給線程,線程才進入就緒狀態。

非阻塞I/O模型
線程發出I/O請求之后,數據沒有准備就緒,會返回一個指示標志發告訴線程數據沒有准備就緒,但是線程不會交出CPU,而是不斷地從用戶帶切換到內核狀態,不斷地詢問數據是否准備就緒。這樣會很浪費CPU時間。

I/O復用模型
I/O復用模型阻塞與select調用,select可以監聽多個連接,當一個連接中的數據就緒的時候,再調用read操作將數據從內核復制給線程。不一定所有的情況都是I/O復用模型比阻塞式I/O快,因為I/O復用模型要進行兩次系統調用,但是i/o復用模型的優勢在於同時監聽多個連接.


信號驅動I/O
在信號驅動IO模型中,當用戶線程發起一個IO請求操作,會給對應的socket注冊一個信號函數,然后用戶線程會繼續執行,當內核數據就緒時會發送一個信號給用戶線程,用戶線程接收到信號之后,便在信號函數中調用IO讀寫操作來進行實際的IO請求操作。

異步IO模型
異步IO模型才是最理想的IO模型,在異步IO模型中,當用戶線程發起read操作之后,立刻就可以開始去做其它的事。而另一方面,從內核的角度,當它受到一個asynchronous read之后,它會立刻返回,說明read請求已經成功發起了,因此不會對用戶線程產生任何block。然后,內核會等待數據准備完成,然后將數據拷貝到用戶線程,當這一切都完成之后,內核會給用戶線程發送一個信號,告訴它read操作完成了。也就說用戶線程完全不需要實際的整個IO操作是如何進行的,只需要先發起一個請求,當接收內核返回的成功信號時表示IO操作已經完成,可以直接去使用數據了。
也就說在異步IO模型中,IO操作的兩個階段都不會阻塞用戶線程,這兩個階段都是由內核自動完成,然后發送一個信號告知用戶線程操作已完成。用戶線程中不需要再次調用IO函數進行具體的讀寫。這點是和信號驅動模型有所不同的,在信號驅動模型中,當用戶線程接收到信號表示數據已經就緒,然后需要用戶線程調用IO函數進行實際的讀寫操作;而在異步IO模型中,收到信號表示IO操作已經完成,不需要再在用戶線程中調用iO函數進行實際的讀寫操作。異步IO是需要操作系統的底層支持,在Java 7中,提供了Asynchronous IO。

五種I/O模型的比較

由圖片可知。前面4種I/O模型 阻塞i/o 非阻塞i/o i/o復用 信號驅動i/o 都是同步的。
java_BIO
簡介
java的傳統IO都屬於BIO,也就是同步阻塞式I/O模型,在BIO的模式下,注定了一個線程對應一個Client,當線程知道數據未准備就緒的時線程就會進入阻塞式狀態,這就是阻塞式IO。
編程實現
需求:使用多線程實現一個線程對應一個客戶端請求
Server端實現
public class Server {
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket(9999);
System.out.println("==服務端啟動==");
while (true) {
Socket socket = ss.accept();
//服務端創建一個單獨的線程來處理接受到的用戶連接請求
new ServerThreadReader(socket).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class ServerThreadReader extends Thread {
private Socket socket;
public ServerThreadReader(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
InputStream is = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg = null;
while ((msg = br.readLine()) != null) {
System.out.println("服務端接收到:" + msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Client端實現
public class Client {
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1", 9999);
OutputStream os = socket.getOutputStream();
PrintStream ps = new PrintStream(os, true);
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("請輸入消息:");
String msg = scanner.nextLine();
ps.println(msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
編程結果:
客戶端發出socket連接,服務端接收到后,也會創建一個socket並會創建一個線程來處理這個socket,這樣客戶端和服務端線程就可以通過這兩個socket進行通信了。
BIO模式下的缺點
- 一個客戶端對應一個線程,當用戶請求連接后,什么都不做,也需要開啟一個線程來服務一個客戶,造成資源的浪費。
- 當客戶多的時候,服務器會創建很多的線程進行服務,這樣會使得無服務器壓力太大,造成線程棧溢出,服務器宕機等問題
- 線程之間頻繁的切換上下文影響了服務器的性能
線程池技術防止線程棧溢出
簡介
服務端接收到的用戶請求,把請求接收到的socket對象封裝成Runnable對象,會先把Runnable放到任務隊列中,如果線程池中有空閑線程則處理任務隊列中的任務,沒有空閑線程則任務隊列中的任務則需要等待。
編程實現
線程池
public class HandleSocketThreadPool {
//線程池
ExecutorService executorService;
public HandleSocketThreadPool(int maxPoolSize, int queueSize) {
/*
corePoolSize - 即使空閑時仍保留在池中的線程數
maximumPoolSize - 池中允許的最大線程數
keepAliveTime - 當線程數大於核心時,這是多余的空閑線程在終止之前等待新任務的最大時間。
unit - keepAliveTime參數的時間單位
workQueue - 在執行任務之前用於保存任務的隊列。 該隊列將僅保存execute方法提交的Runnable任務。
*/
this.executorService = new ThreadPoolExecutor(3,maxPoolSize,120, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(Runnable target){
executorService.execute(target);
}
}
Server
public class Server {
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket(9999);
HandleSocketThreadPool threadPool = new HandleSocketThreadPool(3, 4);
int count = 1;
while (count < 100) {
Socket socket = ss.accept();
System.out.println("第 " + count + " 個客戶端發起請求");
count++;
//使用線程池接收這個socket
/*
1. 先把socket包裝成一個Runnable對象
2. 把這個runnable對象交給線程池管理
*/
ReadClientRunnable runnable = new ReadClientRunnable(socket);
threadPool.execute(runnable);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class ReadClientRunnable implements Runnable{
private final Socket socket;
public ReadClientRunnable(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
InputStream is = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg = null;
while ((msg = br.readLine()) != null) {
System.out.println("服務端接收到:" + msg);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
總結
- 使用線程池技術處理用戶請求,即使用戶請求過多也不會導致線程棧溢出,但是本質沒變,同一時間還是一個線程處理一個請求。還是同步阻塞模型。
java_NIO
基礎知識
java實現NIO主要靠三個類:Buffer, Channel, Selector。Buffer是存放數據的地方,channel是數據傳輸通道不存放數據,通過與buffer交互(從buffer中讀取數據寫入通道或者寫入數據到buffer),selector是管理多個通道的。
Buffer
簡介
NIO中關鍵的Buffer有 CharBuffer, ByteBuffer, ShortBuffer, IntBuffer, LongBuffer, FloatBuffer, DoubleBuffer 對應七種基本數據類型。還有MappedByteBuffer, HeapByteBuffer, DirectByteBuffer等。
通過它們自己的靜態方法創建對象
static XxxBuffer allocate(int capacity) : 創建一個容量為capacity 的 XxxBuffer 對象
基本屬性
容量 (capacity) :作為一個內存塊,Buffer具有一定的固定大小,也稱為"容量",緩沖區容量不能為負,並且創建后不能更改。
限制 (limit):表示緩沖區中可以操作數據的大小(>=limit 的位置數據不能進行讀寫)。緩沖區的限制不能為負,並且不能大於其容量。 寫入模式,限制等於buffer的容量。讀取模式下,limit等於寫入的數據量。
位置 (position):下一個要讀取或寫入的數據的索引。緩沖區的位置不能為 負,並且不能大於其限制
標記 (mark)與重置 (reset):標記是一個索引,通過 Buffer 中的 mark() 方法 指定 Buffer 中一個特定的 position,之后可以通過調用 reset() 方法恢復到這 個 position.
**圖解屬性: **

常見方法
Buffer clear() 清空緩沖區並返回對緩沖區的引用
Buffer flip() 為 將緩沖區的界限設置為當前位置,並將當前位置充值為 0
int capacity() 返回 Buffer 的 capacity 大小
boolean hasRemaining() 判斷緩沖區中是否還有元素
int limit() 返回 Buffer 的界限(limit) 的位置
Buffer limit(int n) 將設置緩沖區界限為 n, 並返回一個具有新 limit 的緩沖區對象
Buffer mark() 對緩沖區設置標記
int position() 返回緩沖區的當前位置 position
Buffer position(int n) 將設置緩沖區的當前位置為 n , 並返回修改后的 Buffer 對象
int remaining() 返回 position 和 limit 之間的元素個數
Buffer reset() 將位置 position 轉到以前設置的 mark 所在的位置
Buffer rewind() 將位置設為為 0, 取消設置的 mark
clear和compact的區別:
clear會把position的位置設置為0,limit設置為capacity。但是原來的數據不會被清空,只是有新數據來的時候會被覆蓋。
compact是會保留未被讀取的數據,將未被讀取的數據往前挪,position指向未被讀取的最后一個數據的下一個位置。limit會被設置為capacity。
讀寫方法
Buffer 所有子類提供了兩個用於數據操作的方法:get()put() 方法
取獲取 Buffer中的數據
get() :讀取單個字節
get(byte[] dst):批量讀取多個字節到 dst 中
get(int index):讀取指定索引位置的字節(不會移動 position)
放到 入數據到 Buffer 中 中
put(byte b):將給定單個字節寫入緩沖區的當前位置
put(byte[] src):將 src 中的字節寫入緩沖區的當前位置
put(int index, byte b):將指定字節寫入緩沖區的索引位置(不會移動 position)
讀寫Buffer的時候遵循的規則
- 讀之前先使用 buffer.flip() 方法切換成讀模式
- 讀取之后調用 clear() 或者 compact()
Channel
簡介
Channel可以理解為數據傳輸的通道,Channel和IO中的Stream(流)是差不多一個等級的。只不過Stream是單向的,譬如:InputStream, OutputStream.而Channel是雙向的,既可以用來進行讀操作,又可以用來進行寫操作。
NIO中的Channel的主要實現有:
- FileChannel 通過字節流對象獲取
- DatagramChannel
- SocketChannel
- ServerSocketChannel
使用nio從客戶端傳入數據給服務端的過程如下
常用方法
int read(ByteBuffer dst) 從 從 Channel 到 中讀取數據到 ByteBuffer
long read(ByteBuffer[] dsts) 將 將 Channel 到 中的數據“分散”到 ByteBuffer[]
int write(ByteBuffer src) 將 將 ByteBuffer 到 中的數據寫入到 Channel
long write(ByteBuffer[] srcs) 將 將 ByteBuffer[] 到 中的數據“聚集”到 Channel
long position() 返回此通道的文件位置
FileChannel position(long p) 設置此通道的文件位置
long size() 返回此通道的文件的當前大小
FileChannel truncate(long s) 將此通道的文件截取為給定大小
void force(boolean metaData) 強制將所有對此通道的文件更新寫入到存儲設備中
案例
使用Channel和Buffer進行文件的復制
public void copyFile() {
try {
FileInputStream fis = new FileInputStream("src//com//yogurt//nio//b_nio_channel//壁紙.jpg");
FileOutputStream fos = new FileOutputStream("src//com//yogurt//nio//b_nio_channel//壁紙new.jpg");
FileChannel fisChannel = fis.getChannel();
FileChannel fosChannel = fos.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
while ((fisChannel.read(buffer)) != -1) {
//轉換成讀模式
buffer.flip();
fosChannel.write(buffer);
//這里一定要清空緩存區
buffer.clear();
}
fosChannel.close();
fisChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
selector
簡介
selector是單個線程可以處理多個Channel的關鍵,他會把channel和該channel感興趣的事件注入到selector中,調用select() 進入阻塞狀態,直到有channel感興趣的事件准備就緒就返回就緒事件的個數,然后線程進入就緒狀態,等待處理。
方法
通過靜態方法open()創建對象
Selector selector = Selector.open();
向選擇器注冊通道:SelectableChannel.register(Selector sel, int ops)
//1. 獲取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2. 切換非阻塞模式
ssChannel.configureBlocking(false);
//3. 綁定連接
ssChannel.bind(new InetSocketAddress(9999));
//4. 獲取選擇器
Selector selector = Selector.open();
//5. 將通道注冊到選擇器上, 並且指定“監聽接收事件”
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
當調用 register(Selector sel, int ops) 將通道注冊選擇器時,選擇器對通道的監聽事件,需要通過第二個參數 ops 指定。可以監聽的事件類型(用 可使用 SelectionKey 的四個常量 表示):
- 讀 : SelectionKey.OP_READ (1)
- 寫 : SelectionKey.OP_WRITE (4)
- 連接 : SelectionKey.OP_CONNECT (8)
- 接收 : SelectionKey.OP_ACCEPT (16)
- 若注冊時不止監聽一個事件,則可以使用“位或”操作符連接。
int interestSet = SelectionKey.OP_READ|SelectionKey.OP_WRITE
案例演示流程
服務端流程
public class Server {
public static void main(String[] args) throws IOException {
//1. 創建ServerSocketChannel 用來綁定端口和接收客戶端的請求
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2. 配置成阻塞模式
ssChannel.configureBlocking(false);
//3. 綁定端口
ssChannel.bind(new InetSocketAddress(9999));
System.out.println("==服務端准備就緒==");
//4. 創建selector
Selector selector = Selector.open();
//5. 把ssChanel注冊到selector中,定義事件類型為接受事件
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
//6. 輪詢selector處理就緒事件
while (selector.select() > 0){
//獲取到所有准備好的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
System.out.println("有新事件准備就緒了~~");
//輪詢就緒事件
while (iterator.hasNext()) {
//獲取就緒事件
SelectionKey selectionKey = iterator.next();
//判斷事件類型是否是接收
if (selectionKey.isAcceptable()) {
//通過ssChannel獲取客戶端的連接
SocketChannel channel = ssChannel.accept();
//轉換成非阻塞模式
channel.configureBlocking(false);
//把客戶端的連接注冊到selector中,注冊成讀操作
channel.register(selector,SelectionKey.OP_READ);
//判斷是否是客戶端發數據過來了
}else if (selectionKey.isReadable()){
//獲取和客戶端對應的channel
SocketChannel channel = (SocketChannel)selectionKey.channel();
//讀取數據
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len;
//channel向緩沖區寫入數據
while ((len = channel.read(buffer)) > 0) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, len));
buffer.clear();
}
}
//清空事件,表示就緒事件都已經處理好了
iterator.remove();
}
}
}
}
客戶端流程
public class Client {
public static void main(String[] args) throws IOException {
//創建SocketChannel,會被selector輪詢到,然后被ssChanel.accept接收注冊進selector
SocketChannel channel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
channel.configureBlocking(false);
//創建Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String msg = scanner.nextLine();
buffer.put(("iandf say :" + msg).getBytes());
buffer.flip();
//從buffer中讀取數據到客戶端channel
channel.write(buffer);
buffer.clear();
}
}
}
組合案例~群聊
使用NIO實現群聊
public class Server {
private ServerSocketChannel ssChannel;
private Selector selector;
private final int PORT = 9999;
public Server() {
try {
//初始化
selector = Selector.open();
ssChannel = ServerSocketChannel.open();
//定義為非阻塞模式
ssChannel.configureBlocking(false);
//綁定端口號
ssChannel.bind(new InetSocketAddress(PORT));
//注冊感興趣的事件
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
/*
需求:
監聽客戶端的請求
接收客戶端發過來的消息,轉發給所有其他的客戶端
*/
public static void main(String[] args) {
Server server = new Server();
server.listen();
}
private void listen() {
try {
System.out.println("監聽線程: " + Thread.currentThread().getName());
//讓selector進行輪詢
while (selector.select() > 0) {
//拿到所有的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//如果事件是客戶端有請求連接服務端
if (selectionKey.isAcceptable()) {
SocketChannel channel = ssChannel.accept();
channel.configureBlocking(false);
System.out.println(channel.getRemoteAddress() + " 上線 ");
channel.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
System.out.println("收到客戶端發送數據的事件");
handleRead(selectionKey);
}
}
//表示本次selector監聽到的事件都處理完成了
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 通過監聽到的事件拿到數據
* @param selectionKey 監聽到的事件
*/
private void handleRead(SelectionKey selectionKey) {
//拿到對應的channel
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
//將channel中的數據讀入buffer
int len = channel.read(buffer);
if (len > 0) {
buffer.flip();
String msg = new String(buffer.array(), 0, len);
System.out.println(msg);
//發送消息給所有的客戶
sendMsgToAll(msg, channel);
buffer.clear();
}
} catch (IOException e) {
try {
System.out.println(channel.getRemoteAddress() + " 關閉了連接");
//取消這次事件
selectionKey.cancel();
//關閉通道
channel.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
e.printStackTrace();
}
}
/**
* 把 msg 發送給其他的客戶端
* @param msg 從當前通道里面獲取的消息
* @param self 當前通道
*/
private void sendMsgToAll(String msg, SocketChannel self) throws IOException {
System.out.println("准備發送該消息給所有用戶");
System.out.println(selector.keys().size());
for (SelectionKey selectedKey : selector.keys()) {
SelectableChannel channel = selectedKey.channel();
if (channel instanceof SocketChannel && channel != self) {
//從buffer中向通道里面寫出數據
SocketChannel dest = (SocketChannel) channel;
System.out.println(dest.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
dest.write(buffer);
}
}
}
}
public class Client {
private Selector selector;
private SocketChannel channel;
private final String ADDRESS = "127.0.0.1";
private final int PORT = 9999;
public Client() {
try {
selector = Selector.open();
//綁定服務端的地址和端口,發送請求
channel = SocketChannel.open();
channel.connect(new InetSocketAddress(ADDRESS, PORT));
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Client client = new Client();
//開啟一個線程,接收服務端發過來的消息
new Thread(client::readInfo).start();
//從控制台讀取數據發送給服務端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
//向buffer中寫入數據
try {
System.out.println(ByteBuffer.wrap((Thread.currentThread().getName() + msg).getBytes()));
//buffer寫出數據到channel
client.channel.write(ByteBuffer.wrap((client.channel.getLocalAddress().toString().substring(1) + " say: " + msg).getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void readInfo() {
try {
while (selector.select() > 0) {
System.out.println("有數據來了");
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
//申請緩沖區
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
//將channel里面的數據讀取到buffer
System.out.println(new String(buffer.array()));
}
}
iterator.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
NIO和BIO的思考
BIO的優點:代碼簡單,清晰,易於維護,適用於連接數目較小的場景。
BIO的缺點:一個線程對應一個連接,當數據未就緒時就會進入線程阻塞狀態,直到數據拷貝到線程,線程才會進入就緒狀態。這樣即使有的客戶連接進入服務器什么也不做就等於大大浪費了服務器的資源,且沒辦法適應於大規模的並發場景。隨着客戶數量的增多會發生線程棧的溢出,進而可能導致服務器宕機等。
NIO的優點:使用I/O復用模型,使一個線程就能同時處理多個連接,不會導致一個連接的數據未就緒就導致線程阻塞,內核會輪詢每一個連接,直到有感興趣的事件發生,內核會通知線程,線程進行系統調用進入阻塞狀態,知道數據從內核拷貝到線程,線程就進入就緒狀態。適用於多連接且I/O操作不耗時的場景
NIO的缺點:一個線程處理多個連接,當一個連接的I/O操作耗時很長的情況下,其他的連接可能遲遲都無法得到響應。而且NIO的代碼不易懂,難以維護和升級,可以使用已經封裝好的框架,比如Netty。