Java NIO 與 基於reactor設計模式的事件處理模型


Java NIO非堵塞應用通常適用用在I/O讀寫等方面,我們知道,系統運行的性能瓶頸通常在I/O讀寫,包括對端口文件的操作上,過去,在打開一個I/O通道后,read()將一直等待在端口一邊讀取字節內容,如果沒有內容進來,read()也是傻傻的等,這會影響我們程序繼續做其他事情,那么改進做法就是開設線程,讓線程去等待,但是這樣做也是相當耗費資源的。

Java NIO非堵塞技術實際是采取Reactor模式,或者說是Observer模式為我們監察I/O端口,如果有內容進來,會自動通知我們,這樣,我們就不必開啟多個線程死等,從外界看,實現了流暢的I/O讀寫,不堵塞了。

Java NIO出現不只是一個技術性能的提高,你會發現網絡上到處在介紹它,因為它具有里程碑意義,從JDK1.4開始,Java開始提高性能相關的功能,從而使得Java在底層或者並行分布式計算等操作上已經可以和C或Perl等語言並駕齊驅。

如果你至今還是在懷疑Java的性能,說明你的思想和觀念已經完全落伍了,Java一兩年就應該用新的名詞來定義。從JDK1.5開始又要提供關於線程、並發等新性能的支持,Java應用在游戲等適時領域方面的機會已經成熟,Java在穩定自己中間件地位后,開始蠶食傳統C的領域。

NIO的基本原理:

  NIO 有一個主要的類Selector,這個類似一個觀察者,只要我們把需要探知的socketchannel告訴Selector,我們接着做別的事情,當有事件發生時,他會通知我們,傳回一組SelectionKey,我們讀取這些Key,就會獲得我們剛剛注冊過的socketchannel,然后,我們從這個Channel中讀取數據,放心,包准能夠讀到,接着我們可以處理這些數據。

  Selector內部原理實際是在做一個對所注冊的channel的輪詢訪問,不斷的輪詢(目前就這一個算法),一旦輪詢到一個channel有所注冊的事情發生,比如數據來了,他就會站起來報告,交出一把鑰匙,讓我們通過這把鑰匙來讀取這個channel的內容。

使用上:

我們結合代碼看看使用,在使用上,也在分兩個方向,一個是線程處理,一個是用非線程,后者比較簡單,看下面代碼:

package reactor.section3;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NBTest1 {
    
    

    private static void printKeyInfo(SelectionKey sk) {
        String s = new String();
        s += "Att: " + (sk.attachment() == null ? "no" : "yes");
        s += ", Read: " + sk.isReadable();
        s += ", Acpt: " + sk.isAcceptable();
        s += ", Cnct: " + sk.isConnectable();
        s += ", Wrt: " + sk.isWritable();
        s += ", Valid: " + sk.isValid();
        s += ", Ops: " + sk.interestOps();
        debug(s);
    }
    private static void debug(String s) {
        System.out.println(s);
    }
    public static void main(String args[]) {
        NBTest1 nbTest = new NBTest1();
        try {
            nbTest.startServer();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    
    private void startServer()  throws Exception{
        int channels = 0;
        int nKeys = 0;
        int currentSelector = 0;

        //使用Selector
        Selector selector = Selector.open();

        //建立Channel 並綁定到9000端口
        ServerSocketChannel ssc = ServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),9000); 
        ssc.socket().bind(address);

        //使設定non-blocking的方式。
        ssc.configureBlocking(false);

        //向Selector注冊Channel及我們有興趣的事件
        SelectionKey s = ssc.register(selector, SelectionKey.OP_ACCEPT);
        printKeyInfo(s);
        while(true) //不斷的輪詢
        {
            
            debug("NBTest: Starting select");
            //Selector通過select方法通知我們我們感興趣的事件發生了。
            nKeys = selector.select();
            //如果有我們注冊的事情發生了,它的傳回值就會大於0
            if(nKeys > 0){
                debug("NBTest: Number of keys after select operation: " +nKeys);
                
                Set selectedKeys = selector.selectedKeys();
                Iterator i = selectedKeys.iterator();
                while(i.hasNext()){
                    
                    s = (SelectionKey) i.next();
                    printKeyInfo(s);
                    debug("NBTest: Nr Keys in selector: " +selector.keys().size());
                    //一個key被處理完成后,就都被從就緒關鍵字(ready keys)列表中除去
                    i.remove();
                    if(s.isAcceptable()){
                        // 從channel()中取得我們剛剛注冊的channel。
                        Socket socket = ((ServerSocketChannel)s.channel()).accept().socket();
                        SocketChannel sc = socket.getChannel();
                        sc.configureBlocking(false);
                        sc.register(selector, SelectionKey.OP_READ |SelectionKey.OP_WRITE);
                        System.out.println(++channels);
                    }else{
                        debug("NBTest: Channel not acceptable");
                    }
                }
                
                
                
            }else{
                debug("NBTest: Select finished without any keys.");
            }

        }
        
    }
}

這是一個守候在端口9000的noblock server例子,如果我們編制一個客戶端程序,就可以對它進行互動操作,或者使用telnet 主機名 90000 可以鏈接上。

當前分布式計算 Web Services盛行天下,這些網絡服務的底層都離不開對socket的操作。他們都有一個共同的結構:
1. Read request
2. Decode request
3. Process service
4. Encode reply
5. Send reply

寫過大中型網絡服務器的朋友相信對事件處理模型(有時也叫事件觸發模型)不陌生。今天要講的Reactor就是在事件處理模型中用的比較多的一種設計模式。請大家先看下面的圖,有個初步的印象:

reactor模式類結構圖

在上圖中,可以看到主要有以下四種角色:

1. Reactor:
Reactor是Reactor模式中最為關鍵的角色,它是該模式最終向用戶提供接口的類。用戶可以向Reactor中注冊EventHandler(3),然后Reactor在“反應(react)”的時候,發現用戶注冊的fd上有事件發生,

就會回調用戶的事件處理函數。

2. SynchrousEventDemultiplexer:

SynchrousEventDemultiplexer也是Reactor中一個比較重要的角色,它是Reactor用來檢測用戶注冊的fd上發生的事件的利器,通過過Reactor得知了哪些fd上發什么了什么樣的事件,然后以些為依據,來多路分發事件,回調用戶的事件處理函數。

3. EventHandler:
EventHander是用戶和Reactor打交道的工具,用戶通過向Reactor注冊自己的EventHandler,可以告知Reactor在特定事件發生的時候該幫我做些什么。

4. ConcreteEventHandler:
ConcreteEventHandler是EventHandler的子類,EventHandler是Reactor所用來規定接口的基類,用戶自己的事件處理器都必須從EventHandler繼承。

【java reactor代碼】

Java的NIO為reactor模式提供了實現的基礎機制,它的Selector當發現某個channel有數據時,會通過SlectorKey來告知我們,在此我們實現事件和handler的綁定。

 

參考:java nio  http://www.jdon.com/concurrent/nio.pdf

http://www.cnblogs.com/xuekyo/archive/2013/01/20/2868547.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM