Java NIO教程 Selector


這次我們開講非阻塞I/O中的Selector,它需要配合非阻塞的TCP和UDP來使用。首先我們先簡單講一下TCP和UDP的非阻塞通道。

非阻塞I/O通道

在上代碼前我們先講解一些最基本的知識。TCP和UDP共對應着三種通道,分別是:SocketChannel、ServerSocketChannel、DatagramChannel 。它們都可以通過channel.open()方法來初始化;同時對於SocketChannel來說,當一個新連接到達ServerSocketChannel時,也會被創建(在代碼中會有說明)。而且它們使用結束后都需要被關閉。

首先讓我們來看看SocketChannel的基本操作

//通過open()打開SocketChannel
SocketChannel socketChannel = SocketChannel.open();
//綁定主機端口
socketChannel.connect(new InetSocketAddress("127.0.0.1", 18888));
//設置成非阻塞模式
socketChannel.configureBlocking(false);
while(! socketChannel.finishConnect() ){
	//做點其他事
}
// 利用SocketChannel進行數據操作

下面再來說說,如何用SocketChannel進行數據操作。它的數據讀寫和其他通道的讀寫方式是完全一致的,只是要注意的是,在非阻塞模式下,read()和write()沒有進行任何操作就返回了,所以要在循環中調用,並注意返回值。

ByteBuffer buf = ByteBuffer.allocate(48);
while(socketChannel.read(buf)!=-1) {
	buf.flip();
	while(buf.hasRemaining()) {
		socketChannel.write(buf);
	}		
	buf.clear();
}

SocketChannel相當於傳統I/O中的Socket,而ServerSocketChannel相當於ServerSocket;而且整體形式都是一致的,都是利用多路復用思想,在服務器端收到連接后,產生一個專門的Socket,與客戶端進行數據傳輸。具體形式就是"serverSocketChannel.accept()"在收到連接后,會返回一個SocketChannel,具體形式見代碼

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//綁定主機端口
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
while (true) {
	//accept()在非阻塞模式中,若建立連接,則返回SocketChannel;否則返回null
	SocketChannel socketChannel = serverSocketChannel.accept();
	if (socketChannel != null) {
		// 利用SocketChannel進行數據操作
	}
}

而DatagramChannel則是跟DatagramPacket十分相似的,只不過數據包由當初的byte數組換成了現在的ByteBuffer

DatagramChannel channel = DatagramChannel.open();
//綁定主機端口
channel.socket().bind(new InetSocketAddress(9999));
channel.configureBlocking(false);
ByteBuffer buf = ByteBuffer.allocate(48);
/*
 * 1.因為UDP是無連接的網絡協議,所以不能像TCP那樣讀取和寫入,它是發送和接收數據包。
 * 2.receive()在非阻塞模式中,若沒有收到數據包,則返回null;
 * 		若收到了,則將內容寫入byteBuffer,將發送方的SocketAddress返回(其中包含IP和端口)
 * 3.如果Buffer容不下收到的數據,多出的數據將被丟棄
 */
while(channel.receive(buf)==null){
	//做點其他事
}
buf.flip();
//指定接收方的SocketAddress
channel.send(buf, new InetSocketAddress("127.0.0.1", 8888));

DatagramChannel還有一個特殊的地方,就是它可以“連接”到網絡中的特定地址的,十分類似於一個TCP連接。但由於UDP是無連接的,連接到特定地址並不會像TCP通道那樣創建一個真正的連接。而是鎖住DatagramChannel ,讓其只能從特定地址收發數據。想實現這種功能,編寫方式和TCP十分類似,就不寫了,去看文檔吧,講解的十分清楚。

Selector

現在開始進入我們今天的主題Selector

其實前言中已經簡單的講解過什么是Selector以及為什么要使用Selector了。這里就不再重復了(我猜你已經忘了,回去再看一眼吧),咱們還是從最基礎的創建開講。

Selector的創建是通過調用Selector.open()方法完成的(這部分都是用open()創建的)

Selector注冊

說完創建,就得說說如何讓Channel和Selector配合使用了?一句話:“將channel注冊到selector上”這個動作是通過SelectionKey channel.register(Selector sel,int ops,Object att)方法完成的。

這里要強調一點,就是調用register的channel必須是非阻塞的。這就將FileChannel排除在外(充話費送的就是不行)。

