JAVA NIO工作原理及代碼示例


簡介:本文主要介紹了JAVA NIO中的Buffer, Channel, Selector的工作原理以及使用它們的若干注意事項,最后是利用它們實現服務器和客戶端通信的代碼實例。

歡迎探討,如有錯誤敬請指正

如需轉載,請注明出處 http://www.cnblogs.com/nullzx/


1. ByteBuffer

1.1直接緩沖區和非直接緩沖區

下面是創建ByteBuffer對象的幾種方式

static ByteBuffer

allocate(int capacity)

static ByteBuffer

allocateDirect(int capacity)

static ByteBuffer

wrap(byte[] array)

static ByteBuffer

wrap(byte[] array, int offset, int length)

allocate方式創建的ByteBuffer對象我們稱之為非直接緩沖區,這個ByteBuffer對象(和對象包含的緩沖數組)都位於JVM的堆區。wrap方式和allocate方式創建的ByteBuffer沒有本質區別,都創建的是非直接緩沖區。

allocateDirect方法創建的ByteBuffer我們稱之為直接緩沖區,此時ByteBuffer對象本身在堆區,而緩沖數組位於非堆區, ByteBuffer對象內部存儲了這個非堆緩沖數組的地址。在非堆區的緩沖數組可以通過JNI(內部還是系統調用)方式進行IO操作,JNI不受gc影響,機器碼執行速度也比較快,同時還避免了JVM堆區與操作系統內核緩沖區的數據拷貝,所以IO速度比非直接緩沖區快。然而allocateDirect方式創建ByteBuffer對象花費的時間和回收該對象花費的時間比較多,所以這個方法適用於創建那些需要重復使用的緩沖區對象。

 

1.2重要屬性和方法

ByteBuffer對象三個重要屬性 position, limitcapacity。其中capacity表示了緩沖區的總容量,始終保持不變,初始時候position 等於 0 , limit 等於 capacity

1) put向緩沖區放入數據

abstract ByteBuffer

put(byte b)

ByteBuffer

put(byte[] src)

ByteBuffer

put(byte[] src, int offset, int length)

調用put方法前,limit應該等於capacity,如果不等於,幾乎可以肯定我們對緩沖區的操作有誤。在put方法中0到position-1的區域表示有效數據,position到limit之間區域表示空閑區域。put方法會從position的當前位置放入數據,每放入一個數據position增加1,當position等於limit(即空閑區域使用完)時還繼續放入數據就會拋出BufferUnderflowException異常

2)get從緩沖區讀取數據

abstract byte

get()

ByteBuffer

get(byte[] dst)

ByteBuffer

get(byte[] dst, int offset, int length)

在get方法中, 0到position-1的區域表示已讀數據,position到limit之間的區域表示未讀取的數據。每讀取一個數據position增加1,當position等於limit時繼續讀取數據就會拋出BufferUnderflowException異常。

2)flip 將寫模式轉換成讀模式

    public final Buffer flip() {
        limit = position;
        position = 0;
        mark = -1;
        return this;
    }

3)clear清空緩沖區,將讀模式轉換寫模式

    public final Buffer clear() {
        position = 0;
        limit = capacity;
        mark = -1;
        return this;
    }

4)compact保留未讀取的數據,將讀模式轉換寫模式

    public ByteBuffer compact() {

        int pos = position();
        int lim = limit();
        assert (pos <= lim);
        int rem = (pos <= lim ? lim - pos : 0);

        unsafe.copyMemory(ix(pos), ix(0), (long)rem << 0);
        position(rem);
        limit(capacity());
        discardMark();
        return this;

    }

5mark保存當前position的位置到mark變量

    public final Buffer mark() {
        mark = position;
        return this;
    }

6rest將position置為mark變量中的值

    public final Buffer reset() {
        int m = mark;
        if (m < 0)
            throw new InvalidMarkException();
        position = m;
        return this;
    }

mark方法和rest方法聯合使用可實現從指定位置的重讀。

 

7rewind從頭開始重讀

    public final Buffer rewind() {
        position = 0;
        mark = -1;
        return this;
    }

 

ByteBuffer對象使用時又很多需要注意的地方,自認為這個API設計的不是很友好。比如一定不能連續兩次調用flip和compact方法,flip方法調用以后不能再調用put方法,等等。要避免這些錯誤,只能在使用ByteBuffer前弄清楚當前緩沖區中0到position-1以及position到limit中數據表示的含義,這才是避免bug的根本辦法。

從上面的介紹中我們可以看出,ByteBuffer對象既可以讀,也可以寫。除非我們能保證在讀操作一次性使用完ByteBuffer對象中的所有數據,並且保證寫入ByteBuffer對象向中的內容全部寫入完成,否則同時用於讀寫的ByteBuffer對象會造成數據的混亂和錯誤。一般來說,我們都會創建兩個ByteBuffer對象向,一個用於接收數據,另一個用於發送數據。

 

1.3其它方法

ByteBuffer是面向字節的,為方便基本數據類型的讀取,ByteBuffer中還提供getInt,putInt,getFloat,putFloat等方法,這些方法方便我們在緩沖區存取單個基本數據類型。如果需要從基本數據類型數組中寫入到ByteBuffer中,或者從ByteBuffer中讀取到基本數據類型的數組中,那么我們可以通過已創建好的ByteBuffer對象的asXxxBuffer方法創建基本數據類型的Buffer。

abstract CharBuffer

asCharBuffer()

abstract DoubleBuffer

asDoubleBuffer()

abstract FloatBuffer

asFloatBuffer()

abstract IntBuffer

asIntBuffer()

abstract LongBuffer

asLongBuffer()

假設有如下代碼

IntBuffer intBufferObj = byteBufferObj.asIntBuffer();

此時intBufferObj和byteBufferObj對象共享底層的數組。但是比較坑爹的是兩個buffer的position,limit是獨立的,這樣極易產生bug,需要引起我們注意。

 

1.4 ByteBuffer的編碼和解碼

數據傳輸中我們使用的是ByteBuffer對象作為緩沖區,如果在通道兩端我們通信的內容是文本數據,這就涉及到ByteBuffer與CharBuffer的轉換。我們可以使用Charset類實現這個轉換的功能。

1)解碼示例

ByteBuffer byteBuffer = ByteBuffer.allocate(128);
byteBuffer.put(new byte[]{-26, -120, -111, -25, -120, -79, -28, -67, -96});
byteBuffer.flip();

/*對獲取utf8的編解碼器*/
Charset utf8 = Charset.forName("UTF-8");
CharBuffer charBuffer = utf8.decode(byteBuffer);/*對bytebuffer中的內容解碼*/

/*array()返回的就是內部的數組引用,編碼以后的有效長度是0~limit*/
char[] charArr = Arrays.copyOf(charBuffer.array(), charBuffer.limit());
System.out.println(charArr); /*運行結果:我愛你*/

 

2)編碼示例

CharBuffer charBuffer = CharBuffer.allocate(128);
charBuffer.append("我愛你");
charBuffer.flip();

/*對獲取utf8的編解碼器*/
Charset utf8 = Charset.forName("UTF-8");
ByteBuffer byteBuffer = utf8.encode(charBuffer); /*對charbuffer中的內容解碼*/

/*array()返回的就是內部的數組引用,編碼以后的有效長度是0~limit*/
byte[] bytes = Arrays.copyOf(byteBuffer.array(), byteBuffer.limit());
System.out.println(Arrays.toString(bytes));
/*運行結果:[-26, -120, -111, -25, -120, -79, -28, -67, -96] */

 

我們還可以通過代碼中的utf8編解碼器分別獲取編碼器對象和解碼器對象

CharsetEncoder utf8Encoder = utf8.newEncoder();
CharsetDecoder utf8Decoder = utf8.newDecoder();

然后通過下面編碼器和解碼器提供的方法進行編解碼,其中一些方法可以使ByteBuffer和CharBuffer對象循環使用,不必每次都產生一個新的對象。

解碼器方法

CharBuffer

decode(ByteBuffer in)

Convenience method that decodes the remaining content of a single input byte buffer into a newly-allocated character buffer.

CoderResult

decode(ByteBuffer in, CharBuffer out, boolean endOfInput)

Decodes as many bytes as possible from the given input buffer, writing the results to the given output buffer.

protected abstract CoderResult

decodeLoop(ByteBuffer in, CharBuffer out)

Decodes one or more bytes into one or more characters.

編碼器方法

ByteBuffer

encode(CharBuffer in)

Convenience method that encodes the remaining content of a single input character buffer into a newly-allocated byte buffer.

CoderResult

encode(CharBuffer in, ByteBuffer out, boolean endOfInput)

Encodes as many characters as possible from the given input buffer, writing the results to the given output buffer.

protected abstract CoderResult

encodeLoop(CharBuffer in, ByteBuffer out)

Encodes one or more characters into one or more bytes.

注意encode和decode方法都會改變源buffer中的position的位置,這點也是容易產生bug的方法。

 

2. Channel

針對四種不同的應用場景,有四種不同類型的Channel對象。

類型

應用場景

是否阻塞

FileChannel

文件

阻塞

DatagramChannel

UDP協議

阻塞或非阻塞

SocketChannel

TCP協議

阻塞或非阻塞

ServerSocketChannel

用於TCP服務器端的監聽和鏈接

阻塞或非阻塞

Channel對象的創建都是通過調用內部的open靜態方法實現的,此方法是線程安全的。不論哪種類型的Channel對象,都有read(要理解為從通道中讀取,寫入緩沖區中)和write(要理解為從緩沖區中讀取數據,寫入到通道中)方法,而且read和write方法都只針對ByteBuffer對象。

當我們要獲取由通道傳輸過來的數據時,先調用channel.read(byteBufferObj)方法,這個方法在內部調用了byteBufferObj對象的put方法,將通道中的數據寫入緩沖區中。當我們要獲取由通道傳輸來的數據時,調用byteBufferObj.flip(),然后調用byteBufferObj的get方法獲取通道傳過來的數據,最后調用clear或compact方法轉換成寫模式,為下次channel.read做准備。

當我們要向通道發送數據時,先調channel.write(byteBufferObj)方法,這個方法內部調用了byteBufferObj的get方法獲取數據,然后將數據寫入通道中。當寫入完成后調用clear或compact方法轉換成寫模式,為下次channel.write寫入緩沖區取做准備。

 

2.1 FileChannel

在文件通道中readwrite方法都是阻塞的,對於read方法,除非遇到文件結束,否則會把緩沖區的剩余空間讀滿再返回。對於write方法,會一次性把緩沖區中的內容全部寫入到文件中才會返回。

下面的代碼展示了FileChannel的功能,首先向文本文件中寫入utf8格式的中英文混合字符,然后再讀取出來。讀寫過程中都涉及到編解碼問題。

package nioDemo;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
package nioDemo;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.nio.file.Paths;

public class FileChannelDemo {
	
	public static void main(String[] args){ 
		
		/*創建文件,向文件中寫入數據*/
		try {
			
			/*如果文件不存在,創建該文件,文件后綴是不是文本文件不重要*/
			File file = new File("E:/noi_utf8.data");
			if(!file.exists()){
				file.createNewFile();
			}
			
			/*根據文件輸出流創建與這個文件相關的通道*/
			FileOutputStream fos = new FileOutputStream(file);
			FileChannel fc = fos.getChannel();
			
			/*創建ByteBuffer對象, position = 0, limit = 64*/
			ByteBuffer bb = ByteBuffer.allocate(64);
			
			/*向ByteBuffer中放入字符串UTF-8的字節, position = 17, limit = 64*/
			bb.put("Hello,World 123 \n".getBytes("UTF-8"));
			
			/*flip方法  position = 0, limit = 17*/
			bb.flip();
			
			/*write方法使得ByteBuffer的position到 limit中的元素寫入通道中*/
			fc.write(bb);
			
			/*clear方法使得position = 0, limit = 64*/
			bb.clear();
			
			/*下面的代碼同理*/
			bb.put("你好,世界 456".getBytes("UTF-8"));
			bb.flip();
			
			fc.write(bb);
			bb.clear();
			
			fos.close();
			fc.close();
			
		} catch (FileNotFoundException e) {
			
		} catch (IOException e) {
			System.out.println(e);
		}
		
		
		/*從剛才的文件中讀取字符序列*/
		try {
			
			/*通過Path對象創建文件通道*/
			Path path = Paths.get("E:/noi_utf8.data");
			FileChannel fc = FileChannel.open(path);
			
			ByteBuffer bb = ByteBuffer.allocate((int) fc.size()+1);
			
			Charset utf8 = Charset.forName("UTF-8");
			
			/*阻塞模式,讀取完成才能返回*/
			fc.read(bb);
			
			bb.flip();
			CharBuffer cb = utf8.decode(bb);
			System.out.print(cb.toString());
			bb.clear();
			

			fc.close();
			
		} catch (IOException e) {
			e.printStackTrace();
		}
		
	}
}

 

2.2 ServerSocketChannel

服務器端用於創建TCP連接的通道,只能對accept事件感興趣。accept方法會返回一個已和客戶端連接好的SocketChannel通道,它才服務器是真正傳輸數據的通道。

 

2.3 SocketChannel

TCP客戶端和TCP服務器端都用它來傳輸數據。

