Java IO學習筆記七:多路復用從單線程到多線程


作者:Grey

原文地址:Java IO學習筆記七:多路復用從單線程到多線程

前面提到的多路復用的服務端代碼中, 我們在處理讀數據的同時,也處理了寫事件:

    public void readHandler(SelectionKey key) {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.clear();
        int read;
        try {
            while (true) {
                read = client.read(buffer);
                if (read > 0) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (read == 0) {
                    break;
                } else {
                    client.close();
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

為了權責清晰一些,我們分開了兩個事件處理:

                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iter = selectionKeys.iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isAcceptable()) {
                            acceptHandler(key);
                        } else if (key.isReadable()) {
                            // 處理讀數據
                            readHandler(key);
                        } else if (key.isWritable()) {
                            // 處理寫數據
                            writeHandler(key);
                        }
                    }

一個負責寫,一個負責讀

讀的事件處理, 如下代碼

    public void readHandler(SelectionKey key) {
        System.out.println("read handler.....");
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.clear();
        int read = 0;
        try {
            while (true) {
                read = client.read(buffer);
                if (read > 0) {
                    client.register(key.selector(), SelectionKey.OP_WRITE, buffer);
                } else if (read == 0) {
                    break;
                } else {
                    client.close();
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

其中read > 0 即從客戶端讀取到了數據,我們才注冊一個寫事件:

client.register(key.selector(), SelectionKey.OP_WRITE, buffer);

其他事件不注冊寫事件。(PS:只要send-queue沒有滿,就可以注冊寫事件)

寫事件的處理邏輯如下:

    private void writeHandler(SelectionKey key) {
        System.out.println("write handler...");
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        buffer.flip();
        while (buffer.hasRemaining()) {
            try {
                client.write(buffer);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        buffer.clear();
        key.cancel();
        try {
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

寫完后,調用key.cancel()取消注冊,並關閉客戶端。

測試一下,運行SocketMultiplexingV2.java

並通過一個客戶端連接進來:

nc 192.168.205.1 9090

客戶端發送一些內容:

nc 192.168.205.1 9090
asdfasdfasf
asdfasdfasf

可以正常接收到數據。

考慮有一個fd執行耗時,在一個線性里會阻塞后續FD的處理,同時,考慮資源利用,充分利用cpu核數。

我們來實現一個基於多線程的多路復用模型。

將N個FD分組(這里的FD就是Socket連接),每一組一個selector,將一個selector壓到一個線程上(最好的線程數量是: cpu核數或者cpu核數*2)

每個selector中的fd是線性執行的。假設有100w個連接,如果有四個線程,那么每個線程處理25w個。

分組的FD和處理這堆FD的Selector我們封裝到一個數據結構中,假設叫:SelectorThread,其成員變量至少有如下:

Selector selector = null;
// 存Selector對應要處理的FD隊列
LinkedBlockingQueue<Channel> lbq = new LinkedBlockingQueue<>();

由於其處理是線性的,且我們要開很多個線程來處理,所以SelectorThread本身是一個線程類(實現Runnable接口)

public class SelectorThread implements Runnable {
...

}

run方法中,我們就可以把之前單線程處理Selector的常規操作代碼移植過來:

....
while (true) {
....
 if (selector.select() > 0) {
       Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
       while (iter.hasNext()) {
            SelectionKey key = iter.next();
            iter.remove();
            if (key.isAcceptable()) {
                acceptHandler(key);
            } else if (key.isReadable()) {
                 readHandler(key);
            } else if (key.isWritable()) {
            }
     }
  }
....
}
....

SelectorThread設計好以后,我們需要一個可以組織SelectorThread的類,假設叫SelectorThreadGroup,這個類的主要職責就是安排哪些FD由哪些Selector來接管,這個類里面持有兩個SelectorThread數組,一個用於分配服務端,一個用於分配每次客戶端的Socket請求。

// 服務端,可以啟動多個服務端
SelectorThread[] bosses;
// 客戶端的Socket請求
SelectorThread[] workers;

SelectorThreadGroup構造器中初始化這兩個數組

    SelectorThreadGroup(int bossNum, int workerNum) {
        bosses = new SelectorThread[bossNum];
        workers = new SelectorThread[workerNum];
        for (int i = 0; i < bossNum; i++) {
            bosses[i] = new SelectorThread(this);
            new Thread(bosses[i]).start();
        }
        for (int i = 0; i < workerNum; i++) {
            workers[i] = new SelectorThread(this);
            new Thread(workers[i]).start();
        }
    }

以下代碼是針對每次的請求,如何分配Selector:

...
    public void nextSelector(Channel c) {
        try {
            SelectorThread st;
            if (c instanceof ServerSocketChannel) {
                st = nextBoss();
                st.lbq.put(c);
                st.setWorker(workerGroup);
            } else {
                st = nextWork();
                st.lbq.add(c);
            }
            st.selector.wakeup();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private SelectorThread nextBoss() {
        int index = xid.incrementAndGet() % bosses.length;
        return bosses[index];
    }

    private SelectorThread nextWork() {
        int index = xid.incrementAndGet() % workers.length;  //動用worker的線程分配
        return workers[index];
    }

...

這里要區分兩類Channel,一類是ServerSocketChannel,即我們每次啟動的服務端,另外一類就是連接服務端的Socket請求,這兩類最好是分到不同的SelectorThread中的隊列中去。分配的算法是朴素的輪詢算法(除以數組長度取模)

這樣我們主函數只需要和SelectorThreadGroup交互即可:


public class Startup {

    public static void main(String[] args) {
  // 開辟了三個SelectorThread給服務端,開辟了三個SelectorThread給客戶端去接收Socket
        SelectorThreadGroup group = new SelectorThreadGroup(3,3);
        group.bind(9999);
        group.bind(8888);
        group.bind(6666);
        group.bind(7777);
    }
}

啟動Startup
開啟一個客戶端,請求服務端,測試一下:

[root@io io]# nc 192.168.205.1 7777
sdfasdfs
sdfasdfs

客戶端請求的數據可以返回,服務端可以監聽到客戶端的請求:

Thread-1 register listen
Thread-0 register listen
Thread-2 register listen
Thread-1 register listen
Thread-1   acceptHandler......
Thread-5 register client: /192.168.205.138:44152

因為我們開了四個端口的監聽,但是我們只設置了三個服務端SelectorThread,所以可以看到Thread-1監聽了兩個服務端。

新接入的客戶端連接是從Thread-5開始的,不會和前面的Thread-0Thread-1Thread-2沖突。

再次來一個新的客戶端連接

[root@io io]# nc 192.168.205.1 8888
sdfasdfas
sdfasdfas

輸入一些內容,依然可以得到服務端的響應

服務端這邊日志顯示:

Thread-3 register client: /192.168.205.138:33262
Thread-3 read......

顯示是Thread-3捕獲了新的連接,也不會和前面的Thread-0Thread-1Thread-2沖突。

源碼:Github


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM