一、基礎知識:
1. Java IO一般包含兩個部分:1)java.io包中阻塞型IO;2)java.nio包中的非阻塞型IO,通常稱為New IO。這里只考慮到java.io包中堵塞型IO;
2. Java.io包簡單地分類。
2.1 Java的IO主要包含三個部分:
1)流式部分――IO的主體部分;
2)非流式部分――主要包含一些輔助流式部分的類,如:File類、RandomAccessFile類和FileDescriptor等類;
3)文件讀取部分的與安全相關的類,如:SerializablePermission類。以及與本地操作系統相關的文件系統的類,如:FileSystem類和Win32FileSystem類和WinNTFileSystem類。
2.2 流式部分可以概括:
1)字節流(Byte Stream)和字符流(Char Stream)的對應;
2)輸入和輸出的對應。
3)從字節流到字符流的橋梁。對應於輸入和輸出為InputStreamReader和OutputStreamWriter。
2.3 流的具體類中又可以具體分為:
1)介質流(Media Stream或者稱為原始流Raw Stream)――主要指一些基本的流,他們主要是從具體的介質上,如:文件、內存緩沖區(Byte數組、Char數組、StringBuffer對象)等,讀取數據;
2)過濾流(Filter Stream)――主要指所有FilterInputStream/FilterOutputStream和FilterReader/FilterWriter的子類,主要是對其包裝的類進行某些特定的處理,如:緩存等。
2.4 節點流和處理流
1)節點流是FileInputStream、ByteArrayInputStream這些直接從某個地方獲取流的類;
2)處理流則是BufferedInputStream這種可以裝飾節點流,來實現特定功能的類。因此,節點流可以理解為裝飾者模式中的被裝飾者,處理流則是裝飾者。
類圖可以參考博客:https://blog.csdn.net/u013063153/article/category/6399747
二、類的分析
1、輸入字節流
IO中輸入字節流的繼承圖:
-InputStream -ByteArrayInputStream //將內存中的Byte數組適配為一個InputStream -FileInputStream //最基本的文件輸入流。主要用於從文件中讀取信息 -FilterInputStream //給其它被裝飾對象提供額外功能的抽象類 -BufferedInputStream //使用該對象阻止每次讀取一個字節都會頻繁操作IO。將字節讀取一個緩存區,從緩存區讀取。 -DataInputStream //使用它可以讀出基本數據類型 -LineNumberInputStream //跟蹤輸入流中的行號。可以得到和設置行號。 -PushbackInputStream //可以在讀取最后一個byte 后將其放回到緩存中。 -ObjectInputStream -PipedInputStream //在流中實現了管道的概念讀取PipedOutputStream寫入的數據。 -SequenceInputStream //將2個或者多個InputStream 對象轉變為一個InputStream -StringBufferInputStream //將內存中的字符串適配為一個InputStream(廢棄)
1)InputStream是抽象類,是所有字節輸入流的超類。
2)ByteArrayInputStream、StringBufferInputStream、FileInputStream是三種基本的介質流,它們分別將Byte數組、StringBuffer、和本地文件中讀取數據。PipedInputStream是從與其它線程共用的管道中讀取數據;
3)ObjectInputStream和所有FilterInputStream的子類都是裝飾流(裝飾器模式的主角)。
4)FileInputStream 文件輸入流,用於讀取本地文件中的字節數據。
2. IO中的輸出字節流
IO中輸出字節流的繼承圖:
-OutputStream -ByteArrayOutputStream //在內存中創建一個buffer。所有寫入此流中的數據都被放入到此buffer中。 -FileOutputStream //將信息寫入文件中。 -FilterOutputStream //實現OutputStream裝飾器功能的抽象類。 -BufferedOutputStream //使用該對象阻止每次讀取一個字節都會頻繁操作IO。將字節讀取一個緩存區,從緩存區讀取。 -DataOutputStream //使用它可以寫入基本數據類型。 -PrintStream //產生具有格式的輸出信息。(一般地在java程序中DataOutputStream用於數據的存儲,即J2EE中持久層完成的功能,PrintStream完成顯示的功能,類似於J2EE中表現層的功能) -BufferedOutputStream //使用它可以避免頻繁地向IO寫入數據,數據一般都寫入一個緩存區,在調用flush方法后會清空緩存、一次完成數據的寫入。 -PipedOutputStream //任何寫入此對象的信息都被放入對應PipedInputStream 對象的緩存中,從而完成線程的通信,實現了“管道”的概念。
1)OutputStream是所有的輸出字節流的父類,它是一個抽象類。
2)ByteArrayOutputStream、FileOutputStream是兩種基本的介質流,它們分別向Byte數組、和本地文件中寫入數據。PipedOutputStream是向與其它線程共用的管道中寫入數據。
3)ObjectOutputStream和所有FilterOutputStream的子類都是裝飾流。
3. 字節流的輸入與輸出的對應
1)LineNumberInputStream主要完成從流中讀取數據時,會得到相應的行號,至於什么時候分行、在哪里分行是由改類主動確定的,並不是在原始中有這樣一個行號。在輸出部分沒有對應的部分,我們完全可以自己建立一個LineNumberOutputStream,在最初寫入時會有一個基准的行號,以后每次遇到換行時會在下一行添加一個行號,看起來也是可以的。
2)PushbackInputStream的功能是查看最后一個字節,不滿意就放入緩沖區。主要用在編譯器的語法、詞法分析部分。輸出部分的BufferedOutputStream幾乎實現相近的功能。
3)SequenceInputStream可以認為是一個工具類,將兩個或者多個輸入流當成一個輸入流依次讀取。完全可以從IO包中去除,還完全不影響IO包的結構,卻讓其更“純潔”――純潔的Decorator模式。
4)PrintStream也可以認為是一個輔助工具。主要可以向其他輸出流,或者FileInputStream寫入數據,本身內部實現還是帶緩沖的。本質上是對其它流的綜合運用的一個工具而已。一樣可以踢出IO包!System.out和System.out就是PrintStream的實例!
5)ObjectInputStream/ObjectOutputStream和DataInputStream/DataOutputStream主要是要求寫對象/數據和讀對象/數據的次序要保持一致,否則輕則不能得到正確的數據,重則拋出異常;
6)PipedInputStream/PipedOutputStream在創建時一般就一起創建,調用它們的讀寫方法時會檢查對方是否存在,或者關閉。
4. 輸入字符流
IO中輸入字符流的繼承圖:
-Reader -BufferedReader -LineNumberReader -CharArrayReader -FilterReader -PushbackReader -InputStreamReader -FileReader -PipedReader -StringReader
1)Reader是所有的輸入字符流的父類,它是一個抽象類。
2)CharReader、StringReader是兩種基本的介質流,它們分別將Char數組、String中讀取數據。PipedReader是從與其它線程共用的管道中讀取數據。
3)BufferedReader很明顯就是一個裝飾器,它和其子類負責裝飾其它Reader對象。
4)FilterReader是所有自定義具體裝飾流的父類,其子類PushbackReader對Reader對象進行裝飾,會增加一個行號。
5)InputStreamReader是字節流向字符流轉化的橋梁,它使用指定的 charset 讀取字節並將其解碼為字符。它使用的字符集可以由名稱指定或顯式給定,或者可以接受平台默認的字符集。其構造方法的默認參數為InputStream 對象。
使用方法:
InputStreamReader(InputStream in),
InputStreamReader(Inputstreamin, charset cs)
為了達到最高效率,InputStreamReader通常用法為:
BufferedReader in = new BufferedReader(newInputStreamReader(System.in));
BufferedReader:緩沖輸入流,包裝其他字符輸入流,提高讀取效率,從字符輸入流中讀取文本,緩沖各個字符,從而實現字符、數組和行的高效讀取。Reader的讀取操作開銷大,為提高效率使用BufferedReader包裝其他Reader(如FileReader和InputStreamReader)
5. 輸出字符流:
IO中輸出字符流的繼承圖:
-Writer -BufferedWriter -CharArrayWriter -FilterWriter -OutputStreamWriter -FileWriter -PipedWriter -PrintWriter -StringWriter
1)Writer是所有的輸出字符流的父類,它是一個抽象類。
2)CharArrayWriter、StringWriter是兩種基本的介質流,它們分別向Char數組、String中寫入數據。PipedWriter是向與其它線程共用的管道中寫入數據。
3) BufferedWriter是一個裝飾器為Writer提供緩沖功能。
4)PrintWriter和PrintStream極其類似,功能和使用也非常相似。
5)OutputStreamWriter:是字符流通向字節流的橋梁,它使用指定的 charset 讀取字符並將其解碼為字節。它使用的字符集可以由名稱指定或顯式給定,或者可以接受平台默認的字符集。
使用方法:
其構造方法的默認參數為OutputStream 對象,
OutputStreamReader(InputStream in),
OutputStreamReader(Inputstream in, charset cs)
為了達到最高效率,OutputStreamReader通常用法為:
BufferedWriter out = new BufferedWriter(newOutputStreamWriter(System.in));
BufferedWriter:緩沖輸出流,包裝其他字符輸出流,提高讀取效率,將文本寫入字符輸出流,緩沖各個字符,從而提供單個字符、數組和字符串的高效寫入Writer的讀取操作開銷大,為提高效率使用BufferedWriter包裝其他Writer(如FileWriter和InputStreamWriter)
6. 序列化與反序列化:ObjectInputStream,ObjectOutputStream
JAVA提出序列化是為了將對象在ObjectOutputStream:對象輸出流,它的writeObject(Object obj)方法可以對參數指定的obj對象進行序列化,把得到的字節序列寫到一個目標輸出流中。
序列化過程:
File file = new File(“path”); OutputStream fos = new FileOutputStream(file); ObjectOutputStream oos = new ObjectOutputStream(“fos”);將輸出流對象輸出到file對象中。 oos.writeObject(Object obj); oos.flush(); oos.close();
ObjectInputStream:對象輸入流,它的readObject()方法可以序列化文件進行反序列化,把字節序列文件轉化為對象。
反序列化過程:
File file = new File(“path”) InputStream fis = new FileInputStream(file); ObjectInputStream ois = new ObjectInputStream(fis); Class obj = (class)ois.readObject(); Ois.close();
實現序列化的兩種方式:
1)類實現Serializable接口,類只有實現了serializable接口,ObjectOutputstream才會去將類的對象序列化,否則會拋出NotSerializableException異常
2)類繼承Externalizable類。
三、主要源碼實現:
1.InputStream:
public abstract class InputStream implements Closeable { private static final int SKIP_BUFFER_SIZE = 2048; //用於skip方法,和skipBuffer相關 private static byte[] skipBuffer; // skipBuffer is initialized in skip(long), if needed. //從輸入流中讀取下一個字節, //正常返回0-255,到達文件的末尾返回-1 //在流中還有數據,但是沒有讀到時該方法會阻塞(block) //Java IO和New IO的區別就是阻塞流和非阻塞流 //抽象方法!不同的子類不同的實現! public abstract int read() throws IOException; //將流中的數據讀入放在byte數組的第off個位置先后的len個位置中 //放回值為放入字節的個數。 //這個方法在利用抽象方法read,某種意義上簡單的Templete模式。 public int read(byte b[], int off, int len) throws IOException { //檢查輸入是否正常。一般情況下,檢查輸入是方法設計的第一步 if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } //讀取下一個字節 int c = read(); //到達文件的末端返回-1 if (c == -1) { return -1; } //放回的字節downcast b[off] = (byte)c; //已經讀取了一個字節 int i = 1; try { //最多讀取len個字節,所以要循環len次 for (; i < len ; i++) { //每次循環從流中讀取一個字節 //由於read方法阻塞, //所以read(byte[],int,int)也會阻塞 c = read(); //到達末尾,理所當然放回-1 if (c == -1) { break; } //讀到就放入byte數組中 b[off + i] = (byte)c; } } catch (IOException ee) { } return i; } //利用上面的方法read(byte[] b) public int read(byte b[]) throws IOException { return read(b, 0, b.length); } //方法內部使用的、表示要跳過的字節數目, public long skip(long n) throws IOException { long remaining = n; int nr; if (skipBuffer == null) //初始化一個跳轉的緩存 skipBuffer = new byte[SKIP_BUFFER_SIZE]; //本地化的跳轉緩存 byte[] localSkipBuffer = skipBuffer; //檢查輸入參數,應該放在方法的開始 if (n <= 0) { return 0; } //一共要跳過n個,每次跳過部分,循環 while (remaining > 0) { nr = read(localSkipBuffer, 0, (int) Math.min(SKIP_BUFFER_SIZE, remaining)); //利用上面的read(byte[],int,int)方法盡量讀取n個字節 //讀到流的末端,則返回 if (nr < 0) { break; } //沒有完全讀到需要的,則繼續循環 remaining -= nr; } return n - remaining;//返回時要么全部讀完,要么因為到達文件末端,讀取了部分 } //查詢流中還有多少可以讀取的字節 //該方法不會block。在java中抽象類方法的實現一般有以下幾種方式: //1.拋出異常(java.util);2.“弱”實現。像上面這種。子類在必要的時候覆蓋它。 //3.“空”實現。 public int available() throws IOException { return 0; } //關閉當前流、同時釋放與此流相關的資源 //關閉當前流、同時釋放與此流相關的資源 public void close() throws IOException {} //markSupport可以查詢當前流是否支持mark public synchronized void mark(int readlimit) {} //對mark過的流進行復位。只有當流支持mark時才可以使用此方法。 public synchronized void reset() throws IOException { throw new IOException("mark/reset not supported"); } //查詢是否支持mark //絕大部分不支持,因此提供默認實現,返回false。子類有需要可以覆蓋。 public boolean markSupported() { return false; } }
2.FilterInputStream
這是字節輸入流部分裝飾器模式的核心。是在裝飾器模式中的Decorator對象,主要完成對其它流裝飾的基本功能:
public class FilterInputStream extends InputStream { //裝飾器的代碼特征:被裝飾的對象一般是裝飾器的成員變量 protected volatile InputStream in; //將要被裝飾的字節輸入流 protected FilterInputStream(InputStream in) { //通過構造方法傳入此被裝飾的流 this.in = in; } //下面這些方法,完成最小的裝飾――0裝飾,只是調用被裝飾流的方法而已 public int read() throws IOException { return in.read(); } public int read(byte b[]) throws IOException { return read(b, 0, b.length); } public int read(byte b[], int off, int len) throws IOException { return in.read(b, off, len); } public long skip(long n) throws IOException { return in.skip(n); } public int available() throws IOException { return in.available(); } public void close() throws IOException { in.close(); } public synchronized void mark(int readlimit) { in.mark(readlimit); } public synchronized void reset() throws IOException { in.reset(); } public boolean markSupported() { return in.markSupported(); } }
ByteArray到ByteArrayInputStream的適配:
ByteArrayInputStream內部有一個byte類型的buffer。很典型的適配器模式的應用――將byte數組適配流的接口。
public class ByteArrayInputStream extends InputStream { protected byte buf[]; //內部的buffer,一般通過構造器輸入 protected int pos; //當前位置的cursor。從0至byte數組的長度。 //byte[pos]就是read方法讀取的字節 protected int mark = 0; //mark的位置。 protected int count; //流中字節的數目。 //構造器,從一個byte[]創建一個ByteArrayInputStream public ByteArrayInputStream(byte buf[]) { //初始化流中的各個成員變量 this.buf = buf; this.pos = 0; this.count = buf.length; } //構造器 public ByteArrayInputStream(byte buf[], int offset, int length) { this.buf = buf; this.pos = offset; //與上面不同 this.count = Math.min(offset + length, buf.length); this.mark = offset; //與上面不同 } //從流中讀取下一個字節 public synchronized int read() { //返回下一個位置的字節//流中沒有數據則返回-1 return (pos < count) ? (buf[pos++] & 0xff) : -1; } // ByteArrayInputStream要覆蓋InputStream中可以看出其提供了該方法的實現 //某些時候,父類不能完全實現子類的功能,父類的實現一般比較通用。 //當子類有更有效的方法時,我們會覆蓋這些方法。 public synchronized int read(byte b[], int off, int len) { //首先檢查輸入參數的狀態是否正確 if(b==null){ throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } if (pos >= count) { return -1; } if (pos + len > count) { len = count - pos; } if (len <= 0) { return 0; } //java中提供數據復制的方法 //出於速度的原因!他們都用到System.arraycopy方法 System.arraycopy(buf, pos, b, off, len); pos += len; return len; } //下面這個方法,在InputStream中也已經實現了。 //但是當時是通過將字節讀入一個buffer中實現的,好像效率低了一點。 //比InputStream中的方法簡單、高效 public synchronized long skip(long n) { //當前位置,可以跳躍的字節數目 if (pos + n > count) { n = count - pos; } //小於0,則不可以跳躍 if (n < 0) { return 0; } //跳躍后,當前位置變化 pos += n; return n; } //查詢流中還有多少字節沒有讀取。 public synchronized int available() { return count - pos; } //ByteArrayInputStream支持mark所以返回true public boolean markSupported() { return true; } //在流中當前位置mark。 public void mark(int readAheadLimit) { mark = pos; } //重置流。即回到mark的位置。 public synchronized void reset() { pos = mark; } //關閉ByteArrayInputStream不會產生任何動作。 public void close() throws IOException { } }
3.BufferedInputStream//該類主要完成對被包裝流,加上一個緩存的功能
public class BufferedInputStream extends FilterInputStream { private static int defaultBufferSize = 8192; //默認緩存的大小 protected volatile byte buf[]; //內部的緩存 protected int count; //buffer的大小 protected int pos; //buffer中cursor的位置 protected int markpos = -1; //mark的位置 protected int marklimit; //mark的范圍 //原子性更新。和一致性編程相關 private static final AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater = AtomicReferenceFieldUpdater.newUpdater (BufferedInputStream.class, byte[].class,"buf"); //檢查輸入流是否關閉,同時返回被包裝流 private InputStream getInIfOpen() throws IOException { InputStream input = in; if (input == null) throw new IOException("Stream closed"); return input; } //檢查buffer的狀態,同時返回緩存 private byte[] getBufIfOpen() throws IOException { byte[] buffer = buf; //不太可能發生的狀態 if (buffer == null) throw new IOException("Stream closed"); return buffer; } //構造器 public BufferedInputStream(InputStream in) { //指定默認長度的buffer this(in, defaultBufferSize); } //構造器 public BufferedInputStream(InputStream in, int size) { super(in); //檢查輸入參數 if(size<=0){ throw new IllegalArgumentException("Buffer size <= 0"); } //創建指定長度的buffer buf = new byte[size]; } //從流中讀取數據,填充如緩存中。 private void fill() throws IOException { //得到buffer byte[] buffer = getBufIfOpen(); if (markpos < 0) //mark位置小於0,此時pos為0 pos = 0; //pos大於buffer的長度 else if (pos >= buffer.length) if (markpos > 0) { int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz); pos = sz; markpos = 0; } else if (buffer.length >= marklimit) { //buffer的長度大於marklimit時,mark失效 markpos = -1; //丟棄buffer中的內容
pos = 0; }else{ //buffer的長度小於marklimit時對buffer擴容 int nsz = pos * 2; if (nsz > marklimit) nsz = marklimit;//擴容為原來的2倍,太大則為marklimit大小 byte nbuf[] = new byte[nsz]; //將buffer中的字節拷貝如擴容后的buf中 System.arraycopy(buffer, 0, nbuf, 0, pos); if (!bufUpdater.compareAndSet(this, buffer, nbuf)) { //在buffer在被操作時,不能取代此buffer throw new IOException("Stream closed"); } //將新buf賦值給buffer buffer = nbuf; } count = pos; int n = getInIfOpen().read(buffer, pos, buffer.length - pos); if (n > 0) count = n + pos; } //讀取下一個字節 public synchronized int read() throws IOException { //到達buffer的末端 if (pos >= count) { //就從流中讀取數據,填充buffer fill();
//讀過一次,沒有數據則返回-1 if (pos >= count) return -1; } //返回buffer中下一個位置的字節 return getBufIfOpen()[pos++] & 0xff; } //將數據從流中讀入buffer中 private int read1(byte[] b, int off, int len) throws IOException { int avail = count - pos; //buffer中還剩的可讀字符 //buffer中沒有可以讀取的數據時 if(avail<=0){ //將輸入流中的字節讀入b中 if (len >= getBufIfOpen().length && markpos < 0) { return getInIfOpen().read(b, off, len); } fill();//填充 avail = count - pos; if (avail <= 0) return -1; } //從流中讀取后,檢查可以讀取的數目 int cnt = (avail < len) ? avail : len; //將當前buffer中的字節放入b的末端 System.arraycopy(getBufIfOpen(), pos, b, off, cnt); pos += cnt; return cnt; } public synchronized int read(byte b[], int off, int len)throws IOException { getBufIfOpen();
// 檢查buffer是否open //檢查輸入參數是否正確 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } int n = 0; for (;;) { int nread = read1(b, off + n, len - n); if (nread <= 0) return (n == 0) ? nread : n; n += nread; if (n >= len) return n; InputStream input = in; if (input != null && input.available() <= 0) return n; } } public synchronized long skip(long n) throws IOException { // 檢查buffer是否關閉 getBufIfOpen(); //檢查輸入參數是否正確 if (n <= 0) { return 0; } //buffered中可以讀取字節的數目 long avail = count - pos; //可以讀取的小於0,則從流中讀取 if (avail <= 0) { //mark小於0,則mark在流中 if (markpos <0) return getInIfOpen().skip(n); // 從流中讀取數據,填充緩沖區。 fill(); //可以讀的取字節為buffer的容量減當前位置 avail = count - pos; if (avail <= 0) return 0; } long skipped = (avail < n) ? avail : n; pos += skipped;
//當前位置改變 return skipped; } //該方法不會block!返回流中可以讀取的字節的數目。 //該方法的返回值為緩存中的可讀字節數目加流中可讀字節數目的和 public synchronized int available() throws IOException { return getInIfOpen().available() + (count - pos); } //當前位置處為mark位置 public synchronized void mark(int readlimit) { marklimit = readlimit; markpos = pos; } public synchronized void reset() throws IOException { // 緩沖去關閉了,肯定就拋出異常!程序設計中經常的手段 getBufIfOpen(); if (markpos < 0) throw new IOException("Resetting to invalid mark"); pos = markpos; } //該流和ByteArrayInputStream一樣都支持mark public boolean markSupported() { return true; } //關閉當前流同時釋放相應的系統資源。 public void close() throws IOException { byte[] buffer; while ( (buffer = buf) != null) { if (bufUpdater.compareAndSet(this, buffer, null)) { InputStream input = in; in = null; if (input != null) input.close(); return; } // Else retry in case a new buf was CASed in fill() } } }
4.PipedOutputStream
PipedOutputStream一般必須和一個PipedInputStream連接。共同構成一個pipe。即必須連接輸入部分。
其原理為:PipedInputStream內部有一個Buffer, PipedInputStream可以使用InputStream的方法讀取其Buffer中的字節。PipedInputStream中Buffer中的字節是PipedOutputStream調用PipedInputStream的方法放入的。
public class PipedOutputStream extends OutputStream { //包含一個PipedInputStream private PipedInputStream sink; //帶有目的地的構造器 public PipedOutputStream(PipedInputStream snk)throws IOException { connect(snk); } //默認構造器,必須使用下面的connect方法連接 public PipedOutputStream() { } public synchronized void connect(PipedInputStream snk) throws IOException { //檢查輸入參數的正確性 if(snk==null){ throw new NullPointerException(); } else if (sink != null || snk.connected) { throw new IOException("Already connected"); } //一系列初始化工作 sink = snk; snk.in = -1; snk.out = 0; snk.connected = true; } //向流中寫入數據 public void write(int b) throws IOException { if (sink == null) { throw new IOException("Pipe not connected"); } //本質上是,調用PipedInputStream的receive方法接受此字節 sink.receive(b); } public void write(byte b[], int off, int len) throws IOException { //首先檢查輸入參數的正確性 if (sink == null) { throw new IOException("Pipe not connected"); } else if (b == null) { throw new NullPointerException(); } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return; } //調用PipedInputStream的receive方法接受 sink.receive(b, off, len); } //flush輸出流 public synchronized void flush() throws IOException { if (sink != null) { //本質是通知輸入流,可以讀取 synchronized (sink) { sink.notifyAll(); } } } //關閉流同時釋放相關資源 public void close() throws IOException { if (sink != null) { sink.receivedLast(); } } }
PipedInputStream
public class PipedInputStream extends InputStream { //標識有讀取方或寫入方關閉 boolean closedByWriter = false; volatile boolean closedByReader = false; //是否建立連接 boolean connected = false; //標識哪個線程 Thread readSide; Thread writeSide; //緩沖區的默認大小 protected static final int PIPE_SIZE = 1024; //緩沖區 protected byte buffer[] = new byte[PIPE_SIZE]; //下一個寫入字節的位置。0代表空,in==out代表滿 protected int in = -1; //下一個讀取字節的位置 protected int out = 0; //給定源的輸入流 public PipedInputStream(PipedOutputStream src) throws IOException { connect(src); } //默認構造器,下部一定要connect源 public PipedInputStream() { } //連接輸入源 public void connect(PipedOutputStream src) throws IOException { //調用源的connect方法連接當前對象 src.connect(this); } //只被PipedOuputStream調用 protected synchronized void receive(int b) throws IOException { //檢查狀態,寫入 checkStateForReceive(); //永遠是PipedOuputStream writeSide = Thread.currentThread(); //輸入和輸出相等,等待空間 if (in == out) awaitSpace(); if (in < 0) { in = 0; out = 0; } //放入buffer相應的位置 buffer[in++] = (byte)(b & 0xFF); //in為0表示buffer已空 if (in >= buffer.length) { in = 0; } } synchronized void receive(byte b[], int off, int len) throws IOException { checkStateForReceive(); //從PipedOutputStream可以看出 writeSide = Thread.currentThread(); int bytesToTransfer = len; while (bytesToTransfer > 0) { //滿了,會通知讀取的;空會通知寫入 if (in == out) awaitSpace(); int nextTransferAmount = 0; if (out < in) { nextTransferAmount = buffer.length - in; } else if (in < out) { if (in == -1) { in = out = 0; nextTransferAmount = buffer.length - in; } else { nextTransferAmount = out - in; } } if (nextTransferAmount > bytesToTransfer) nextTransferAmount = bytesToTransfer; assert(nextTransferAmount > 0); System.arraycopy(b, off, buffer, in, nextTransferAmount); bytesToTransfer -= nextTransferAmount; off += nextTransferAmount; in += nextTransferAmount; if (in >= buffer.length) { in = 0; } } } //檢查當前狀態,等待輸入 private void checkStateForReceive() throws IOException { if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByWriter || closedByReader) { throw new IOException("Pipe closed"); } else if (readSide != null && !readSide.isAlive()) { throw new IOException("Read end dead"); } } //Buffer已滿,等待一段時間 private void awaitSpace() throws IOException { //in==out表示滿了,沒有空間 while (in == out) { //檢查接受端的狀態 checkStateForReceive(); //通知讀取端 notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } } //通知所有等待的線程()已經接受到最后的字節 synchronized void receivedLast() { closedByWriter = true; // notifyAll(); } public synchronized int read() throws IOException { //檢查一些內部狀態 if (!connected) { throw new IOException("Pipe not connected"); } else if (closedByReader) { throw new IOException("Pipe closed"); } else if (writeSide != null && !writeSide.isAlive()&& !closedByWriter && (in < 0)) { throw new IOException("Write end dead"); } //當前線程讀取 readSide = Thread.currentThread(); //重復兩次??? int trials = 2; while (in < 0) { //輸入斷關閉返回-1 if (closedByWriter) { return -1; } //狀態錯誤 if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { throw new IOException("Pipe broken"); } notifyAll(); // 空了,通知寫入端可以寫入 try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } int ret = buffer[out++] & 0xFF; if (out >= buffer.length) { out = 0; } //沒有任何字節 if (in == out) { in = -1; } return ret; } public synchronized int read(byte b[], int off, int len) throws IOException { //檢查輸入參數的正確性 if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } //讀取下一個 int c = read(); //已經到達末尾了,返回-1 if (c < 0) { return -1; } //放入外部buffer中 b[off] = (byte) c; //return-len int rlen = 1; //下一個in存在,且沒有到達len while ((in >= 0) && (--len > 0)) { //依次放入外部buffer b[off + rlen] = buffer[out++]; rlen++; //讀到buffer的末尾,返回頭部 if (out >= buffer.length) { out = 0; } //讀、寫位置一致時,表示沒有數據 if (in == out) { in = -1; } } //返回填充的長度 return rlen; } //返回還有多少字節可以讀取 public synchronized int available() throws IOException { //到達末端,沒有字節 if(in < 0) return 0; else if(in == out) //寫入的和讀出的一致,表示滿 return buffer.length; else if (in > out) //寫入的大於讀出 return in - out; else //寫入的小於讀出的 return in + buffer.length - out; } //關閉當前流,同時釋放與其相關的資源 public void close() throws IOException { //表示由輸入流關閉 closedByReader = true; //同步化當前對象,in為-1 synchronized (this) { in = -1; } } }