現在講解register()中每一個參數的含義。第一個參數,就是要將channel注冊到哪個Selector。第二個參數,它是一個“interest集合”,意思是在通過Selector監聽Channel時對什么事件感興趣,可以監聽四種不同類型的事件,分別是Connect、Accept、Read和Write;它們四個分別代表的含義是:

  • Connect(SelectionKey.OP_CONNECT):一個channel成功連接到另一個服務器——“連接就緒”
  • Accept(SelectionKey.OP_ACCEPT):一個ServerSocketchannel准備好接收新進入的連接——“接收就緒”
  • Read(SelectionKey.OP_READ):一個通道的可讀數據已准備好——“讀就緒”
  • Write(SelectionKey.OP_WRITE):一個通道的可寫數據已准備好——“寫就緒”

P.S:圓括號中的是要填在第二個參數ops位置上的int常量。我們把這四種叫做“感興趣事件”,后文會多次提到這個概念

如果你對不止一種事件感興趣,那么可以用“位或”操作符將常量連接起來,如下:
int ops = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

register()方法的第三個參數為附加對象,它可有可無,是一個Object對象,它可以作為每個通道的標識符,用以區別注冊在同一個Selector上的其他通道;也可以附加其他對象。

最后再來看看register()方法的返回值。返回值為SelectionKey對象,這是一個重要的對象,接下來我們就主要講解SelectionKey。

SelectionKey

當Selector發現某些channel中的感興趣事件發生了,就會返回相對應channel的SelectionKey對象。

SelectionKey對象包含着許多信息。比如所屬通道的channel對象,通過selectionKey.channel()方法就可以得到;還有通道的附加對象,通過selectionKey.attachment()方法就可以得到;還可以得到通道那個感興趣時間發生了通過下面四種方法獲得:

  • boolean selectionKey.isAcceptable()
  • boolean selectionKey.isConnectable()
  • boolean selectionKey.isReadable()
  • boolean selectionKey.isWritable()

還可以獲得更多信息,具體內容可以去看文檔

Selector.select()

之前的創建、注冊等准備都完成之后,就可以坐等准備好的數據到來了。這時候需要知道有多少個通道感興趣事件已經准備好了。這時候有下面三個方法幫你完成這項任務,分別是

  • int selector.select()
  • int selector.select(long timeout)
  • int selector.selectNow()

首先講一下這三個方法准確的作用,它們都是返回有多少個通道已經變成就緒狀態。它們的區別是:

  • select()是阻塞的,它會一直等到有通道准備就緒、
  • select(long timeout)也是阻塞的,它會一直等到有通道准備就緒或者已經超出給定的timeout時間並返回0。
  • selectNow()是非阻塞的,如果沒有通道就緒就直接返回0。

Selector.selectedKeys()

通過select()方法知道有若干個通道准備就緒,就可以調用下面的方法來返回相應若干個通道的selectedKey了
Set<SelectionKey> selectedKeys = selector.selectedKeys()
獲得selectedKeys后,你就可以進行相應的處理了。需要強調的是,每次處理完一個selectionKey之后需要將它在Set中刪除,這樣下次它准備好以后就可以再次添加到Set中來。

現在關於Selector的知識基本上就講解完了,讓我們在一個服務器端、客戶端收發字符串的例子中結束本次的講解吧。

客戶端

public class HansClient {
	// 定義檢測SocketChannel的Selector對象
	private Selector selector = null;
	// 客戶端SocketChannel
	private SocketChannel sc = null;

	public void init() throws IOException {
		selector = Selector.open();
		InetSocketAddress isa = new InetSocketAddress("127.0.0.1", 30000);
		// 調用open靜態方法創建連接到指定主機的SocketChannel
		sc = SocketChannel.open(isa);
		// 設置該sc以非阻塞方式工作
		sc.configureBlocking(false);
		// 將SocketChannel對象注冊到指定Selector
		sc.register(selector, SelectionKey.OP_READ);
		// 啟動讀取服務器端數據的線程
		new ClientThread().start();
		// 創建鍵盤輸入流
		Scanner scan = new Scanner(System.in);
		while (scan.hasNextLine()) {
			// 讀取鍵盤輸入
			String line = scan.nextLine();
			// 將鍵盤輸入的內容輸出到SocketChannel中
			sc.write(StandardCharsets.UTF_8.encode(line));
		}
	}

