《Scalable IO in Java》譯文


《Scalable IO in Java》 是java.util.concurrent包的作者,大師Doug Lea關於分析與構建可伸縮的高性能IO服務的一篇經典文章,在文章中Doug Lea通過各個角度,循序漸進的梳理了服務開發中的相關問題,以及在解決問題的過程中服務模型的演變與進化,文章中基於Reactor反應器模式的幾種服務模型架構,也被Netty、Mina等大多數高性能IO服務框架所采用,因此閱讀這篇文章有助於你更深入了解Netty、Mina等服務框架的編程思想與設計模式。

下面是我對《Scalable IO in Java》原文核心內容的一個翻譯,原文連接:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

一、網絡服務

在一般的網絡或分布式服務等應用程序中,大都具備一些相同的處理流程,例如:

① 讀取請求數據;

② 對請求數據進行解碼;

③ 對數據進行處理;

④ 對回復數據進行編碼;

⑤ 發送回復;

當然在實際應用中每一步的運行效率都是不同的,例如其中可能涉及到xml解析、文件傳輸、web頁面的加載、計算服務等不同功能。

1、傳統的服務設計模式

在一般的網絡服務當中都會為每一個連接的處理開啟一個新的線程,我們可以看下大致的示意圖:

 

 

 

每一個連接的處理都會對應分配一個新的線程,下面我們看一段經典的Server端Socket服務代碼:

class Server implements Runnable {
        public void run() {
            try {
                ServerSocket ss = new ServerSocket(PORT);
                while (!Thread.interrupted())
                    new Thread(new Handler(ss.accept())).start();
                // or, single-threaded, or a thread pool
            } catch (IOException ex) {
                /* ... */ }
        }

        static class Handler implements Runnable {
            final Socket socket;

            Handler(Socket s) {
                socket = s;
            }

            public void run() {
                try {
                    byte[] input = new byte[MAX_INPUT];
                    socket.getInputStream().read(input);
                    byte[] output = process(input);
                    socket.getOutputStream().write(output);
                } catch (IOException ex) {
                    /* ... */ }
            }

            private byte[] process(byte[] cmd) {
                /* ... */ }
        }
    }

2、構建高性能可伸縮的IO服務

在構建高性能可伸縮IO服務的過程中,我們希望達到以下的目標:

① 能夠在海量負載連接情況下優雅降級;

② 能夠隨着硬件資源的增加,性能持續改進;

③ 具備低延遲、高吞吐量、可調節的服務質量等特點;

而分發處理就是實現上述目標的一個最佳方式。

3、分發模式

分發模式具有以下幾個機制:

① 將一個完整處理過程分解為一個個細小的任務;

② 每個任務執行相關的動作且不產生阻塞;

③ 在任務執行狀態被觸發時才會去執行,例如只在有數據時才會觸發讀操作;

在一般的服務開發當中,IO事件通常被當做任務執行狀態的觸發器使用,在hander處理過程中主要針對的也就是IO事件;

 

java.nio包就很好的實現了上述的機制:

①  非阻塞的讀和寫

②  通過感知IO事件分發任務的執行

所以結合一系列基於事件驅動模式的設計,給高性能IO服務的架構與設計帶來豐富的可擴展性;

二、基於事件驅動模式的設計

基於事件驅動的架構設計通常比其他架構模型更加有效,因為可以節省一定的性能資源,事件驅動模式下通常不需要為每一個客戶端建立一個線程,這意味這更少的線程開銷,更少的上下文切換和更少的鎖互斥,但任務的調度可能會慢一些,而且通常實現的復雜度也會增加,相關功能必須分解成簡單的非阻塞操作,類似與GUI的事件驅動機制,當然也不可能把所有阻塞都消除掉,特別是GC, page faults(內存缺頁中斷)等。由於是基於事件驅動的,所以需要跟蹤服務的相關狀態(因為你需要知道什么時候事件會發生);

下圖是AWT中事件驅動設計的一個簡單示意圖,可以看到,在不同的架構設計中的基於事件驅動的IO操作使用的基本思路是一致的;

 

三、Reactor模式

Reactor也可以稱作反應器模式,它有以下幾個特點:

① Reactor模式中會通過分配適當的handler(處理程序)來響應IO事件,類似與AWT 事件處理線程;

② 每個handler執行非阻塞的操作,類似於AWT ActionListeners 事件監聽

③ 通過將handler綁定到事件進行管理,類似與AWT addActionListener 添加事件監聽;

1、單線程模式

下圖展示的就是單線程下基本的Reactor設計模式

 

 

首先我們明確下java.nio中相關的幾個概念:

Channels

支持非阻塞讀寫的socket連接;

Buffers

用於被Channels讀寫的字節數組對象

Selectors

用於判斷channle發生IO事件的選擇器

SelectionKeys

負責IO事件的狀態與綁定 

Ok,接下來我們一步步看下基於Reactor模式的服務端設計代碼示例:

第一步  Rector線程的初始化

class Reactor implements Runnable { 
    final Selector selector;
    final ServerSocketChannel serverSocket;
    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //注冊accept事件
        sk.attach(new Acceptor()); //調用Acceptor()為回調方法
    }
    
    public void run() { 
        try {
            while (!Thread.interrupted()) {//循環
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                    dispatch((SelectionKey)(it.next()); //dispatch分發事件
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }
    
    void dispatch(SelectionKey k) {
        Runnable r = (Runnable)(k.attachment()); //調用SelectionKey綁定的調用對象
        if (r != null)
            r.run();
    }
    
    // Acceptor 連接處理類
    class Acceptor implements Runnable { // inner
        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null)
                new Handler(selector, c);
            }
            catch(IOException ex) { /* ... */ }
        }
    }
}

