Java高伸縮性IO處理
在Doug Lea大神的經典NIO框架文章《Scalable IO in Java》中,具體闡述了如何把Reactor模式和Java NIO整合起來,一步步理論結合Java代碼實踐去構建了一套高伸縮性的網絡處理框架雛形,從當今的流行NIO框架(Netty、Main、Grizzly)中無不看到其本質均與該文章所述架構不謀而合(或者也可以說其實是借鑒並以現代化的方式實現了Doug Lea的思想吧),這里總結《Scalable IO in Java》中的要點並記錄下自己實現多Reactor的過程中遇到的坑
網絡服務的基本結構
當今網絡上的各種基於TCP/IP的應用服務,其對1次請求的處理過程的本質流程結構均為
- 從底層IO讀取字節請求
- 把讀取后的字節請求進行解碼成為自己的業務請求對象
- 把解碼后的業務請求對象進行業務處理
- 把處理后的響應編碼為底層IO可寫入的字節響應
- 利用底層IO返回(發出)編碼后的字節響應
整體的流程如上述5步所示,但具體每步驟所使用到的一些技術手段不一樣:例如解碼協議是自定義的還是使用業界流行的?是文本協議還是二進制協議?處理過程就結合具體業務進行處理等
一般典型的網絡服務設計如下圖所示:
可見其對每一個請求都新產生一個線程來進行處理,缺點就是線程的創建是消耗不小的系統資源的,且最關鍵的是如果並發訪問突然激增到一定程度,那響應就會大打折扣,甚至由於系統資源不足導致系統崩潰。。。
這里給出自己的Java實現代碼如下,比較簡單,就是處理每個請求都new一個Thread
Server
public class Server implements Runnable{
private final int port;
private ServerSocket serverSocket;
public Server(int port){
this.port = port;
try {
this.serverSocket = new ServerSocket(port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run(){
try {
while (!Thread.interrupted()) {
new Thread(new Handler(serverSocket.accept())).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public int getPort() {
return port;
}
public static void main(String[] args) {
new Thread(new Server(9001)).start();
}
}
Handle
public class Handler implements Runnable{
private final Socket clientSocket;
public Handler(Socket clientSocket){
this.clientSocket = clientSocket;
}
@Override
public void run() {
int readSize;
byte[] readBuf = new byte[BUF_SIZE];
try {
InputStream in = clientSocket.getInputStream();
OutputStream out = clientSocket.getOutputStream();
while ((readSize = in.read(readBuf)) != -1) {
out.write(readBuf, 0, readSize);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
高伸縮性系統的目標
基於“每請求每線程”的缺點比較明顯且不可接受(嚴重時系統崩潰),Doug Lea大神提出了構建高伸縮性系統的目標:
- 在激增請求的負載下至少優雅退化吧,即可以相應慢點,但別崩潰(無響應)呀
- 然后自動的增加處理所需資源(例如CPU、內存、磁盤、帶寬)來漸進改善上一步中響應緩慢的問題
然后對於系統整體的可用性和性能也提出了一些目標:
- 低延時,其實就是盡量高響應
- 能夠滿足最大峰值的處理請求,即在訪問量突增時不至於宕機
- 可調控的服務處理,例如請求較多時可多加入一些服務處理線程
最后總結了針對設計高伸縮性系統的一個至理名言:
分而治之通常都是構建任何高伸縮性系統的最佳解決方案!
NIO框架的分而治之和事件驅動設計
針對NIO框架的分而治之是把處理過程拆分封裝成小的任務——每個任務可以單獨無阻塞的進行業務處理;每個任務在其可以立即執行處理的時候就立即執行,即把原來IO阻塞部分交由NIO框架去管理處理,真正的任務只無差別/冪等的處理真正的業務;NIO框架則把IO事件當做觸發器去回調相關的任務去執行。
值得慶幸的在java.nio包中有對實現上述的NIO框架處理機制的支持:
- 非阻塞(Non-blocking)的讀取和寫入
- 分發(dispatch)IO事件到與其對應的任務並執行處理
這一切看起來都很類似Swing/AWT事件驅動設計:
而實際的Swing/AWT事件驅動本質上是多生產者/單一消費者模式,即有多個產生事件的地方(各種交互GUI),但是處理事件卻只在一個地方(AWT/Event線程從事件隊列獲取事件一個個處理)。
Reactor模式和NIO
同Swing/AWT事件驅動設計類似,Reactor模式也是多生產者/單一消費者模式,多個IO(讀 /寫)事件,但是處理IO事件卻只在單一的EventLoop(事件循環)線程中分發給對應的任務處理器處理。基本的Reactor模式(單線程版)如下所示:
下面來看下基於NIO的Reactor模式和Swing/AWT事件驅動設計的相似對比
類名稱 | 作用 | 對應Swing/AWT |
---|---|---|
Reactor反應器 | EventLoop及時響應相對於的讀/寫IO事件 | AWT中的單例事件分發線程 |
分發到對應Handler處理器上進行業務處理 | ||
Handlers處理器 | 處理非阻塞讀/寫IO事件所對應的業務邏輯 | AWT中的ActionListeners處理器 |
事件綁定和處理 | 管理IO讀/寫事件到對應處理器的綁定 | AWT中的addActionListener綁定 |
然后再看下Java NIO中對實現Reactor提供了哪些支持
類名稱 | 作用 |
---|---|
Channels | 連接支持非阻塞IO的讀/寫的通道 |
例如磁盤文件、網絡Socket等都有對應的非阻塞IO的通道類 | |
Buffers | Channels通道直接用來進行讀/寫操作的類數組對象 |
Selectors | 能知道哪些Channels通道集合存在IO事件 |
SelectionKeys | 提供IO事件狀態信息和IO事件綁定功能的類 |
![]() |
Reactor模式的多線程設計
單線程版Reactor模式是最基本的實現,其核心就是單線程Reactor的EventLoop在不斷處理被Selector檢測到的IO事件,但缺點也顯而易見:
- 隨着客戶端的連接數目的增加,如果業務的處理也需要消耗不小時間的話,那僅僅單次的EventLoop循環都會消耗不少時間才能進入下一次循環,導致IO事件阻塞在Selector里不能被及時輪詢處理到
- 而且隨着多核CPU的爆發,當擁有多核機器時,應當適當利用多線程能力來分擔本來是單線程的Rector,以去應對更多的客戶端連接,否則依舊是單線程Rector的話,豈不是浪費了多核這個潮流強項了?
Worker Threads
針對第1條缺點引入了Worker Threads(工人線程,消費線程,即有一群工人老早就做好准備處理即將到來任務了)——線程池;理由是Reactor的EventLoop輪詢應當快速響應IO觸發事件,而不應當消耗在本應該是任務處理器處理的業務上:
從上圖可以看到其實就是在單線程Reactor的基礎上把非IO相關的業務處理部分(decode、computer和encode)拆出來封裝成為一個單獨的任務(Runnable/命令模式),如此一來在線程池中就能立即進行計算處理了
Multiple Reactor Threads
針對第2條Multiple Reactor Threads,即多個Reactor線程;理由是隨着客戶端連接越來越多,單個Reactor線程處理IO能力會達到飽和狀態,在多核機器上看到的現象是只有一個核心利用率較高,其他核心是閑置的,所以應當適當利用多核優勢,擴展成匹配CPU核數的多個Reactor,達到分擔IO負載的目的:
如上圖所示,多Reactor根據職責划分為1個mainReactor和多個subReactors,mainReactor主要負責接收客戶端連接,因為TCP初始需要經歷3次握手才能確認連接,這個連接過程的消耗在客戶端較多時其開銷是不小的,單獨使用mainReactor處理保證了其他已經連接上的客戶端在subReactors中不受其影響,從而快速響應處理業務,以此分攤負載並提高系統整體系能
代碼實現
《Scalable IO in Java》文章中也已經給出示例代碼了,基本的Reactor模式的實現直接照搬代碼,自己再寫點NIO的讀/寫部分以及process部分即可,所以這里主要把如何實現多Reactor/Selector以及所遇到的坑說一下
多Reactor/多Selector
Reactor的實現依賴於NIO的Selector,是Selector去輪詢Channel的,所以其實在單線程版Reactor中Reactor有一個Selector,同理既然是多Reactor,那么還是每個Reactor都有自己的Selector和EventLoop輪詢。
區別在於:mainReactor的Selector感興趣的是ACCEPT操作,而subReactors感興趣的先是READ然后才是WRITE,然后WRITE完畢后感興趣的是READ然后再是WRITE。。。如此反復,必須要先READ是為了避免多線程中IO重疊問題,所以需要在代碼中區分Reactor是不是mainReactor。
Reactor
public abstract class Reactor extends Thread{
protected final int port;
protected final ServerSocketChannel serverChannel;
protected final boolean isMainReactor;
protected final boolean useMultipleReactors;
protected final long timeout;
protected Selector selector;
public Reactor(int port, ServerSocketChannel serverChannel, boolean isMainReactor, boolean useMultipleReactors, long timeout){
this.port = port;
this.serverChannel = serverChannel;
this.isMainReactor = isMainReactor;
this.useMultipleReactors = useMultipleReactors;
this.timeout = timeout;
}
@Override
public void run(){
try {
init();
while(!Thread.interrupted()){
//不可以使用阻塞的select方式,否則accept后subReactor的selector在register的時候會一直阻塞
//但是修改為帶有超時的select或者selectNow后,subReactor的selector在register就不會阻塞了
//最終選擇了帶有超時的select是因為使用selectNow的無限循環會導致CPU飆高特別快
//並且如果使用阻塞的select方式,還需要知道在哪里調用wakeup,否則會一直阻塞,使用非阻塞方式就不需要wakeup了
//selector.select();
//if(selector.selectNow() > 0){
if(selector.select(timeout) > 0){
log(selector+" isMainReactor="+isMainReactor+" select...");
Iterator<SelectionKey> keyIt = selector.selectedKeys().iterator();
while(keyIt.hasNext()){
SelectionKey key = keyIt.next();
dispatch(key);
keyIt.remove();
}
}
}
log(getClass().getSimpleName()+" end on "+port+" ..."+"\n");
} catch (IOException e) {
e.printStackTrace();
}
}
private void init() throws IOException{
selector = Selector.open();
log(selector+" isMainReactor="+isMainReactor);
if(isMainReactor){
//serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(port));
serverChannel.configureBlocking(false);
SelectionKey key = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
key.attach(newAcceptor(selector));
log(getClass().getSimpleName()+" start on "+port+" ..."+"\n");
}else{
}
//如果使用阻塞的select方式,且開啟下面的代碼的話,相當於開啟了多個reactor池,而不是mainReactor和subReactor的關系了
//SelectionKey key = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
//key.attach(newAcceptor(selector, serverChannel));
}
public abstract Acceptor newAcceptor(Selector selector);
/** * 事件和事件處理器的綁定 * <ul> * <li>管理IO讀/寫事件到事件處理器的一一對應的綁定</li> * </ul> */
private void dispatch(SelectionKey key){
Runnable r = (Runnable)key.attachment();
if(r != null){
r.run();
}
}
}
- Reactor中的init方法里的isMainReactor字段即是用來判斷是否該Reactor是否為mainReactor的,如果是mainReactor的話,則注冊感興趣的為ACCEPT事件,並且添加Acceptor附件
- 然后run方法里面的while循環即是EventLoop輪詢了,需要注意的是這里有坑:別使用阻塞的select方法,因為該方法會導致accept后subReactor的selector在register的時候會一直阻塞;也別使用非阻塞的selecNow方法,因為selectNow在無限循環下即使沒有IO事件,也會使CPU飆到100%;所以最終選擇使用帶有超時的select(timeout)方法
Acceptor
public abstract class Acceptor extends Thread {
protected final Selector selector;
protected final ServerSocketChannel serverChannel;
protected final boolean useMultipleReactors;
public Acceptor(Selector selector, ServerSocketChannel serverChannel, boolean useMultipleReactors){
this.selector = selector;
this.serverChannel = serverChannel;
this.useMultipleReactors = useMultipleReactors;
}
@Override
public void run() {
log(selector+" accept...");
try {
SocketChannel clientChannel = serverChannel.accept();
if(clientChannel != null){
log(selector+" clientChannel not null...");
//如果使用阻塞的select方式,且目的是開啟了多個reactor池,而不是mainReactor和subReactor的關系的話,
//則下面就不是nextSubSelector().selector,而是改為傳遞當前實例的selector對象即可
handle(useMultipleReactors ? nextSubReactor().selector : selector, clientChannel);
}else{
log(selector+" clientChannel is null...");
}
} catch (IOException e) {
e.printStackTrace();
}
}
/** * 在每個具體的Handler下調用run方法是為了令其從connecting狀態變為reading狀態, * 和原pdf版本下的做法是一樣的,只不過原pdf版本是在構造函數直接修改設置了感興趣為read事件 */
public abstract void handle(Selector selector, SocketChannel clientSocket);
}
Acceptor是被mainReactor當做ACCPET的附屬對象,所以當有連接接收過來了,就使用handle方法處理,handle方法的Selector參數即可傳遞subReactors的Selector,然后先對READ感興趣即可。
Handler
public abstract class Handler extends Thread {
private enum State{
CONNECTING(0),
READING(SelectionKey.OP_READ),
PROCESSING(2),
WRITING(SelectionKey.OP_WRITE);
private final int opBit;
private State(int operateBit){
opBit = operateBit;
}
}
private State state;
protected final SocketChannel clientChannel;
protected final SelectionKey key;
protected final ByteBuffer readBuf;
protected final StringBuilder readData = new StringBuilder();
protected ByteBuffer writeBuf;
public Handler(Selector selector, SocketChannel clientChannel){
this.state = State.CONNECTING;
SelectionKey key = null;
try {
clientChannel.configureBlocking(false);
//這里在使用subSelector的時候會阻塞,為什么?是因為使用了阻塞的select方法,非阻塞的才可以
//但如果使用reactor池的話,那是因為需要serverChannel注冊selector的accept事件!?必須對應上才可以通過,否則阻塞
key = clientChannel.register(selector, this.state.opBit);
key.attach(this);
} catch (Exception e) {
e.printStackTrace();
}
this.clientChannel = clientChannel;
this.key = key;
this.readBuf = ByteBuffer.allocate(byteBufferSize());
log(selector+" connect success...");
}
@Override
public void run() {
switch (state) {
case CONNECTING:
connect();
break;
case READING:
readAndProcess();
break;
case WRITING:
write();
break;
default:
err("\nUnsupported State: "+state+" ! overlap processing with IO...");
}
}
private void connect() {
interestOps(State.READING);
}
/** * But harder to overlap processing with IO<br/> * Best when can first read all input a buffer<br/> * <br> * That why we used synchronized on read method!<br/> * Just to protected read buffer And handler state...<br/> * <br> * 其實就是害怕重疊IO和工作線程處理不一致:例如Reactor單線程讀某個key的IO完畢后立馬開啟工作線程的處理, * 緊接着Reactor單線程處理第二個IO key的時候發現還是之前的那個key的讀IO事件,但是之前同一個key的處理還未完成, * 不等待之前的處理完成的話,就會出現多個線程同時訪問修改Handler里面數據的情況,導致出錯, * 但是最好先把數據都全部讀入buffer中就可以規避了!? * * <p>此處的synchronized同步是為了防止state狀態以及讀寫buffer在多線程訪問中出現讀臟數據, * Debug調試的時候同時訪問一個SelectionKey有2個線程: * <br>1、Reactor單線程 * <br>2、讀數據完畢后多線程處理的話,線程池里面執行processAndHandOff的線程 * <br> * 不能單一使用volatile或者原子變量的原因是因為該方法為復合操作(check and act) */
private synchronized void readAndProcess(){
doRead();
doProcess();
}
private void doRead(){
int readSize;
try {
while((readSize = clientChannel.read(readBuf)) > 0){
readData.append(new String(Arrays.copyOfRange(readBuf.array(), 0, readSize)));
readBuf.clear();
}
if(readSize == -1){
disconnect();
return;
}
} catch (IOException e) {
e.printStackTrace();
disconnect();
}
log("readed from client:"+readData+", "+readData.length());
}
private void doProcess(){
if(readIsComplete()){
state = State.PROCESSING;
processAndInterestWrite();
}
}
/** * 處理過程可能是比較耗時的,所以可考慮將其交由線程池處理,處理完畢后才注冊感興趣的write事件<p> * 然而正是由於交由線程池處理所以可能造成重疊IO的多線程處理的狀態問題,最好能一次性全部讀入buffer,否則考慮同步狀態處理問題 */
private void processAndInterestWrite(){
Processor processor = new Processor();
if(useThreadPool){
execute(processor);
}else{
processor.run();
}
}
private final class Processor implements Runnable{
@Override
public void run() {
processAndHandOff();
}
}
private synchronized void processAndHandOff(){
if(process()){
interestOps(State.WRITING);
}
}
//TODO 修改為復用output,即當output容量不足的時候就反復write,而不是每次都使用wrap來new一個新的
public boolean process(){
log("process readData="+readData.toString());
if(isQuit()){
disconnect();
return false;
}
writeBuf = ByteBuffer.wrap(readData.toString().getBytes());
readData.delete(0, readData.length());
return true;
}
private void write(){
try {
do{
clientChannel.write(writeBuf);
}while(!writeIsComplete());
} catch (IOException e) {
e.printStackTrace();
disconnect();
}
String writeData = new String(Arrays.copyOf(writeBuf.array(), writeBuf.array().length));
log("writed to client:"+writeData+", "+writeData.length());
interestOps(State.READING);
}
/** * 事件和事件處理器的綁定 * <ul> * <li>類似AWT中的addActionListener添加監聽器/觀察者</li> * </ul> * 不需要重置key的附件(key.attach)是因為key一直綁定使用的是當前this實例, * 在Reactor dispatch的時候如果是接受(accept)該附件就是Acceptor實例, * 否則就是綁定到該key的同一個Handler實例 */
private void interestOps(State state){
this.state = state;
key.interestOps(state.opBit);
}
public boolean isQuit(){
return false;
}
private void disconnect(){
try {
clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
log("\nclient Address=【"+clientAddress(clientChannel)+"】 had already closed!!! ");
}
private static SocketAddress clientAddress(SocketChannel clientChannel){
return clientChannel.socket().getRemoteSocketAddress();
}
public abstract int byteBufferSize();
public abstract boolean readIsComplete();
public abstract boolean writeIsComplete();
}
具體做的事情就像前面說的,先對READ感興趣,然后是狀態機的判斷和處理,注意的地方使用了synchronized同步避免IO重疊並起到了保護狀態機的作用,注釋上也已經做出描述了。
其中有些方法是abstract是因為想自己寫一個類NIO框架,達到根據應用場景的不同可以自行實現所需要的方法,當前僅僅寫了個Echo(回顯)和Enter(回車作為結束符)顯示消息的例子,具體代碼已經放到本人GitHub上:scalableIO