本章內容包括3個部分:BufferedInputStream介紹,BufferedInputStream源碼,以及BufferedInputStream使用示例。
轉載請注明出處:http://www.cnblogs.com/skywang12345/p/io_12.html
BufferedInputStream 介紹
BufferedInputStream 是緩沖輸入流。它繼承於FilterInputStream。
BufferedInputStream 的作用是為另一個輸入流添加一些功能,例如,提供“緩沖功能”以及支持“mark()標記”和“reset()重置方法”。
BufferedInputStream 本質上是通過一個內部緩沖區數組實現的。例如,在新建某輸入流對應的BufferedInputStream后,當我們通過read()讀取輸入流的數據時,BufferedInputStream會將該輸入流的數據分批的填入到緩沖區中。每當緩沖區中的數據被讀完之后,輸入流會再次填充數據緩沖區;如此反復,直到我們讀完輸入流數據位置。
BufferedInputStream 函數列表
BufferedInputStream(InputStream in) BufferedInputStream(InputStream in, int size) synchronized int available() void close() synchronized void mark(int readlimit) boolean markSupported() synchronized int read() synchronized int read(byte[] buffer, int offset, int byteCount) synchronized void reset() synchronized long skip(long byteCount)
BufferedInputStream 源碼分析(基於jdk1.7.40)
1 package java.io; 2 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; 3 4 public class BufferedInputStream extends FilterInputStream { 5 6 // 默認的緩沖大小是8192字節 7 // BufferedInputStream 會根據“緩沖區大小”來逐次的填充緩沖區; 8 // 即,BufferedInputStream填充緩沖區,用戶讀取緩沖區,讀完之后,BufferedInputStream會再次填充緩沖區。如此循環,直到讀完數據... 9 private static int defaultBufferSize = 8192; 10 11 // 緩沖數組 12 protected volatile byte buf[]; 13 14 // 緩存數組的原子更新器。 15 // 該成員變量與buf數組的volatile關鍵字共同組成了buf數組的原子更新功能實現, 16 // 即,在多線程中操作BufferedInputStream對象時,buf和bufUpdater都具有原子性(不同的線程訪問到的數據都是相同的) 17 private static final 18 AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater = 19 AtomicReferenceFieldUpdater.newUpdater 20 (BufferedInputStream.class, byte[].class, "buf"); 21 22 // 當前緩沖區的有效字節數。 23 // 注意,這里是指緩沖區的有效字節數,而不是輸入流中的有效字節數。 24 protected int count; 25 26 // 當前緩沖區的位置索引 27 // 注意,這里是指緩沖區的位置索引,而不是輸入流中的位置索引。 28 protected int pos; 29 30 // 當前緩沖區的標記位置 31 // markpos和reset()配合使用才有意義。操作步驟: 32 // (01) 通過mark() 函數,保存pos的值到markpos中。 33 // (02) 通過reset() 函數,會將pos的值重置為markpos。接着通過read()讀取數據時,就會從mark()保存的位置開始讀取。 34 protected int markpos = -1; 35 36 // marklimit是標記的最大值。 37 // 關於marklimit的原理,我們在后面的fill()函數分析中會詳細說明。這對理解BufferedInputStream相當重要。 38 protected int marklimit; 39 40 // 獲取輸入流 41 private InputStream getInIfOpen() throws IOException { 42 InputStream input = in; 43 if (input == null) 44 throw new IOException("Stream closed"); 45 return input; 46 } 47 48 // 獲取緩沖 49 private byte[] getBufIfOpen() throws IOException { 50 byte[] buffer = buf; 51 if (buffer == null) 52 throw new IOException("Stream closed"); 53 return buffer; 54 } 55 56 // 構造函數:新建一個緩沖區大小為8192的BufferedInputStream 57 public BufferedInputStream(InputStream in) { 58 this(in, defaultBufferSize); 59 } 60 61 // 構造函數:新建指定緩沖區大小的BufferedInputStream 62 public BufferedInputStream(InputStream in, int size) { 63 super(in); 64 if (size <= 0) { 65 throw new IllegalArgumentException("Buffer size <= 0"); 66 } 67 buf = new byte[size]; 68 } 69 70 // 從“輸入流”中讀取數據,並填充到緩沖區中。 71 // 后面會對該函數進行詳細說明! 72 private void fill() throws IOException { 73 byte[] buffer = getBufIfOpen(); 74 if (markpos < 0) 75 pos = 0; /* no mark: throw away the buffer */ 76 else if (pos >= buffer.length) /* no room left in buffer */ 77 if (markpos > 0) { /* can throw away early part of the buffer */ 78 int sz = pos - markpos; 79 System.arraycopy(buffer, markpos, buffer, 0, sz); 80 pos = sz; 81 markpos = 0; 82 } else if (buffer.length >= marklimit) { 83 markpos = -1; /* buffer got too big, invalidate mark */ 84 pos = 0; /* drop buffer contents */ 85 } else { /* grow buffer */ 86 int nsz = pos * 2; 87 if (nsz > marklimit) 88 nsz = marklimit; 89 byte nbuf[] = new byte[nsz]; 90 System.arraycopy(buffer, 0, nbuf, 0, pos); 91 if (!bufUpdater.compareAndSet(this, buffer, nbuf)) { 92 throw new IOException("Stream closed"); 93 } 94 buffer = nbuf; 95 } 96 count = pos; 97 int n = getInIfOpen().read(buffer, pos, buffer.length - pos); 98 if (n > 0) 99 count = n + pos; 100 } 101 102 // 讀取下一個字節 103 public synchronized int read() throws IOException { 104 // 若已經讀完緩沖區中的數據,則調用fill()從輸入流讀取下一部分數據來填充緩沖區 105 if (pos >= count) { 106 fill(); 107 if (pos >= count) 108 return -1; 109 } 110 // 從緩沖區中讀取指定的字節 111 return getBufIfOpen()[pos++] & 0xff; 112 } 113 114 // 將緩沖區中的數據寫入到字節數組b中。off是字節數組b的起始位置,len是寫入長度 115 private int read1(byte[] b, int off, int len) throws IOException { 116 int avail = count - pos; 117 if (avail <= 0) { 118 // 加速機制。 119 // 如果讀取的長度大於緩沖區的長度 並且沒有markpos, 120 // 則直接從原始輸入流中進行讀取,從而避免無謂的COPY(從原始輸入流至緩沖區,讀取緩沖區全部數據,清空緩沖區, 121 // 重新填入原始輸入流數據) 122 if (len >= getBufIfOpen().length && markpos < 0) { 123 return getInIfOpen().read(b, off, len); 124 } 125 // 若已經讀完緩沖區中的數據,則調用fill()從輸入流讀取下一部分數據來填充緩沖區 126 fill(); 127 avail = count - pos; 128 if (avail <= 0) return -1; 129 } 130 int cnt = (avail < len) ? avail : len; 131 System.arraycopy(getBufIfOpen(), pos, b, off, cnt); 132 pos += cnt; 133 return cnt; 134 } 135 136 // 將緩沖區中的數據寫入到字節數組b中。off是字節數組b的起始位置,len是寫入長度 137 public synchronized int read(byte b[], int off, int len) 138 throws IOException 139 { 140 getBufIfOpen(); // Check for closed stream 141 if ((off | len | (off + len) | (b.length - (off + len))) < 0) { 142 throw new IndexOutOfBoundsException(); 143 } else if (len == 0) { 144 return 0; 145 } 146 147 // 讀取到指定長度的數據才返回 148 int n = 0; 149 for (;;) { 150 int nread = read1(b, off + n, len - n); 151 if (nread <= 0) 152 return (n == 0) ? nread : n; 153 n += nread; 154 if (n >= len) 155 return n; 156 // if not closed but no bytes available, return 157 InputStream input = in; 158 if (input != null && input.available() <= 0) 159 return n; 160 } 161 } 162 163 // 忽略n個字節 164 public synchronized long skip(long n) throws IOException { 165 getBufIfOpen(); // Check for closed stream 166 if (n <= 0) { 167 return 0; 168 } 169 long avail = count - pos; 170 171 if (avail <= 0) { 172 // If no mark position set then don't keep in buffer 173 if (markpos <0) 174 return getInIfOpen().skip(n); 175 176 // Fill in buffer to save bytes for reset 177 fill(); 178 avail = count - pos; 179 if (avail <= 0) 180 return 0; 181 } 182 183 long skipped = (avail < n) ? avail : n; 184 pos += skipped; 185 return skipped; 186 } 187 188 // 下一個字節是否存可讀 189 public synchronized int available() throws IOException { 190 int n = count - pos; 191 int avail = getInIfOpen().available(); 192 return n > (Integer.MAX_VALUE - avail) 193 ? Integer.MAX_VALUE 194 : n + avail; 195 } 196 197 // 標記“緩沖區”中當前位置。 198 // readlimit是marklimit,關於marklimit的作用,參考后面的說明。 199 public synchronized void mark(int readlimit) { 200 marklimit = readlimit; 201 markpos = pos; 202 } 203 204 // 將“緩沖區”中當前位置重置到mark()所標記的位置 205 public synchronized void reset() throws IOException { 206 getBufIfOpen(); // Cause exception if closed 207 if (markpos < 0) 208 throw new IOException("Resetting to invalid mark"); 209 pos = markpos; 210 } 211 212 public boolean markSupported() { 213 return true; 214 } 215 216 // 關閉輸入流 217 public void close() throws IOException { 218 byte[] buffer; 219 while ( (buffer = buf) != null) { 220 if (bufUpdater.compareAndSet(this, buffer, null)) { 221 InputStream input = in; 222 in = null; 223 if (input != null) 224 input.close(); 225 return; 226 } 227 // Else retry in case a new buf was CASed in fill() 228 } 229 } 230 }
說明:
要想讀懂BufferedInputStream的源碼,就要先理解它的思想。BufferedInputStream的作用是為其它輸入流提供緩沖功能。創建BufferedInputStream時,我們會通過它的構造函數指定某個輸入流為參數。BufferedInputStream會將該輸入流數據分批讀取,每次讀取一部分到緩沖中;操作完緩沖中的這部分數據之后,再從輸入流中讀取下一部分的數據。
為什么需要緩沖呢?原因很簡單,效率問題!緩沖中的數據實際上是保存在內存中,而原始數據可能是保存在硬盤或NandFlash等存儲介質中;而我們知道,從內存中讀取數據的速度比從硬盤讀取數據的速度至少快10倍以上。
那干嘛不干脆一次性將全部數據都讀取到緩沖中呢?第一,讀取全部的數據所需要的時間可能會很長。第二,內存價格很貴,容量不像硬盤那么大。
下面,我就BufferedInputStream中最重要的函數fill()進行說明。其它的函數很容易理解,我就不詳細介紹了,大家可以參考源碼中的注釋進行理解。
fill() 源碼如下:
private void fill() throws IOException { byte[] buffer = getBufIfOpen(); if (markpos < 0) pos = 0; else if (pos >= buffer.length) { if (markpos > 0) { /* can throw away early part of the buffer */ int sz = pos - markpos; System.arraycopy(buffer, markpos, buffer, 0, sz); pos = sz; markpos = 0; } else if (buffer.length >= marklimit) { markpos = -1; /* buffer got too big, invalidate mark */ pos = 0; /* drop buffer contents */ } else { /* grow buffer */ int nsz = pos * 2; if (nsz > marklimit) nsz = marklimit; byte nbuf[] = new byte[nsz]; System.arraycopy(buffer, 0, nbuf, 0, pos); if (!bufUpdater.compareAndSet(this, buffer, nbuf)) { // Can't replace buf if there was an async close. // Note: This would need to be changed if fill() // is ever made accessible to multiple threads. // But for now, the only way CAS can fail is via close. // assert buf == null; throw new IOException("Stream closed"); } buffer = nbuf; } } count = pos; int n = getInIfOpen().read(buffer, pos, buffer.length - pos); if (n > 0) count = n + pos; }
根據fill()中的if...else...,下面我們將fill分為5種情況進行說明。
情況1:讀取完buffer中的數據,並且buffer沒有被標記
執行流程如下,
(01) read() 函數中調用 fill()
(02) fill() 中的 if (markpos < 0) ...
為了方便分析,我們將這種情況下fill()執行的操作等價於以下代碼:
private void fill() throws IOException { byte[] buffer = getBufIfOpen(); if (markpos < 0) pos = 0; count = pos; int n = getInIfOpen().read(buffer, pos, buffer.length - pos); if (n > 0) count = n + pos; }
說明:
這種情況發生的情況是 — — 輸入流中有很長的數據,我們每次從中讀取一部分數據到buffer中進行操作。每次當我們讀取完buffer中的數據之后,並且此時輸入流沒有被標記;那么,就接着從輸入流中讀取下一部分的數據到buffer中。
其中,判斷是否讀完buffer中的數據,是通過 if (pos >= count) 來判斷的;
判斷輸入流有沒有被標記,是通過 if (markpos < 0) 來判斷的。
理解這個思想之后,我們再對這種情況下的fill()的代碼進行分析,就特別容易理解了。
(01) if (markpos < 0) 它的作用是判斷“輸入流是否被標記”。若被標記,則markpos大於/等於0;否則markpos等於-1。
(02) 在這種情況下:通過getInIfOpen()獲取輸入流,然后接着從輸入流中讀取buffer.length個字節到buffer中。
(03) count = n + pos; 這是根據從輸入流中讀取的實際數據的多少,來更新buffer中數據的實際大小。
情況2:讀取完buffer中的數據,buffer的標記位置>0,並且buffer中沒有多余的空間
執行流程如下,
(01) read() 函數中調用 fill()
(02) fill() 中的 else if (pos >= buffer.length) ...
(03) fill() 中的 if (markpos > 0) ...
為了方便分析,我們將這種情況下fill()執行的操作等價於以下代碼:
private void fill() throws IOException { byte[] buffer = getBufIfOpen(); if (markpos >= 0 && pos >= buffer.length) { if (markpos > 0) { int sz = pos - markpos; System.arraycopy(buffer, markpos, buffer, 0, sz); pos = sz; markpos = 0; } } count = pos; int n = getInIfOpen().read(buffer, pos, buffer.length - pos); if (n > 0) count = n + pos; }
說明:
這種情況發生的情況是 — — 輸入流中有很長的數據,我們每次從中讀取一部分數據到buffer中進行操作。當我們讀取完buffer中的數據之后,並且此時輸入流存在標記時;那么,就發生情況2。此時,我們要保留“被標記位置”到“buffer末尾”的數據,然后再從輸入流中讀取下一部分的數據到buffer中。
其中,判斷是否讀完buffer中的數據,是通過 if (pos >= count) 來判斷的;
判斷輸入流有沒有被標記,是通過 if (markpos < 0) 來判斷的。
判斷buffer中沒有多余的空間,是通過 if (pos >= buffer.length) 來判斷的。
理解這個思想之后,我們再對這種情況下的fill()代碼進行分析,就特別容易理解了。
(01) int sz = pos - markpos; 作用是“獲取‘被標記位置’到‘buffer末尾’”的數據長度。
(02) System.arraycopy(buffer, markpos, buffer, 0, sz); 作用是“將buffer中從markpos開始的數據”拷貝到buffer中(從位置0開始填充,填充長度是sz)。接着,將sz賦值給pos,即pos就是“被標記位置”到“buffer末尾”的數據長度。
(03) int n = getInIfOpen().read(buffer, pos, buffer.length - pos); 從輸入流中讀取出“buffer.length - pos”的數據,然后填充到buffer中。
(04) 通過第(02)和(03)步組合起來的buffer,就是包含了“原始buffer被標記位置到buffer末尾”的數據,也包含了“從輸入流中新讀取的數據”。
注意:執行過情況2之后,markpos的值由“大於0”變成了“等於0”!
情況3:讀取完buffer中的數據,buffer被標記位置=0,buffer中沒有多余的空間,並且buffer.length>=marklimit
執行流程如下,
(01) read() 函數中調用 fill()
(02) fill() 中的 else if (pos >= buffer.length) ...
(03) fill() 中的 else if (buffer.length >= marklimit) ...
為了方便分析,我們將這種情況下fill()執行的操作等價於以下代碼:
private void fill() throws IOException { byte[] buffer = getBufIfOpen(); if (markpos >= 0 && pos >= buffer.length) { if ( (markpos <= 0) && (buffer.length >= marklimit) ) { markpos = -1; /* buffer got too big, invalidate mark */ pos = 0; /* drop buffer contents */ } } count = pos; int n = getInIfOpen().read(buffer, pos, buffer.length - pos); if (n > 0) count = n + pos; }
說明:這種情況的處理非常簡單。首先,就是“取消標記”,即 markpos = -1;然后,設置初始化位置為0,即pos=0;最后,再從輸入流中讀取下一部分數據到buffer中。
情況4:讀取完buffer中的數據,buffer被標記位置=0,buffer中沒有多余的空間,並且buffer.length<marklimit
執行流程如下,
(01) read() 函數中調用 fill()
(02) fill() 中的 else if (pos >= buffer.length) ...
(03) fill() 中的 else { int nsz = pos * 2; ... }
為了方便分析,我們將這種情況下fill()執行的操作等價於以下代碼:
private void fill() throws IOException { byte[] buffer = getBufIfOpen(); if (markpos >= 0 && pos >= buffer.length) { if ( (markpos <= 0) && (buffer.length < marklimit) ) { int nsz = pos * 2; if (nsz > marklimit) nsz = marklimit; byte nbuf[] = new byte[nsz]; System.arraycopy(buffer, 0, nbuf, 0, pos); if (!bufUpdater.compareAndSet(this, buffer, nbuf)) { throw new IOException("Stream closed"); } buffer = nbuf; } } count = pos; int n = getInIfOpen().read(buffer, pos, buffer.length - pos); if (n > 0) count = n + pos; }
說明:
這種情況的處理非常簡單。
(01) 新建一個字節數組nbuf。nbuf的大小是“pos*2”和“marklimit”中較小的那個數。
int nsz = pos * 2; if (nsz > marklimit) nsz = marklimit; byte nbuf[] = new byte[nsz];
(02) 接着,將buffer中的數據拷貝到新數組nbuf中。通過System.arraycopy(buffer, 0, nbuf, 0, pos)
(03) 最后,從輸入流讀取部分新數據到buffer中。通過getInIfOpen().read(buffer, pos, buffer.length - pos);
注意:在這里,我們思考一個問題,“為什么需要marklimit,它的存在到底有什么意義?”我們結合“情況2”、“情況3”、“情況4”的情況來分析。
假設,marklimit是無限大的,而且我們設置了markpos。當我們從輸入流中每讀完一部分數據並讀取下一部分數據時,都需要保存markpos所標記的數據;這就意味着,我們需要不斷執行情況4中的操作,要將buffer的容量擴大……隨着讀取次數的增多,buffer會越來越大;這會導致我們占據的內存越來越大。所以,我們需要給出一個marklimit;當buffer>=marklimit時,就不再保存markpos的值了。
情況5:除了上面4種情況之外的情況
執行流程如下,
(01) read() 函數中調用 fill()
(02) fill() 中的 count = pos...
為了方便分析,我們將這種情況下fill()執行的操作等價於以下代碼:
private void fill() throws IOException { byte[] buffer = getBufIfOpen(); count = pos; int n = getInIfOpen().read(buffer, pos, buffer.length - pos); if (n > 0) count = n + pos; }
說明:這種情況的處理非常簡單。直接從輸入流讀取部分新數據到buffer中。
示例代碼
關於BufferedInputStream中API的詳細用法,參考示例代碼(BufferedInputStreamTest.java):
1 import java.io.BufferedInputStream; 2 import java.io.ByteArrayInputStream; 3 import java.io.File; 4 import java.io.InputStream; 5 import java.io.FileInputStream; 6 import java.io.IOException; 7 import java.io.FileNotFoundException; 8 import java.lang.SecurityException; 9 10 /** 11 * BufferedInputStream 測試程序 12 * 13 * @author skywang 14 */ 15 public class BufferedInputStreamTest { 16 17 private static final int LEN = 5; 18 19 public static void main(String[] args) { 20 testBufferedInputStream() ; 21 } 22 23 /** 24 * BufferedInputStream的API測試函數 25 */ 26 private static void testBufferedInputStream() { 27 28 // 創建BufferedInputStream字節流,內容是ArrayLetters數組 29 try { 30 File file = new File("bufferedinputstream.txt"); 31 InputStream in = 32 new BufferedInputStream( 33 new FileInputStream(file), 512); 34 35 // 從字節流中讀取5個字節。“abcde”,a對應0x61,b對應0x62,依次類推... 36 for (int i=0; i<LEN; i++) { 37 // 若能繼續讀取下一個字節,則讀取下一個字節 38 if (in.available() >= 0) { 39 // 讀取“字節流的下一個字節” 40 int tmp = in.read(); 41 System.out.printf("%d : 0x%s\n", i, Integer.toHexString(tmp)); 42 } 43 } 44 45 // 若“該字節流”不支持標記功能,則直接退出 46 if (!in.markSupported()) { 47 System.out.println("make not supported!"); 48 return ; 49 } 50 51 // 標記“當前索引位置”,即標記第6個位置的元素--“f” 52 // 1024對應marklimit 53 in.mark(1024); 54 55 // 跳過22個字節。 56 in.skip(22); 57 58 // 讀取5個字節 59 byte[] buf = new byte[LEN]; 60 in.read(buf, 0, LEN); 61 // 將buf轉換為String字符串。 62 String str1 = new String(buf); 63 System.out.printf("str1=%s\n", str1); 64 65 // 重置“輸入流的索引”為mark()所標記的位置,即重置到“f”處。 66 in.reset(); 67 // 從“重置后的字節流”中讀取5個字節到buf中。即讀取“fghij” 68 in.read(buf, 0, LEN); 69 // 將buf轉換為String字符串。 70 String str2 = new String(buf); 71 System.out.printf("str2=%s\n", str2); 72 73 in.close(); 74 } catch (FileNotFoundException e) { 75 e.printStackTrace(); 76 } catch (SecurityException e) { 77 e.printStackTrace(); 78 } catch (IOException e) { 79 e.printStackTrace(); 80 } 81 } 82 }
程序中讀取的bufferedinputstream.txt的內容如下:
abcdefghijklmnopqrstuvwxyz 0123456789 ABCDEFGHIJKLMNOPQRSTUVWXYZ
運行結果:
0 : 0x61
1 : 0x62
2 : 0x63
3 : 0x64
4 : 0x65
str1=01234
str2=fghij
更多內容
03. java io系列02之 ByteArrayInputStream的簡介,源碼分析和示例(包括InputStream)
04. java io系列03之 ByteArrayOutputStream的簡介,源碼分析和示例(包括OutputStream)
05. java io系列04之 管道(PipedOutputStream和PipedInputStream)的簡介,源碼分析和示例
06. java io系列05之 ObjectInputStream 和 ObjectOutputStream
07. java io系列06之 序列化總結(Serializable 和 Externalizable)
08. java io系列07之 FileInputStream和FileOutputStream
10. java io系列09之 FileDescriptor總結
11. java io系列10之 FilterInputStream
12. java io系列11之 FilterOutputStream
13. java io系列12之 BufferedInputStream(緩沖輸入流)的認知、源碼和示例