客戶端必須調用connect方法去連接服務器。在非阻塞通模式中,該方法將當前通道加入到選擇器的已注冊集合中,然后通過異步方式進行創建TCP連接,然后該方法立刻返回。注意調用該方法后並不表示已經創建好了TCP連接,如果這個方法返回false,稍后必須調用finishConnect方法來完成客戶端到服務器的tcp連接。在阻塞方式中,connect方法會阻塞直到創建好了TCP連接。

finishConnect在非阻塞模式中僅僅是返回連接的狀態。返回true時,表示連接創建好了。在阻塞模式下,直接調用方法connect即可完成連接,不需要使用finishConnect。

非阻塞模式下,讀寫操作要配合選擇器一起使用。在阻塞模式下,創建好TCP連接后就可以直接對通道進行讀寫操作。

 

2.4 DatagramChannel

connect方法僅用於客戶端到服務器端的連接,連接的作用僅僅是避免每次發送和接受數據時的安全檢查,提高發送和接受數據的效率,而不是像TCP連接那樣表示握手的意思。客戶端通道只有調用了connect方法后,才能使用read和write方法讀寫數據。

客戶端也可以不事先調用connet方法,而直接使用receive方法和send方法來實現數據的收發。

abstract SocketAddress

receive(ByteBuffer dst)

abstract int

send(ByteBuffer src, SocketAddress target)

 

2.5 服務器端DatagramChannel和SocketChannel的區別

對於服務器端DatagramChannel(UDP)和SocketChannel(TCP)有明顯的區別,對於TCP連接,服務器端每創建一個連接就對應一個通道(不同的客戶端ip:port地址對應一個通道),而服務器端UDP的連接始終只有一個通道,所有客戶端發送過來的報文都存放於同一個緩沖區中,這顯然會降低服務器端的效率,好在DatagramChannel對象是線程安全的,可以用多個線程讀寫同一個UDP通道。

服務器端為什么只有一個通道呢?我猜想因為UDP是無狀態的,不知道什么時客戶端會發送數據,什么時候數據又發送完成,所以服務器端沒有辦法為每個客戶端創建一個通道,就算服務器端根據客戶端ip:port為每個客戶端創建了通道,服務器端也不知道什么時候該釋放這個通道,這就造成了資源的浪費。

 

4. Selector

Selector類表示選擇器,通過這個類的對象可以選取已就緒的通道和這個通道感興趣的事件。通過靜態open方法創建。

 

4.1注冊

通道可以通過它的register方法,將通道注冊到選擇器上。

SelectionKey

register(Selector sel, int ops)

Registers this channel with the given selector, returning a selection key.

abstract SelectionKey

register(Selector sel, int ops, Object att)

Registers this channel with the given selector, returning a selection key.

這個該方法會返回一個SeletctKey對象,但在這里我們通常忽略這個返回值。SeletctionKey對象內部包含了這個注冊的通道和這個通道感興趣的事件(ops參數),以及附帶的對象(由att參數傳遞),這個附帶的對象通常就是和這個通道相關的讀寫緩沖區。

 

4.2通道的選擇與取消

abstract int

select()

Selects a set of keys whose corresponding channels are ready for I/O operations.

abstract int

select(long timeout)

Selects a set of keys whose corresponding channels are ready for I/O operations.

abstract int

selectNow()

Selects a set of keys whose corresponding channels are ready for I/O operations.

三個方法的返回值都表示就緒通道的數量。

select()方法是個阻塞方法,有通道就緒才會返回。

select(long timeout),最多阻塞timeout毫秒,即使沒有通道就緒也會返回,若超時返回,則當前線程中斷標志位被設置。若阻塞時間內有通道就緒,就提前返回。

seletor.selectNow(),非阻塞方法。

一個seletor對象內部維護了三個集合。

1)已注冊集合:表示了所有已注冊通道的SelectionKey對象。

2)就緒集合:表示了所有已就緒通道的SelectionKey對象。

3)取消集合:表示了所有需要取消注冊關系的通道的SelectionKey對象。

SelectionKey的cancel方法用於取消通道和選擇器的注冊關系,這個方法只是把表示當前通道的SelectionKey放入取消集合中,下次調用select方法時才會真正取消注冊關系。

select方法每次會從已注冊的通道集合中刪除所有已取消的通道的SelectionKey,然后清空已取消的通道集合,最后從更新過的已注冊通道集合中選出就緒的通道,放入已就緒的集合中。每次調用select方法,會向已就緒的集合中放入已就緒通道的SelectionKey對象,調用selectedKeys 方法就會返回這個已就緒通道集合的引用。當我們處理完一個已就緒通道,該通道對應的SelectionKey對象仍然位於已就緒的集合中,這就要求我們處理一個已就緒的通道后就必須手動從已就緒的集合中刪除它,否則下次調用selectedKeys時,已處理過的通道還存在於這個集合中,導致線程空轉。這里也是極易產生bug的。

 

4.3通道的寫方法注意事項

1)寫方法什么時候就緒?

寫操作的就緒條件為socket底層寫緩沖區有空閑空間,此時並不代表我們這時有(或者需要將)數據寫入通道。而底層寫緩沖區絕大部分時間都是有空閑空間的,所以當你注冊寫事件后,寫操作基本一直是就緒的。這就導致只要有一個通道對寫事件感興趣,select方法幾乎總是立刻返回的,但是實際上我們可能沒有數據可寫的,所以使得調用select方法的線程總是空轉。對於客戶端發送一些數據,客戶端返回一些數據的模型,我們可以在讀事件完成后,再設置通道對寫事件感興趣,寫操作完成后再取消該通道對寫事件的興趣,這樣就可以避免上述問題。

 

2)如何正確的發送數據

while(writeBuffer.hasRemaining()){
    channel.write(writeBuffer);
}

 

上面發送數據的通常用的代碼,當網絡狀況良好的情況下,這段代碼能正常工作。 現在我們考慮一種極端情況,服務器端寫事件就緒,我們向底層的寫緩沖區寫入一些數據后,服務器端到客戶端的鏈路出現問題,服務器端沒能把數據發送出去,此時底層的寫緩沖區一直處於滿的狀態,假設writeBuffer中仍然還有沒發送完的數據就會導致while循環空轉,浪費CPU資源,同時也妨礙這個selector管理的其它通道的讀寫。

為了解決個問題,我們應該使用下面的方法發送數據

int len = 0;
while(writeBuffer.hasRemaining()){
	len = sc.write(writeBuffer);
	/*說明底層的socket寫緩沖已滿*/
	if(len == 0){
		break;
	}
}

 

5. 代碼示例

下面這個類,后面的代碼都會用到,它只是兩個緩沖區的包裝

package nioDemo;

import java.nio.ByteBuffer;

/*自定義Buffer類中包含讀緩沖區和寫緩沖區,用於注冊通道時的附加對象*/
public class Buffers {

	ByteBuffer readBuffer;
	ByteBuffer writeBuffer;
	
	public Buffers(int readCapacity, int writeCapacity){
		readBuffer = ByteBuffer.allocate(readCapacity);
		writeBuffer = ByteBuffer.allocate(writeCapacity);
	}
	
	public ByteBuffer getReadBuffer(){
		return readBuffer;
	}
	
	public ByteBuffer gerWriteBuffer(){
		return writeBuffer;
	}
}

 

5.1 TCP非阻塞示例

1)服務器端代碼

package nioDemo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;


/*服務器端,:接收客戶端發送過來的數據並顯示,
 *服務器把上接收到的數據加上"echo from service:"再發送回去*/
public class ServiceSocketChannelDemo {
	
	public static class TCPEchoServer implements Runnable{
		
		/*服務器地址*/
		private InetSocketAddress localAddress;
		
		public TCPEchoServer(int port) throws IOException{
			this.localAddress = new InetSocketAddress(port);
		}
		
		
		@Override
		public void run(){
			
			Charset utf8 = Charset.forName("UTF-8");
			
			ServerSocketChannel ssc = null;
			Selector selector = null;
			
			Random rnd = new Random();
			
			try {
				/*創建選擇器*/
				selector = Selector.open();
				
				/*創建服務器通道*/
				ssc = ServerSocketChannel.open();
				ssc.configureBlocking(false);
				
				/*設置監聽服務器的端口,設置最大連接緩沖數為100*/
				ssc.bind(localAddress, 100);
				
				/*服務器通道只能對tcp鏈接事件感興趣*/
				ssc.register(selector, SelectionKey.OP_ACCEPT);
				
			} catch (IOException e1) {
				System.out.println("server start failed");
				return;
			} 
			
			System.out.println("server start with address : " + localAddress);
			
			/*服務器線程被中斷后會退出*/
			try{
				
				while(!Thread.currentThread().isInterrupted()){
					
					int n = selector.select();
					if(n == 0){
						continue;
					}

					Set<SelectionKey> keySet = selector.selectedKeys();
					Iterator<SelectionKey> it = keySet.iterator();
					SelectionKey key = null;
					
					while(it.hasNext()){
							
						key = it.next();
						/*防止下次select方法返回已處理過的通道*/
						it.remove();
						
						/*若發現異常,說明客戶端連接出現問題,但服務器要保持正常*/
						try{
							/*ssc通道只能對鏈接事件感興趣*/
							if(key.isAcceptable()){
								
								/*accept方法會返回一個普通通道,
								     每個通道在內核中都對應一個socket緩沖區*/
								SocketChannel sc = ssc.accept();
								sc.configureBlocking(false);
								
								/*向選擇器注冊這個通道和普通通道感興趣的事件,同時提供這個新通道相關的緩沖區*/
								int interestSet = SelectionKey.OP_READ; 							
								sc.register(selector, interestSet, new Buffers(256, 256));
								
								System.out.println("accept from " + sc.getRemoteAddress());
							}
							
							
							/*(普通)通道感興趣讀事件且有數據可讀*/
							if(key.isReadable()){
								
								/*通過SelectionKey獲取通道對應的緩沖區*/
								Buffers  buffers = (Buffers)key.attachment();
								ByteBuffer readBuffer = buffers.getReadBuffer();
								ByteBuffer writeBuffer = buffers.gerWriteBuffer();
								
								/*通過SelectionKey獲取對應的通道*/
								SocketChannel sc = (SocketChannel) key.channel();
								
								/*從底層socket讀緩沖區中讀入數據*/
								sc.read(readBuffer);
								readBuffer.flip();
								
								/*解碼顯示,客戶端發送來的信息*/
								CharBuffer cb = utf8.decode(readBuffer);
								System.out.println(cb.array());
					
								readBuffer.rewind();

								
								/*准備好向客戶端發送的信息*/
								/*先寫入"echo:",再寫入收到的信息*/
								writeBuffer.put("echo from service:".getBytes("UTF-8"));
								writeBuffer.put(readBuffer);
								
								readBuffer.clear();
								
								/*設置通道寫事件*/
								key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
																
							}
							
							/*通道感興趣寫事件且底層緩沖區有空閑*/
							if(key.isWritable()){
								
								Buffers  buffers = (Buffers)key.attachment();
								
								ByteBuffer writeBuffer = buffers.gerWriteBuffer();
								writeBuffer.flip();
								
								SocketChannel sc = (SocketChannel) key.channel();
								
								int len = 0;
								while(writeBuffer.hasRemaining()){
									len = sc.write(writeBuffer);
									/*說明底層的socket寫緩沖已滿*/
									if(len == 0){
										break;
									}
								}
								
								writeBuffer.compact();
								
								/*說明數據全部寫入到底層的socket寫緩沖區*/
								if(len != 0){
									/*取消通道的寫事件*/
									key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
								}
								
							}
						}catch(IOException e){
							System.out.println("service encounter client error");
							/*若客戶端連接出現異常,從Seletcor中移除這個key*/
							key.cancel();
							key.channel().close();
						}

					}
						
					Thread.sleep(rnd.nextInt(500));
				}
				
			}catch(InterruptedException e){
				System.out.println("serverThread is interrupted");
			} catch (IOException e1) {
				System.out.println("serverThread selecotr error");
			}finally{
				try{
					selector.close();
				}catch(IOException e){
					System.out.println("selector close failed");
				}finally{
					System.out.println("server close");
				}
			}

		}
	}
	
	public static void main(String[] args) throws InterruptedException, IOException{
		Thread thread = new Thread(new TCPEchoServer(8080));
		thread.start();
		Thread.sleep(100000);
		/*結束服務器線程*/
		thread.interrupt();
	}
	
}

 

2)客戶端程序

package nioDemo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;

/*客戶端:客戶端每隔1~2秒自動向服務器發送數據,接收服務器接收到數據並顯示*/
public class ClientSocketChannelDemo {
	
	public static class TCPEchoClient implements Runnable{
		
		/*客戶端線程名*/
		private String name;
		private Random rnd = new Random();
		
		/*服務器的ip地址+端口port*/
		private InetSocketAddress remoteAddress;
		
		public TCPEchoClient(String name, InetSocketAddress remoteAddress){
			this.name = name;
			this.remoteAddress = remoteAddress;
		}
		