第二步 Handler處理類的初始化

final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;
    
    Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c;
        c.configureBlocking(false);
        // Optionally try first read now
        sk = socket.register(sel, 0);
        sk.attach(this); //將Handler綁定到SelectionKey上
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }
    boolean inputIsComplete() { /* ... */ }
    boolean outputIsComplete() { /* ... */ }
    void process() { /* ... */ }
    
    public void run() {
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (IOException ex) { /* ... */ }
    }
    
    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }
    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) sk.cancel(); 
    }
}

下面是基於GoF狀態對象模式對Handler類的一個優化實現,不需要再進行狀態的判斷。

class Handler { // ...
    public void run() { // initial state is reader
        socket.read(input);
        if (inputIsComplete()) {
            process();
            sk.attach(new Sender()); 
            sk.interest(SelectionKey.OP_WRITE);
            sk.selector().wakeup();
        }
    }
    class Sender implements Runnable {
        public void run(){ // ...
            socket.write(output);
            if (outputIsComplete()) sk.cancel();
        }
    }
}

2、多線程設計模式

在多處理器場景下,為實現服務的高性能我們可以有目的的采用多線程模式:

  1、增加Worker線程,專門用於處理非IO操作,因為通過上面的程序我們可以看到,反應器線程需要迅速觸發處理流程,而如果處理過程也就是process()方法產生阻塞會拖慢反應器線程的性能,所以我們需要把一些非IO操作交給Woker線程來做;

  2、拆分並增加反應器Reactor線程,一方面在壓力較大時可以飽和處理IO操作,提高處理能力;另一方面維持多個Reactor線程也可以做負載均衡使用;線程的數量可以根據程序本身是CPU密集型還是IO密集型操作來進行合理的分配;

2.1 多線程模式

Reactor多線程設計模式具備以下幾個特點:

① 通過卸載非IO操作來提升Reactor 線程的處理性能,這類似與POSA2 中Proactor的設計;

② 比將非IO操作重新設計為事件驅動的方式更簡單;

③ 但是很難與IO重疊處理,最好能在第一時間將所有輸入讀入緩沖區;(這里我理解的是最好一次性讀取緩沖區數據,方便異步非IO操作處理數據)

④ 可以通過線程池的方式對線程進行調優與控制,一般情況下需要的線程數量比客戶端數量少很多;

下面是Reactor多線程設計模式的一個示意圖與示例代碼(我們可以看到在這種模式中在Reactor線程的基礎上把非IO操作放在了Worker線程中執行):

    class Handler implements Runnable {
        // uses util.concurrent thread pool
        static PooledExecutor pool = new PooledExecutor(...);//聲明線程池
        static final int PROCESSING = 3;

        // ...
        synchronized void read() { // ...
            socket.read(input);
            if (inputIsComplete()) {
                state = PROCESSING;
                pool.execute(new Processer());//處理程序放在線程池中執行
            }
        }

        synchronized void processAndHandOff() {
            process();
            state = SENDING; // or rebind attachment
            sk.interest(SelectionKey.OP_WRITE);
        }

        class Processer implements Runnable {
            public void run() {
                processAndHandOff();
            }
        }
    }

當你把非IO操作放到線程池中運行時,你需要注意以下幾點問題:

① 任務之間的協調與控制,每個任務的啟動、執行、傳遞的速度是很快的,不容易協調與控制;

② 每個hander中dispatch的回調與狀態控制;

③ 不同線程之間緩沖區的線程安全問題;

④ 需要任務返回結果時,任務線程等待和喚醒狀態間的切換;

為解決上述問題可以使用PooledExecutor線程池框架,這是一個可控的任務線程池,主函數采用execute(Runnable r),它具備以下功能,可以很好的對池中的線程與任務進行控制與管理:

① 可設置線程池中最大與最小線程數;

② 按需要判斷線程的活動狀態,及時處理空閑線程;

③ 當執行任務數量超過線程池中線程數量時,有一系列的阻塞、限流的策略;

 2.2 基於多個反應器的多線程模式

這是對上面模式的進一步完善,使用反應器線程池,一方面根據實際情況用於匹配調節CPU處理與IO讀寫的效率,提高系統資源的利用率,另一方面在靜態或動態構造中每個反應器線程都包含對應的Selector,Thread,dispatchloop,下面是一個簡單的代碼示例與示意圖(Netty就是基於這個模式設計的,一個處理Accpet連接的mainReactor線程,多個處理IO事件的subReactor線程):

    Selector[] selectors; // Selector集合,每一個Selector 對應一個subReactor線程
    //mainReactor線程
    class Acceptor { // ...
        public synchronized void run() { 
            //...
            Socket connection = serverSocket.accept(); 
            if (connection != null)
              new Handler(selectors[next], connection); 
            if (++next == selectors.length)
                next = 0;
        }
    }

 

在服務的設計當中,我們還需要注意與java.nio包特性的結合:

一是注意線程安全,每個selectors 對應一個Reactor 線程,並將不同的處理程序綁定到不同的IO事件,在這里特別需要注意線程之間的同步;

二是java nio中文件傳輸的方式:

① Memory-mapped files 內存映射文件的方式,通過緩存區訪問文件;

② Direct buffers直接緩沖區的方式,在合適的情況下可以使用零拷貝傳輸,但同時這會帶來初始化與內存釋放的問題(需要池化與主動釋放);

 

以上就是對《Scalable IO in Java》中核心內容的譯文,限於本人各方面水平有限,本次翻譯也只是便於自己閱讀與理解,其中難免有翻譯與認知錯誤的地方,望請大家諒解,如果對這方面的內容感興趣還是建議大家去閱讀原文。

 

關注微信公眾號,查看更多技術文章。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM