1 多線程環境下selector使用
1-1 為什么需要多線程優化?
單線程配合selector選擇器雖然能夠管理多個channel的事件,但仍存在以下缺點:
缺點1:多核 cpu被白白浪費
缺點2:某個事件耗費時間比較長會影響其他事件的處理。
- 單線程處理多個事件適合每個事件的處理事件比較短的情況
補充::Redis采用單線程處理,如果某個操作時間較長,會影響其他操作,所以redis單個操作時間復雜度不能太高。
1-2 多線程架構模型

總體設計上基於分工思想分為二個部分,分別是boss模塊和worker模塊(通常是一個boss線程配合多個work線程):
boss模塊(只負責接待):多線程機制(每個線程都有一個selector),專門用於處理客戶端的連接事件。
worker模塊(只負責讀寫):多個worker,每個worker實際上是一個線程配合一個selector,worker專門負責數據的讀寫操作。
- 通常線程的數目與CPU的核心數目是一致的。
1-3 多線程環境下利用selector進行網絡通信
1-3-1 多線程環境下無法獲取可讀事件的原因
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
@Slf4j
public class Server7 {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
/*boss selecor專門用於處理accept event*/
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss,0,null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
// 1 創建固定數量的worker並初始化
Worker worker = new Worker("worker-0");
worker.register();
while(true){
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();;
if(key.isAcceptable()){
log.debug("accept event happen!");
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("before register...{}",sc.getRemoteAddress());
// 2. 關聯selector
sc.register(worker.selector,SelectionKey.OP_READ,null);
log.debug("after register...{}",sc.getRemoteAddress());
}
}
}
}
// 只有內部類能夠定義為static
static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false;
public Worker(String name){this.name = name;}
// 初始化線程和selector
public void register() throws IOException {
if(!start){ // 利用start保證這段代碼只會被執行一次。
selector = Selector.open(); // open返回:SelectorProvider.provider().openSelector()
thread = new Thread( this,name);
thread.start();
start = true;
}
}
@Override
public void run() {
while(true){
try{
log.debug("begin select!");
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
/*這里實際讀寫需要考慮消息邊界,寫的數據規模過大的問題,以及連接的正常/異常關閉問題詳見單線程版本設計*/
if(key.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
channel.read(buffer);
buffer.flip();
printBytebuffer(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
static void printBytebuffer(ByteBuffer tmp){ // 注意:傳入的bytebuffer必須時寫模式
System.out.println(StandardCharsets.UTF_8.decode(tmp).toString());
}
}
客戶端建立連接並發送數據后的執行結果
14:44:05.969 [worker-0] DEBUG Server.Server7 - begin select!
14:44:14.910 [boss] DEBUG Server.Server7 - accept event happen!
14:44:14.911 [boss] DEBUG Server.Server7 - before register.../127.0.0.1:14363 // 無法獲取可讀事件
問題:服務端的boss模塊的selector能夠處理accept事件,但是work模塊去無法獲取可讀事件。
原因分析
- 主要原因在於worker線程執行了select方法后,主線程中register方法就無法生效。造成selector沒有監控讀寫事件(線程的異步性引發問題)
代碼段1:主線程讓worker的selector監控讀寫通道(主線程執行該方法!!!)
sc.register(worker.selector,SelectionKey.OP_READ,null);
代碼段2:worker線程的run方法內部select方法(worker0線程執方法!!)
public void run() {
while(true){
try{
selector.select();
1-3-2 利用任務隊列與wakeup解決可讀事件無法獲取的問題
解決思路:讓worker線程執行注冊任務,通過任務隊列的方法將任務對象傳遞給worker線程。
服務端代碼
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
@Slf4j
public class Server8 {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss,0,null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
Worker worker = new Worker("worker-0");
while(true){
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
if(key.isAcceptable()){
log.debug("accept event happen!");
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
worker.register(sc); // 首次調用啟動線程並注冊,后去調用僅僅注冊
}
}
}
}
// 只有內部類能夠定義為static
static class Worker implements Runnable{
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false;
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Worker(String name){this.name = name;}
// 初始化線程和selector
/*======改進1:boss所在線程在accept事件發生后調用該方法,將Runable對象放入消息隊列 ==============================*/
public void register(SocketChannel sc) throws IOException {
if(!start){ // 利用start確保worker線程只有一個
selector = Selector.open(); // open返回:SelectorProvider.provider().openSelector()
thread = new Thread( this,name);
thread.start();
start = true;
}
// 向隊列中添加注冊任務(runable任務),當worker線程運行時從這個隊列獲取任務並執行
// 確保channel的注冊在select之前。
queue.add(()->{
try{
sc.register(selector,SelectionKey.OP_READ,null);
selector.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
});
/*
讓后面的第一次selection操作不再阻塞。
Causes the first selection operation that has not yet returned to return immediately.
*/
selector.wakeup(); // 這個方法調用讓select方法立刻返回一次,確保注冊的完成
log.debug("Wake up for to register new read/write channel for the selector!");
}
@Override
public void run() {
while(true){
try{
selector.select();
/*======改進1:將Runable對象從消息隊列取出完成注冊==============================*/
Runnable task = queue.poll();
if(task != null){
task.run(); // 執行了sc.register(selector,SelectionKey.OP_READ,null)
log.debug("Register successfully!");
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
/*這里實際讀寫需要考慮消息邊界,寫的數據規模過大的問題,詳見單線程版本設計*/
/*對於可讀事件,需要考慮三種情況:
* 1)正常的可讀事件 2)客戶端異常的關閉(需要處理異常) 3)客戶端正常管理,可讀的字節數為0,必須進行cancel操作。
* 忽視第二種情況會造成服務器程序宕機。忽視第三種情況會造成服務器的陷入死循環狀態
* */
if(key.isReadable()){
try{
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
int read = channel.read(buffer);
if(read == -1){
key.cancel();
channel.close();
}else{
buffer.flip();
printBytebuffer(buffer);
}
}catch (IOException e){
e.printStackTrace();
}
}
iter.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
static void printBytebuffer(ByteBuffer tmp){ // 注意:傳入的bytebuffer必須時寫模式
System.out.println(StandardCharsets.UTF_8.decode(tmp).toString());
}
}
注意點:
- wakeup方法調用后能夠讓selector.select()立刻返回一次。
- 利用ConcurrentLinkedQueue讓boss線程將runable對象傳遞給work線程,從而讓worker線程實現select與regiser。
Java中關於可寫事件的注意點:有三種情況會觸發可讀事件
1)正常的可讀事件 2)客戶端異常的關閉(需要處理異常) 3)客戶端正常關閉,可讀的字節數為0,必須進行cancel操作(否則key不會從事件集合中移除)。
這三種情況的處理統一的模板如下:
while(iter.hasNext()){
SelectionKey key = iter.next();
/*這里實際讀寫需要考慮消息邊界,寫的數據規模過大的問題,詳見單線程版本設計*/
/*對於可讀事件,需要考慮三種情況:
* 1)正常的可讀事件 2)客戶端異常的關閉(需要處理異常) 3)客戶端正常管理,可讀的字節數為0,必須進行cancel操作。
* 忽視第二種情況會造成服務器程序宕機。忽視第三種情況會造成服務器的陷入死循環狀態
* */
if(key.isReadable()){
try{
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
int read = channel.read(buffer);
if(read == -1){
key.cancel();
channel.close();
}else{
buffer.flip();
printBytebuffer(buffer);
}
}catch (IOException e){
e.printStackTrace();
}
}
iter.remove();
}
1-4 完整的多線程環境下selector的使用
程序功能:
1)定義一個boss,其selector專門去監控客戶端的accept事件。
2)定義實現runable接口的worker類,其selector專門用於監控客戶端的讀寫事件。
3)使用單個boss多個worker處理客戶端連接。
- worker的數量通常根據結合cpu核心數設置
4)采用round-robin(輪詢)機制讓多個work均勻的監控客戶端連接的讀寫事件。
- 多線程環境下采用原子整數實現
服務端實現代碼
import lombok.extern.slf4j.Slf4j;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class Server9 {
public static void main(String[] args) throws IOException {
new BossEventLoop().register();
}
@Slf4j
static class BossEventLoop implements Runnable {
/*============================01 重要的屬性=============================================*/
/*創建一個boss以及多個work*/
private Selector boss;
private WorkerEventLoop[] workers;
private volatile boolean start = false;
/*創建計數器,當有連接建立的時候,輪詢可用的worker,將channel綁定到空閑worker的selector上*/
AtomicInteger index = new AtomicInteger();
/*============================02 用於初始化boss線程用於監聽客戶端的連接事件===========================*/
public void register() throws IOException {
if (!start) {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);
boss = Selector.open();
SelectionKey ssckey = ssc.register(boss, 0, null);
ssckey.interestOps(SelectionKey.OP_ACCEPT);
workers = initEventLoops();
new Thread(this, "boss").start();
log.debug("boss start...");
start = true;
}
}
/*============================03 用於初始化boss線程用於監聽客戶端的連接事件===========================*/
public WorkerEventLoop[] initEventLoops() {
// Runtime.getRuntime().availableProcessors()可以獲取CPU的當前核心數,
// 該方法存在bug,就是docker環境下無法獲得所分配的CPU核心
// EventLoop[] eventLoops = new EventLoop[Runtime.getRuntime().availableProcessors()];
WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];
for (int i = 0; i < workerEventLoops.length; i++) {
workerEventLoops[i] = new WorkerEventLoop(i);
}
return workerEventLoops;
}
/*============================04 實際運行的代碼:基本流程就是boss的selector監聽accept事件==============
當有新的連接,按照輪詢的分配方式分配一個worker,這個worker的selector專門用於監聽這個連接的讀寫事件。
這里采用輪詢的策略:是為了保證每個worker監控的連接數目是均勻的。 ===========================*/
@Override
public void run() {
while (true) {
try {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
SocketChannel sc = c.accept();
sc.configureBlocking(false);
log.debug("{} connected", sc.getRemoteAddress());
workers[index.getAndIncrement() % workers.length].register(sc);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/*=============================04 處理讀寫事件的worker類的定義============================== */
@Slf4j
static class WorkerEventLoop implements Runnable {
private Selector worker;
private volatile boolean start = false;
private int index;
private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>();
public WorkerEventLoop(int index) {
this.index = index;
}
public void register(SocketChannel sc) throws IOException {
if (!start) {
worker = Selector.open();
new Thread(this, "worker-" + index).start();
start = true;
}
tasks.add(() -> {
try {
SelectionKey sckey = sc.register(worker, 0, null);
sckey.interestOps(SelectionKey.OP_READ);
worker.selectNow();
} catch (IOException e) {
e.printStackTrace();
}
});
worker.wakeup();
}
@Override
public void run() {
while (true) {
try {
worker.select();
Runnable task = tasks.poll();
if (task != null) {
task.run();
}
Set<SelectionKey> keys = worker.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
try {
int read = sc.read(buffer);
if (read == -1) {
key.cancel();
sc.close();
} else {
buffer.flip();
log.debug("{} message:", sc.getRemoteAddress());
printBytebuffer(buffer);
}
} catch (IOException e) {
e.printStackTrace();
key.cancel();
sc.close();
}
}
iter.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
static void printBytebuffer(ByteBuffer tmp){ // 注意:傳入的bytebuffer必須時寫模式
System.out.println(StandardCharsets.UTF_8.decode(tmp).toString());
}
}
2 NIO與BIO的基礎概念
2-1 Java中stream與channel的區別
定義
stream(流):Classes to support functional-style operations on streams of elements, such as map-reduce transformations on collections.
channel(通道):A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing
A channel is either open or closed. A channel is open upon creation, and once closed it remains closed. Once a channel is closed, any attempt to invoke an I/O operation upon it will cause a ClosedChannelException to be thrown. Whether or not a channel is open may be tested by invoking its isOpen method.Channels are, in general, intended to be safe for multithreaded access as described in the specifications of the interfaces and classes that extend and implement this interface.
總結:流是更為抽象的概念,泛指實體的流,而channel表示對具體事事務(硬件設備/文件/網絡套接字)的連接,
不同點:
- stream 不會自動緩沖數據,channel 會利用系統提供的發送緩沖區、接收緩沖區(更為底層)
- stream 僅支持阻塞 API,channel 同時支持阻塞、非阻塞 API,網絡 channel 可配合 selector 實現多路復用(文件channel沒有多路復用這個說法)
共同點:
- 二者均為全雙工,即讀寫可以同時進行
2-2 IO 模型
2-2-1 從read方法分析阻塞IO/非阻塞IO

當調用一次 channel.read 或 stream.read 后,會切換至操作系統內核態來完成真正數據讀取,而讀取又分為兩個階段,分別為:
- 等待數據階段
- 復制數據階段
總結:讀取操作必須需要操作系統的支持, Java的讀取需要操作系統的支持。
等待階段通常由操作系統切換到內核態,然后從硬件中獲取數據到內存中的這段時間。
| 名稱 | 調用 | 結合數據讀取2個階段進行區分 |
|---|---|---|
| 阻塞IO | read | 阻塞IO會讓用戶線程在等待數據階段與復制數據階段都停止運行(阻塞) |
| 非阻塞IO | read | 非阻塞IO在等待數據階段不被阻塞(沒有數據立刻返回),在復制數據階段線程還是阻塞的。 |


總結:可以看到阻塞IO模型與多路復用IO模型從圖中表現十分類似,在等待數據與復制數據階段都會阻塞,這二者的主要區別如下圖所示:

可以看到selector與阻塞IO的最大區別:
- selector對於多種類型的事件只要任意類型事件就緒會返回,有多個事件,則會返回多個事件。
- 阻塞IO只能逐個處理accept/read/write等不同類型的事件。
2-2-2 同步異步的角度看待IO模型
- 同步:線程自己去獲取結果(一個線程)
- 異步:線程自己不去獲取結果,而是由其它線程送結果(至少兩個線程)
同步阻塞、同步非阻塞、同步多路復用、異步阻塞(沒有此情況)、異步非阻塞(從網絡IO模型理解這些概念)
| 名稱/IO模型 | 同步/異步的區分 | 阻塞/非阻塞區分 | |
|---|---|---|---|
| 同步阻塞(阻塞IO) | 調用read方法自己獲取結果,只不過等待的過程阻塞用戶線程 | 等待數據階段與復制數據階段都停止運行(阻塞) | |
| 同步非阻塞(非阻塞IO) | 調用read方法自己獲取結果,只不過等待的過程不阻塞用戶線程等待數據階段與復制數據階段都停止運行(阻塞) | 等待數據階段不會阻塞但復制數據階段會阻塞 | |
| 同步多路復用(IO多路復用) | 調用read方法自己獲取結果,有事件發生返回結果處理事件,其余情況阻塞 | 等待數據階段與復制數據階段都停止運行(阻塞) | |
| 異步非阻塞(異步IO) | 調用方法但自己不獲取結果,結果是其他線程通過調用回調函數送了過來 | 等待數據階段與復制數據階段都不阻塞 |
注意:沒有異步阻塞這種說法!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
- 異步已經是讓其他線程去送結果了,根本沒必要去阻塞。

上圖中是異步IO,關鍵點:
1)用戶線程提供回調方法參數給其他線程。
2)其他線程在滿足條件調用回調函數將結果傳輸給用戶線程。
2-3 IO模型中的零拷貝問題
2-3-1 方式1:傳統IO模型的工作流程中數據拷貝次數(結合下圖)

File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.length()];
file.read(buf);
Socket socket = ...;
socket.getOutputStream().write(buf);
情景:從磁盤中文件讀取內容然后通過網絡套接字給發送出去?
數據拷貝次數(4次):磁盤=>內核緩沖去(DMA,內核態),內核緩沖區=>用戶緩沖區(cpu,用戶態),用戶緩沖區=>socket緩沖區(cpu,用戶態),socket緩沖區=>網卡(DMA,內核態)
-
read 方法調用后,要從 java 程序的用戶態切換至內核態,去調用操作系統(Kernel)的讀能力,將數據從磁盤讀入內核緩沖區。這期間用戶線程阻塞,操作系統使用 DMA(Direct Memory Access)來實現文件讀,不會使用 cpu。
DMA 也可以理解為硬件單元,用來解放 cpu 完成文件 IO
-
從內核態切換回用戶態,將數據從內核緩沖區讀入用戶緩沖區(即 byte[] buf),這期間 cpu 會參與拷貝,無法利用 DMA
-
調用 write 方法,這時將數據從用戶緩沖區(byte[] buf)寫入 socket 緩沖區,cpu 會參與拷貝
-
接下來要向網卡寫數據,再次從用戶態切換至內核態,調用操作系統的寫能力,使用 DMA 將 socket 緩沖區的數據寫入網卡,不使用 cpu
2-3-2 方式2:直接內存的優化

通過分配直接內存將內核緩沖區與用戶緩沖區合並到一起,減少了文件數據讀取時的一次拷貝。
- 數據拷貝了3次
- 發生2次內核態與用戶態的區分
2-3-3 方式3:進一步優化

- java 調用 transferTo 方法后,要從 java 程序的用戶態切換至內核態,使用 DMA將數據讀入內核緩沖區,不會使用 cpu
- 數據從內核緩沖區傳輸到 socket 緩沖區,cpu 會參與拷貝
- 最后使用 DMA 將 socket 緩沖區的數據寫入網卡,不會使用 cpu
可以看到
- 只發生了一次用戶態與內核態的切換
- 數據拷貝了 3 次
2-3-4 方式4:再進一步優化(硬件優化)

- java 調用 transferTo 方法后,要從 java 程序的用戶態切換至內核態,使用 DMA將數據讀入內核緩沖區,不會使用 cpu
- 只會將一些 offset 和 length 信息拷入 socket 緩沖區,幾乎無消耗
- 使用 DMA 將 內核緩沖區的數據寫入網卡,不會使用 cpu(數據的拷貝不會經過socket緩沖區)
可以看到
- 一次用戶態與內核態的切換
- 數據拷貝了 2 次。
2-3-5 四種方式總結
| 數據拷貝次數 | 內核態與用戶態切換次數 | 特點 | |
|---|---|---|---|
| 方式1:傳統IO模型的工作流程 | 4 | 2 | 磁盤->內核緩沖區->用戶緩沖區->socket緩沖區->網卡 |
| 方式2:采用JJava內存優化工作流程 | 3 | 2 | 磁盤-->直接內存->socket緩沖區->網卡 |
| 方式3 | 3 | 1 | 磁盤-->內核緩沖區->socket緩沖區->網卡 |
| 方式4 | 2 | 1 | 磁盤-->內核緩沖區->網卡 |
總結:
1)方式3與方式4都屬於零拷貝,數據沒有放入到用戶緩沖區,最大的特點就是內核態與用戶態只需要進行一次(節約上下文開銷)。
2)方式4與方式3有各自的適用場景,方式4比方式3少拷貝一次數據。
2-3-6 零拷貝的總結
零拷貝:不是真正無拷貝,而是在不會拷貝重復數據到 jvm 內存中(用戶緩沖區)(方式3與方式4都是屬於零拷貝)
零拷貝的優點:
- 更少的用戶態與內核態的切換
- 不利用 cpu 計算,減少 cpu 緩存偽共享
- 零拷貝適合頻繁的,小文件傳輸
2-4 異步IO介紹
- Netty5廢棄,linux支持的不好,windows支持的比較好,了解即可
AIO 用來解決數據復制階段的阻塞問題
- 同步意味着,在進行讀寫操作時,線程需要等待結果,還是相當於閑置
- 異步意味着,在進行讀寫操作時,線程不必等待結果,而是將來由操作系統來通過回調方式由另外的線程來獲得結果
異步模型需要底層操作系統(Kernel)提供支持
- Windows 系統通過 IOCP 實現了真正的異步 IO
- Linux 系統異步 IO 在 2.6 版本引入,但其底層實現還是用多路復用模擬了異步 IO,性能沒有優勢
💡 守護線程
默認文件 AIO 使用的線程都是守護線程,所以最后要執行 System.in.read() 以避免守護線程意外結束
