1. BIO
JDK5之前, JDK的IO模式只有BIO(同步阻塞)
問題: 因為阻塞的存在, 需對每個請求開啟一個線程. 過多的線程切換影響操作系統性能
解決: 使用線程池, 處理不過來的放入隊列, 再處理不過來的會觸發其他機制
問題: 超過線程池數量的請求需要等待
public class Client {
final static String ADDRESS = "127.0.0.1";
final static int PORT = 8765;
public static void main(String[] args) throws IOException {
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket(ADDRESS, PORT);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true); // true自動flush
//向服務器端發送數據
out.println("來自客戶端的請求");
//從服務端接收數據
String response = in.readLine(); // 阻塞
System.out.println("Client獲取數據: " + response);
} catch (Exception e) {
e.printStackTrace();
} finally {
out.close();
in.close();
socket.close();
}
}
}
服務端1: 一個請求~一個線程
public class Server {
final static int PROT = 8765;
public static void main(String[] args) throws IOException {
ServerSocket server = null;
try {
server = new ServerSocket(PROT);
System.out.println("server start");
while(true){
Socket socket = server.accept(); //監聽 阻塞 , socket底層會新建線程處理與客戶端的三次握手
//建立線程處理獲取的 socket
new Thread(new ServerHandler(socket)).start();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
server.close();
}
}
}
class ServerHandler implements Runnable {
private Socket socket;
public ServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String body = null;
while (true) {
body = in.readLine(); // 阻塞
if (body == null)
break;
System.out.println("Server獲取的請求: " + body);
out.println("來自服務器的響應");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
out.close();
in.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
服務端2: 用線程池處理請求
public class Server {
final static int PORT = 8765;
public static void main(String[] args) throws IOException {
ServerSocket server = null;
try {
server = new ServerSocket(PORT);
System.out.println("server start");
HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000);
while(true){
Socket socket = server.accept();
executorPool.execute(new ServerHandler(socket));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
server.close();
}
}
}
class HandlerExecutorPool {
private ExecutorService executor;
public HandlerExecutorPool(int maxPoolSize, int queueSize){
this.executor = new ThreadPoolExecutor( // 帶阻塞隊列的線程池
Runtime.getRuntime().availableProcessors(), // 初始線程數
maxPoolSize, // 線程數上限 如果要處理請求的Runnable對象裝滿了隊列, 則提高現有線程數
120L, // 如在120個時間顆粒內某線程是空閑的, 將被回收
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize) // 存放處理請求的Runnable對象
);
}
public void execute(Runnable task){
this.executor.execute(task);
}
}
class ServerHandler implements Runnable {
private Socket socket;
public ServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String body = null;
while (true) {
body = in.readLine();
if (body == null)
break;
System.out.println("Server獲取的請求: " + body); // 阻塞
out.println("來自服務器的響應");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
out.close();
in.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2.NIO1.0
JDK5以后引入了NIO1.0(多路復用機制)
伴隨多路復用在程序中引入了如下概念:
Channel(通道):TCP連接的抽象,一個TCP連接對應多個Channel,這樣減少TCP的連接次數。
通道與BIO中socket類似
通道與BIO中的流類似, 不過channel是雙向的而流是單向的
channel有多種狀態位, 能被selector識別
Buffer(緩沖區):
緩沖區是一塊內存區域(數組), 在NIO中被包裝成Buffer對象. Buffer提供方法用來訪問該內存。
BIO中,數據存儲在流中,而NIO中,數據存儲在緩沖區中。
除了boolean的其他java七種基本類型都有相應的Buffer類. 最常使用的是ByteBuffer
Selector(多路復用器):負責輪詢所有注冊通道,根據通道狀態執行相關操作。狀態包括:Connect,Accept,Read,Write。
在"四種常用IO模型"里提過用select系統調用實現IO多路復用. 除select外Linux還提供了poll/epoll函數, 其中select/poll函數按順序掃描文件句柄是否就緒,支持的文件句柄數有限; 而epoll使用基於事件驅動方式替代順序掃描,性能更高, 對文件句柄數沒有數量限制. JDK的Selector使用了epoll, 只需要一個線程輪詢, 就可以接入大量的客戶端.
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = null;
ByteBuffer writeBuf = ByteBuffer.allocate(1024);
ByteBuffer readBuf = ByteBuffer.allocate(1024);
try {
//創建通道
sc = SocketChannel.open();
//進行連接
sc.connect(new InetSocketAddress("127.0.0.1", 8765));
// 下面步驟可以用selector輪詢代替
while(true){
//定義一個字節數組,然后使用系統錄入功能:
byte[] bytes1 = new byte[1024];
System.in.read(bytes1); //阻塞
//把數據放到緩沖區中
writeBuf.put(bytes1);
//對緩沖區進行復位
writeBuf.flip();
//寫出數據
sc.write(writeBuf);
//清空緩沖區
writeBuf.clear();
// 接收服務端響應
sc.read(readBuf);
readBuf.flip();
byte[] bytes2 = new byte[readBuf.remaining()];
readBuf.get(bytes2);
readBuf.clear();
String body = new String(bytes2);
System.out.println("Client獲取數據: " + body);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
sc.close();
}
}
}
通過改變Selector監聽Channel的狀態位, 控制與客戶端讀寫的先后順序
public class Server implements Runnable{
private Selector seletor;
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
public Server(int port){
try {
//1 創建多路復用器selector
this.seletor = Selector.open();
//2 創建ServerSocket通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//3 設置通道是否阻塞, 決定了通道了read/write/accept/connect方法是否阻塞
ssc.configureBlocking(false);
//4 設置通道地址
ssc.bind(new InetSocketAddress(port));
//5 將ServerSocket通道注冊到selector上, 指定監聽其accept事件
ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
System.out.println("Server start");
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
try {
// select阻塞, 監聽相關事件
this.seletor.select();
// 解除阻塞, 返回選擇key, key含有通道, 狀態等信息
Iterator<SelectionKey> keysIter = this.seletor.selectedKeys().iterator();
// 進行遍歷
while(keysIter.hasNext()){
SelectionKey key = keysIter.next();
keysIter.remove();
if (key.isValid()) {
// 等待接收連接狀態
if (key.isAcceptable()) {
accept(key);
}
// 可讀狀態
if (key.isReadable()) {
read(key);
}
if (key.isWritable()) {
write(key);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void write(SelectionKey key) {
try {
// 獲取通道
SocketChannel sc = (SocketChannel) key.channel();
// 寫回給客戶端數據
writeBuf.put("來自服務器的響應".getBytes());
writeBuf.flip();
sc.write(writeBuf);
writeBuf.clear();
// 修改監聽的狀態位, 如果保持OP_WRITE會導致重復寫
key.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
private void read(SelectionKey key) {
try {
// 獲取通道
SocketChannel sc = (SocketChannel) key.channel();
// 讀取數據, 讀到buffer. 按程序運行順序, 這里sc是否設置為阻塞效果都一樣
int count = sc.read(this.readBuf); // readBuf寫時會改變position的值
if (count == -1) {
key.channel().close();
key.cancel(); //取消該通道在selector的注冊, 之后不會被select輪詢到
return;
}
// 有數據則進行讀取. 讀取前需要將position和limit進行復位
readBuf.flip();
// 根據緩沖區的數據長度創建相應大小的byte數組, 接收緩沖區的數據
byte[] bytes = new byte[this.readBuf.remaining()];
// 接收緩沖區數據
readBuf.get(bytes);
readBuf.clear();
String body = new String(bytes).trim();
System.out.println("Server獲取的請求: " + body);
// 如果保持OP_READ會導致重復讀
sc.register(this.seletor, SelectionKey.OP_WRITE);
} catch (IOException e) {
e.printStackTrace();
}
}
private void accept(SelectionKey key) {
try {
// 獲取服務通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
// 獲取客戶端通道.
SocketChannel sc = ssc.accept();
// 設置非阻塞模式
sc.configureBlocking(false);
// 將客戶端通道注冊到多路復用器上,指定監聽事件
sc.register(this.seletor, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(new Server(8765)).start();;
}
}
BIO客戶端與NIO服務端通信需注意的:
BIO服務端, 一次IO有明確的結束點, 客戶端再次read會返回-1
NIO服務端一次IO結束后, 沒有關閉通道, 它可能把通道從讀狀態轉為寫狀態. 於是selector不監聽讀了, 客戶端再次read什么都沒返回, 就會阻塞.
3.NIO2.0
JDK7引入了NIO2.0(即AIO)
NIO1.0中, IO過程沒有阻塞, 阻塞被轉移到了Selector輪詢上. Selector管理所有的Channel, 因此能把總阻塞時間縮到最短.
NIO2.0中, 供我們調用的IO API都是非阻塞的, 背后復雜的實現過程(肯定有阻塞)被轉移到了JDK底層和操作系統上. 我們的程序的IO調用可以做到立即返回.
同樣有Channel和Buffer, 但沒有Selector
public class Server {
//線程池
private ExecutorService executorService;
//異步通道線程組
private AsynchronousChannelGroup threadGroup;
//服務器通道
public AsynchronousServerSocketChannel assc;
public Server(int port){
try {
//創建一個線程池
executorService = Executors.newCachedThreadPool();
//使用線程池創建異步通道線程組, 該線程組在底層支持着我們的異步操作
threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
//使用 異步通道線程組 創建服務器通道
assc = AsynchronousServerSocketChannel.open(threadGroup);
//給通道綁定端口
assc.bind(new InetSocketAddress(port));
System.out.println("server start");
// 下面的accept不會阻塞 , 一個accept只能接收一個連接請求
// accept第一個參數: 被綁定到IO操作的關聯對象(子類), 第二個參數 CompletionHandler<AsynchronousSocketChannel, 關聯對象(父類)>, 操作成功后執行的回調句柄
// 如果接受了一個新的連接, 其結果AsynchronousSocketChannel會被綁定與assc通道到相同的AsynchronousChannelGroup
assc.accept(this, new ServerCompletionHandler());
// 這里為了避免程序結束, 異步通道線程組結束就不會執行回調了
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Server(8765);
}
}
//第一個參數: IO操作結果; 第二個參數: 被綁定到IO操作的關聯對象
public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {
// 以下兩個重載參數與CompletionHander的模板參數一致, 回調時被傳入IO結果和IO操作時設置的關聯對象
@Override
public void completed(AsynchronousSocketChannel asc, Server attachment) {
// 完成當前連接時, 首先, 為下一個客戶端能接入再次調用accept異步方法
attachment.assc.accept(attachment, this);
// 其次, 執行下一步的讀操作
read(asc);
}
@Override
public void failed(Throwable exc, Server attachment) {
exc.printStackTrace();
}
private void read(final AsynchronousSocketChannel asc) {
//讀取數據
ByteBuffer buf = ByteBuffer.allocate(1024);
// 第一個參數: 讀操作的Buffer, 第二個參數: IO關聯對象, 第三個參數:CompletionHandler<Integer, IO管理對象父類>
asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer resultSize, ByteBuffer attachment) {
//進行讀取之后,重置標識位
attachment.flip();
//獲得讀取的字節數
System.out.println("Server端" + "收到客戶端的數據長度為:" + resultSize);
//獲取讀取的數據
String resultData = new String(attachment.array()).trim();
System.out.println("Server端" + "收到客戶端的數據信息為:" + resultData);
String response = "From服務端To客戶端: 於" + new Date() + "收到了請求數據"+ resultData;
write(asc, response);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
private void write(AsynchronousSocketChannel asc, String response) {
try {
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put(response.getBytes());
buf.flip();
// 寫操作, 異步
Future<Integer> future = asc.write(buf);
// 阻塞等待結果
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public class Client {
private AsynchronousSocketChannel asc ;
public Client() throws Exception {
asc = AsynchronousSocketChannel.open();
}
public void connect() throws InterruptedException, ExecutionException{
// get()阻塞
asc.connect(new InetSocketAddress("127.0.0.1", 8765)).get();
}
public void write(String request){
try {
// get()阻塞
asc.write(ByteBuffer.wrap(request.getBytes())).get();
read();
} catch (Exception e) {
e.printStackTrace();
}
}
private void read() throws IOException {
ByteBuffer buf = ByteBuffer.allocate(1024);
try {
// get()阻塞
asc.read(buf).get();
buf.flip();
byte[] respByte = new byte[buf.remaining()];
buf.get(respByte);
System.out.println(new String(respByte,"utf-8").trim());
// 關閉
asc.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
Client c1 = new Client();
Client c2 = new Client();
c1.connect();
c2.connect();
c1.write("aa");
c2.write("bbb");
}
}