	// 定義讀取服務器數據的線程
	private class ClientThread extends Thread {
		public void run() {
			try {
				while (selector.select() > 0) {
					// 遍歷每個有可用IO操作Channel對應的SelectionKey
					for (SelectionKey sk : selector.selectedKeys()) {
						// 刪除正在處理的SelectionKey
						selector.selectedKeys().remove(sk);
						// 如果該SelectionKey對應的Channel中有可讀的數據
						if (sk.isReadable()) {
							// 使用NIO讀取Channel中的數據
							SocketChannel sc = (SocketChannel) sk.channel();
							ByteBuffer buff = ByteBuffer.allocate(1024);
							String content = "";
							while (sc.read(buff) > 0) {
								sc.read(buff);
								buff.flip();
								content += StandardCharsets.UTF_8.decode(buff);
							}
							// 打印輸出讀取的內容
							System.out.println("聊天信息:" + content);
						}
					}
				}
			} catch (IOException ex) {
				ex.printStackTrace();
			}
		}
	}

	public static void main(String[] args) throws IOException {
		new HansClient().init();
	}
}

服務器端

public class HansServer {
	// 用於檢測所有Channel狀態的Selector
	private Selector selector = null;

	public void init() throws IOException {
		selector = Selector.open();
		// 通過open方法來打開一個未綁定的ServerSocketChannel實例
		ServerSocketChannel server = ServerSocketChannel.open();
		InetSocketAddress isa = new InetSocketAddress("127.0.0.1", 30000);
		// 將該ServerSocketChannel綁定到指定IP地址
		server.socket().bind(isa);
		// 設置ServerSocket以非阻塞方式工作
		server.configureBlocking(false);
		// 將server注冊到指定Selector對象
		server.register(selector, SelectionKey.OP_ACCEPT);
		while (selector.select() > 0) {
			// 依次處理selector上的每個已選擇的SelectionKey
			for (SelectionKey sk : selector.selectedKeys()) {
				// 從selector上的已選擇Key集中刪除正在處理的SelectionKey
				selector.selectedKeys().remove(sk);
				// 如果sk對應的通道包含客戶端的連接請求
				if (sk.isAcceptable()) {
					// 調用accept方法接受連接,產生服務器端對應的SocketChannel
					SocketChannel sc = server.accept();
					// 設置采用非阻塞模式
					sc.configureBlocking(false);
					// 將該SocketChannel也注冊到selector
					sc.register(selector, SelectionKey.OP_READ);
				}
				// 如果sk對應的通道有數據需要讀取
				if (sk.isReadable()) {
					// 獲取該SelectionKey對應的Channel,該Channel中有可讀的數據
					SocketChannel sc = (SocketChannel) sk.channel();
					// 定義准備執行讀取數據的ByteBuffer
					ByteBuffer buff = ByteBuffer.allocate(1024);
					String content = "";
					// 開始讀取數據
					try {
						while (sc.read(buff) > 0) {
							buff.flip();
							content += StandardCharsets.UTF_8.decode(buff);
						}
						// 打印從該sk對應的Channel里讀取到的數據
						System.out.println("=====" + content);
					}
					// 如果捕捉到該sk對應的Channel出現了異常,即表明該Channel
					// 對應的Client出現了問題,所以從Selector中取消sk的注冊
					catch (IOException ex) {
						// 從Selector中刪除指定的SelectionKey
						sk.cancel();
						if (sk.channel() != null) {
							sk.channel().close();
						}
					}
					// 如果content的長度大於0,即聊天信息不為空
					if (content.length() > 0) {
						// 遍歷該selector里注冊的所有SelectKey
						for (SelectionKey key : selector.keys()) {
							// 獲取該key對應的Channel
							Channel targetChannel = key.channel();
							// 如果該channel是SocketChannel對象
							if (targetChannel instanceof SocketChannel) {
								// 將讀到的內容寫入該Channel中
								SocketChannel dest = (SocketChannel) targetChannel;
								dest.write(StandardCharsets.UTF_8.encode(content));
							}
						}
					}
				}
			}
		}
	}

	public static void main(String[] args) throws IOException {
		new HansServer().init();
	}
}

本次講解就到這里了,本系列的講解也就到這里了。如果你能看到這里我真的很開心。有任何事都可以與我討論。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM