概述
Java NIO非堵塞技術實際是采取反應器模式,或者說是觀察者(observer)模式為我們監察I/O端口,如果有內容進來,會自動通知我們,這樣,我們就不必開啟多個線程死等,從外界看,實現了流暢的I/O讀寫,不堵塞了。
同步和異步區別 : 有無通知(是否輪詢)
堵塞和非堵塞區別 : 操作結果是否等待(是否馬上又返回值),只是設計方式的不同。
NIO 有一個主要的類Selector,這個類似一個觀察者,只要我們把需要探知的SocketChannel告訴Selector,我們接着做別的事情,當有事件發生時,他會通知我們,傳回一組SelectionKey,我們讀取這些Key,就會獲得我們剛剛注冊過的SocketChannel,然后,我們從這個Channel中讀取數據,接着我們可以處理這些數據。
反應器模式與觀察者模式在某些方面極為相似:當一個主體發生改變時,所有依屬體都得到通知。不過,觀察者模式與單個事件源關聯,而反應器模式則與多個事件源關聯 。
一般模型
我們想象以下情形:長途客車在路途上,有人上車有人下車,但是乘客總是希望能夠在客車上得到休息。
傳統的做法是:每隔一段時間(或每一個站),司機或售票員對每一個乘客詢問是否下車。
反應器模式做法是:汽車是乘客訪問的主體(Reactor),乘客上車后,到售票員(acceptor)處登記,之后乘客便可以休息睡覺去了,當到達乘客所要到達的目的地后,售票員將其喚醒即可。
import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.util.Iterator; import java.util.Set; /** * 反應器模式 用於解決多用戶訪問並發問題 * * 舉個例子:餐廳服務問題 * * 傳統線程池做法:來一個客人(請求)去一個服務員(線程) * 反應器模式做法:當客人點菜的時候,服務員就可以去招呼其他客人了,等客人點好了菜,直接招呼一聲:服務員 * * @author linxcool */ public class Reactor implements Runnable { public final Selector selector; public final ServerSocketChannel serverSocketChannel; public Reactor(int port) throws IOException { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress( InetAddress.getLocalHost(), port); serverSocketChannel.socket().bind(inetSocketAddress); serverSocketChannel.configureBlocking(false); // 向selector注冊該channel SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 利用selectionKey的attache功能綁定Acceptor 如果有事情,觸發Acceptor selectionKey.attach(new Acceptor(this)); } @Override public void run() { try { while (!Thread.interrupted()) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); // Selector如果發現channel有OP_ACCEPT或READ事件發生,下列遍歷就會進行。 while (it.hasNext()) { // 來一個事件 第一次觸發一個accepter線程 // 以后觸發SocketReadHandler SelectionKey selectionKey = it.next(); dispatch(selectionKey); selectionKeys.clear(); } } } catch (IOException e) { e.printStackTrace(); } } /** * 運行Acceptor或SocketReadHandler * * @param key */ void dispatch(SelectionKey key) { Runnable r = (Runnable) (key.attachment()); if (r != null) { r.run(); } } }
import java.io.IOException; import java.nio.channels.SocketChannel; public class Acceptor implements Runnable { private Reactor reactor; public Acceptor(Reactor reactor) { this.reactor = reactor; } @Override public void run() { try { SocketChannel socketChannel = reactor.serverSocketChannel.accept(); if (socketChannel != null)// 調用Handler來處理channel new SocketReadHandler(reactor.selector, socketChannel); } catch (IOException e) { e.printStackTrace(); } } }
public class SocketReadHandler implements Runnable { private SocketChannel socketChannel; public SocketReadHandler(Selector selector, SocketChannel socketChannel) throws IOException { this.socketChannel = socketChannel; socketChannel.configureBlocking(false); SelectionKey selectionKey = socketChannel.register(selector, 0); // 將SelectionKey綁定為本Handler 下一步有事件觸發時,將調用本類的run方法。 // 參看dispatch(SelectionKey key) selectionKey.attach(this); // 同時將SelectionKey標記為可讀,以便讀取。 selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } /** * 處理讀取數據 */ @Override public void run() { ByteBuffer inputBuffer = ByteBuffer.allocate(1024); inputBuffer.clear(); try { socketChannel.read(inputBuffer); // 激活線程池 處理這些request // requestHandle(new Request(socket,btt)); } catch (IOException e) { e.printStackTrace(); } } }