從BIO到Netty的演變
前言
計算機網絡可以說是每個學計算機的都繞不過去的一道坎。計算機網絡到底有多么重要,你走到大學圖書館的計算機部分,翻開那些什么《從零開始:黑客XXX》,《黑客攻防從入門到放棄》等書籍,基本第一部分都是在談論網絡。你去一些X客論壇,上面的教程帖也基本都是從網絡部分開始的。
相信每一位科班出身的,都學習過《計算機網絡》這樣書籍, 上過這樣的課程。當然教師資源如何,我這里就不談論,那一定又會引出一頓苦水。但是學習完這樣的課程,我們還是對計算機網絡感到十分迷茫。這時候的我們可以背下網絡七層模型,網絡五層模型等,了解局域網,IP等基本地概念,但是計算機網絡對於我們來說,還是一個十分空盪盪的名詞。
為了更好地了解網絡(絕對不是因為那時候很迷黑客的緣故),我決定參加高級網絡工程師的考試。通過網絡工程師的我對計算機網絡有了更為深入的理解,開始將自己的計算機網絡體系從概念上勾連起來。也許我可以看懂其中的一些路由規則,甚至看懂一些路由分發的論文。但是我依舊只是站在理論的角度,去理解計算機網絡。
到了工作的時候,開始了解Socket編程,開始參與各種實際生產環境的編程。這個時候的我開始對網絡有了雖然簡單,但是十分真實的接觸。計算機網絡不再只是停留在書本中的概念,而是我用以實現業務目標的切實手段。
隨着工作中開始負責物聯網項目的建設,我對網絡中的各種協議開始有了自己的認識,甚至可以自己實現網絡協議規范的代碼落地。於此同時,由於對網絡交互的性能要求,我不再只是會使用BIO網絡編程,我開始使用NIO網絡編程。
為了自己的知識儲備,也是為了滿足自己的好奇心,我查找了許多的資料,也報了許多課程,去學習網絡編程。而我正好周六完成了軟考的又一次考試,所以接下來有一定空閑時間的我,接下來會繼續整理我的知識,並將它寫成博客。
這篇博客的主要內容就是按照演變的順序,寫下BIO->NIO->Reactor->Netty這樣的四個里程碑。這也是大佬們推薦的計算機網絡編程的學習路線。不過這次只是給個整體的認識以及demo,更為深入的原理探究,會放在后面。
BIO
介紹
幾乎每個人都是BIO開始的計算機網絡編程,而其中大部分也永遠地留在了這個計算機網絡編程的模型。
優點
- 理解簡單
- 實現簡單
- 要求較低
缺點
- 性能低
- 瓶頸低
- 擴展難
代碼示例(BIO下TCP)
這里給出一些簡單的demo,供大家認識。
BIO_Client
package tech.jarry.learning.netease;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.Scanner;
/**
* @Description:
* @Author: jarry
*/
public class BIOClient {
private static final Charset charset = Charset.forName("utf-8");
public static void main(String[] args) throws IOException {
Socket socket = new Socket();
// Socket socket = new Socket("localhost", 8080);
// 我還以為可以的。但是貌似上面的8080表示目標端口,而下面的8080表示源端口(發送端口)
// socket.bind(new InetSocketAddress("localhost", 8080));
// 后來才去確定,.bind是用於綁定源信息,而.connect是用於綁定目標信息
socket.connect(new InetSocketAddress(Inet4Address.getLocalHost(), 8080));
OutputStream outputStream = socket.getOutputStream();
Scanner scanner = new Scanner(System.in);
System.out.println("please input: ");
String msg = scanner.nextLine();
outputStream.write(msg.getBytes(charset));
scanner.close();
outputStream.close();
socket.close();
}
}
BIO_ServerV1
package tech.jarry.learning.netease;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @Description: BIO模型中Server端的簡單實現
* @Author: jarry
*/
public class BIOServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(8080));
System.out.println("server has started");
while (!serverSocket.isClosed()) {
Socket requestClient = serverSocket.accept();
System.out.println("server get a connection: " + requestClient.toString());
InputStream requestInputStream = requestClient.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(requestInputStream));
String msg;
while ((msg = reader.readLine()) != null) {
if (msg.length() == 0) {
break;
}
System.out.println(msg);
}
System.out.println("server has receive a message from: " + requestClient.toString());
requestInputStream.close();
requestClient.close();
}
serverSocket.close();
}
}
BIO_ServerV2
package tech.jarry.learning.netease;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Description: 直接對原有代碼BIOServer進行暴力修改,將其阻塞部分,通過多線程實現異步處理
* @Author: jarry
*/
public class BIOServer1 {
private static ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(8080));
System.out.println("server has started");
while (!serverSocket.isClosed()) {
Socket requestClient = serverSocket.accept();
System.out.println("server get a connection: " + requestClient.toString());
executorService.submit(new Runnable() {
@Override
public void run() {
InputStream requestInputStream = null;
try {
requestInputStream = requestClient.getInputStream();
} catch (IOException e) {
e.printStackTrace();
}
BufferedReader reader = new BufferedReader(new InputStreamReader(requestInputStream));
String msg = null;
while (true) {
try {
if (!((msg = reader.readLine()) != null)) {
break;
}
} catch (IOException e) {
e.printStackTrace();
}
if (msg.length() == 0) {
break;
}
System.out.println(msg);
}
System.out.println("server has receive a message from: " + requestClient.toString());
try {
requestInputStream.close();
requestClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
serverSocket.close();
}
/**
* 運行結果分析:
* server has started
* server get a connection: Socket[addr=/10.0.75.1,port=64042,localport=8080]
* server get a connection: Socket[addr=/10.0.75.1,port=64052,localport=8080]
* server get a connection: Socket[addr=/10.0.75.1,port=64061,localport=8080]
* 123
* server has receive a message from: Socket[addr=/10.0.75.1,port=64042,localport=8080]
* 456
* server has receive a message from: Socket[addr=/10.0.75.1,port=64052,localport=8080]
* 789
* server has receive a message from: Socket[addr=/10.0.75.1,port=64061,localport=8080]
*/
}
BIO_ServerV3
package tech.jarry.learning.netease;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @Description: 直接對原有代碼BIOServer進行暴力修改,增加了其http格式的返回,確保瀏覽器可以正常訪問
* @Author: jarry
*/
public class BIOServer2 {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(8080));
System.out.println("server has started");
while (!serverSocket.isClosed()) {
Socket requestClient = serverSocket.accept();
System.out.println("server get a connection: " + requestClient.toString());
InputStream requestInputStream = requestClient.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(requestInputStream));
String msg;
while ((msg = reader.readLine()) != null) {
if (msg.length() == 0) {
break;
}
System.out.println(msg);
}
System.out.println("server has receive a message from: " + requestClient.toString());
// 返回數據,並確保可以被http協議理解
OutputStream outputStream = requestClient.getOutputStream();
outputStream.write("HTTP/1.1 200 OK\r\r".getBytes("utf-8"));
outputStream.write("Content-Length: 11\r\n\r\n".getBytes("utf-8"));
outputStream.write("Hello World".getBytes("utf-8"));
outputStream.flush();
requestInputStream.close();
outputStream.close();
requestClient.close();
}
serverSocket.close();
}
/**
* 運行結果分析:
*/
// server has started
// server get a connection: Socket[addr=/0:0:0:0:0:0:0:1,port=63008,localport=8080]
// GET / HTTP/1.1
// Host: localhost:8080
// Connection: keep-alive
// Cache-Control: max-age=0
// Upgrade-Insecure-Requests: 1
// User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36
// Sec-Fetch-Mode: navigate
// Sec-Fetch-User: ?1
// Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3
// Sec-Fetch-Site: none
// Accept-Encoding: gzip, deflate, br
// Accept-Language: en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7
// Cookie: Webstorm-c7a2b5a2=b5e53f87-54cc-41d5-a21f-c7be3056dfe8; centcontrol_login_token=09E8A6B6888CB0B7A9F89AB3DB5FAFE4
// server has receive a message from: Socket[addr=/0:0:0:0:0:0:0:1,port=63008,localport=8080]
// server get a connection: Socket[addr=/0:0:0:0:0:0:0:1,port=63009,localport=8080]
// GET /favicon.ico HTTP/1.1
// Host: localhost:8080
// Connection: keep-alive
// Pragma: no-cache
// Cache-Control: no-cache
// Sec-Fetch-Mode: no-cors
// User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.90 Safari/537.36
// Accept: image/webp,image/apng,image/*,*/*;q=0.8
// Sec-Fetch-Site: same-origin
// Referer: http://localhost:8080/
// Accept-Encoding: gzip, deflate, br
// Accept-Language: en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7
// Cookie: Webstorm-c7a2b5a2=b5e53f87-54cc-41d5-a21f-c7be3056dfe8; centcontrol_login_token=09E8A6B6888CB0B7A9F89AB3DB5FAFE4
// server has receive a message from: Socket[addr=/0:0:0:0:0:0:0:1,port=63009,localport=8080]
}
上面的代碼是一套的,可以進行Server與Client的通信,功能較為簡單。
所以這里再給一個,可以進行通信的版本。簡單的業務場景可以直接修改,應用。
BIO2_Client
package self;
import java.io.*;
import java.net.*;
/**
* @Description:
* @Author: jarry
*/
public class Client {
public static void main(String[] args) throws IOException {
Socket socket = new Socket();
socket.setSoTimeout(2000);
socket.connect(new InetSocketAddress(Inet4Address.getLocalHost(),2000),2000);
System.out.println("client startup");
dealMsg(socket);
socket.close();
}
private static void dealMsg(Socket clientSocket) throws IOException {
// 1.獲取鍵盤輸入流
InputStream systemInputStream = System.in;
// 2.將systemInputStream轉化為具有緩存功能的字符輸入流BufferedReader
BufferedReader systemBufferedReader = new BufferedReader(new InputStreamReader(systemInputStream));
// 3.獲取Socket輸入流
InputStream socketInputStream = clientSocket.getInputStream();
// 4.將socketInputStream轉換為具有緩存能力的字符輸入流
BufferedReader socketBufferedReader = new BufferedReader(new InputStreamReader(socketInputStream));
// 5.獲取Socket輸出流
OutputStream socketOutputStream = clientSocket.getOutputStream();
// 6.將socketOutputStream轉換為打印流(用於發送String)
PrintStream socketPrintStream = new PrintStream(socketOutputStream);
// 用於確立連接狀態的標識符
boolean flag = true;
// 7.利用循環,client與server進行交互
do {
// 從鍵盤等系統輸入流獲取輸入字符串
String str = systemBufferedReader.readLine();
// 將str寫入到socketClient的打印流(本質是輸出流)。socketClient的輸出流是連接Server的,用於向Server發送數據的
socketPrintStream.println(str);
// 從Server獲得回寫(Server的回寫,一定會發送到socketClient的輸入流中(輸入的“入”是指入socketClient)
String echo = socketBufferedReader.readLine();
// 建立一個用於關閉的方式
if ("bye".equalsIgnoreCase(echo)){
flag = false;
}else{
// 在控制台打印server的echo
System.out.println("server echo:"+echo);
}
}while (flag);
// 8.退出交互,需要關閉與Server連接的兩個資源(輸入與輸出) 考慮一下lombok的@Cleanup
socketBufferedReader.close();
socketPrintStream.close();
}
}
BIO2_Server
package self;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @Description:
* @Author: jarry
*/
public class Server {
public static void main(String[] args) throws IOException {
// 建立Server的Socket,服務端不需要設置IP,以及Port
// IP采用本地IP
ServerSocket serverSocket = new ServerSocket(2000);
System.out.println("server startup");
// 通過循環,對client的請求進行監聽
while (true){
// 獲得client的請求
Socket clientRequest = serverSocket.accept();
// 異步處理client的請求
ClientHandler clientHandler = new ClientHandler(clientRequest);
clientHandler.start();
}
}
private static class ClientHandler extends Thread {
Socket socketClient = null;
boolean flag = true;
ClientHandler(Socket socketClient){
this.socketClient = socketClient;
}
@Override
public void run() {
super.run();
// 構建系統輸入流
InputStream systemInputStream = System.in;
// 將系統輸入流轉換為字符輸入流
BufferedReader systemBufferedReader = new BufferedReader(new InputStreamReader(systemInputStream));
try {
// 構建socketClient的輸入流(即客戶端中,寫入client輸出流的數據)
InputStream clientInputStream = socketClient.getInputStream();
// 將client的輸入流轉為具有存儲能力的BufferedReader
BufferedReader clientBufferedReader = new BufferedReader(new InputStreamReader(clientInputStream));
// 構建socketClient的輸出流(用於發送數據,即客戶端中,從client輸入流讀取的數據)
OutputStream clientOutputStream = socketClient.getOutputStream();
// 將client的輸出流轉換為打印流,便於輸出數據
PrintStream clientPrintStream = new PrintStream(clientOutputStream);
// 通過循環,與客戶端進行交互
do {
// 讀取從客戶端發送來的數據,即讀取socketClient的輸入流轉化的BufferedReader
String str = clientBufferedReader.readLine();
if ("bye".equalsIgnoreCase(str)){
flag = false;
clientPrintStream.println("connect interrupt");
}else{
System.out.println(str);
// 發送回寫數據,即將回寫數據寫入socketClient的輸出流(客戶端的輸入流會獲取相關數據)
clientPrintStream.println(str.length());
}
// 從系統輸入中獲取想要發送的數據
String servStr = systemBufferedReader.readLine();
// 發送到客戶端
clientPrintStream.println(servStr);
}while (flag);
// 同樣的,關閉連接資源
clientBufferedReader.close();
clientPrintStream.close();
} catch (IOException e) {
e.printStackTrace();
}finally {
// 無論發生什么,最后都要關閉socket連接
try {
socketClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
為了使得代碼結構更有優雅,並且為了更好地處理字符編碼問題(demo中保留了各種數據類型的處理方式)。我們將上述版本更新一下。
BIO2_ClientV2
package example;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
public class Client {
// 連接到遠程服務器的遠程端口
private static final int PORT = 20000;
// 本地端口
private static final int LOCAL_PORT = 20001;
public static void main(String[] args) throws IOException {
// 創建Socket的操作,可以選擇不同的創建方式
Socket socket = createSocket();
// Socket初始化操作
initSocket(socket);
// 鏈接到本地20000端口,超時時間3秒,超過則拋出超時異常
socket.connect(new InetSocketAddress(Inet4Address.getLocalHost(), PORT), 3000);
System.out.println("已發起服務器連接,並進入后續流程~");
System.out.println("客戶端信息:" + socket.getLocalAddress() + " P:" + socket.getLocalPort());
System.out.println("服務器信息:" + socket.getInetAddress() + " P:" + socket.getPort());
try {
// 發送接收數據
todo(socket);
} catch (Exception e) {
System.out.println("異常關閉");
}
// 釋放資源
socket.close();
System.out.println("客戶端已退出~");
}
/**
* 創建Socket
* @return
* @throws IOException
*/
private static Socket createSocket() throws IOException {
/*
// 無代理模式,等效於空構造函數
Socket socket = new Socket(Proxy.NO_PROXY);
// 新建一份具有HTTP代理的套接字,傳輸數據將通過www.baidu.com:8080端口轉發
Proxy proxy = new Proxy(Proxy.Type.HTTP,
new InetSocketAddress(Inet4Address.getByName("www.baidu.com"), 8800));
socket = new Socket(proxy);
// 新建一個套接字,並且直接鏈接到本地20000的服務器上
socket = new Socket("localhost", PORT);
// 新建一個套接字,並且直接鏈接到本地20000的服務器上
socket = new Socket(Inet4Address.getLocalHost(), PORT);
// 新建一個套接字,並且直接鏈接到本地20000的服務器上,並且綁定到本地20001端口上
socket = new Socket("localhost", PORT, Inet4Address.getLocalHost(), LOCAL_PORT);
socket = new Socket(Inet4Address.getLocalHost(), PORT, Inet4Address.getLocalHost(), LOCAL_PORT);
*/
// 推薦無參構造,因為其它(上面)的構造方法都是包含構造,設參,以及connect操作。而socket一旦connect后,設置參數的操作就無效了。不便於靈活使用
Socket socket = new Socket();
// 綁定到本地20001端口
socket.bind(new InetSocketAddress(Inet4Address.getLocalHost(), LOCAL_PORT));
return socket;
}
private static void initSocket(Socket socket) throws SocketException {
// 設置讀取超時時間為2秒
socket.setSoTimeout(2000);
// 是否復用未完全關閉的Socket地址,對於指定bind操作后的套接字有效(正常Socket關閉后,對應端口在兩分鍾內將不再復用。而這個設置將可以直接使用對應空置端口)
socket.setReuseAddress(true);
// 是否開啟Nagle算法(開啟后,兩點:第一,會對收到的每次數據進行ACK,另一端只有在接收到對應ACK,才會繼續發送數據。第二,如果有數據堆積,會一次將所有堆積數據發出去(畢竟這種模式有數據堆積是正常的)
// 開啟后,更為嚴謹,嚴格,安全(默認開啟)
socket.setTcpNoDelay(true);
// 是否需要在長時無數據響應時發送確認數據(類似心跳包),時間大約為2小時
socket.setKeepAlive(true);
// 對於close關閉操作行為進行怎樣的處理;默認為false,0
// false、0:默認情況,關閉時立即返回,底層系統接管輸出流,將緩沖區內的數據發送完成
// true、0:關閉時立即返回,緩沖區數據拋棄,直接發送RST結束命令到對方,並無需經過2MSL等待
// true、200:關閉時最長阻塞200毫秒,隨后按第二情況處理
socket.setSoLinger(true, 20);
// 是否讓緊急數據內斂,默認false;緊急數據通過 socket.sendUrgentData(1);發送
// 只有設置為true,才會暴露到上層(邏輯層)
socket.setOOBInline(true);
// 設置接收發送緩沖器大小
socket.setReceiveBufferSize(64 * 1024 * 1024);
socket.setSendBufferSize(64 * 1024 * 1024);
// 設置性能參數:短鏈接,延遲,帶寬的相對重要性(權重)
socket.setPerformancePreferences(1, 1, 0);
}
private static void todo(Socket client) throws IOException {
// 得到Socket輸出流
OutputStream outputStream = client.getOutputStream();
// 得到Socket輸入流
InputStream inputStream = client.getInputStream();
byte[] buffer = new byte[256];
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
// 等同於上兩行代碼(ByteBuffer是NIO提供的一個工具,allocate就是分配內存地址,ByteBuffer處理的是byte)
// ByteBuffer byteBuffer = ByteBuffer.allocate(256);
// 嘗試各種數據傳輸,發出
// byte
byteBuffer.put((byte) 126);
// char
char c = 'a';
byteBuffer.putChar(c);
// int
int i = 2323123;
byteBuffer.putInt(i);
// bool
boolean b = true;
byteBuffer.put(b ? (byte) 1 : (byte) 0);
// Long
long l = 298789739;
byteBuffer.putLong(l);
// float
float f = 12.345f;
byteBuffer.putFloat(f);
// double
double d = 13.31241248782973;
byteBuffer.putDouble(d);
// String
String str = "Hello你好!";
byteBuffer.put(str.getBytes());
// 發送到服務器(長度等於index+1)
outputStream.write(buffer, 0, byteBuffer.position() + 1);
// 接收服務器返回
int read = inputStream.read(buffer);
System.out.println("收到數量:" + read);
// 資源釋放
outputStream.close();
inputStream.close();
}
/**
* 擴展-MSL
* MSL是Maximum Segment Lifetime的英文縮寫,可譯為“最長報文段壽命”,
* 它是任何報文在網絡上存在的最長的最長時間,超過這個時間報文將被丟棄。
* 我們都知道IP頭部中有個TTL字段,TTL是time to live的縮寫,可譯為“生存時間”,
* 這個生存時間是由源主機設置設置初始值但不是但不是存在的具體時間,而是一個IP數據報可以經過的最大路由數,每經過一個路由器,它的值就減1,
* 當此值為0則數據報被丟棄,同時發送ICMP報文通知源主機。
* RFC793中規定MSL為2分鍾,但這完全是從工程上來考慮,對於現在的網絡,MSL=2分鍾可能太長了一些。
* 因此TCP允許不同的實現可根據具體情況使用更小的MSL值。TTL與MSL是有關系的但不是簡單的相等關系,MSL要大於TTL。
*/
}
BIO2_ServerV2
package example;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
public class Server {
private static final int PORT = 20000;
public static void main(String[] args) throws IOException {
ServerSocket server = createServerSocket();
initServerSocket(server);
// 綁定到本地端口上 backlog標識等待隊列中等待數量(超出,則在對應的客戶端觸發異常)
server.bind(new InetSocketAddress(Inet4Address.getLocalHost(), PORT), 50);
System.out.println("服務器准備就緒~");
System.out.println("服務器信息:" + server.getInetAddress() + " P:" + server.getLocalPort());
// 等待客戶端連接
for (; ; ) {
// 得到客戶端
Socket client = server.accept();
// 客戶端構建異步線程
ClientHandler clientHandler = new ClientHandler(client);
// 啟動線程
clientHandler.start();
}
}
private static ServerSocket createServerSocket() throws IOException {
// 創建基礎的ServerSocket
ServerSocket serverSocket = new ServerSocket();
// 綁定到本地端口20000上,並且設置當前可允許等待鏈接的隊列為50個
//server.bind(new InetSocketAddress(Inet4Address.getLocalHost(), PORT), 50);
//serverSocket = new ServerSocket(PORT);
// 等效於上面的方案,隊列設置為50個
//serverSocket = new ServerSocket(PORT, 50);
// 與上面等同
// serverSocket = new ServerSocket(PORT, 50, Inet4Address.getLocalHost());
return serverSocket;
}
private static void initServerSocket(ServerSocket serverSocket) throws IOException {
// 是否復用未完全關閉的地址端口
serverSocket.setReuseAddress(true);
// 等效Socket#setReceiveBufferSize(針對的是accept()接收到的clientSocket。畢竟在accept時就已經接收到了一定的數據了)
serverSocket.setReceiveBufferSize(64 * 1024 * 1024);
// 設置serverSocket#accept超時時間
// serverSocket.setSoTimeout(2000);
// 設置性能參數:短鏈接,延遲,帶寬的相對重要性(針對的是accept()接收到的clientSocket)
serverSocket.setPerformancePreferences(1, 1, 1);
}
/**
* 客戶端消息處理
*/
private static class ClientHandler extends Thread {
private Socket socket;
ClientHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
super.run();
System.out.println("新客戶端連接:" + socket.getInetAddress() +
" P:" + socket.getPort());
try {
// 得到套接字流
OutputStream outputStream = socket.getOutputStream();
InputStream inputStream = socket.getInputStream();
byte[] buffer = new byte[256];
int readCount = inputStream.read(buffer);
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, readCount);
// 按客戶端發送的順序讀取
// byte
byte be = byteBuffer.get();
// char
char c = byteBuffer.getChar();
// int
int i = byteBuffer.getInt();
// bool
boolean b = byteBuffer.get() == 1;
// Long
long l = byteBuffer.getLong();
// float
float f = byteBuffer.getFloat();
// double
double d = byteBuffer.getDouble();
// String
int pos = byteBuffer.position();
String str = new String(buffer, pos, readCount - pos - 1);
System.out.println("收到數量:" + readCount + " 數據:"
+ be + "\n"
+ c + "\n"
+ i + "\n"
+ b + "\n"
+ l + "\n"
+ f + "\n"
+ d + "\n"
+ str + "\n");
outputStream.write(buffer, 0, readCount);
outputStream.close();
inputStream.close();
} catch (Exception e) {
System.out.println("連接異常斷開");
} finally {
// 連接關閉
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("客戶端已退出:" + socket.getInetAddress() +
" P:" + socket.getPort());
}
}
}
BIO2_Tool
這里的tool,表明了兩點:如何實現int與byte之間的轉換,可以自定義實現數據的轉換
package example;
/**
* 過渡一下,簡述int與byte之間的轉換。
* 進而明確各種數據類型與byte之間的轉化。
* 最終引申出NIO包下的ByteBuffer工具,實現不同數據類型與byte類型的相互轉換
*/
public class Tools {
public static int byteArrayToInt(byte[] b) {
return b[3] & 0xFF |
(b[2] & 0xFF) << 8 |
(b[1] & 0xFF) << 16 |
(b[0] & 0xFF) << 24;
}
public static byte[] intToByteArray(int a) {
return new byte[]{
(byte) ((a >> 24) & 0xFF),
(byte) ((a >> 16) & 0xFF),
(byte) ((a >> 8) & 0xFF),
(byte) (a & 0xFF)
};
}
}
代碼示例擴展(BIO下UDP)
由於實際工作中UDP使用得比較少,所以這里只給出了BIO中UDP的使用。不過也基本滿足了UDP的使用入門了,可以實現局域網搜索(起碼對我目前的工作來說是夠用了)。至於UDP用於音視頻數據傳輸,得讀者自己尋找,或者等我了解之后,更新。
BIO_UDP_Searcher
package self;
import java.io.IOException;
import java.net.*;
/**
* @Description:
* @Author: jarry
*/
public class UDPSearcher {
public static void main(String[] args) throws IOException {
System.out.println("UDPSearcher started.");
// 構建UDP的Socket(由於是searcher,即數據的率先發送者,所以可以不用指定port,用於監聽)
DatagramSocket datagramSocket = new DatagramSocket();
// 構建請求消息的實體(包含目標ip及port)
String requestMsg = "just a joke.";
byte[] requestBytes = requestMsg.getBytes();
DatagramPacket requestPacket = new DatagramPacket(requestBytes, requestBytes.length);
requestPacket.setAddress(Inet4Address.getLocalHost());
requestPacket.setPort(20000);
// 發送請求數據
System.out.println("UDPSearcher has send msg.");
datagramSocket.send(requestPacket);
// 接收回送數據
byte[] buf = new byte[512];
DatagramPacket receivePacket = new DatagramPacket(buf,buf.length);
datagramSocket.receive(receivePacket);
String sourceIp = receivePacket.getAddress().getHostAddress();
int sourcePort = receivePacket.getPort();
int dataLength = receivePacket.getLength();
String receiveData = new String(receivePacket.getData(),0,receivePacket.getData().length);
// 顯示接收到的數據
System.out.println("UDPSearcher has received data with source:"+sourceIp+":"+sourcePort+" with length "+dataLength+". data:"+receiveData);
// 由於是demo,所以不用循環,就此結束
System.out.println("UDPSearcher finished.");
datagramSocket.close();
}
}
BIO_UDP_Provider
package self;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
/**
* @Description:
* @Author: jarry
*/
public class UDPProvider {
public static void main(String[] args) throws IOException {
System.out.println("UDPProvider started.");
// 新建DatagramSocekt,並設定在本機20000端口監聽,並接收消息
DatagramSocket datagramSocket = new DatagramSocket(20000);
// 新建DatagramPacket實體
byte[] buf = new byte[512];
DatagramPacket datagramPacket = new DatagramPacket(buf,buf.length);
// 接收數據
datagramSocket.receive(datagramPacket);
// 處理接受到的數據
String sourceIp = datagramPacket.getAddress().getHostAddress();
int sourcePort = datagramPacket.getPort();
String data = new String(datagramPacket.getData(),0,datagramPacket.getLength());
// 顯示接收到的數據
System.out.println("UDPProvider has received data with source:"+sourceIp+":"+sourcePort+" with length "+data.length()+". data:"+data);
// 准備發送回送數據
String responseData = "UDPProvider has received data with length:"+data.length();
byte[] responseBytes = responseData.getBytes();
// 構建回送數據實體(別玩了,設置目標ip與port)
DatagramPacket responsePacket = new DatagramPacket(responseBytes, responseBytes.length
,datagramPacket.getAddress(),datagramPacket.getPort());
// 發送回送數據
System.out.println("UDPProvider has sended data.");
datagramSocket.send(responsePacket);
// 由於是demo,所以不用循環,就此結束
System.out.println("UDPProvider finished.");
datagramSocket.close();
}
}
為了網絡監聽的clear,以及權限問題,需要對上述代碼進行一次升級。
BIO_UDP2_MessageCreator
package self.v2;
/**
* @Description: 自定義通信數據格式(這可能是最簡單的應用層協議了)
* @Author: jarry
*/
public class MessageCreator {
private static final String SN_HEADER = "收到暗號,我是(SN):";
private static final String PORT_HEADER = "發送暗號,請回電端口(PORT):";
public static String buildWithPort(int port){
return PORT_HEADER + port;
}
public static int parsePort(String data){
if (data.startsWith(PORT_HEADER)){
return Integer.parseInt(data.substring(PORT_HEADER.length()));
}
return -1;
}
public static String buildWithSN(String sn){
return SN_HEADER + sn;
}
public static String parseSN(String data){
if (data.startsWith(SN_HEADER)){
return data.substring(SN_HEADER.length());
}
return null;
}
}
BIO_UDP2_Searcher
package self.v2;
import java.io.IOException;
import java.net.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @Description:
* @Author: jarry
*/
public class UDPSearcher {
// 監聽端口號
private static final int LISTEN_PORT = 30000;
public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("UDPSearcher Started");
Listener listener = listen();
sendBroadcast();
// 讀取任意鍵盤信息后退出
System.in.read();
List<Device> devices = listener.getDevicesAndClose();
for (Device device : devices) {
System.out.println("Device:"+device.toString());
}
// 完成
System.out.println("UDPSearcher Finished");
}
private static Listener listen() throws InterruptedException {
System.out.println("UDPSearcher start listen.");
CountDownLatch countDownLatch = new CountDownLatch(1);
Listener listener = new Listener(LISTEN_PORT, countDownLatch);
listener.start();
countDownLatch.await();
return listener;
}
/**
* 用於發送廣播消息
* @throws IOException
*/
private static void sendBroadcast() throws IOException {
System.out.println("UDPSearcher sendBroadcast started.");
// 作為一個搜索者(發送請求),無需指定一個端口,由系統自動分配
DatagramSocket datagramSocket = new DatagramSocket();
// 構建一份請求數據
String requestData = MessageCreator.buildWithPort(LISTEN_PORT);
byte[] requestDataBytes = requestData.getBytes();
// 構建發送數據實體
DatagramPacket requestPacket = new DatagramPacket(requestDataBytes, requestDataBytes.length);
// 設置目標地址(采用廣播地址)
requestPacket.setAddress(Inet4Address.getByName("255.255.255.255"));
requestPacket.setPort(20000);
// 發送構建好的消息
datagramSocket.send(requestPacket);
System.out.println("start send data.");
// 發送結束
System.out.println("UDPSearcher sendBroadcast finished.");
datagramSocket.close();
}
private static class Device {
final int port;
final String ip;
final String sn;
public Device(int port, String ip, String sn) {
this.port = port;
this.ip = ip;
this.sn = sn;
}
@Override
public String toString() {
return "Device{" +
"port=" + port +
", ip='" + ip + '\'' +
", sn='" + sn + '\'' +
'}';
}
}
private static class Listener extends Thread{
private final int listenPort;
private final CountDownLatch countDownLatch;
private final List<Device> devices = new ArrayList<Device>();
private boolean done = false;
private DatagramSocket ds = null;
public Listener(int listenPort, CountDownLatch countDownLatch){
super();
this.listenPort = listenPort;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
super.run();
// 通知已啟動
countDownLatch.countDown();
// 開始實際數據監聽部分
try {
// 監聽回送端口
ds = new DatagramSocket(listenPort);
while (!done){
// 接收消息的實體
final byte[] buf = new byte[512];
DatagramPacket receivePack = new DatagramPacket(buf, buf.length);
// 開始接收數據
ds.receive(receivePack);
// 打印接收到的信息
String ip = receivePack.getAddress().getHostAddress();
int port = receivePack.getPort();
int dataLength = receivePack.getLength();
String data = new String(receivePack.getData(),0,dataLength);
System.out.println("UDPSearcher receive form ip:" + ip
+ "\tport:" + port + "\tdata:" + data);
String sn = MessageCreator.parseSN(data);
if (sn != null){
Device device = new Device(port, ip ,sn);
devices.add(device);
}
}
}catch (Exception e){
}finally {
close();
}
System.out.println("UDPSearcher listner finished");
}
private void close(){
if (ds != null){
ds.close();
ds = null;
}
}
List<Device> getDevicesAndClose(){
done = true;
close();
return devices;
}
}
}
BIO_UDP_Provider
package self.v2; /**
* @Description:
* @Author: jarry
*/
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.util.UUID;
/**
* UDP 提供者, 用於提供UDP服務
*/
public class UDPProvider {
public static void main(String[] args) throws IOException {
String sn = UUID.randomUUID().toString();
Provider provider = new Provider(sn);
provider.start();
// 讀取任意字符,退出
System.in.read();
provider.exit();
}
private static class Provider extends Thread {
private final String sn;
private boolean done = false;
private DatagramSocket datagramSocket = null;
public Provider(String sn){
super();
this.sn = sn;
}
@Override
public void run() {
super.run();
System.out.println("UDPProvider started.");
try {
// 作為一個接收者(接受請求),需要指定一個端口用來接收消息
datagramSocket = new DatagramSocket(20000);
// 通過一個循環,不斷監聽,接收數據
while (true) {
// 接收消息的實體
final byte[] buf = new byte[512];
DatagramPacket receivePack = new DatagramPacket(buf, buf.length);
// 開始接收數據
datagramSocket.receive(receivePack);
// 打印接收到的信息
String ip = receivePack.getAddress().getHostAddress();
int port = receivePack.getPort();
int dataLength = receivePack.getLength();
String data = new String(receivePack.getData(), 0, dataLength);
System.out.println("UDPProvider receive form ip:" + ip
+ "\tport:" + port + "\tdata:" + data);
// 獲得目標端口
int responsePort = MessageCreator.parsePort(data);
if (responsePort != -1){
// 構建一份回送數據
String responseData = MessageCreator.buildWithSN(sn);
byte[] reponseDataBytes = responseData.getBytes();
// 直接根據發送者,構建回送數據實體
DatagramPacket responsePacket = new DatagramPacket(reponseDataBytes,
reponseDataBytes.length,
receivePack.getAddress(),
// 采用指定的端口,而不是解析獲得的來源端口(來源端口不一定就是監聽端口,這是有些時候為了簡化而已)
responsePort);
// 發送構建好的回送消息
datagramSocket.send(responsePacket);
System.out.println("start send data.");
}
}
}catch (Exception ignore){
}finally {
close();
}
// 發送結束
System.out.println("UDPProvider finished.");
}
/**
* 對外提供結束方法
*/
void exit(){
done = true;
close();
}
/**
* 本地關閉DatagramSocket的方法
*/
private void close(){
if (datagramSocket != null){
datagramSocket.close();
datagramSocket = null;
}
}
}
}
NIO
介紹
在了解BIO之后,我們可以很明顯地發現其中的問題,那就是BIO模型中,每一個Client的請求發送到Server端后,Server端通過accept接收請求后,必須創建一個clientSocket來進行通信。並且這個通信是阻塞的,一方面,新的clientSocket無法進入(單線程嘛),另一方面,clientSocket是通過流的方式進行通信,而流的通信方式是阻塞的(即沒有獲得數據是,必須在那里等待)。這兩個問題,前者可以如demo中那樣,創建一個線程池來解決,而后者是沒法解決的。而這樣一個多線程+BIO也是很多開發人員的選擇,因為這樣的實現也十分簡單,並且可以滿足一定的需求了。
但是,回過頭想一想,上述的解決方案,存在一個問題。那就是系統並發量受限於線程池的線程數。如果請求只有幾百的並發,那么上述的解決方案沒有任何問題。但是任何一個稍有規模的業務場景都不會只有幾百的並發。那么如果不對技術進行升級,只有兩個辦法。一個升級硬件配置,尤其是內存(因為線程是非常消耗內存的),另一方面將連接按照一定的邏輯維度進行拆分(比如按照業務場景)。
我曾經和我的Boss談話時,提到這么一句話:技術的非常熟練,不如技術升級帶來的價值高(因為我們公司有一個去年畢業的開發,非常抗拒學習新技術。雖然基礎的CRUD挺熟練的,但是效率真的太低了。一個簡單的條件查詢就說要十五個工作日。如果他會使用函數式編程,配合MP,也許就一個小時吧。有空可以出個效率編程的專題,感覺很有價值)。
所以,在BIO越加疲軟的時候(當然有的業務場景BIO性能貌似並不比NIO低多少,但是投入差別有些大),終於NIO面世。
NIO借助事件監聽機制,提供非阻塞式的高伸縮性網絡。當然,有興趣的可以深挖,相關概念還是很多的,比如它與linux的IO模型的關系,這些都可以很好地幫助大家擴展視野(畢竟視野決定了高度)。
NIO有三大支柱,分別是:ByteBuffer,Channel,Selector(詳見:Java NIO:Buffer、Channel 和 Selector)。
-
ByteBuffer:就是一個數據實體,其中提供了許多數據轉換的方法。如在BIO的demo中就用到了
-
Channel:參考網絡通信的channel,所有的 NIO 操作始於通道,通道是數據來源或數據寫入的目的地。這降低了BIO入門時對流認識的痛苦(一會輸入流,一會輸出流,流還需要進行轉換),並且也有利於提高開發效率。
-
Selector:多路復用器(雖然有人稱之為選擇器,但是更為精准的說法時多路復用器),實現了一個線程管理多個Channel,也是NIO事件驅動機制的基礎。
當然上述的這些,也不是必須的,我可以只有Channel,ByteBuffer的數據轉換可以自己實現,而Selector,可以通過多線程的方式去達到類似的功能效果(性能當然時沒法比的了)。但是只有三者齊聚,才能最大發揮NIO的性能。
優點
- 性能好
- 性價比高
- 不需要理解流
- 性能瓶頸更高
缺點
- 需要理解NIO的模型(相對於BIO,NIO的模型更為抽象)
- 需要理解NIO的事件驅動機制
- NIO的三大支柱的理解需要一定的時間
代碼示例
這里給出一些簡單的demo,供大家認識。
NIO_Client
package tech.jarry.learning.netease;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
/**
* @Description: NIO模型下的TCP客戶端實現
* @Author: jarry
*/
public class NIOClient {
public static void main(String[] args) throws IOException {
// 獲得一個SocektChannel
SocketChannel socketChannel = SocketChannel.open();
// 設置SocketChannel為非阻塞模式
socketChannel.configureBlocking(false);
// 設置SocketChannel的連接配置
socketChannel.connect(new InetSocketAddress(Inet4Address.getLocalHost(), 8080));
// 通過循環,不斷連接。跳出循環,表示連接建立成功
while (!socketChannel.finishConnect()){
// 如果沒有成功建立連接,就一直阻塞當前線程(.yield()會令當前線程“謙讓”出CPU資源)
Thread.yield();
}
// 發送外部輸入的數據
Scanner scanner = new Scanner(System.in);
System.out.println("please input:");
String msg = scanner.nextLine();
// ByteBuffer.wrap()會直接調用HeapByteBuffer。故一方面其會自己完成內存分配。另一方面,其分配的內存是非直接內存(非heap堆)
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
// ByteBuffer.hasRemaining()用於判斷對應ByteBuffer是否還有剩余數據(實現:return position < limit;)
while (byteBuffer.hasRemaining()){
socketChannel.write(byteBuffer);
}
// 讀取響應
System.out.println("receive echoResponse from server");
// 設置緩沖區大小為1024
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
// 判斷條件:是否開啟,是否讀取到數據
//TODO 我認為這里的實現十分粗糙,是不可以置於生產環境的,具體還需要后面再看看(即使是過渡demo,也可以思考一下嘛)
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1){
// 長連接情況下,需要手動判斷數據有沒有讀取結束 (此處做一個簡單的判斷: 超過0字節就認為請求結束了)
if (requestBuffer.position() > 0) {
break;
}
}
requestBuffer.flip();
// byte[] content = new byte[requestBuffer.limit()];
// // .get()方法只會返回byte類型(猜測是當前標記位的數據)
// requestBuffer.get(content);
// System.out.println(new String(content));
// ByteBuffer提供了大量的基本類型轉換的方法,可以直接拿來使用
System.out.println(new String(requestBuffer.array()));
scanner.close();
socketChannel.close();
}
}
NIO_ServerV1
package tech.jarry.learning.netease;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/**
* @Description: 直接根據BIOServer進行轉變的。所以整體的邏輯與BIOServer類似
* @Author: jarry
*/
public class NIOServer {
public static void main(String[] args) throws IOException {
// 創建網絡服務端
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
//TODO .socket().bind()與.bind()的區別不清楚
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
System.out.println("server has started");
// 通過循環,不斷獲取監聽不同客戶端發來的連接請求
while (true){
// 由於NIO是非阻塞,故返回值是完全可能是null的
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null){
System.out.println("server has connect a new client: "+socketChannel.getRemoteAddress().toString());
socketChannel.configureBlocking(false);
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1){
if (requestBuffer.position() > 0){
break;
}
}
if (requestBuffer.position() == 0){
// 如果沒有數據,就不再進行后續處理,而是進入下一個循環
continue;
}
requestBuffer.flip();
System.out.println("server receive a message: "+new String(requestBuffer.array()));
System.out.println("server receive a message from: "+socketChannel.getRemoteAddress());
// 響應結果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 12\r\n\r\n" +
"Hello World!";
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
while (responseBuffer.hasRemaining()){
socketChannel.write(responseBuffer);
}
}
}
}
}
NIO_ServerV2
package tech.jarry.learning.netease;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* @Description: 與BIOServer同樣的,NIOServer也無法同時連接多個客戶端
* V1版本這里,依舊如BIOServer1那樣,通過輪詢實現多個客戶端處理(不過BIO由於是阻塞的,所以采用多線程。而NIO是非阻塞的,所以采用一個全局列表來進行處理)
* @Author: jarry
*/
public class NIOServerV1 {
private static List<SocketChannel> socketChannelList = new ArrayList<>();
public static void main(String[] args) throws IOException {
// 創建網絡服務端
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
//TODO .socket().bind()與.bind()的區別不清楚
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
System.out.println("server has started");
// 通過循環,不斷獲取監聽不同客戶端發來的連接請求
while (true) {
// 由於NIO是非阻塞,故返回值是完全可能是null的
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
// 如果有新的連接接入,就打印日志,並將對應的SocektChannel置入全局隊列中
System.out.println("server has connect a new client: " + socketChannel.getRemoteAddress().toString());
socketChannel.configureBlocking(false);
socketChannelList.add(socketChannel);
} else {
// 如果沒有新的連接接入,就對現有連接的數據進行處理,如果處理完了就從列表中刪除對應SocketChannel
Iterator<SocketChannel> socketChannelIterator = socketChannelList.iterator();
while (socketChannelIterator.hasNext()){
SocketChannel clientSocketChannel = socketChannelIterator.next();
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
// 新增:如果當前channel的數據長度為0,表示這個通道沒有數據需要處理,那就過會兒處理
if (clientSocketChannel.read(requestBuffer) == 0){
// 進入下一個循環,即處理下一個channel
continue;
}
while (clientSocketChannel.isOpen() && clientSocketChannel.read(requestBuffer) != -1) {
if (requestBuffer.position() > 0) {
break;
}
}
if (requestBuffer.position() == 0) {
// 如果沒有數據,就不再進行后續處理,而是進入下一個循環
continue;
}
requestBuffer.flip();
System.out.println("server receive a message: " + new String(requestBuffer.array()));
System.out.println("server receive a message from: " + clientSocketChannel.getRemoteAddress());
// 響應結果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 12\r\n\r\n" +
"Hello World!";
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
while (responseBuffer.hasRemaining()) {
clientSocketChannel.write(responseBuffer);
}
// 新增:如果運行到這里,說明返回的數據已經返回了
// 我認為,如果是長連接的話,這里的處理應當更加嚴密(當然這只是一個過渡demo版本)
socketChannelIterator.remove();
// 我認為,應當進行close等資源釋放操作。並且應該先remove(),再close
clientSocketChannel.close();
}
}
}
}
}
NIO_ServerV3
package tech.jarry.learning.netease.again;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @Description: 這個版本,充分利用了NIO的第三個支柱-Selector,完成事件驅動的轉型
* 注意,上個版本使用循環,就類似自旋(自旋相對比較底層,小),雖然解決了BIO的每個client占據一個線程的資源消耗(主要是內存),但是加大了CPU的消耗(CPU要不斷進行循環,判斷,即使是無效的操作)
* NIO通過Selector,建立事件驅動模型,來解決這一問題。即只有當特定的事件(如連接建立完成)發生,才會進行對應的事件處理(從而避免了CPU的無效消耗,提高效率)
* 私語:很多Javaer一直停留在初級層次(網絡編程只能百度,使用BIO),就是無法突破事件驅動模型這種抽象層次更高的高層思想
* @Description: 為了更好地學習與理解Netty,基礎的NIO再過一遍
* @Author: jarry
*/
public class NIOServerV2 {
public static void main(String[] args) throws IOException {
// 1.創建並配置ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
// 2.創建Selector,並完成SelectionKey的注冊,並完成初始化監聽
// Selector在非阻塞的基礎上,實現了一個線程管理多個Channel(也就常說的“多路復用”)
// 那可不可以理解為一個selector管理多個channel,監聽多個channel(后續代碼中,除了server外,還有client們都注冊到了這個selector中)
Selector selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
System.out.println("server start success ");
// 3.開始循環處理各個事件
while (true) {
// 1.通過.select()阻塞當前線程,直到有注冊的selectionKey觸發(觸發是,會將對應的selectionKey復制到selected set中)
selector.select();
// 2.獲取觸發的selectionKey集合
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
// 3.遍歷處理觸發的selectionKey集合
Iterator<SelectionKey> iterator = selectionKeySet.iterator();
while (iterator.hasNext()){
// 1.獲得觸發的selectionKey
SelectionKey selectedKey = iterator.next();
// 2.從集合中移除正在處理的selectionKey(單線程也可以在處理完后移除,但多線程中就可能出現同一selectionKey被多個線程處理)
iterator.remove();
// 3.根據iteration觸發的事件類型,進行對應處理(這里demo為了簡單一些,就只處理accept與read事件類型)
if (selectedKey.isAcceptable()){
// 如果selectedKey觸發的是accept事件類型,即serverSocketChannel通過accept獲得了一個客戶端連接
// 1.獲得服務端ServerSocketChannel(即之前監聽accept事件時,放入attachment的可選對象,便於后續處理)
ServerSocketChannel server = (ServerSocketChannel)selectedKey.attachment();
// 2.獲得客戶端SocketChannel(利用剛剛獲得的server,與觸發的.accept()方法),便於后續操作
SocketChannel client = server.accept();
// 3.配置客戶端SocketChannel(畢竟SocketChannel也是默認配置阻塞的)
client.configureBlocking(false);
// 4.注冊新的事件(既然已經連接成功,那么開始注冊如read等新事件,便於后續監聽)
// 也可以采取類似初始化階段那樣兩句代碼完成,但是這里不需要(也可以說時表現一個新的處理方法)
client.register(selector, SelectionKey.OP_READ, client);
// 5.日志打印
System.out.println("server has connect a new client: "+ client.getRemoteAddress());
}
if (selectedKey.isReadable()){
// 如果selectedKey觸發的是可讀事件類型,即當前selectionKey對應的channel可以進行讀操作(但不代表就一定有數據可以讀)
// 1.獲得客戶端SocketChannel(即之前監聽事件處理時,注冊read事件時置入的attachment對象)
SocketChannel client = (SocketChannel)selectedKey.attachment();
// 2.新建一個ByteBuffer用於緩沖數據(或者說,用來盛放數據)
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
// 3.判斷對應client是否處於open狀態,對應channel內是否有可讀數據(如果不滿足就跳過該循環)
// 原本我在想我都已經移除了對應的key,這里又沒有處理數據,那下一次不就沒有對應key了。
// 但實際是我移除的是.selectedKeys()選出來的key(是復制體),下次觸發read事件,還會有對應key被selectedKeys選出來的。
while (client.isOpen() && client.read(requestBuffer) != -1){
// 達到這里,說明對應channel中是有對應數據的
// 開始讀取數據
if (requestBuffer.position() > 0){
// 這里為了簡化處理,就設定為一旦讀取了數據就算讀取完畢
// 注意:讀取的操作在loop的判斷條件中,client.read(requestBuffer)
//TODO_FINISH 疑問:既然這里設置的是>0就break,那為什么實際操作中,數據(字符串)是讀完了呢
// 答案:while循環條件的read就是完成了當前緩沖區數據的讀取。
//而循環體中的if在生產環境可能更多是進行(編解碼的沾包拆包處理等)。
break;
}
}
// 4.如果requestBuffer為空,即沒有讀取到數據,那就跳出本次selectionKey的處理
if (requestBuffer.position() == 0){
continue;
}
// 5.到達這里說明requestBuffer.position()不為0,即bytebBuffer不為空,即讀取到了數據,那么就處理數據
// 5.1 將requestBuffer從寫模式轉為讀模式
requestBuffer.flip();
// 5.2 業務處理:將brequestBuffer中的數據打印出來(切記,只有.allocate()分配的非直接內存的ByteBuffer才可以.array())
System.out.println(new String(requestBuffer.array()));
System.out.println("server has receive a message from: "+client.getRemoteAddress());
// 6.返回響應
// 6.1 模擬一下http協議的響應,便於瀏覽器解析(響應結果 200)
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
// 6.2 通過ByteBuffer.wrap()將數據置入響應的ByteBuffer
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
// 6.2 將響應的ByteBuffer寫入到客戶端Socket中(底層會自動將該數據發送過去,額,好吧。實際是交由操作系統底層處理)
while (responseBuffer.hasRemaining()) {
client.write(responseBuffer);
}
}
}
//TODO_FINISHED 不理解這句的目的是什么,這是一個類似.select()的非阻塞式方法。
// epoll空論的一種解決方案,但是無法根本解決問題,最好還是如Netty那樣refresh解決
selector.selectNow();
}
}
}
Reactor模型
介紹
在了解NIO之后,估計很多人都太嘆服於它的設計,它的效率,它的性能。NIO由於其模型,充分發揮了單線程的性能,但是單線程往往就意味着性能瓶頸(如單線程是無法發揮多核CPU的性能的)。
所以,如何實現NIO的多線程成為了網絡編程的發展方向。
當然如果只是為了將NIO與多線程結合起來,其實並不是十分困難。但是如何有機結合(既充分發揮多線程性能,又不產生過多的性能浪費),並確保可擴展性才是真正的關鍵。
這個時候Doug Lea這個Java大神(真的佩服),發表了一篇文章Scalable IO in Java,提出了解決問題,甚至都給出了demo。
首先是基本的Reactor線程模型設計:
在Reactor基本線程模型中,Doug Lea將NIO進行accept操作的部分提取出來,通過一個單一線程acceptor(也就是當前線程)實現client的accept信號的監聽,並進行分發(進行后續事件的注冊)。
而當監聽到read等事件后,通過dispatch將相關事件處理分發到線程池TreadPool中,交由worker thread進行具體業務處理。
當然這樣的線程模型,其擴展性依舊無法滿足需求,其性能瓶頸,會卡在acceptor線程上。所以Doug Lea進而提出了multiple Reactors
其設計是將原先的基本Reactor線程模型的Reactor拆分為mainReactor與subReactor,中間通過acceptor連接,從而降低原先基本Reactor線程模型中acceptor的壓力。
優點
- 優秀的可擴展性
- 更高的性能瓶頸
缺點
- 需要NIO的理解基礎
- 需要理解Reactor線程模型
- 代碼實現較為復雜(相較原先的NIO與BIO)
代碼示例
這里給出一些簡單的demo,供大家認識。
NIO_Client(和之前的NIO_Client沒有任何區別)
package tech.jarry.learning.netease;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
/**
* @Description: NIO模型下的TCP客戶端實現
* @Author: jarry
*/
public class NIOClient {
public static void main(String[] args) throws IOException {
// 獲得一個SocektChannel
SocketChannel socketChannel = SocketChannel.open();
// 設置SocketChannel為非阻塞模式
socketChannel.configureBlocking(false);
// 設置SocketChannel的連接配置
socketChannel.connect(new InetSocketAddress(Inet4Address.getLocalHost(), 8080));
// 通過循環,不斷連接。跳出循環,表示連接建立成功
while (!socketChannel.finishConnect()){
// 如果沒有成功建立連接,就一直阻塞當前線程(.yield()會令當前線程“謙讓”出CPU資源)
Thread.yield();
}
// 發送外部輸入的數據
Scanner scanner = new Scanner(System.in);
System.out.println("please input:");
String msg = scanner.nextLine();
// ByteBuffer.wrap()會直接調用HeapByteBuffer。故一方面其會自己完成內存分配。另一方面,其分配的內存是非直接內存(非heap堆)
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
// ByteBuffer.hasRemaining()用於判斷對應ByteBuffer是否還有剩余數據(實現:return position < limit;)
while (byteBuffer.hasRemaining()){
socketChannel.write(byteBuffer);
}
// 讀取響應
System.out.println("receive echoResponse from server");
// 設置緩沖區大小為1024
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
// 判斷條件:是否開啟,是否讀取到數據
//TODO 我認為這里的實現十分粗糙,是不可以置於生產環境的,具體還需要后面再看看(即使是過渡demo,也可以思考一下嘛)
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1){
// 長連接情況下,需要手動判斷數據有沒有讀取結束 (此處做一個簡單的判斷: 超過0字節就認為請求結束了)
if (requestBuffer.position() > 0) {
break;
}
}
requestBuffer.flip();
// byte[] content = new byte[requestBuffer.limit()];
// // .get()方法只會返回byte類型(猜測是當前標記位的數據)
// requestBuffer.get(content);
// System.out.println(new String(content));
// ByteBuffer提供了大量的基本類型轉換的方法,可以直接拿來使用
System.out.println(new String(requestBuffer.array()));
scanner.close();
socketChannel.close();
}
}
NIO_ServerV4_ReactorV1
package tech.jarry.learning.netease;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Description: 根據Doug Lea大神的多路復用Reactor線程模型,進行編碼,學習Reactor設計模式在網絡編程的重要體現
* 注意:NIOServerV2作為一個demo已經不錯了。但是仍然存在致命的性能瓶頸(其實很明顯,整個網絡編程就靠一個線程實現全部工作,肯定不行,起碼沒法充分發揮多核CPU的能力)
* 故將服務端常用的部分分為accept,read,bussinessDeal三個部分(第三部分,本demo就不深入了)
* @Author: jarry
*/
public class NIOServerV3 {
// 處理業務操作的線程
private static ExecutorService workPool = Executors.newCachedThreadPool();
/**
* 封裝了Selector.select()等事件的輪詢的共用代碼
*/
abstract class ReactorThread extends Thread {
Selector selector;
LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
/**
* Selector監聽到有事件后,調用這個方法(不過具體實現,需要基類自己實現)
* @param channel
*/
public abstract void handler(SelectableChannel channel) throws Exception;
private ReactorThread() throws IOException {
selector = Selector.open();
}
// 用於判斷線程運行狀態
volatile boolean running = false;
@Override
public void run() {
// 輪詢Selector事件
while (running) {
try {
// 執行隊列中的任務
Runnable task;
while ((task = taskQueue.poll()) != null) {
task.run();
}
selector.select(1000);
// 獲取查詢結果
Set<SelectionKey> selected = selector.selectedKeys();
// 遍歷查詢結果
Iterator<SelectionKey> iter = selected.iterator();
while (iter.hasNext()) {
// 被封裝的查詢結果
SelectionKey key = iter.next();
iter.remove();
int readyOps = key.readyOps();
// 關注 Read 和 Accept兩個事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
try {
SelectableChannel channel = (SelectableChannel) key.attachment();
channel.configureBlocking(false);
handler(channel);
if (!channel.isOpen()) {
key.cancel(); // 如果關閉了,就取消這個KEY的訂閱
}
} catch (Exception ex) {
key.cancel(); // 如果有異常,就取消這個KEY的訂閱
}
}
}
selector.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private SelectionKey register(SelectableChannel channel) throws Exception {
// 為什么register要以任務提交的形式,讓reactor線程去處理?
// 因為線程在執行channel注冊到selector的過程中,會和調用selector.select()方法的線程爭用同一把鎖
// 而select()方法是在eventLoop中通過while循環調用的,爭搶的可能性很高,為了讓register能更快的執行,就放到同一個線程來處理
FutureTask<SelectionKey> futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel));
taskQueue.add(futureTask);
return futureTask.get();
}
private void doStart() {
if (!running) {
running = true;
start();
}
}
}
// 0. 創建ServerSocketChannel
private ServerSocketChannel serverSocketChannel;
// 1、創建多個線程 - accept處理reactor線程 (accept線程)
private ReactorThread[] mainReactorThreads = new ReactorThread[1];
// 2、創建多個線程 - io處理reactor線程 (I/O線程)
private ReactorThread[] subReactorThreads = new ReactorThread[8];
/**
* 初始化線程組
*/
private void initGroup() throws IOException {
// 創建IO線程,負責處理客戶端連接以后socketChannel的IO讀寫
for (int i = 0; i < subReactorThreads.length; i++) {
subReactorThreads[i] = new ReactorThread() {
@Override
public void handler(SelectableChannel channel) throws IOException {
// work線程只負責處理IO處理,不處理accept事件
SocketChannel ch = (SocketChannel) channel;
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (ch.isOpen() && ch.read(requestBuffer) != -1) {
// 長連接情況下,需要手動判斷數據有沒有讀取結束 (此處做一個簡單的判斷: 超過0字節就認為請求結束了)
if (requestBuffer.position() > 0) {
break;
}
}
if (requestBuffer.position() == 0) {
return; // 如果沒數據了, 則不繼續后面的處理
}
requestBuffer.flip();
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println(Thread.currentThread().getName() + "收到數據,來自:" + ch.getRemoteAddress());
// TODO 業務操作 數據庫、接口...
workPool.submit(() -> {
});
// 響應結果 200
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
ch.write(buffer);
}
}
};
}
// 創建mainReactor線程, 只負責處理serverSocketChannel
for (int i = 0; i < mainReactorThreads.length; i++) {
mainReactorThreads[i] = new ReactorThread() {
AtomicInteger incr = new AtomicInteger(0);
@Override
public void handler(SelectableChannel channel) throws Exception {
// 只做請求分發,不做具體的數據讀取
ServerSocketChannel ch = (ServerSocketChannel) channel;
SocketChannel socketChannel = ch.accept();
socketChannel.configureBlocking(false);
// 收到連接建立的通知之后,分發給I/O線程繼續去讀取數據
int index = incr.getAndIncrement() % subReactorThreads.length;
ReactorThread workEventLoop = subReactorThreads[index];
workEventLoop.doStart();
SelectionKey selectionKey = workEventLoop.register(socketChannel);
selectionKey.interestOps(SelectionKey.OP_READ);
System.out.println(Thread.currentThread().getName() + "收到新連接 : " + socketChannel.getRemoteAddress());
}
};
}
}
/**
* 初始化channel,並且綁定一個eventLoop線程
*
* @throws IOException IO異常
*/
private void initAndRegister() throws Exception {
// 1、 創建ServerSocketChannel
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 2、 將serverSocketChannel注冊到selector
int index = new Random().nextInt(mainReactorThreads.length);
mainReactorThreads[index].doStart();
SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
}
/**
* 綁定端口
*
* @throws IOException IO異常
*/
private void bind() throws IOException {
// 1、 正式綁定端口,對外服務
serverSocketChannel.bind(new InetSocketAddress(8080));
System.out.println("啟動完成,端口8080");
}
public static void main(String[] args) throws Exception {
NIOServerV3 nioServerV3 = new NIOServerV3();
nioServerV3.initGroup(); // 1、 創建main和sub兩組線程
nioServerV3.initAndRegister(); // 2、 創建serverSocketChannel,注冊到mainReactor線程上的selector上
nioServerV3.bind(); // 3、 為serverSocketChannel綁定端口
}
}
NIO_ServerV4_ReactorV2
為了更好的理解Reactor線程模型,我將之前的Reactor代碼,按照我的代碼習慣,做了一些調整。
這個部分理解是有一些困難的,推薦多看幾遍,如我這樣手擼兩邊,第二遍可以根據自己的習慣,進行代碼結構的調整。
package tech.jarry.learning.netease.again;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Description: 為了更好地學習與理解Netty,結合Reactor線程模型的NIO再過一遍,並做出一定的結構調整
* @Author: jarry
*/
public class NIOServerV3 {
// 工作線程池,其中工作線程用於完成實際工作(如計算,編解碼等工作)
private static ExecutorService workerPool = Executors.newCachedThreadPool();
// 全局變量ServerSocketChannel,記錄服務端的Channel
private ServerSocketChannel serverSocketChannel;
// 創建mainReactors線程組
private MainReactorThread[] mainReactorThreads = new MainReactorThread[1];
// 創建subReactors線程組
private SubReactorThread[] subReactorThreads = new SubReactorThread[8];
private abstract class AbstractReactorThread extends Thread {
// 創建Selector,用於建立Channel事件監聽
protected Selector selector;
// 用於標記線程運行狀態
private volatile boolean running = false;
// 創建任務隊列,用於多線程處理工作
private LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
/**
* 通過懶加載方式,實例化Selector
*/
public AbstractReactorThread() throws IOException {
selector = Selector.open();
}
@Override
/**
* 重寫run方法,完成ReactorThread的公共代碼邏輯
*/
public void run() {
while (running){
// 1.通過一個巧妙的方式,遍歷處理taskQueue中的所有task
Runnable task;
while ((task = taskQueue.poll()) != null){
task.run();
}
// 2.通過.select()阻塞當前線程,直到有注冊的selectionKey觸發(之所以等待1000ms,應該是為了令上面的task執行完成)
try {
selector.select(1000L);
} catch (IOException e) {
e.printStackTrace();
}
// 3.接下來的操作類似,遍歷處理各種監聽到的事件
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeySet.iterator();
while (iterator.hasNext()){
SelectionKey selectedKey = iterator.next();
iterator.remove();
// 獲得事件類型的編號
int readyOps = selectedKey.readyOps();
// 通過位運算等方式,快速判斷readyOps是否與對應事件類型編號符合(這里作為demo只關注accept與read事件)
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 根據V2的編程了解,.attachment返回的極可能是服務端的ServerSocketChannel,也可能是客戶端的SocketChannel,故采用他們共同的父類SelectableChannel
SelectableChannel channel = (SelectableChannel)selectedKey.attachment();
try {
// 老規矩,將channel設置為非阻塞式的
channel.configureBlocking(false);
// 將channel交給抽象方法reactorHandler解決,(具體實現由各自子類去實現)
//TODO_FINISH 話說,如何確定哪個子類解決哪個問題
// 答案:抽象類不會實例化成對象
// 這里的reactorHandler都是由對應子類調用的。MainReactorThread只有在注冊時調用,並且是直接置入taskQueue,第二次不會到這里
reactorHandler(channel);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// 判斷channel是否關閉
if (!channel.isOpen()){
// 如果channel已經關閉,那么其上的SelectionKey就可以取消訂閱了
selectedKey.cancel();
}
}
}
//TODO 這個還是看不懂
try {
selector.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 根據提交的channel,進行注冊處理(畢竟調用這個方法的,也只有此類,與衍生類了)
* @param channel
* @return
*/
protected SelectionKey register(SelectableChannel channel) throws ExecutionException, InterruptedException {
// 為什么register要以任務提交的形式,讓reactor線程去處理?
// 因為線程在執行channel注冊到selector的過程中,會和調用selector.select()方法的線程爭用同一把鎖
// 而select()方法是在eventLoop中通過while循環調用的,爭搶的可能性很高,為了讓register能更快的執行,就放到同一個線程來處理
// 這里無論是解決方案,還是register與select通用一把synchronized鎖,都蠻令人驚嘆的(雖然我不大理解為什么register要與select公用一邊鎖)
// select -> SelectorImpl.lockAndDoSelect 該方法的執行內容采用了synchronized(this)鎖
// register -> SelectorImpl.register 該方法的執行內容采用了synchronized(this.publicKeys)鎖 (果然這個比較復雜,主要synchronized鎖太多了)
FutureTask<SelectionKey> futureTask = new FutureTask<>(() ->
channel.register(selector, 0, channel)
);
taskQueue.add(futureTask);
return futureTask.get();
}
/**
* 執行啟動操作(其實外部可以判斷線程狀態的,但是這里running表示的線程狀態,與規定的線程狀態不同)
*/
protected void doStart(){
if (!running){
running = true;
start();
}
}
/**
* mainReactor與subReactor的handler處理邏輯是不同的,交由子類實現
*/
protected abstract void reactorHandler(SelectableChannel channel) throws IOException, ExecutionException, InterruptedException;
}
/**
* mainReactor的實現類,實現了父類的reactorHandler方法。主要完成accept的監聽與處理,並進行事件分發操作
*/
public class MainReactorThread extends AbstractReactorThread {
AtomicInteger atomicInteger = new AtomicInteger(0);
/**
* 通過懶加載方式,實例化Selector
*/
public MainReactorThread() throws IOException {
}
@Override
protected void reactorHandler(SelectableChannel channel) throws IOException, ExecutionException, InterruptedException {
// 獲得服務端ServerSocketChannel
ServerSocketChannel server = (ServerSocketChannel) channel;
// 獲得客戶端SocketChannel
SocketChannel client = server.accept();
// 設置客戶端SocketChannel為非阻塞模式
client.configureBlocking(false);
// // 設置新的事件監聽
// client.register(selector, SelectionKey.OP_READ, client);
// 不再由當前線程完成read事件的注冊,畢竟當前線程只完成accept事件處理,與事件分發
// 故調用專門寫的一個私有方法,進行注冊
doRegister(client);
// 打印日志
System.out.println("server has connect a new client: "+client.getRemoteAddress());
}
/**
* Reactor線程模型下,MainReactor將read事件的注冊下放到SubReactor
* @param client 需要進行事件(這里只處理read事件)注冊的client
*/
private void doRegister(SocketChannel client) throws ExecutionException, InterruptedException {
// 通過輪詢的方式(也可以自定義,或擴展開),將事件(非Accept事件,如read事件)交給subReactor線程池中的線程處理
int index = atomicInteger.getAndIncrement() % subReactorThreads.length;
// 獲取subReactorThread對象,又稱workEventLoop對象(為了更好地對接Netty中的EventLoop
SubReactorThread workEventLoop = subReactorThreads[index];
// 調用workEventLoop的doStart()方法,啟動工作線程(如果之前已有事件啟動了,就不會再啟動了)
workEventLoop.doStart();
// 完成事件的注冊工作(AbstractReactorThread中的注冊,默認監聽事件編碼為0。
SelectionKey selectionKey = workEventLoop.register(client);
// 設置監聽事件的編碼(這樣的分離,有助於不同子類的實現,更加友好)
selectionKey.interestOps(SelectionKey.OP_READ);
}
}
/**
* subReactor的實現類,實現了父類的reactorHandler方法。主要完成非accept事件(這里demo特指read)的監聽與處理,包括打印,計算,響應等
*/
public class SubReactorThread extends AbstractReactorThread {
/**
* 通過懶加載方式,實例化Selector
*/
public SubReactorThread() throws IOException {
}
@Override
/**
* 完成非accept事件(這里特指read)事件的處理(打印與響應)
*/
protected void reactorHandler(SelectableChannel channel) throws IOException {
// 獲得客戶端SocketChannel
SocketChannel client = (SocketChannel) channel;
// 創建ByteBuffer作為緩沖區
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
// 嘗試讀取數據
while (client.isOpen() && (client.read(requestBuffer)) != -1){
// 這里進行簡單判斷與處理
if (requestBuffer.position() > 0){
break;
}
}
// 判斷requestBuffer大小
if (requestBuffer.position() == 0){
// 如果沒有數據,就不需要進行接下來的處理了
return;
}
// 將requestBuffer由寫模式轉為讀模式
requestBuffer.flip();
// TODO 業務操作 數據庫、接口...
workerPool.submit(() -> {
// 如:打印請求數據
System.out.println("server get a message: "+new String(requestBuffer.array()));
});
// 打印日志
System.out.println("server get a mesage from client: "+client.getRemoteAddress());
// 發送響應
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
while (responseBuffer.hasRemaining()){
client.write(responseBuffer);
}
}
}
/**
* Reactor線程模型的初始化
* @throws IOException
* @throws ExecutionException
* @throws InterruptedException
*/
public void init() throws IOException, ExecutionException, InterruptedException {
initGroup();
initMain();
}
/**
* 進行服務端,端口綁定
* @param port
* @throws IOException
*/
public void bind(int port) throws IOException {
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
System.out.println("server bind success");
System.out.println("server start");
}
/**
* 實例化兩個Reactor線程組
* @throws IOException
*/
private void initGroup() throws IOException {
for (int i = 0; i < mainReactorThreads.length; i++) {
mainReactorThreads[i] = new MainReactorThread();
}
for (int i = 0; i < subReactorThreads.length; i++) {
subReactorThreads[i] = new SubReactorThread();
}
}
/**
* 初始化一個MainReactorThread,來進行工作
* @throws IOException
* @throws ExecutionException
* @throws InterruptedException
*/
private void initMain() throws IOException, ExecutionException, InterruptedException {
//TODO_FINISHED 話說,這里的mainReactorThreads只有一個線程,MainReactorThread可能多個線程嘛?還是說一個端口-》一個ServerSocketChannel-》一個MainReactorThread?
// 參照Netty的bossGroup的NioEventLoopGroup
// 初始化並配置serverSocketChannel
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 從mainReactorThreads中挑選一個MainReactorThread
int index = new Random().nextInt(mainReactorThreads.length);
// 啟動挑選出來的mainReactorThread
mainReactorThreads[index].doStart();
// 通過挑選出來的mainReactorThread線程對服務端serverSocketChannel進行注冊
SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
// 設定監聽的事件編碼(Accept事件監聽)
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
}
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
NIOServerV3 nioServerV3 = new NIOServerV3();
nioServerV3.init();
nioServerV3.bind(8080);
}
}
Netty
介紹
說實話,NIO優秀的網絡編程模型,結合Doug Lea提出的Reactor線程模型已經為現有的網絡編程提出了一個幾乎無可挑剔的解決方案。
從技術上來說,這個方案已經沒有什么問題了。唯一的問題就是之前提到的缺點:
- 前置需求多(需要很熟悉NIO與Reactor);
- 編碼比較復雜(可以看到實際落地代碼,差不多200行代碼只是用於實現基本結構支持)。
那么,有沒有更好的解決方案呢?有的,那就Netty框架。
前面提到200行代碼只是實現了基本結構支持,那么這200行的代碼就可以提取成為一個公共的通用模塊。
Netty框架做出了優秀的封裝(如NioEventLoopGroup,ByteBuf,ServerBootstrap等等),而且解決了NIO的epoll空輪詢等問題,更是提供了諸多的工具類,提供便利。
Netty的架構分為三個部分:
- ServerBootstrap:引導器,引導使用者,建立EventLoop;
- ChannelInitializer:初始器,建立ChannelPipeline,通過責任鏈,管理處理器;
- ChannelHandler:處理器,處理各種業務邏輯,如編解碼,協議切換,加解密等。
從官方的結構圖,可以看出其中包含三大模塊:
- 支持Socket等多種傳輸方式;
- 提供了多種協議的編解碼實現;
- 核心設計包含事件處理模型,API的使用,ByteBuffer的增強ByteBuf。
優點
- 優秀的可擴展性
- 極高的性能瓶頸(Netty實現百萬連接不要太簡單,客戶端+服務端配置只需要內存達到8G即可)
- 不需要理解NIO,Reactor
- 代碼實現簡單
缺點
- 需要了解Netty的使用(但是依靠demo,入門還是比較簡單)
- Netty易學難精(入門,熟練比較簡單,但是精通比較難):
- 缺乏前置理論,源碼很難深入理解;
- Netty涉及一些設計模式,如責任鏈,單例,策略,觀察者等;
- Netty獨有的ByteBuf,及其零拷貝等概念;
- Netty涉及諸多工具類,如Http編解碼工具類等。
代碼示例
這里給出一個簡單的demo(根據官方提供的echoServerDemo,增加了一些注釋),供大家認識。
Netty_Echo_Server
package tech.jarry.learning.netease.example;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
public static void main(String[] args) throws Exception {
// Configure the server.
// 創建EventLoopGroup accept線程組 NioEventLoop
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 創建EventLoopGroup I/O線程組
EventLoopGroup workerGroup2 = new NioEventLoopGroup(1);
try {
// 服務端啟動引導工具類
ServerBootstrap b = new ServerBootstrap();
// 配置服務端處理的reactor線程組以及服務端的其他配置
b
// 設置兩個線程組(Reactor線程模型中的mainReactorThreads與subReactorThreads)。說白了就是兩個線程池
.group(bossGroup, workerGroup2)
// 設置采用的channel類型(NioServerSocketChannel對應ServerSocketChannel,其它類似),底層實現用的反射
/**
* ChannelFactory 的 newChannel() 方法什么時候會被調用就可以了。
* 對於 NioSocketChannel,由於它充當客戶端的功能,它的創建時機在 connect(…) 的時候;
* 對於 NioServerSocketChannel 來說,它充當服務端功能,它的創建時機在綁定端口 bind(…) 的時候。
*/
.channel(NioServerSocketChannel.class)
//TODO 只看到是用於配置,詳細還不了解
.option(ChannelOption.SO_BACKLOG, 100)
// 設置handler,這里設置了Netty提供的日志ChannelHandler(並采用了Debug級別)
.handler(new LoggingHandler(LogLevel.DEBUG))
// 設置childHandler 這里可以通過ChannelInitializer實例,來放入多個ChannelHandler(需要重寫其中的.initChannel()方法)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 通過SocketChannel獲得ChannelPipeline
ChannelPipeline p = ch.pipeline();
// 在ChannelPipeline后面添加新的ChannelHandler
/**
* 每個 Channel 內部都有一個 pipeline,pipeline 由多個 handler 組成,
* handler 之間的順序是很重要的,因為 IO 事件將按照順序順次經過 pipeline 上的 handler,
* 這樣每個 handler 可以專注於做一點點小事,由多個 handler 組合來完成一些復雜的邏輯。
*
* Inbound 和 Outbound。在 Netty 中,IO 事件被分為 Inbound 事件和 Outbound 事件。
* 例如:
* 1. pipeline.addLast(new StringDecoder());
* 2. pipeline.addLast(new StringEncoder());
* 3. pipeline.addLast(new BizHandler());
*/
p.addLast(new EchoServerHandler());
}
});
// 通過bind啟動服務(Netty的ChannelFuture繼承自JDK的Future,只不過多了一些方法
ChannelFuture f = b
// 服務端用於綁定端口(其中還涉及Java的channel等操作)
.bind(PORT)
// ChannelFuture新增的.sync()區別於新增的.await()
/**
* sync() 和 await() 用於阻塞等待
* sync() 內部會先調用 await() 方法,等 await() 方法返回后,會檢查下這個任務是否失敗,如果失敗,重新將導致失敗的異常拋出來。
* 也就是說,如果使用 await(),任務拋出異常后,await() 方法會返回,但是不會拋出異常,而 sync() 方法返回的同時會拋出異常。
*/
.sync();
// 阻塞主線程,知道網絡服務被關閉
f
// 用於獲得封裝在Netty的ChannelFuture內的Channel(Java的),便於進行下一步操作
.channel()
// 當Netty的ChannelFuture被關閉時,返回ChannelFuture
.closeFuture()
// 同上
.sync();
} finally {
// 關閉線程組
bossGroup.shutdownGracefully();
workerGroup2.shutdownGracefully();
}
}
}
從上面的代碼,可以看到,利用建造者模式,通過一個固定的模板,Netty就可以實現一個簡單的EchoServer了。
使用的時候,一般業務的處理只是在修改其中pipeline的handler。
通過Netty,我們幾乎只需要關注業務方面的,並且利用Netty的Pipeline,可以很輕松的編排handler。
總結
技術演變
網絡模型其實是開發人員思維的體現。而網絡模型的演變其實就是開發人員思維的演變。
思維演變
這里我通過一個例子,來展現網絡模型演變中思維的演變。
BIO
客戶(Client)到公司(Server)辦理業務,會找到前台的小姐姐(acceptor),由小姐姐引導着,處理各類業務。
但是,當存在復數位客戶時,就需要在前台排隊,等待前台小姐姐招待好最前面的客戶。
為了解決這個排隊問題,Boss打算招聘多位小姐姐(線程池)。但是客戶的到來有時多,有時少,所以前台小姐姐也是有時候很空閑,有時候很繁忙。
NIO
由於小姐姐引導客戶處理業務時,客戶經常由於填表(業務處理中數據庫操作等)等操作,導致小姐姐在一旁等待。而這時候,前台還有客戶在等待。
為了解決這個問題,Boss在前台放置了一個鈴鐺(accept注冊)。
當有新的客戶到來時,小姐姐就會給他一個新的鈴鐺(read等事件注冊),用於在處理業務時呼喚她。
Reactor(Basic Reactor)
隨着客戶處理業務的上升,Boss招收了多位工作人員(worker thread),用於引導客戶處理業務。
而小姐姐(Acceptor)只負責招待客戶(只處理accept事件),交給客戶新的鈴鐺(read等事件注冊)。客戶通過新的鈴鐺找到工作人員,處理業務。
Reactor(Multiple Reactor)
隨着業務規模的進一步提升,一位前台小姐姐已經忙不過來了。
Boss就在前台安排了多位小姐姐,招待與引導客戶。
Netty
看到Boss的成功,許多人都爭相模仿,但是由於了解不深刻,總是出現各種問題。
Boss希望通過這一成功經驗,幫助到其它人,所以制作了一款工作流軟件(Netty)。
其他人只需要下載這個軟件,然后按照其中的簡單引導,進行設置,就可以輕松使用。
問題解決
解決問題一般都是三個步驟:
- 先通過一些比較簡單,比較暴力的方式解決問題;
- 再根據問題解決過程中的一些思想火花,以及一些資料,優化解決方案;
- 然后根據需要,視情況看,是否執行這一步。查閱各種資料,請教各位前輩,在原來的基礎上,再進一步優化;
- 最后,在無法提升的情況下,考慮將這個解決方案,做成一個工具,一個框架,去幫助其他人。
就像我在leetcode上,一般第一步,都是先按照自己最直觀的想法,先嘗試解決問題,然后再考慮優化,最后思考是否可以通用化,工具化等。
留言
這次只是簡單提一些網絡編程的演變過程,也算是幫助大家入門吧。
后續會進行深入的原理分析與總結,並會寫成博客,發布出來的。