		@Override
		public void run(){
			
			/*創建解碼器*/
			Charset utf8 = Charset.forName("UTF-8");
			
			Selector selector;
			
			try {
				
				/*創建TCP通道*/
				SocketChannel sc = SocketChannel.open();
				
				/*設置通道為非阻塞*/
				sc.configureBlocking(false);
				
				/*創建選擇器*/
				selector = Selector.open();
				
				/*注冊感興趣事件*/
				int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
				
				/*向選擇器注冊通道*/
				sc.register(selector, interestSet, new Buffers(256, 256));
				
				/*向服務器發起連接,一個通道代表一條tcp鏈接*/
				sc.connect(remoteAddress);
				
				/*等待三次握手完成*/
				while(!sc.finishConnect()){
					;
				}

				System.out.println(name + " " + "finished connection");
				
			} catch (IOException e) {
				System.out.println("client connect failed");
				return;
			}
			
			/*與服務器斷開或線程被中斷則結束線程*/
			try{

				int i = 1;
				while(!Thread.currentThread().isInterrupted()){
					
					/*阻塞等待*/
					selector.select();
					
					/*Set中的每個key代表一個通道*/
					Set<SelectionKey> keySet = selector.selectedKeys();
					Iterator<SelectionKey> it = keySet.iterator();
					
					/*遍歷每個已就緒的通道,處理這個通道已就緒的事件*/
					while(it.hasNext()){
						
						SelectionKey key = it.next();
						/*防止下次select方法返回已處理過的通道*/
						it.remove();
						
						/*通過SelectionKey獲取對應的通道*/
						Buffers  buffers = (Buffers)key.attachment();
						ByteBuffer readBuffer = buffers.getReadBuffer();
						ByteBuffer writeBuffer = buffers.gerWriteBuffer();
						
						/*通過SelectionKey獲取通道對應的緩沖區*/
						SocketChannel sc = (SocketChannel) key.channel();
						
						/*表示底層socket的讀緩沖區有數據可讀*/
						if(key.isReadable()){
							/*從socket的讀緩沖區讀取到程序定義的緩沖區中*/
							sc.read(readBuffer);
							readBuffer.flip();
							/*字節到utf8解碼*/
							CharBuffer cb = utf8.decode(readBuffer);
							/*顯示接收到由服務器發送的信息*/
							System.out.println(cb.array());
							readBuffer.clear();
						}
						
						/*socket的寫緩沖區可寫*/
						if(key.isWritable()){
							writeBuffer.put((name + "  " + i).getBytes("UTF-8"));
							writeBuffer.flip();
							/*將程序定義的緩沖區中的內容寫入到socket的寫緩沖區中*/
							sc.write(writeBuffer);
							writeBuffer.clear();
							i++;
						}
					}
					
					Thread.sleep(1000 + rnd.nextInt(1000));
				}
			
			}catch(InterruptedException e){
				System.out.println(name + " is interrupted");
			}catch(IOException e){
				System.out.println(name + " encounter a connect error");
			}finally{
				try {
					selector.close();
				} catch (IOException e1) {
					System.out.println(name + " close selector failed");
				}finally{
					System.out.println(name + "  closed");
				}
			}
		}
		
	}
	
	public static void main(String[] args) throws InterruptedException{
		
		InetSocketAddress remoteAddress = new InetSocketAddress("192.168.1.100", 8080);
		
		Thread ta = new Thread(new TCPEchoClient("thread a", remoteAddress));
		Thread tb = new Thread(new TCPEchoClient("thread b", remoteAddress));
		Thread tc = new Thread(new TCPEchoClient("thread c", remoteAddress));
		Thread td = new Thread(new TCPEchoClient("thread d", remoteAddress));
		
		ta.start();
		tb.start();
		tc.start();
		
		Thread.sleep(5000);

		/*結束客戶端a*/
		ta.interrupt();
		
		/*開始客戶端d*/
		td.start();
	}
}

 

5.2 UDP示例

客戶端非阻塞模式,服務器端阻塞模式

1)服務器端代碼(服務器端只有一個通道,對應一個讀緩沖區,一個寫緩沖區,所以使用非阻塞方式容易發生數據混亂)

package nioDemo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;

public class ServiceDatagramChannelDemo {
	
	public static class UDPEchoService implements Runnable{
		
		private int port;
		
		public UDPEchoService(int port){
			this.port = port;
		}
		
