【轉】反應器(Reactor)模式


概述

Java NIO非堵塞技術實際是采取反應器模式,或者說是觀察者(observer)模式為我們監察I/O端口,如果有內容進來,會自動通知我們,這樣,我們就不必開啟多個線程死等,從外界看,實現了流暢的I/O讀寫,不堵塞了。

同步和異步區別     : 有無通知(是否輪詢)
堵塞和非堵塞區別  : 操作結果是否等待(是否馬上又返回值),只是設計方式的不同。

NIO 有一個主要的類Selector,這個類似一個觀察者,只要我們把需要探知的SocketChannel告訴Selector,我們接着做別的事情,當有事件發生時,他會通知我們,傳回一組SelectionKey,我們讀取這些Key,就會獲得我們剛剛注冊過的SocketChannel,然后,我們從這個Channel中讀取數據,接着我們可以處理這些數據。

反應器模式與觀察者模式在某些方面極為相似:當一個主體發生改變時,所有依屬體都得到通知。不過,觀察者模式與單個事件源關聯,而反應器模式則與多個事件源關聯 。


一般模型

我們想象以下情形:長途客車在路途上,有人上車有人下車,但是乘客總是希望能夠在客車上得到休息。

傳統的做法是:每隔一段時間(或每一個站),司機或售票員對每一個乘客詢問是否下車。

反應器模式做法是:汽車是乘客訪問的主體(Reactor),乘客上車后,到售票員(acceptor)處登記,之后乘客便可以休息睡覺去了,當到達乘客所要到達的目的地后,售票員將其喚醒即可。

 

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 反應器模式 用於解決多用戶訪問並發問題
 * 
 * 舉個例子:餐廳服務問題
 * 
 * 傳統線程池做法:來一個客人(請求)去一個服務員(線程)
 * 反應器模式做法:當客人點菜的時候,服務員就可以去招呼其他客人了,等客人點好了菜,直接招呼一聲:服務員
 * 
 * @author linxcool
 */
public class Reactor implements Runnable {
    public final Selector selector;
    public final ServerSocketChannel serverSocketChannel;

    public Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(
                InetAddress.getLocalHost(), port);
        serverSocketChannel.socket().bind(inetSocketAddress);
        serverSocketChannel.configureBlocking(false);

        // 向selector注冊該channel
        SelectionKey selectionKey = serverSocketChannel.register(selector,
                SelectionKey.OP_ACCEPT);

        // 利用selectionKey的attache功能綁定Acceptor 如果有事情,觸發Acceptor
        selectionKey.attach(new Acceptor(this));
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                // Selector如果發現channel有OP_ACCEPT或READ事件發生,下列遍歷就會進行。
                while (it.hasNext()) {
                    // 來一個事件 第一次觸發一個accepter線程
                    // 以后觸發SocketReadHandler
                    SelectionKey selectionKey = it.next();
                    dispatch(selectionKey);
                    selectionKeys.clear();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 運行Acceptor或SocketReadHandler
     * 
     * @param key
     */
    void dispatch(SelectionKey key) {
        Runnable r = (Runnable) (key.attachment());
        if (r != null) {
            r.run();
        }
    }

}
import java.io.IOException;
import java.nio.channels.SocketChannel;

public class Acceptor implements Runnable {
    private Reactor reactor;

    public Acceptor(Reactor reactor) {
        this.reactor = reactor;
    }

    @Override
    public void run() {
        try {
            SocketChannel socketChannel = reactor.serverSocketChannel.accept();
            if (socketChannel != null)// 調用Handler來處理channel
                new SocketReadHandler(reactor.selector, socketChannel);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class SocketReadHandler implements Runnable {
    private SocketChannel socketChannel;

    public SocketReadHandler(Selector selector, SocketChannel socketChannel)
            throws IOException {
        this.socketChannel = socketChannel;
        socketChannel.configureBlocking(false);

        SelectionKey selectionKey = socketChannel.register(selector, 0);

        // 將SelectionKey綁定為本Handler 下一步有事件觸發時,將調用本類的run方法。
        // 參看dispatch(SelectionKey key)
        selectionKey.attach(this);

        // 同時將SelectionKey標記為可讀,以便讀取。
        selectionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    /**
     * 處理讀取數據
     */
    @Override
    public void run() {
        ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
        inputBuffer.clear();
        try {
            socketChannel.read(inputBuffer);
            // 激活線程池 處理這些request
            // requestHandle(new Request(socket,btt));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM