今天是2018年的第三天,真是時光飛逝,2017年的學習計划還沒有學習完成,因此繼續開始研究學習,那么上一節我們了解了NIO,那么這一節我們進一步來學習NIO相關的知識。那就是通道和緩沖區。Java NIO系統的核心在於:通道(Channel)和緩沖區(Buffer)。通道表示打開到 IO 設備(例如:文件、套接字)的連接。若需要使用 NIO 系統,需要獲取用於連接 IO 設備的通道以及用於容納數據的緩沖區。然后操作緩沖區,對數據進行處理。簡而言之, Channel 負責傳輸, Buffer 負責存儲。
一、緩沖區(Buffer)
緩沖區(Buffer):一個用於特定基本數據類型的容器。由 java.nio 包定義的,所有緩沖區都是 Buffer 抽象類的子類。Java NIO 中的 Buffer 主要用於與 NIO 通道進行交互,數據是從通道讀入緩沖區,從緩沖區寫入通道中的。
緩沖區對象本質上是一個數組,但它其實是一個特殊的數組,緩沖區對象內置了一些機制,能夠跟蹤和記錄緩沖區的狀態變化情況,如果我們使用get()方法從緩沖區獲取數據或者使用put()方法把數據寫入緩沖區,都會引起緩沖區狀態的變化。它可以保存多個相同類型的數據。根據數據類型不同(boolean 除外) ,有以下 Buffer 常用子類:
ByteBuffer
CharBuffer
ShortBuffer
IntBuffer
LongBuffer
FloatBuffer
DoubleBuffer
上述 Buffer 類 他們都采用相似的方法進行管理數據,只是各自管理的數據類型不同而已。都是通過如下方法獲取一個 Buffer對象:
static XxxBuffer allocate(int capacity) : 創建一個容量為 capacity 的 XxxBuffer 對象
在緩沖區中,最重要的屬性有下面三個,它們一起合作完成對緩沖區內部狀態的變化跟蹤:
position:指定了下一個將要被寫入或者讀取的元素索引,它的值由get()/put()方法自動更新,在新創建一個Buffer對象時,position被初始化為0。
limit:指定還有多少數據需要取出(在從緩沖區寫入通道時),或者還有多少空間可以放入數據(在從通道讀入緩沖區時)。
capacity:指定了可以存儲在緩沖區中的最大數據容量,實際上,它指定了底層數組的大小,或者至少是指定了准許我們使用的底層數組的容量。
另外:
標記 (mark)與重置 (reset): 標記是一個索引,通過 Buffer 中的 mark() 方法指定 Buffer 中一個特定的 position,之后可以通過調用 reset() 方法恢復到這個 position.
以上四個屬性值之間有一些相對大小的關系:0 <= position <= limit <= capacity。如果我們創建一個新的容量大小為10的ByteBuffer對象,在初始化的時候,position設置為0,limit和 capacity被設置為10,在以后使用ByteBuffer對象過程中,capacity的值不會再發生變化,而其它兩個個將會隨着使用而變化。四個屬性值分別如圖所示:
現在我們可以從通道中讀取一些數據到緩沖區中,注意從通道讀取數據,相當於往緩沖區中寫入數據。如果讀取4個自己的數據,則此時position的值為4,即下一個將要被寫入的字節索引為4,而limit仍然是10,如下圖所示:
下一步把讀取的數據寫入到輸出通道中,相當於從緩沖區中讀取數據,在此之前,必須調用flip()方法,該方法將會完成兩件事情:
1. 把limit設置為當前的position值
2. 把position設置為0
由於position被設置為0,所以可以保證在下一步輸出時讀取到的是緩沖區中的第一個字節,而limit被設置為當前的position,可以保證讀取的數據正好是之前寫入到緩沖區中的數據,如下圖所示:
現在調用get()方法從緩沖區中讀取數據寫入到輸出通道,這會導致position的增加而limit保持不變,但position不會超過limit的值,所以在讀取我們之前寫入到緩沖區中的4個自己之后,position和limit的值都為4,如下圖所示:
在從緩沖區中讀取數據完畢后,limit的值仍然保持在我們調用flip()方法時的值,調用clear()方法能夠把所有的狀態變化設置為初始化時的值,如下圖所示
Buffer 的常用方法:
方 法 | 描 述 |
Buffer clear() | 清空緩沖區並返回對緩沖區的引用 |
Buffer flip() | 將緩沖區的界限設置為當前位置,並將當前位置充值為 0 |
int capacity() | 返回 Buffer 的 capacity 大小 |
boolean hasRemaining() | 判斷緩沖區中是否還有元素 |
int limit() | 返回 Buffer 的界限(limit) 的位置 |
Buffer limit(int n) | 將設置緩沖區界限為 n, 並返回一個具有新 limit 的緩沖區對象 |
Buffer mark() | 對緩沖區設置標記 |
int position() | 返回緩沖區的當前位置 position |
Buffer position(int n) | 將設置緩沖區的當前位置為 n , 並返回修改后的 Buffer 對象 |
int remaining() | 返回 position 和 limit 之間的元素個數 |
Buffer reset() | 將位置 position 轉到以前設置的 mark 所在的位置 |
Buffer rewind() | 將位置設為為 0, 取消設置的 mark |
緩沖區的數據操作:
Buffer 所有子類提供了兩個用於數據操作的方法: get()與 put() 方法
獲取 Buffer 中的數據
get() :讀取單個字節
get(byte[] dst):批量讀取多個字節到 dst 中
get(int index):讀取指定索引位置的字節(不會移動 position)
放入數據到 Buffer 中
put(byte b):將給定單個字節寫入緩沖區的當前位置
put(byte[] src):將 src 中的字節寫入緩沖區的當前位置
put(int index, byte b):將指定字節寫入緩沖區的索引位置(不會移動 position)
直接與非直接緩沖區
字節緩沖區要么是直接的,要么是非直接的。如果為直接字節緩沖區,則 Java 虛擬機會盡最大努力直接在此緩沖區上執行本機 I/O 操作。也就是說,在每次調用基礎操作系統的一個本機 I/O 操作之前(或之后),虛擬機都會盡量避免將緩沖區的內容復制到中間緩沖區中(或從中間緩沖區中復制內容)。
直接字節緩沖區可以通過調用此類的 allocateDirect() 工廠方法來創建。此方法返回的緩沖區進行分配和取消分配所需成本通常高於非直接緩沖區。直接緩沖區的內容可以駐留在常規的垃圾回收堆之外,因此,它們對應用程序的內存需求量造成的影響可能並不明顯。所以,建議將直接緩沖區主要分配給那些易受基礎系統的本機 I/O 操作影響的大型、持久的緩沖區。一般情況下,最好僅在直接緩沖區能在程序性能方面帶來明顯好處時分配它們。直接字節緩沖區還可以通過 FileChannel 的 map() 方法 將文件區域直接映射到內存中來創建。該方法返回MappedByteBuffer 。 Java 平台的實現有助於通過 JNI 從本機代碼創建直接字節緩沖區。如果以上這些緩沖區中的某個緩沖區實例指的是不可訪問的內存區域,則試圖訪問該區域不會更改該緩沖區的內容,並且將會在訪問期間或稍后的某個時間導致拋出不確定的異常。
字節緩沖區是直接緩沖區還是非直接緩沖區可通過調用其 isDirect() 方法來確定。提供此方法是為了能夠在性能關鍵型代碼中執行顯式緩沖區管理。
非直接緩沖區
直接緩沖區
下面我們看下直接緩沖區的操作樣例和重點:
import java.nio.ByteBuffer; import org.junit.Test; /* * 一、緩沖區(Buffer):在 Java NIO 中負責數據的存取。緩沖區就是數組。用於存儲不同數據類型的數據 * * 根據數據類型不同(boolean 除外),提供了相應類型的緩沖區: * ByteBuffer * CharBuffer * ShortBuffer * IntBuffer * LongBuffer * FloatBuffer * DoubleBuffer * * 上述緩沖區的管理方式幾乎一致,通過 allocate() 獲取緩沖區 * * 二、緩沖區存取數據的兩個核心方法: * put() : 存入數據到緩沖區中 * get() : 獲取緩沖區中的數據 * * 三、緩沖區中的四個核心屬性: * capacity : 容量,表示緩沖區中最大存儲數據的容量。一旦聲明不能改變。 * limit : 界限,表示緩沖區中可以操作數據的大小。(limit 后數據不能進行讀寫) * position : 位置,表示緩沖區中正在操作數據的位置。 * * mark : 標記,表示記錄當前 position 的位置。可以通過 reset() 恢復到 mark 的位置 * * 0 <= mark <= position <= limit <= capacity * * 四、直接緩沖區與非直接緩沖區: * 非直接緩沖區:通過 allocate() 方法分配緩沖區,將緩沖區建立在 JVM 的內存中 * 直接緩沖區:通過 allocateDirect() 方法分配直接緩沖區,將緩沖區建立在物理內存中。可以提高效率 */ public class TestBuffer { @Test public void test3(){ //分配直接緩沖區 ByteBuffer buf = ByteBuffer.allocateDirect(1024); System.out.println(buf.isDirect()); } @Test public void test2(){ String str = "abcde"; ByteBuffer buf = ByteBuffer.allocate(1024); buf.put(str.getBytes()); buf.flip(); byte[] dst = new byte[buf.limit()]; buf.get(dst, 0, 2); System.out.println(new String(dst, 0, 2)); System.out.println(buf.position()); //mark() : 標記 buf.mark(); buf.get(dst, 2, 2); System.out.println(new String(dst, 2, 2)); System.out.println(buf.position()); //reset() : 恢復到 mark 的位置 buf.reset(); System.out.println(buf.position()); //判斷緩沖區中是否還有剩余數據 if(buf.hasRemaining()){ //獲取緩沖區中可以操作的數量 System.out.println(buf.remaining()); } } @Test public void test1(){ String str = "abcde"; //1. 分配一個指定大小的緩沖區 ByteBuffer buf = ByteBuffer.allocate(1024); System.out.println("-----------------allocate()----------------"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity()); //2. 利用 put() 存入數據到緩沖區中 buf.put(str.getBytes()); System.out.println("-----------------put()----------------"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity()); //3. 切換讀取數據模式 buf.flip(); System.out.println("-----------------flip()----------------"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity()); //4. 利用 get() 讀取緩沖區中的數據 byte[] dst = new byte[buf.limit()]; buf.get(dst); System.out.println(new String(dst, 0, dst.length)); System.out.println("-----------------get()----------------"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity()); //5. rewind() : 可重復讀 buf.rewind(); System.out.println("-----------------rewind()----------------"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity()); //6. clear() : 清空緩沖區. 但是緩沖區中的數據依然存在,但是處於“被遺忘”狀態 buf.clear(); System.out.println("-----------------clear()----------------"); System.out.println(buf.position()); System.out.println(buf.limit()); System.out.println(buf.capacity()); System.out.println((char)buf.get()); } }
二、通道(Channel)
通道(Channel):由 java.nio.channels 包定義的。 Channel 表示 IO 源與目標打開的連接。Channel 類似於傳統的“流”。只不過 Channel本身不能直接訪問數據, Channel 只能與Buffer 進行交互。
下面我們通過幾張圖來引入通道:
上面這張圖是指當准備從磁盤或內存中copy數據,進行IO操作的時候,需要建立IO 連接,那么這個時候所有的調度中心都在CPU上面,那么當有很多IO請求的時候,那么CPU都要直接參與調度,那么勢必會影響到CPU的執行效率,因為所有的IO從建立連接到傳入數據都要經過CPU的操作來完成,於是為了節省CPU的占用率,於是出現了下面的改進;
這種是CPU將權利釋放,只是進行審批流程,即就相當於我們現實生活工作中,領導不負責具體的工作,只負責簽字審批確認即可,那么就減少了IO對CPU的影響,從而提高了CPU的利用率,但這種還是會占用到CPU的時間消耗和利用率,因此為了完全不占用CPU,於是出現了專門負責IO的專門者,就是通道:
Java 為 Channel 接口提供的最主要實現類如下:
•FileChannel:用於讀取、寫入、映射和操作文件的通道。
•DatagramChannel:通過 UDP 讀寫網絡中的數據通道。
•SocketChannel:通過 TCP 讀寫網絡中的數據。
•ServerSocketChannel:可以監聽新進來的 TCP 連接,對每一個新進來的連接都會創建一個 SocketChannel。
獲取通道
獲取通道的一種方式是對支持通道的對象調用getChannel() 方法。支持通道的類如下:
FileInputStream
FileOutputStream
RandomAccessFile
DatagramSocket
Socket
ServerSocket
獲取通道的其他方式是使用 Files 類的靜態方法 newByteChannel() 獲取字節通道。或者通過通道的靜態方法 open() 打開並返回指定通道。
通道的數據傳輸
將 Buffer 中數據寫入 Channel
例如:
從 Channel 讀取數據到 Buffer
例如:
分散(Scatter)和聚集(Gather)
分散讀取(Scattering Reads)是指從 Channel 中讀取的數據“分散” 到多個 Buffer 中
注意:按照緩沖區的順序,從 Channel 中讀取的數據依次將 Buffer 填滿。
聚集寫入(Gathering Writes)是指將多個 Buffer 中的數據“聚集”到 Channel。
注意:按照緩沖區的順序,寫入 position 和 limit 之間的數據到 Channel 。
transferFrom()
將數據從源通道傳輸到其他 Channel 中:
transferTo()
將數據從源通道傳輸到其他 Channel 中:
FileChannel 的常用方法
方 法 | 描 述 |
int read(ByteBuffer dst) | 從 Channel 中讀取數據到 ByteBuffer |
long read(ByteBuffer[] dsts) | 將 Channel 中的數據“分散”到 ByteBuffer[] |
int write(ByteBuffer src) | 將 ByteBuffer 中的數據寫入到 Channel |
long write(ByteBuffer[] srcs) | 將 ByteBuffer[] 中的數據“聚集”到 Channel |
long position() | 返回此通道的文件位置 |
FileChannel position(long p) | 設置此通道的文件位置 |
long size() | 返回此通道的文件的當前大小 |
FileChannel truncate(long s) | 將此通道的文件截取為給定大小 |
void force(boolean metaData) | 強制將所有對此通道的文件更新寫入到存儲設備中 |
上述樣例代碼和核心:
import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.junit.Test; /* * 一、通道(Channel):用於源節點與目標節點的連接。在 Java NIO 中負責緩沖區中數據的傳輸。Channel 本身不存儲數據,因此需要配合緩沖區進行傳輸。 * * 二、通道的主要實現類 * java.nio.channels.Channel 接口: * |--FileChannel * |--SocketChannel * |--ServerSocketChannel * |--DatagramChannel * * 三、獲取通道 * 1. Java 針對支持通道的類提供了 getChannel() 方法 * 本地 IO: * FileInputStream/FileOutputStream * RandomAccessFile * * 網絡IO: * Socket * ServerSocket * DatagramSocket * * 2. 在 JDK 1.7 中的 NIO.2 針對各個通道提供了靜態方法 open() * 3. 在 JDK 1.7 中的 NIO.2 的 Files 工具類的 newByteChannel() * * 四、通道之間的數據傳輸 * transferFrom() * transferTo() * * 五、分散(Scatter)與聚集(Gather) * 分散讀取(Scattering Reads):將通道中的數據分散到多個緩沖區中 * 聚集寫入(Gathering Writes):將多個緩沖區中的數據聚集到通道中 * * 六、字符集:Charset * 編碼:字符串 -> 字節數組 * 解碼:字節數組 -> 字符串 * */ public class TestChannel { //字符集 @Test public void test6() throws IOException{ Charset cs1 = Charset.forName("GBK"); //獲取編碼器 CharsetEncoder ce = cs1.newEncoder(); //獲取解碼器 CharsetDecoder cd = cs1.newDecoder(); CharBuffer cBuf = CharBuffer.allocate(1024); cBuf.put("尚硅谷威武!"); cBuf.flip(); //編碼 ByteBuffer bBuf = ce.encode(cBuf); for (int i = 0; i < 12; i++) { System.out.println(bBuf.get()); } //解碼 bBuf.flip(); CharBuffer cBuf2 = cd.decode(bBuf); System.out.println(cBuf2.toString()); System.out.println("------------------------------------------------------"); Charset cs2 = Charset.forName("GBK"); bBuf.flip(); CharBuffer cBuf3 = cs2.decode(bBuf); System.out.println(cBuf3.toString()); } @Test public void test5(){ Map<String, Charset> map = Charset.availableCharsets(); Set<Entry<String, Charset>> set = map.entrySet(); for (Entry<String, Charset> entry : set) { System.out.println(entry.getKey() + "=" + entry.getValue()); } } //分散和聚集 @Test public void test4() throws IOException{ RandomAccessFile raf1 = new RandomAccessFile("1.txt", "rw"); //1. 獲取通道 FileChannel channel1 = raf1.getChannel(); //2. 分配指定大小的緩沖區 ByteBuffer buf1 = ByteBuffer.allocate(100); ByteBuffer buf2 = ByteBuffer.allocate(1024); //3. 分散讀取 ByteBuffer[] bufs = {buf1, buf2}; channel1.read(bufs); for (ByteBuffer byteBuffer : bufs) { byteBuffer.flip(); } System.out.println(new String(bufs[0].array(), 0, bufs[0].limit())); System.out.println("-----------------"); System.out.println(new String(bufs[1].array(), 0, bufs[1].limit())); //4. 聚集寫入 RandomAccessFile raf2 = new RandomAccessFile("2.txt", "rw"); FileChannel channel2 = raf2.getChannel(); channel2.write(bufs); } //通道之間的數據傳輸(直接緩沖區) @Test public void test3() throws IOException{ FileChannel inChannel = FileChannel.open(Paths.get("d:/1.mkv"), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get("d:/2.mkv"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE); // inChannel.transferTo(0, inChannel.size(), outChannel); outChannel.transferFrom(inChannel, 0, inChannel.size()); inChannel.close(); outChannel.close(); } //使用直接緩沖區完成文件的復制(內存映射文件) @Test public void test2() throws IOException{//2127-1902-1777 long start = System.currentTimeMillis(); FileChannel inChannel = FileChannel.open(Paths.get("d:/1.mkv"), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get("d:/2.mkv"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE); //內存映射文件 MappedByteBuffer inMappedBuf = inChannel.map(MapMode.READ_ONLY, 0, inChannel.size()); MappedByteBuffer outMappedBuf = outChannel.map(MapMode.READ_WRITE, 0, inChannel.size()); //直接對緩沖區進行數據的讀寫操作 byte[] dst = new byte[inMappedBuf.limit()]; inMappedBuf.get(dst); outMappedBuf.put(dst); inChannel.close(); outChannel.close(); long end = System.currentTimeMillis(); System.out.println("耗費時間為:" + (end - start)); } //利用通道完成文件的復制(非直接緩沖區) @Test public void test1(){//10874-10953 long start = System.currentTimeMillis(); FileInputStream fis = null; FileOutputStream fos = null; //①獲取通道 FileChannel inChannel = null; FileChannel outChannel = null; try { fis = new FileInputStream("d:/1.mkv"); fos = new FileOutputStream("d:/2.mkv"); inChannel = fis.getChannel(); outChannel = fos.getChannel(); //②分配指定大小的緩沖區 ByteBuffer buf = ByteBuffer.allocate(1024); //③將通道中的數據存入緩沖區中 while(inChannel.read(buf) != -1){ buf.flip(); //切換讀取數據的模式 //④將緩沖區中的數據寫入通道中 outChannel.write(buf); buf.clear(); //清空緩沖區 } } catch (IOException e) { e.printStackTrace(); } finally { if(outChannel != null){ try { outChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(inChannel != null){ try { inChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if(fos != null){ try { fos.close(); } catch (IOException e) { e.printStackTrace(); } } if(fis != null){ try { fis.close(); } catch (IOException e) { e.printStackTrace(); } } } long end = System.currentTimeMillis(); System.out.println("耗費時間為:" + (end - start)); } }
參考資料:
《尚硅谷》視頻學習