		@Override
		public void run(){
			
			ByteBuffer readBuffer = ByteBuffer.allocate(256);
			ByteBuffer writeBuffer = ByteBuffer.allocate(256);

			DatagramChannel dc = null;
			
			try{
				
				/*服務器端使用默認的阻塞IO的方式*/
				dc = DatagramChannel.open();
				dc.bind(new InetSocketAddress(port));
				
				System.out.println("service start");
				while(!Thread.currentThread().isInterrupted()){
					
					try{
						
						/*先讀取客戶端發送的消息,直到讀取到消息才會返回*/
						/*只能調用receive方法,因為不知道哪個地址給服務器發信息,沒法實現調用connect方法*/
						/*dc是阻塞的,所以receive方法要等到接收到數據才返回*/
						SocketAddress clientAddress = dc.receive(readBuffer);
						readBuffer.flip();
						CharBuffer charBuffer = Charset.defaultCharset().decode(readBuffer);
						System.out.println(charBuffer.array());
						
						/*調用send方法向客戶端發送的消息,
						 *dc是阻塞的,所以直到send方法把數據全部寫入到socket緩沖區才返回*/
						writeBuffer.put("echo : ".getBytes());
						readBuffer.rewind();
						writeBuffer.put(readBuffer);
						writeBuffer.flip();
						dc.send(writeBuffer, clientAddress);
						
						readBuffer.clear();
						writeBuffer.clear();
						
					}catch(IOException e){
						System.out.println("receive from or send to client failed");
					}
				}
			}catch(IOException e){
				System.out.println("server error");
			}finally{
				try {
					if(dc != null){
						dc.close();
					}
				} catch (IOException e) {

				}
			}
		}
	}
	
	public static void main(String[] args) throws IOException{
		new Thread(new UDPEchoService(8080)).start();
	}
}

 

2)客戶端代碼

package nioDemo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.charset.Charset;
import java.util.Iterator;

public class ClientDatagramChannelDemo {
	
	public static class UDPEchoClient implements Runnable{
		
		private String name;
		private InetSocketAddress serviceAddress;
		
		public UDPEchoClient(String name, InetSocketAddress serviceAddress){
			this.name = name;
			this.serviceAddress = serviceAddress;
		}
		
		@Override
		public void run(){
			DatagramChannel dc = null;
			try{
				
				/*每個實際上可以創建多個通道連接同一個服務器地址,
				我們這里為了演示方便,只創建了一個通道*/
				dc = DatagramChannel.open();
				
				/*客戶端采用非阻塞模式*/
				dc.configureBlocking(false);

				/*這里的連接不是指TCP的握手連接,因為UDP協議本身不需要連接,
				 *這里連接的意思大概是提前向操作系統申請好本地端口號,以及高速操作系統要發送的目的
				 *連接后的UDP通道可以提高發送的效率,還可以調用read和write方法接收和發送數據
				 *未連接的UDP通道只能調用receive和send方法接收和發送數據*/
				dc.connect(serviceAddress);
				
				Selector selector = Selector.open();
				int interest = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
				dc.register(selector, interest, new Buffers(256, 256));
				
				int i = 0;
				while(!Thread.currentThread().isInterrupted()){
					
					selector.select();
					
					Iterator<SelectionKey> it = selector.selectedKeys().iterator();
					while(it.hasNext()){
						
						SelectionKey key = it.next();
						it.remove();
						
						Buffers buffers = (Buffers)key.attachment();
						
						ByteBuffer readBuffer = buffers.getReadBuffer();
						ByteBuffer writeBuffer = buffers.gerWriteBuffer();
						
						try{
							
							if(key.isReadable()){
								dc.read(readBuffer);
								readBuffer.flip();
								CharBuffer charBuffer = Charset.defaultCharset().decode(readBuffer);
								System.out.println(charBuffer.array());
								readBuffer.clear();
							}
							
							if(key.isWritable()){
								writeBuffer.put((name + (i++)).getBytes());
								writeBuffer.flip();
								dc.write(writeBuffer);
								writeBuffer.clear();
								
								Thread.sleep(500);
							}
						
						}catch(IOException e){
							key.cancel();
							key.channel().close();
						}
					}
				}
			}catch(InterruptedException e){
				System.out.println(name + "interrupted");
			} catch (IOException e) {
				System.out.println(name + "encounter connect error");
			} finally{
				try {
					dc.close();
				} catch (IOException e) {
					System.out.println(name + "encounter close error");
				}finally{
					System.out.println(name + "closed");
				}
			}
		}
	}
	
	public static void main(String[] args){
		
		InetSocketAddress serviceAddress = new InetSocketAddress("192.168.1.100", 8080);
		
		UDPEchoClient clientA = new UDPEchoClient("thread a ", serviceAddress);
		UDPEchoClient clientB = new UDPEchoClient("thread b ", serviceAddress);
		UDPEchoClient clientC = new UDPEchoClient("thread c ", serviceAddress);
		
		new Thread(clientA).start();
		new Thread(clientB).start();
		new Thread(clientC).start();
		
	}
}

 

6. 參考內容

[1] 堆外內存之 DirectByteBuffer 詳解

[2] SocketChannel---各種注意點

[3] JDK 8 API 文檔


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM