Java源碼解析——Java IO包


一、基礎知識:

1. Java IO一般包含兩個部分:1)java.io包中阻塞型IO;2)java.nio包中的非阻塞型IO,通常稱為New IO。這里只考慮到java.io包中堵塞型IO;

2. Java.io包簡單地分類。

  2.1 Java的IO主要包含三個部分:

  1)流式部分――IO的主體部分;

  2)非流式部分――主要包含一些輔助流式部分的類,如:File類、RandomAccessFile類和FileDescriptor等類;

  3)文件讀取部分的與安全相關的類,如:SerializablePermission類。以及與本地操作系統相關的文件系統的類,如:FileSystem類和Win32FileSystem類和WinNTFileSystem類。

  2.2 流式部分可以概括:

  1)字節流(Byte Stream)和字符流(Char Stream)的對應;

  2)輸入輸出的對應。

  3)從字節流到字符流的橋梁。對應於輸入和輸出為InputStreamReaderOutputStreamWriter

  2.3 流的具體類中又可以具體分為:

       1)介質流(Media Stream或者稱為原始流Raw Stream)――主要指一些基本的流,他們主要是從具體的介質上,如:文件、內存緩沖區(Byte數組、Char數組、StringBuffer對象)等,讀取數據

       2)過濾流(Filter Stream)――主要指所有FilterInputStream/FilterOutputStream和FilterReader/FilterWriter的子類,主要是對其包裝的類進行某些特定的處理,如:緩存等。

  2.4 節點流和處理流

  1)節點流是FileInputStream、ByteArrayInputStream這些直接從某個地方獲取流的類;

       2)處理流則是BufferedInputStream這種可以裝飾節點流,來實現特定功能的類。因此,節點流可以理解為裝飾者模式中的被裝飾者,處理流則是裝飾者

 

類圖可以參考博客:https://blog.csdn.net/u013063153/article/category/6399747

 

二、類的分析

 

1、輸入字節流

 

  IO中輸入字節流的繼承圖:

-InputStream
    -ByteArrayInputStream         //將內存中的Byte數組適配為一個InputStream
    -FileInputStream          //最基本的文件輸入流。主要用於從文件中讀取信息
    -FilterInputStream         //給其它被裝飾對象提供額外功能的抽象類
        -BufferedInputStream     //使用該對象阻止每次讀取一個字節都會頻繁操作IO。將字節讀取一個緩存區,從緩存區讀取。
        -DataInputStream        //使用它可以讀出基本數據類型
        -LineNumberInputStream    //跟蹤輸入流中的行號。可以得到和設置行號。
        -PushbackInputStream      //可以在讀取最后一個byte 后將其放回到緩存中。
    -ObjectInputStream
    -PipedInputStream           //在流中實現了管道的概念讀取PipedOutputStream寫入的數據。
    -SequenceInputStream        //將2個或者多個InputStream 對象轉變為一個InputStream
    -StringBufferInputStream     //將內存中的字符串適配為一個InputStream(廢棄)

  1)InputStream是抽象類,是所有字節輸入流的超類。

  2)ByteArrayInputStream、StringBufferInputStream、FileInputStream是三種基本的介質流,它們分別將Byte數組、StringBuffer、和本地文件中讀取數據。PipedInputStream是從與其它線程共用的管道中讀取數據;

  3)ObjectInputStream和所有FilterInputStream的子類都是裝飾流(裝飾器模式的主角)。

  4)FileInputStream 文件輸入流,用於讀取本地文件中的字節數據。

 

2. IO中的輸出字節流

 

 IO中輸出字節流的繼承圖:

-OutputStream
    -ByteArrayOutputStream      //在內存中創建一個buffer。所有寫入此流中的數據都被放入到此buffer中。
    -FileOutputStream         //將信息寫入文件中。
    -FilterOutputStream        //實現OutputStream裝飾器功能的抽象類。    
        -BufferedOutputStream     //使用該對象阻止每次讀取一個字節都會頻繁操作IO。將字節讀取一個緩存區,從緩存區讀取。
        -DataOutputStream       //使用它可以寫入基本數據類型。        
        -PrintStream          //產生具有格式的輸出信息。(一般地在java程序中DataOutputStream用於數據的存儲,即J2EE中持久層完成的功能,PrintStream完成顯示的功能,類似於J2EE中表現層的功能)
        -BufferedOutputStream    //使用它可以避免頻繁地向IO寫入數據,數據一般都寫入一個緩存區,在調用flush方法后會清空緩存、一次完成數據的寫入。    
        -PipedOutputStream      //任何寫入此對象的信息都被放入對應PipedInputStream 對象的緩存中,從而完成線程的通信,實現了“管道”的概念。

  1)OutputStream是所有的輸出字節流的父類,它是一個抽象類。

  2)ByteArrayOutputStream、FileOutputStream是兩種基本的介質流,它們分別向Byte數組、和本地文件中寫入數據。PipedOutputStream是向與其它線程共用的管道中寫入數據。

  3)ObjectOutputStream和所有FilterOutputStream的子類都是裝飾流。

 

3. 字節流的輸入與輸出的對應

  1)LineNumberInputStream主要完成從流中讀取數據時,會得到相應的行號,至於什么時候分行、在哪里分行是由改類主動確定的,並不是在原始中有這樣一個行號。在輸出部分沒有對應的部分,我們完全可以自己建立一個LineNumberOutputStream,在最初寫入時會有一個基准的行號,以后每次遇到換行時會在下一行添加一個行號,看起來也是可以的。

  2)PushbackInputStream的功能是查看最后一個字節,不滿意就放入緩沖區。主要用在編譯器的語法、詞法分析部分。輸出部分的BufferedOutputStream幾乎實現相近的功能。

  3)SequenceInputStream可以認為是一個工具類,將兩個或者多個輸入流當成一個輸入流依次讀取。完全可以從IO包中去除,還完全不影響IO包的結構,卻讓其更“純潔”――純潔的Decorator模式。

  4)PrintStream也可以認為是一個輔助工具。主要可以向其他輸出流,或者FileInputStream寫入數據,本身內部實現還是帶緩沖的。本質上是對其它流的綜合運用的一個工具而已。一樣可以踢出IO包!System.out和System.out就是PrintStream的實例!

  5)ObjectInputStream/ObjectOutputStream和DataInputStream/DataOutputStream主要是要求寫對象/數據和讀對象/數據的次序要保持一致,否則輕則不能得到正確的數據,重則拋出異常;

  6)PipedInputStream/PipedOutputStream在創建時一般就一起創建,調用它們的讀寫方法時會檢查對方是否存在,或者關閉。

 

4. 輸入字符流

IO中輸入字符流的繼承圖:

-Reader
    -BufferedReader
        -LineNumberReader
    -CharArrayReader
    -FilterReader
        -PushbackReader
    -InputStreamReader
        -FileReader
    -PipedReader
    -StringReader

  1)Reader是所有的輸入字符流的父類,它是一個抽象類。

  2)CharReader、StringReader是兩種基本的介質流,它們分別將Char數組、String中讀取數據。PipedReader是從與其它線程共用的管道中讀取數據。

  3)BufferedReader很明顯就是一個裝飾器,它和其子類負責裝飾其它Reader對象。

  4)FilterReader是所有自定義具體裝飾流的父類,其子類PushbackReader對Reader對象進行裝飾,會增加一個行號。

  5)InputStreamReader是字節流向字符流轉化的橋梁,它使用指定的 charset 讀取字節並將其解碼為字符。它使用的字符集可以由名稱指定或顯式給定,或者可以接受平台默認的字符集。其構造方法的默認參數為InputStream 對象。

使用方法:

InputStreamReader(InputStream in),
InputStreamReader(Inputstreamin, charset cs)

為了達到最高效率,InputStreamReader通常用法為:

BufferedReader in = new BufferedReader(newInputStreamReader(System.in));

BufferedReader:緩沖輸入流,包裝其他字符輸入流,提高讀取效率,從字符輸入流中讀取文本,緩沖各個字符,從而實現字符、數組和行的高效讀取。Reader的讀取操作開銷大,為提高效率使用BufferedReader包裝其他Reader(如FileReader和InputStreamReader)

 

5. 輸出字符流:

IO中輸出字符流的繼承圖:

-Writer
    -BufferedWriter
    -CharArrayWriter
    -FilterWriter
    -OutputStreamWriter
        -FileWriter
    -PipedWriter
    -PrintWriter
    -StringWriter

  1)Writer是所有的輸出字符流的父類,它是一個抽象類。

  2)CharArrayWriter、StringWriter是兩種基本的介質流,它們分別向Char數組、String中寫入數據。PipedWriter是向與其它線程共用的管道中寫入數據。

  3) BufferedWriter是一個裝飾器為Writer提供緩沖功能。

  4)PrintWriter和PrintStream極其類似,功能和使用也非常相似。

  5)OutputStreamWriter:是字符流通向字節流的橋梁,它使用指定的 charset 讀取字符並將其解碼為字節。它使用的字符集可以由名稱指定或顯式給定,或者可以接受平台默認的字符集。

使用方法:

其構造方法的默認參數為OutputStream 對象,

OutputStreamReader(InputStream in),
OutputStreamReader(Inputstream in, charset cs)

為了達到最高效率,OutputStreamReader通常用法為:

BufferedWriter out = new BufferedWriter(newOutputStreamWriter(System.in));

BufferedWriter:緩沖輸出流,包裝其他字符輸出流,提高讀取效率,將文本寫入字符輸出流,緩沖各個字符,從而提供單個字符、數組和字符串的高效寫入Writer的讀取操作開銷大,為提高效率使用BufferedWriter包裝其他Writer(如FileWriter和InputStreamWriter)

 

6. 序列化與反序列化:ObjectInputStream,ObjectOutputStream

JAVA提出序列化是為了將對象在ObjectOutputStream:對象輸出流,它的writeObject(Object obj)方法可以對參數指定的obj對象進行序列化,把得到的字節序列寫到一個目標輸出流中。

序列化過程:

File file = new File(“path”);
OutputStream fos = new FileOutputStream(file);
ObjectOutputStream oos = new ObjectOutputStream(“fos”);將輸出流對象輸出到file對象中。
oos.writeObject(Object obj);
oos.flush();
oos.close();

ObjectInputStream:對象輸入流,它的readObject()方法可以序列化文件進行反序列化,把字節序列文件轉化為對象。

反序列化過程:

File file = new File(“path”)
InputStream fis = new FileInputStream(file);
ObjectInputStream ois = new ObjectInputStream(fis);
Class obj = (class)ois.readObject();
Ois.close();

實現序列化的兩種方式:

  1)類實現Serializable接口,類只有實現了serializable接口,ObjectOutputstream才會去將類的對象序列化,否則會拋出NotSerializableException異常

  2)類繼承Externalizable類。

 

三、主要源碼實現:

1.InputStream:

public abstract class InputStream implements Closeable {
    private static final int SKIP_BUFFER_SIZE = 2048;  //用於skip方法,和skipBuffer相關
    private static byte[] skipBuffer;    // skipBuffer is initialized in skip(long), if needed.
    
    //從輸入流中讀取下一個字節,
    //正常返回0-255,到達文件的末尾返回-1
    //在流中還有數據,但是沒有讀到時該方法會阻塞(block)
    //Java IO和New IO的區別就是阻塞流和非阻塞流
    //抽象方法!不同的子類不同的實現!
    public abstract int read() throws IOException;  
    
    //將流中的數據讀入放在byte數組的第off個位置先后的len個位置中
    //放回值為放入字節的個數。
    //這個方法在利用抽象方法read,某種意義上簡單的Templete模式。
    public int read(byte b[], int off, int len) throws IOException {
        //檢查輸入是否正常。一般情況下,檢查輸入是方法設計的第一步
        if (b == null) {    
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {
             throw new IndexOutOfBoundsException();
        } else if (len == 0) {
             return 0;
        }        
        //讀取下一個字節
        int c = read();
        //到達文件的末端返回-1
        if (c == -1) {    return -1;   }
        //放回的字節downcast                           
        b[off] = (byte)c;
        //已經讀取了一個字節                                                   
        int i = 1;                                                                        
        try {
            //最多讀取len個字節,所以要循環len次
            for (; i < len ; i++) {
                //每次循環從流中讀取一個字節
                //由於read方法阻塞,
                //所以read(byte[],int,int)也會阻塞
                c = read();
                //到達末尾,理所當然放回-1                                       
                if (c == -1) {            break;           } 
                //讀到就放入byte數組中
                b[off + i] = (byte)c;
            }
        } catch (IOException ee) {     }
        return i;
    }

     //利用上面的方法read(byte[] b)
    public int read(byte b[]) throws IOException {
        return read(b, 0, b.length);
     }                          
    //方法內部使用的、表示要跳過的字節數目,
     public long skip(long n) throws IOException {
        long remaining = n;    
        int nr;
        if (skipBuffer == null)
        //初始化一個跳轉的緩存
        skipBuffer = new byte[SKIP_BUFFER_SIZE];                   
        //本地化的跳轉緩存
        byte[] localSkipBuffer = skipBuffer;          
        //檢查輸入參數,應該放在方法的開始                            
        if (n <= 0) {    return 0;      }                             
        //一共要跳過n個,每次跳過部分,循環       
        while (remaining > 0) {                                      
            nr = read(localSkipBuffer, 0, (int) Math.min(SKIP_BUFFER_SIZE, remaining));
            //利用上面的read(byte[],int,int)方法盡量讀取n個字節  
            //讀到流的末端,則返回
            if (nr < 0) {  break;    }
            //沒有完全讀到需要的,則繼續循環
            remaining -= nr;                                       
        }       
        return n - remaining;//返回時要么全部讀完,要么因為到達文件末端,讀取了部分
    }
    //查詢流中還有多少可以讀取的字節
    //該方法不會block。在java中抽象類方法的實現一般有以下幾種方式:
    //1.拋出異常(java.util);2.“弱”實現。像上面這種。子類在必要的時候覆蓋它。
    //3.“空”實現。
    public int available() throws IOException {           
        return 0;
    }
    //關閉當前流、同時釋放與此流相關的資源
    //關閉當前流、同時釋放與此流相關的資源
    public void close() throws IOException {}
    //markSupport可以查詢當前流是否支持mark
    public synchronized void mark(int readlimit) {}
    //對mark過的流進行復位。只有當流支持mark時才可以使用此方法。
    public synchronized void reset() throws IOException {

                   throw new IOException("mark/reset not supported");

}
//查詢是否支持mark
    //絕大部分不支持,因此提供默認實現,返回false。子類有需要可以覆蓋。
    public boolean markSupported() {           
        return false;
    }
}

 

2.FilterInputStream

 這是字節輸入流部分裝飾器模式的核心。是在裝飾器模式中的Decorator對象,主要完成對其它流裝飾的基本功能:

public class FilterInputStream extends InputStream {
    //裝飾器的代碼特征:被裝飾的對象一般是裝飾器的成員變量
    protected volatile InputStream in; //將要被裝飾的字節輸入流
    protected FilterInputStream(InputStream in) {   //通過構造方法傳入此被裝飾的流
        this.in = in;
     }
    //下面這些方法,完成最小的裝飾――0裝飾,只是調用被裝飾流的方法而已
    public int read() throws IOException {
        return in.read();
    }
    public int read(byte b[]) throws IOException {
        return read(b, 0, b.length);
     }
    public int read(byte b[], int off, int len) throws IOException {
        return in.read(b, off, len);
     }
    public long skip(long n) throws IOException {
        return in.skip(n);
    }
    public int available() throws IOException {
        return in.available();
    }
    public void close() throws IOException {
        in.close();
    }
    public synchronized void mark(int readlimit) {
        in.mark(readlimit);
     }
    public synchronized void reset() throws IOException {
        in.reset();
    }
    public boolean markSupported() {
        return in.markSupported();
    }
}

ByteArray到ByteArrayInputStream的適配:

ByteArrayInputStream內部有一個byte類型的buffer。很典型的適配器模式的應用――將byte數組適配流的接口。

public class ByteArrayInputStream extends InputStream {
    protected byte buf[];                //內部的buffer,一般通過構造器輸入
    protected int pos;                   //當前位置的cursor。從0至byte數組的長度。
    //byte[pos]就是read方法讀取的字節
    protected int mark = 0;           //mark的位置。
    protected int count;              //流中字節的數目。

    //構造器,從一個byte[]創建一個ByteArrayInputStream
     public ByteArrayInputStream(byte buf[]) {
        //初始化流中的各個成員變量
        this.buf = buf;              
        this.pos = 0;
        this.count = buf.length;
     }
    //構造器
     public ByteArrayInputStream(byte buf[], int offset, int length) {                
        this.buf = buf;
        this.pos = offset; //與上面不同
        this.count = Math.min(offset + length, buf.length);
        this.mark = offset; //與上面不同
    }
    //從流中讀取下一個字節
     public synchronized int read() {           
        //返回下一個位置的字節//流中沒有數據則返回-1
        return (pos < count) ? (buf[pos++] & 0xff) : -1; 
    }
    // ByteArrayInputStream要覆蓋InputStream中可以看出其提供了該方法的實現
    //某些時候,父類不能完全實現子類的功能,父類的實現一般比較通用。
    //當子類有更有效的方法時,我們會覆蓋這些方法。
    public synchronized int read(byte b[], int off, int len) {
        //首先檢查輸入參數的狀態是否正確
        if(b==null){ 
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        }
        if (pos >= count) {             return -1;             }
        if (pos + len > count) {      len = count - pos;         }
        if (len <= 0) {           return 0;     }
        //java中提供數據復制的方法
        //出於速度的原因!他們都用到System.arraycopy方法
        System.arraycopy(buf, pos, b, off, len); 
        pos += len;
        return len;
    }
    //下面這個方法,在InputStream中也已經實現了。
    //但是當時是通過將字節讀入一個buffer中實現的,好像效率低了一點。
    //比InputStream中的方法簡單、高效
    public synchronized long skip(long n) {
        //當前位置,可以跳躍的字節數目
        if (pos + n > count) {    n = count - pos;       }       
        //小於0,則不可以跳躍
         if (n < 0) {       return 0;     }   
        //跳躍后,當前位置變化                                 
        pos += n;                                                                              
        return n;
    }   
    //查詢流中還有多少字節沒有讀取。                                 
public synchronized int available() {
    return count - pos;
    }
    //ByteArrayInputStream支持mark所以返回true
    public boolean markSupported() {                   

                   return true;

}  
    //在流中當前位置mark。      
    public void mark(int readAheadLimit) {            
        mark = pos;
     }
    //重置流。即回到mark的位置。
    public synchronized void reset() {
        pos = mark;
    }
    //關閉ByteArrayInputStream不會產生任何動作。
    public void close() throws IOException {   }
}

 

3.BufferedInputStream//該類主要完成對被包裝流,加上一個緩存的功能

public class BufferedInputStream extends FilterInputStream { private static int defaultBufferSize = 8192;    //默認緩存的大小
    protected volatile byte buf[];  //內部的緩存
    protected int count;     //buffer的大小
    protected int pos;      //buffer中cursor的位置
    protected int markpos = -1;    //mark的位置
    protected int marklimit;     //mark的范圍 //原子性更新。和一致性編程相關
    private static final AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater = AtomicReferenceFieldUpdater.newUpdater (BufferedInputStream.class, byte[].class,"buf"); //檢查輸入流是否關閉,同時返回被包裝流
     private InputStream getInIfOpen() throws IOException { InputStream input = in; if (input == null)    throw new IOException("Stream closed"); return input; } //檢查buffer的狀態,同時返回緩存
    private byte[] getBufIfOpen() throws IOException { byte[] buffer = buf; //不太可能發生的狀態
        if (buffer == null)   throw new IOException("Stream closed"); return buffer; } //構造器
public BufferedInputStream(InputStream in) { //指定默認長度的buffer
        this(in, defaultBufferSize); } //構造器
    public BufferedInputStream(InputStream in, int size) { super(in); //檢查輸入參數
        if(size<=0){ throw new IllegalArgumentException("Buffer size <= 0"); } //創建指定長度的buffer
        buf = new byte[size]; } //從流中讀取數據,填充如緩存中。
    private void fill() throws IOException { //得到buffer
        byte[] buffer = getBufIfOpen(); if (markpos < 0) //mark位置小於0,此時pos為0
            pos = 0; //pos大於buffer的長度 
            else if (pos >= buffer.length) if (markpos > 0) { int sz = pos - markpos;                                                           
          System.arraycopy(buffer, markpos, buffer, 0, sz); pos = sz; markpos = 0; } else if (buffer.length >= marklimit) { //buffer的長度大於marklimit時,mark失效 markpos = -1; //丟棄buffer中的內容
          pos = 0;
}else{ //buffer的長度小於marklimit時對buffer擴容 int nsz = pos * 2; if (nsz > marklimit) nsz = marklimit;//擴容為原來的2倍,太大則為marklimit大小 byte nbuf[] = new byte[nsz]; //將buffer中的字節拷貝如擴容后的buf中 System.arraycopy(buffer, 0, nbuf, 0, pos); if (!bufUpdater.compareAndSet(this, buffer, nbuf)) { //在buffer在被操作時,不能取代此buffer throw new IOException("Stream closed"); } //將新buf賦值給buffer buffer = nbuf; } count = pos; int n = getInIfOpen().read(buffer, pos, buffer.length - pos); if (n > 0) count = n + pos; } //讀取下一個字節 public synchronized int read() throws IOException { //到達buffer的末端 if (pos >= count) { //就從流中讀取數據,填充buffer fill();
       //讀過一次,沒有數據則返回-1 if (pos >= count) return -1; } //返回buffer中下一個位置的字節 return getBufIfOpen()[pos++] & 0xff; } //將數據從流中讀入buffer中 private int read1(byte[] b, int off, int len) throws IOException { int avail = count - pos; //buffer中還剩的可讀字符 //buffer中沒有可以讀取的數據時 if(avail<=0){ //將輸入流中的字節讀入b中 if (len >= getBufIfOpen().length && markpos < 0) { return getInIfOpen().read(b, off, len); } fill();//填充 avail = count - pos; if (avail <= 0) return -1; } //從流中讀取后,檢查可以讀取的數目 int cnt = (avail < len) ? avail : len; //將當前buffer中的字節放入b的末端 System.arraycopy(getBufIfOpen(), pos, b, off, cnt); pos += cnt; return cnt; } public synchronized int read(byte b[], int off, int len)throws IOException { getBufIfOpen();
     // 檢查buffer是否open //檢查輸入參數是否正確 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } int n = 0; for (;;) { int nread = read1(b, off + n, len - n); if (nread <= 0) return (n == 0) ? nread : n; n += nread; if (n >= len) return n; InputStream input = in; if (input != null && input.available() <= 0) return n; } } public synchronized long skip(long n) throws IOException { // 檢查buffer是否關閉 getBufIfOpen(); //檢查輸入參數是否正確 if (n <= 0) { return 0; } //buffered中可以讀取字節的數目 long avail = count - pos; //可以讀取的小於0,則從流中讀取 if (avail <= 0) { //mark小於0,則mark在流中 if (markpos <0) return getInIfOpen().skip(n); // 從流中讀取數據,填充緩沖區。 fill(); //可以讀的取字節為buffer的容量減當前位置 avail = count - pos; if (avail <= 0) return 0; } long skipped = (avail < n) ? avail : n; pos += skipped;
     //當前位置改變 return skipped; } //該方法不會block!返回流中可以讀取的字節的數目。 //該方法的返回值為緩存中的可讀字節數目加流中可讀字節數目的和 public synchronized int available() throws IOException { return getInIfOpen().available() + (count - pos); } //當前位置處為mark位置 public synchronized void mark(int readlimit) { marklimit = readlimit; markpos = pos; } public synchronized void reset() throws IOException { // 緩沖去關閉了,肯定就拋出異常!程序設計中經常的手段 getBufIfOpen(); if (markpos < 0) throw new IOException("Resetting to invalid mark"); pos = markpos; } //該流和ByteArrayInputStream一樣都支持mark public boolean markSupported() { return true; } //關閉當前流同時釋放相應的系統資源。 public void close() throws IOException { byte[] buffer; while ( (buffer = buf) != null) { if (bufUpdater.compareAndSet(this, buffer, null)) { InputStream input = in; in = null; if (input != null) input.close(); return; } // Else retry in case a new buf was CASed in fill() } } }

 

4.PipedOutputStream

 

PipedOutputStream一般必須和一個PipedInputStream連接。共同構成一個pipe。即必須連接輸入部分。

其原理為:PipedInputStream內部有一個Buffer, PipedInputStream可以使用InputStream的方法讀取其Buffer中的字節。PipedInputStream中Buffer中的字節是PipedOutputStream調用PipedInputStream的方法放入的。

public class PipedOutputStream extends OutputStream {
    //包含一個PipedInputStream
    private PipedInputStream sink;               
    //帶有目的地的構造器
    public PipedOutputStream(PipedInputStream snk)throws IOException {       
        connect(snk);
    }
    //默認構造器,必須使用下面的connect方法連接
    public PipedOutputStream() {  }
                          
    public synchronized void connect(PipedInputStream snk) throws IOException {
        //檢查輸入參數的正確性
        if(snk==null){                                                                    
            throw new NullPointerException();
        } else if (sink != null || snk.connected) {
            throw new IOException("Already connected");
        }
        //一系列初始化工作
        sink = snk;                                                                           
        snk.in = -1;
        snk.out = 0;
        snk.connected = true;

} 
//向流中寫入數據
    public void write(int b) throws IOException {                       
        if (sink == null) {    throw new IOException("Pipe not connected");      }
        //本質上是,調用PipedInputStream的receive方法接受此字節
        sink.receive(b);           
    }
    public void write(byte b[], int off, int len) throws IOException {
        //首先檢查輸入參數的正確性
        if (sink == null) {                                                                   
            throw new IOException("Pipe not connected");
        } else if (b == null) {
            throw new NullPointerException();
        } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return;
        }
        //調用PipedInputStream的receive方法接受
        sink.receive(b, off, len);                                                                 
    }
    //flush輸出流
    public synchronized void flush() throws IOException {                 
        if (sink != null) {
            //本質是通知輸入流,可以讀取
            synchronized (sink) {     sink.notifyAll();     } 
        }
    }
    //關閉流同時釋放相關資源
    public void close()  throws IOException {                         
        if (sink != null) {    sink.receivedLast();         }
    }
}

PipedInputStream

public class PipedInputStream extends InputStream {
    //標識有讀取方或寫入方關閉
    boolean closedByWriter = false;                                                             
    volatile boolean closedByReader = false;    
    //是否建立連接
    boolean connected = false;                                                                     
    //標識哪個線程
    Thread readSide;                                                                                             
    Thread writeSide;
    //緩沖區的默認大小
    protected static final int PIPE_SIZE = 1024;                         
    //緩沖區
    protected byte buffer[] = new byte[PIPE_SIZE];                 
    //下一個寫入字節的位置。0代表空,in==out代表滿
    protected int in = -1;               
    //下一個讀取字節的位置
protected int out = 0;    
          
    //給定源的輸入流
    public PipedInputStream(PipedOutputStream src) throws IOException {                
        connect(src);
    }
    //默認構造器,下部一定要connect源
    public PipedInputStream() {    }                                                
    //連接輸入源
    public void connect(PipedOutputStream src) throws IOException {               
        //調用源的connect方法連接當前對象
        src.connect(this);                                                                           
    }
    //只被PipedOuputStream調用
    protected synchronized void receive(int b) throws IOException {                  
        //檢查狀態,寫入
        checkStateForReceive();                                                                                 
        //永遠是PipedOuputStream
        writeSide = Thread.currentThread();                                                      
        //輸入和輸出相等,等待空間
        if (in == out)     awaitSpace();                                                           
        if (in < 0) {
            in = 0;
            out = 0;
        }
        //放入buffer相應的位置
        buffer[in++] = (byte)(b & 0xFF);                                                             
        //in為0表示buffer已空
        if (in >= buffer.length) {      in = 0;         }                                             
    }

    synchronized void receive(byte b[], int off, int len)  throws IOException {
        checkStateForReceive();
        //從PipedOutputStream可以看出
        writeSide = Thread.currentThread();                                   
        int bytesToTransfer = len;
        while (bytesToTransfer > 0) {
            //滿了,會通知讀取的;空會通知寫入
            if (in == out)    awaitSpace();                                
            int nextTransferAmount = 0;
            if (out < in) {
                nextTransferAmount = buffer.length - in;
            } else if (in < out) {
                if (in == -1) {
                in = out = 0;
                nextTransferAmount = buffer.length - in;
            } else {
                nextTransferAmount = out - in;
            }
        }
        if (nextTransferAmount > bytesToTransfer)  nextTransferAmount = bytesToTransfer;
        assert(nextTransferAmount > 0);
        System.arraycopy(b, off, buffer, in, nextTransferAmount);
        bytesToTransfer -= nextTransferAmount;
        off += nextTransferAmount;
        in += nextTransferAmount;
        if (in >= buffer.length) {     in = 0;      }
        }
    }
    //檢查當前狀態,等待輸入
    private void checkStateForReceive() throws IOException {                           
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
            throw new IOException("Pipe closed");
        } else if (readSide != null && !readSide.isAlive()) {
            throw new IOException("Read end dead");
        }
    }
    
    //Buffer已滿,等待一段時間
    private void awaitSpace() throws IOException {                                              
        //in==out表示滿了,沒有空間
        while (in == out) {                                                                                             
            //檢查接受端的狀態
            checkStateForReceive();                                                                       
            //通知讀取端
            notifyAll();                                                                                  
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
    }

    //通知所有等待的線程()已經接受到最后的字節
    synchronized void receivedLast() {                  
        closedByWriter = true;                             //
        notifyAll();
    }

    public synchronized int read()  throws IOException {
        //檢查一些內部狀態
        if (!connected) {                                                                              
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
            throw new IOException("Pipe closed");
        } else if (writeSide != null && !writeSide.isAlive()&& !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }
        //當前線程讀取
        readSide = Thread.currentThread();                                            
        //重復兩次???
          int trials = 2;                                                                               
        while (in < 0) {
        //輸入斷關閉返回-1
        if (closedByWriter) {              return -1;        }                 
            //狀態錯誤
            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {         
                throw new IOException("Pipe broken");
            }
            notifyAll();        // 空了,通知寫入端可以寫入                                                                         try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
        int ret = buffer[out++] & 0xFF;                                                                         if (out >= buffer.length) {             out = 0;                }
        //沒有任何字節
        if (in == out) {           in = -1;                 }                             
        return ret;
    }

    public synchronized int read(byte b[], int off, int len)  throws IOException {
        //檢查輸入參數的正確性
         if (b == null) {                                                                                 
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }
        //讀取下一個
         int c = read();                                                                                 
        //已經到達末尾了,返回-1
        if (c < 0) {    return -1;       }                                            
        //放入外部buffer中
        b[off] = (byte) c;                                                                    
        //return-len
        int rlen = 1;                                                                            
        //下一個in存在,且沒有到達len
        while ((in >= 0) && (--len > 0)) {                                          
            //依次放入外部buffer
            b[off + rlen] = buffer[out++];                                         
            rlen++;
            //讀到buffer的末尾,返回頭部
            if (out >= buffer.length) {         out = 0;           }        
            //讀、寫位置一致時,表示沒有數據
            if (in == out) {     in = -1;      }               
        }
        //返回填充的長度
        return rlen;                                                                            
    }
    //返回還有多少字節可以讀取
    public synchronized int available() throws IOException {             
        //到達末端,沒有字節
        if(in < 0)
            return 0;                                                                                         
        else if(in == out)
            //寫入的和讀出的一致,表示滿
            return buffer.length;                                                               
        else if (in > out)
            //寫入的大於讀出
            return in - out;                                                                                 
        else
            //寫入的小於讀出的
            return in + buffer.length - out;                                                
    }
    //關閉當前流,同時釋放與其相關的資源
    public void close()  throws IOException {                
        //表示由輸入流關閉
        closedByReader = true;                                             
        //同步化當前對象,in為-1
        synchronized (this) {     in = -1;    }        
    }
        
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM