okio:定義簡短高效


 

歡迎關注公眾號,第一時間獲取最新文章:

 

本篇目錄

 

一、前言

okio是大名鼎鼎的square公司開發出來的,其是okhttp的底層io操作庫,既然已經有java原生的io庫為什么還要自己費盡開發一套呢?java原生的io操作存在很多問題,比如讀寫阻塞,內存管理並不高效,體系臃腫,api調用不精簡,以上我個人認為okio改進最大的地方是內存管理方面,比如我們拷貝數據java原生io數據轉移大體過程如下:


而okio中過程如下:

少了一個中間數據拷貝的過程,這樣效率會提升很多,並且okio中數據緩存的處理更是精心設計的,我覺得這部分才是其精華所在:okio將數據(Buffer)采用雙向循環鏈表的方式組織在一起,並且鏈表每個結點數據存儲在一個個數組(Segment)中,結構如下:

這樣的存儲結構有很多好處,拷貝數據我們可以直接移動指針而不像原生io那樣需要一個個字節拷貝過去,這樣會大大提高數據轉移的效率。

再來簡要看一下API的使用簡潔性

向file中寫入數據,原生io實現如下:

 1public static void writeTest(File file) {
 2    try {
 3        FileOutputStream fos = new FileOutputStream(file);
 4        OutputStream os = new BufferedOutputStream(fos);
 5        DataOutputStream dos = new DataOutputStream(os);
 6        dos.writeUTF("write string by utf-8.\n");
 7        dos.writeInt(1234);
 8        dos.flush();
 9        fos.close();
10    } catch (Exception e) {
11        e.printStackTrace();
12    }
13}

用okio實現:

1 public static void writeTest(File file) {
2    try {
3        Okio.buffer(Okio.sink(file))
4            .writeUtf8("write string by utf-8.\n")
5            .writeInt(1234).close();
6    } catch (Exception e) {
7        e.printStackTrace();
8    }
9 }

以上demo很好的體現了okio api的簡潔性。

通過以上比較你應該能感受到okio的強大之處,但是也要明白一點okio也是完全基於原生InputStream與OutputStream來進行封裝的,並沒有完全舍棄原生io,可以理解為對原生io的封裝擴展,重點優化了緩存部分,至於緩存部分后續分析完源碼你會有更深入的理解。

另外okio提供數據訪問的超時機制,訪問資源可以控制時間。

okio的源碼比較簡短,建議有時間好好閱讀一下。

二、頂級接口Source與Sink

Source與Sink是Okio中的輸入流接口和輸出流接口,對應原生IO的InputStream和OutputStream。

先看下Source源碼:

1 public interface Source extends Closeable {
2
3  long read(Buffer sink, long byteCount) throws IOException;
4
5  Timeout timeout();
6
7  @Override void close() throws IOException;
8 }

很簡單就定義了幾個方法,讀數據到sink中以及關閉資源的方法,至於timeout方法暫時先不用管,后面提超時機制的時候會分析。

Sink源碼:

 1 public interface Sink extends Closeable, Flushable {
 2
 3  void write(Buffer source, long byteCount) throws IOException;
 4
 5  @Override void flush() throws IOException;
 6
 7  Timeout timeout();
 8
 9  @Override void close() throws IOException;
10 }

同樣比較簡單,沒什么好說的,自己看一下就可以了。

三、BufferedSource與BufferedSink

BufferedSource與BufferedSink同樣是兩個接口類,分別繼承Source與Sink接口,BufferedSource與BufferedSink是具有緩存功能的接口,各自維護了一個buffer,同時提供了很多實用的api調用接口,平時我們使用也主要是調用這兩個類中定義的方法。

BufferedSink類:

 1 public interface BufferedSink extends Sink {
 2  /** Returns this sink's internal buffer. */
 3  Buffer buffer();
 4
 5  BufferedSink write(ByteString byteString) throws IOException;
 6
 7  BufferedSink write(byte[] source) throws IOException;
 8
 9  BufferedSink write(byte[] source, int offset, int byteCount) throws IOException;
10
11  long writeAll(Source source) throws IOException;
12
13  BufferedSink write(Source source, long byteCount) throws IOException;
14
15  BufferedSink writeUtf8(String string) throws IOException;
16
17  BufferedSink writeUtf8(String string, int beginIndex, int endIndex) throws IOException;
18
19  /** Encodes {@code codePoint} in UTF-8 and writes it to this sink. */
20  BufferedSink writeUtf8CodePoint(int codePoint) throws IOException;
21
22  /** Encodes {@code string} in {@code charset} and writes it to this sink. */
23  BufferedSink writeString(String string, Charset charset) throws IOException;
24
25  BufferedSink writeString(String string, int beginIndex, int endIndex, Charset charset)
26      throws IOException;
27
28  /** Writes a byte to this sink. */
29  BufferedSink writeByte(int b) throws IOException;
30
31  BufferedSink writeShort(int s) throws IOException;
32
33  BufferedSink writeShortLe(int s) throws IOException;
34
35  BufferedSink writeInt(int i) throws IOException;
36
37  BufferedSink writeIntLe(int i) throws IOException;
38
39  BufferedSink writeLong(long v) throws IOException;
40
41  BufferedSink writeLongLe(long v) throws IOException;
42
43  BufferedSink writeDecimalLong(long v) throws IOException;
44
45  BufferedSink writeHexadecimalUnsignedLong(long v) throws IOException;
46
47  @Override void flush() throws IOException;
48
49  BufferedSink emit() throws IOException;
50
51  BufferedSink emitCompleteSegments() throws IOException;
52
53  /** Returns an output stream that writes to this sink. */
54  OutputStream outputStream();
55 }

就是定義了一些寫方便的方法,其中emit()與flush()方法剛接觸同學可能有些生疏,去看下源碼中注釋就明白了,其余都比較簡單了,不熟悉可以看下注釋,老外寫代碼挺注重注釋的~

BufferedSource類源碼這里只看一部分了,與BufferedSink對應:

1 public interface BufferedSource extends Source {
 2
 3  /** Returns this source's internal buffer. */
 4  Buffer buffer();
 5
 6  /**
 7   * Returns when the buffer contains at least {@code byteCount} bytes. Throws an
 8   * {@link java.io.EOFException} if the source is exhausted before the required bytes can be read.
 9   */
10  void require(long byteCount) throws IOException;
11
12  /**
13   * Returns true when the buffer contains at least {@code byteCount} bytes, expanding it as
14   * necessary. Returns false if the source is exhausted before the requested bytes can be read.
15   */
16  boolean request(long byteCount) throws IOException;
17
18  /** Removes a byte from this source and returns it. */
19  byte readByte() throws IOException;
20
21  short readShort() throws IOException;
22
23  short readShortLe() throws IOException;
24
25  long readLong() throws IOException;
26
27  /** Removes all bytes bytes from this and returns them as a byte string. */
28  ByteString readByteString() throws IOException;
29
30  /** Removes {@code byteCount} bytes from this and returns them as a byte array. */
31  byte[] readByteArray(long byteCount) throws IOException;
32
33  int read(byte[] sink) throws IOException;
34
35  void readFully(byte[] sink) throws IOException;
36
37  int read(byte[] sink, int offset, int byteCount) throws IOException;
38
39  long readAll(Sink sink) throws IOException;
40
41  String readUtf8() throws IOException;
42
43  String readUtf8Line() throws IOException;
44
45  /** Returns an input stream that reads from this source. */
46  InputStream inputStream();
47 }

這里只是列出了部分定義的方法,大體看一下就可以了,就是各種讀的方法。

四、 RealBufferedSink 和 RealBufferedSource

上面提到的都是接口類,具體的實現類分別是RealBufferedSink和 RealBufferedSource ,其實這兩個類也不算具體實現類,只是Buffer類的代理類,具體功能都在Buffer類里面實現的。

RealBufferedSink類部分源碼:

1 final class RealBufferedSink implements BufferedSink {
 2  public final Buffer buffer = new Buffer();
 3  public final Sink sink;
 4  boolean closed;
 5
 6  RealBufferedSink(Sink sink) {
 7    if (sink == null) throw new NullPointerException("sink == null");
 8    this.sink = sink;
 9  }
10
11  @Override public Buffer buffer() {
12    return buffer;
13  }
14
15  @Override public void write(Buffer source, long byteCount)
16      throws IOException {
17    if (closed) throw new IllegalStateException("closed");
18    //調用buffer的write方法
19    buffer.write(source, byteCount);
20    emitCompleteSegments();
21  }
22
23  @Override public BufferedSink write(ByteString byteString) throws IOException {
24    if (closed) throw new IllegalStateException("closed");
25     //調用buffer的write方法
26    buffer.write(byteString);
27    return emitCompleteSegments();
28  }
29
30  @Override public BufferedSink writeUtf8(String string) throws IOException {
31    if (closed) throw new IllegalStateException("closed");
32    //調用buffer的writeUtf8方法
33    buffer.writeUtf8(string);
34    return emitCompleteSegments();
35  }
36  。。。。。。。。
37
38   @Override public BufferedSink emitCompleteSegments() throws IOException {
39    if (closed) throw new IllegalStateException("closed");
40    long byteCount = buffer.completeSegmentByteCount();
41    //將緩存中的數據寫出到流中
42    if (byteCount > 0) sink.write(buffer, byteCount);
43    return this;
44  }
45
46  @Override public BufferedSink emit() throws IOException {
47    if (closed) throw new IllegalStateException("closed");
48    long byteCount = buffer.size();
49    //將緩存中的數據寫出到流中
50    if (byteCount > 0) sink.write(buffer, byteCount);
51    return this;
52  }
53
54  @Override public void flush() throws IOException {
55    if (closed) throw new IllegalStateException("closed");
56    if (buffer.size > 0) {
57     //先寫出數據
58      sink.write(buffer, buffer.size);
59    }
60    sink.flush();
61  }
62
63  @Override public void close() throws IOException {
64    if (closed) return;
65
66    // Emit buffered data to the underlying sink. If this fails, we still need
67    // to close the sink; otherwise we risk leaking resources.
68    Throwable thrown = null;
69    try {
70      if (buffer.size > 0) {
71          //先寫出數據
72        sink.write(buffer, buffer.size);
73      }
74    } catch (Throwable e) {
75      thrown = e;
76    }
77
78    try {
79      //關閉流
80      sink.close();
81    } catch (Throwable e) {
82      if (thrown == null) thrown = e;
83    }
84    closed = true;
85
86    if (thrown != null) Util.sneakyRethrow(thrown);
87  }
88
89  @Override public Timeout timeout() {
90    return sink.timeout();
91  }
92
93  @Override public String toString() {
94    return "buffer(" + sink + ")";
95  }
96 }

看到了吧,RealBufferedSink實現BufferedSink接口,內部維護了一個Buffer,操作基本都是由Buffer來完成的,寫數據首先將數據寫到Buffer中,然后調用emitCompleteSegments方法將數據寫到流中。

RealBufferedSource 類部分源碼

1 final class RealBufferedSource implements BufferedSource {
 2  public final Buffer buffer = new Buffer();//緩存的buffer
 3  public final Source source;//數據源,其實就是InputStream
 4  boolean closed;
 5
 6  RealBufferedSource(Source source) {
 7    if (source == null) throw new NullPointerException("source == null");
 8    this.source = source;
 9  }
10
11  @Override public Buffer buffer() {
12    return buffer;
13  }
14  //讀數據到輸出流sink中
15  @Override public long read(Buffer sink, long byteCount) throws IOException {
16    if (sink == null) throw new IllegalArgumentException("sink == null");
17    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
18    if (closed) throw new IllegalStateException("closed");
19    //檢查緩存中是否有數據
20    if (buffer.size == 0) {
21     //緩存中沒有數據則先讀取 Segment.SIZE數量數據到buffer緩存中
22      long read = source.read(buffer, Segment.SIZE);
23      if (read == -1) return -1;
24    }
25    //buffer可能沒有byteCount數量數據,這里檢查一下
26    long toRead = Math.min(byteCount, buffer.size);
27    return buffer.read(sink, toRead);
28  }
29    //申請byteCount數量數據到緩存中
30    @Override public void require(long byteCount) throws IOException {
31    if (!request(byteCount)) throw new EOFException();
32  }
33
34  @Override public boolean request(long byteCount) throws IOException {
35    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
36    if (closed) throw new IllegalStateException("closed");
37    //申請數據到緩存中:緩存中的數據
38    while (buffer.size < byteCount) {//緩存中沒有足夠數據,則從數據源source讀取數據到buffer中
39      if (source.read(buffer, Segment.SIZE) == -1) return false;
40    }
41    return true;
42  }
43
44  @Override public byte readByte() throws IOException {
45    require(1);//先申請數據到緩存中,然后在讀出來
46    return buffer.readByte();
47  }
48
49  @Override public ByteString readByteString() throws IOException {
50    buffer.writeAll(source);//將數據源source中數據全部讀取到buffer中
51    return buffer.readByteString();//buffer中讀取全部數據
52  }
53
54  。。。。
55
56  @Override public void close() throws IOException {
57    if (closed) return;
58    closed = true;
59    source.close();
60    buffer.clear();
61  }
62
63  @Override public Timeout timeout() {
64    return source.timeout();
65  }
66
67  @Override public String toString() {
68    return "buffer(" + source + ")";
69  }
70 }

RealBufferedSource 實現BufferedSource 接口,內部同樣維護了一個Buffer,讀數據大體流程都是先將數據從數據源source讀取到緩存buffer中,然后再從buffer讀取就完了。

看完上述大體流程應該明白Buffer緩存類是okio中的核心了,其實個人看完okio源碼覺得其余方面都不用太關心,okio嫌棄的就是原生io的緩存機制有點“low”,所以這部分才是重點,至於其余TimeOut超時機制都是小功能了,下面我們一起看看okio的緩存機制。

OKIO中的緩存機制

先來簡單看一下Buffer類:

1 public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
 2  。。。。
 3  Segment head;
 4  long size;
 5
 6  public Buffer() {
 7  }
 8
 9  @Override public Buffer buffer() {
10    return this;
11  }
12
13。。。。。
14
15  /** Write {@code byteCount} bytes from this to {@code out}. */
16  public Buffer writeTo(OutputStream out, long byteCount) throws IOException {
17    if (out == null) throw new IllegalArgumentException("out == null");
18    checkOffsetAndCount(size, 0, byteCount);
19
20    Segment s = head;
21    while (byteCount > 0) {
22      int toCopy = (int) Math.min(byteCount, s.limit - s.pos);
23      out.write(s.data, s.pos, toCopy);
24
25      s.pos += toCopy;
26      size -= toCopy;
27      byteCount -= toCopy;
28
29      if (s.pos == s.limit) {
30        Segment toRecycle = s;
31        head = s = toRecycle.pop();
32        SegmentPool.recycle(toRecycle);
33      }
34    }
35    return this;
36  }
37 }

這里我只是列出了一部分Buffer類源碼可以看到類中用到了Segment與SegmentPool,在開篇中已經說過okio的緩存結構,這里再看一下:


Buffer類內部維護了一個Segment構成的雙向循環鏈表,okio將緩存切成一個個很小的片段,每個片段就是Segment,我們寫數據或者讀數據都是操作的Segment中維護的一個個數組,而SegmentPool維護被回收的Segment,這樣創建Segment的時候從SegmentPool取就可以了,有緩存直接用緩存的,沒有再新創建Segment。

五、 Segment解析

我們先看一下Segment類源碼

Segment中的變量:

 1  //每一個Segment所包含最大數據量
 2  static final int SIZE = 8192;
 3
 4  //分享數據的時候會用到,后續會介紹
 5  static final int SHARE_MINIMUM = 1024;
 6 //盛放數據的數組
 7  final byte[] data;
 8
 9  //data數組中第一個可讀的位置
10  int pos;
11
12  //data中第一個可寫的位置
13  int limit;
14
15  //分享數據相關,如果有別的Segment使用同樣的data[]則為true
16  //如果我們將自己的數據分享給了別的Segment則置為true
17  boolean shared;
18
19  //當前Segment擁有data[]並且能寫入數據則為true
20  boolean owner;
21
22  //前一個Segment
23  Segment next;
24
25  //后一個Segment
26  Segment prev;

都比較簡單,分享數據相關的字段先放一下,后面會詳細說明,這里要明白pos與limit含義,Segment中data[]數據整體說明如下:


所以Segment中數據量計算方式為:limit-pos

Segment中構造函數

 1 //此方式創建Segment,data數據是自己創建的,不是分享而來的,所以owner為true,shared為false
 2  Segment() {
 3    this.data = new byte[SIZE];
 4    this.owner = true;
 5    this.shared = false;
 6  }
 7   //此方式創建Segment,data數據是別的Segment分享而來的,所以owner為false,shared為true
 8  Segment(Segment shareFrom) {
 9    this(shareFrom.data, shareFrom.pos, shareFrom.limit);
10    shareFrom.shared = true;//別的Segment分享了自己的數據,同樣標記shared為true
11  }
12   //此方式創建Segment,data數據是外部傳入的,所以owner為false,shared為true
13  Segment(byte[] data, int pos, int limit) {
14    this.data = data;
15    this.pos = pos;
16    this.limit = limit;
17    this.owner = false;
18    this.shared = true;
19  }

通過以上構造函數我們應該明白如果一個Segment的數據分享給了別的Segment或者自己的數據是別的Segment分享而來的,那么shared都會標記為true,表示分享了數據或者數據被分享而來的。

這個分享是什么鳥意思呢?到這里記住有這個玩意就可以了,后面會用的。

接下來看一下Segment中的方法

pop方法:

1  public Segment pop() {
2    Segment result = next != this ? next : null;
3    prev.next = next;
4    next.prev = prev;
5    next = null;
6    prev = null;
7    return result;
8  }

pop方法很簡單就是將當前segment結點從循環鏈表中彈出並返回下一個segment,如果你對鏈表增刪不熟悉自己一定畫一下,源碼中很多這種操作,都是很簡單的,這里不過多解釋。

push方法

1  public Segment push(Segment segment) {
2    segment.prev = this;
3    segment.next = next;
4    next.prev = segment;
5    next = segment;
6    return segment;
7  }

push方法就是將segment加入到當前segment的后面,並返回加入的segment,鏈表的操作如果看不出來就自己畫一下。

split分割方法

 1  //將一個Segment分割成兩個Segment,byteCount為分割出去多少數據
 2  public Segment split(int byteCount) {
 3    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
 4    Segment prefix;
 5    //一下這段注釋很重要,可以說是時間空間的平衡
 6    // We have two competing performance goals://我們有兩個目標
 7    //  - Avoid copying data. We accomplish this by sharing segments.//為了避免拷貝數據,我們采用分享數據的方法
 8    //  - Avoid short shared segments. These are bad for performance because they are readonly and
 9    //    may lead to long chains of short segments.//分享后數據只能只讀,不能再寫入數據,
10    // To balance these goals we only share segments when the copy will be large.
11    //綜上,okio分割數據時為了性能的考慮,這里只有在涉及大量數據拷貝的時候才會采用分享數據的策略,而不是拷貝數據,分享數據就是創建一個新的Segment,然后將當前Segment數據分享byteCount數量給新的Segmen
12    if (byteCount >= SHARE_MINIMUM) {//分割數據量大於SHARE_MINIMUM約定的數量okio則認為是大數據量
13      //采用分享數據的方式,創建新的Segment,而不是拷貝數據消耗CPU,空間換時間
14      prefix = new Segment(this);
15    } else {
16     //數據量小則直接拷貝數據就可以了,消耗不了多少CPU性能
17      prefix = SegmentPool.take();
18      System.arraycopy(data, pos, prefix.data, 0, byteCount);
19    }
20    //新Segment的寫位置limit為pos加上byteCount
21    prefix.limit = prefix.pos + byteCount;
22    //當前Segment中data的讀位置往后移byteCount
23    pos += byteCount;
24    prev.push(prefix);
25    return prefix;
26  }

上面已經給了詳細注釋,這里用圖畫一下分割完Segment變化:


這里要明白一點,分割之后兩個Segment都引用的同一個data[],只是數據的讀寫位置索引發生了改變,正是兩個Segment都引用了同一個data[],所以data一旦被分享則不允許再寫入數據,data被分享也就是多個Segment引用了同一個data,如果還允許寫那肯定就亂了,就不能很好的控制data中的數據了,所以只能讀。

那這個split分割操作有什么用呢?okio中Buffer類的write(Buffer source, long byteCount)方法有一段注釋如下:

1    // Splitting segments
2    //
3    // Occasionally we write only part of a source buffer to a sink buffer. For
4    // example, given a sink [51%, 91%], we may want to write the first 30% of
5    // a source [92%, 82%] to it. To simplify, we first transform the source to
6    // an equivalent buffer [30%, 62%, 82%] and then move the head segment,
7    // yielding sink [51%, 91%, 30%] and source [62%, 82%].

解釋一下:有時候我們需要將source buffer緩沖區數據部分寫入sink buffer緩沖區,比如,sink buffer緩沖區數據狀態為 [51%, 91%],source buffer緩沖區數據狀態為[92%, 82%] ,我們只想寫30%的數據到sink buffer緩沖區,這時我們首先將source buffer中的92%容量的Segment分割為30%與62%,然后將30%的Segment一次寫出去就可以了,這樣是不是就高效多了,我們不用一點點的寫出去,先分割然后一次性寫出去顯然效率高很多。

writeTo方法: 將Segment數據寫入到另一個Segment中去

1  /** Moves {@code byteCount} bytes from this segment to {@code sink}. */
 2  public void writeTo(Segment sink, int byteCount) {
 3    //首先判斷是否能寫入數據,分享的Segment則不能寫入數據,只能讀數據
 4    if (!sink.owner) throw new IllegalArgumentException();
 5    //首先判斷剩余空間能否容納byteCount數量的數據
 6    if (sink.limit + byteCount > SIZE) {//不能容納byteCount數量的數據,考慮向前移動數據為了容納byteCount數量數據
 7      // We can't fit byteCount bytes at the sink's current position. Shift sink first.
 8      //daata[]數據被分享了不能再移動其數據塊
 9      if (sink.shared) throw new IllegalArgumentException();
10      if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
11      //不拷貝數據,只是將原數據整體往前移動到開頭位置
12      System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
13      sink.limit -= sink.pos;//寫數據位置向前移動
14      sink.pos = 0;//重置讀數據位置為0
15    }
16    //到這里說明剩余空間放得下byteCount數量數據
17    System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
18    //拷貝完同樣移動讀寫數據位置
19    sink.limit += byteCount;
20    pos += byteCount;
21  }

writeTo方法大體邏輯:當我們將byteCount數據寫入一個Segment中的時候會檢查剩余可寫的數據塊能否盛放下byteCount數據,如果不能則考慮將已經存在的數據整體向前移動,如果還不能則拋出異常,如果可以則移動數據后將byteCount數據放入Segment中,再用圖來表示一下移動數據塊流程:


compact()方法

1  /**
 2   * 如果當前鏈表尾部Segment與其前一個Segment(也就是鏈表中頭部Segment)的數據均為占滿其整體容量的50%,則考慮壓縮這兩個Segment為一個Segment,這樣就可以節省空間了
 3   * Call this when the tail and its predecessor may both be less than half
 4   * full. This will copy data so that segments can be recycled.
 5   */
 6  public void compact() {
 7    if (prev == this) throw new IllegalStateException();
 8    //前面的Segment是不可寫的則不能壓縮數據:data[]不是自己擁有
 9    if (!prev.owner) return; // Cannot compact: prev isn't writable.
10    //當前Segment的數據量byteCount
11    int byteCount = limit - pos;
12    //前一個Segment的可用空間:如果data[]沒有被分享則從0開始到pos的空間也算在可用空間內,writeTo方法內部會移動數據塊到data[]開始的位置
13    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
14    //要寫入的數據量byteCount大於可用空間則直接返回,表示盛放不下
15    if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
16    //當前Segment數據寫入前一個Segment中
17    writeTo(prev, byteCount);
18    //鏈表中斷開當前Segment
19    pop();
20    //回收當前Segment進緩存池,循環利用
21    SegmentPool.recycle(this);
22  }

compact()主要作用就是合並兩個Segment,節省內存,這里也是優化的作用,可見okio對很多方面做了優化。

六、 SegmentPool解析

接下來我們看下SegmentPool,也就是Segment的緩存池,SegmentPool內部維持一條單鏈表保存被回收的Segment,緩存池的大小限制為64KB,每個Segment大小最大為8KB,所以SegmentPool最多存儲8個Segment。

SegmentPool存儲結構為單向鏈表,結構如圖:


SegmentPool源碼解析:

 1 final class SegmentPool {
 2  //緩存池的大小限制為64KB
 3  static final long MAX_SIZE = 64 * 1024; // 64 KiB.
 4  //鏈表頭部指針
 5  static Segment next;
 6
 7  //已經存儲的緩存大小
 8  static long byteCount;
 9
10  private SegmentPool() {
11  }
12
13 //從SegmentPool獲取一個Segment
14  static Segment take() {
15    synchronized (SegmentPool.class) {
16      if (next != null) {//緩存池中有緩存的Segment
17        //一下就是單向鏈表刪除結點的邏輯,比較簡單
18        Segment result = next;
19        next = result.next;
20        result.next = null;
21        byteCount -= Segment.SIZE;
22        return result;
23      }
24    }
25    //如果緩存中沒有Segment則新建一個Segment返回
26    return new Segment(); 
27  }
28
29  //回收Segment進入SegmentPool
30  static void recycle(Segment segment) {
31    if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
32    //分享的Segment不能被回收
33    if (segment.shared) return; // This segment cannot be recycled.
34    synchronized (SegmentPool.class) {
35      //容量判斷
36      if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
37      byteCount += Segment.SIZE;
38      //將Segment加入單向鏈表中
39      segment.next = next;
40      segment.pos = segment.limit = 0;
41      next = segment;
42    }
43  }
44 }

SegmentPool 很簡單,內部維護一個回收的單向Segment的鏈表,方便復用,節省GC操作。

以上介紹了Segment與SegmentPool類,現在我們可以回頭看Buffer類了。

七、 Buffer類核心解析

Buffer類是讀寫操作的具體實現,實現了BufferedSource, BufferedSink接口。

Segment的split,compact操作是Buffer類中的write(Buffer source, long byteCount)方法,該方法把傳入的source Buffer的前byteCount個字節寫到調用該方法的Buffer中去,接下來我們仔細分析一下write(Buffer source, long byteCount)方法:

1public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
  2
  3  Segment head;//Buffer類中雙向循環鏈表的頭結點
  4  long size;//Buffer中存儲的數據大小
  5
  6  public Buffer() {
  7  }
  8
  9  。。。。。。
 10  //將傳入的source Buffer中byteCount數量數據寫入調用此方法的Buffer中
 11  @Override public void write(Buffer source, long byteCount) {
 12    //以下英文注釋基本描述了該類的核心
 13    // Move bytes from the head of the source buffer to the tail of this buffer
 14    // while balancing two conflicting goals: don't waste CPU and don't waste
 15    // memory.
 16    //
 17    //
 18    // Don't waste CPU (ie. don't copy data around).
 19    //
 20    // Copying large amounts of data is expensive. Instead, we prefer to
 21    // reassign entire segments from one buffer to the other.
 22    //
 23    //
 24    // Don't waste memory.
 25    //
 26    // As an invariant, adjacent pairs of segments in a buffer should be at
 27    // least 50% full, except for the head segment and the tail segment.
 28    //
 29    // The head segment cannot maintain the invariant because the application is
 30    // consuming bytes from this segment, decreasing its level.
 31    //
 32    // The tail segment cannot maintain the invariant because the application is
 33    // producing bytes, which may require new nearly-empty tail segments to be
 34    // appended.
 35    //
 36    //
 37    // Moving segments between buffers
 38    //
 39    // When writing one buffer to another, we prefer to reassign entire segments
 40    // over copying bytes into their most compact form. Suppose we have a buffer
 41    // with these segment levels [91%, 61%]. If we append a buffer with a
 42    // single [72%] segment, that yields [91%, 61%, 72%]. No bytes are copied.
 43    //
 44    // Or suppose we have a buffer with these segment levels: [100%, 2%], and we
 45    // want to append it to a buffer with these segment levels [99%, 3%]. This
 46    // operation will yield the following segments: [100%, 2%, 99%, 3%]. That
 47    // is, we do not spend time copying bytes around to achieve more efficient
 48    // memory use like [100%, 100%, 4%].
 49    //
 50    // When combining buffers, we will compact adjacent buffers when their
 51    // combined level doesn't exceed 100%. For example, when we start with
 52    // [100%, 40%] and append [30%, 80%], the result is [100%, 70%, 80%].
 53    //
 54    //
 55    // Splitting segments
 56    //
 57    // Occasionally we write only part of a source buffer to a sink buffer. For
 58    // example, given a sink [51%, 91%], we may want to write the first 30% of
 59    // a source [92%, 82%] to it. To simplify, we first transform the source to
 60    // an equivalent buffer [30%, 62%, 82%] and then move the head segment,
 61    // yielding sink [51%, 91%, 30%] and source [62%, 82%].
 62
 63    if (source == null) throw new IllegalArgumentException("source == null");
 64    if (source == this) throw new IllegalArgumentException("source == this");
 65    checkOffsetAndCount(source.size, 0, byteCount);
 66
 67    while (byteCount > 0) {
 68      // Is a prefix of the source's head segment all that we need to move?
 69      //要寫的數據量byteCount 小於source 頭結點的數據量,也就是鏈表第一個Segment包含的數據量大於byteCount 
 70      if (byteCount < (source.head.limit - source.head.pos)) {
 71        //獲取鏈表尾部的結點Segment
 72        Segment tail = head != null ? head.prev : null;
 73        //尾部結點Segment可寫並且能夠盛放byteCount 數據
 74        if (tail != null && tail.owner
 75            && (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
 76          // Our existing segments are sufficient. Move bytes from source's head to our tail.
 77          //直接寫入尾部結點Segment即可,Segment的writeTo方法上面已經分析
 78          source.head.writeTo(tail, (int) byteCount);
 79          //改變緩存Buffer中數據量
 80          source.size -= byteCount;
 81          size += byteCount;
 82          return;
 83        } else {
 84          // We're going to need another segment. Split the source's head
 85          // segment in two, then move the first of those two to this buffer.
 86          //尾部Segment不能盛放下byteCount數量數據,那就將source中頭結點Segment進行分割,split方法上面已經分析過
 87          source.head = source.head.split((int) byteCount);
 88        }
 89      }
 90
 91      // Remove the source's head segment and append it to our tail.
 92      //獲取source中的頭結點
 93      Segment segmentToMove = source.head;
 94      long movedByteCount = segmentToMove.limit - segmentToMove.pos;
 95      //將頭結點segmentToMove從原鏈表中彈出
 96      source.head = segmentToMove.pop();
 97      //檢查要加入的鏈表頭結點head是否為null
 98      if (head == null) {//head為null情況下插入鏈表
 99        head = segmentToMove;
100        head.next = head.prev = head;
101      } else {//head不為null
102        Segment tail = head.prev;
103        //將segmentToMove插入新的鏈表中
104        tail = tail.push(segmentToMove);
105        //掉用compact嘗試壓縮
106        tail.compact();
107      }
108      source.size -= movedByteCount;
109      size += movedByteCount;
110      byteCount -= movedByteCount;
111    }
112  }
113 }

write(Buffer source, long byteCount)描述了將一個Buffer數據寫入另一個Buffer中的核心邏輯,Buffer之間數據的轉移就是將一個Buffer從頭部數據開始寫入另一個Buffer的尾部,但是上述有個特別精巧的構思:如果目標Segment能夠容納下要寫入的數據則直接采用數組拷貝的方式,如果容納不下則先split拆分source頭結點Segment,然后整段移動到目標Buffer鏈表尾部,注意這里是移動也就是操作指針而不是數組拷貝,這樣就非常高效了,而不是一味地數組拷貝方式轉移數據,okio將數據分割成一小段一小段並且用鏈表連接起來也是為了這樣的操作來轉移數據,對數據的操作更加靈活高效。

我們再來看Buffer類中的read方法,相比於write方法,read方法就比較簡單了,平時使用中讀取字符串操作算是比較比較多的了,我們看下Buffer中readString方法:

1  @Override public String readString(long byteCount, Charset charset) throws EOFException {
 2    checkOffsetAndCount(size, 0, byteCount);
 3    if (charset == null) throw new IllegalArgumentException("charset == null");
 4    if (byteCount > Integer.MAX_VALUE) {
 5      throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
 6    }
 7    if (byteCount == 0) return "";
 8
 9    Segment s = head;
10    //如果緩存中head結點Segment存儲的數據小於byteCount ,則轉移調用readByteArray方法讀取
11    if (s.pos + byteCount > s.limit) {
12      // If the string spans multiple segments, delegate to readBytes().
13      return new String(readByteArray(byteCount), charset);
14    }
15   //緩存中head結點Segment存儲的數據大於等於byteCount,也就是能從head結點Segment讀取全部數據,直接讀取就可以了
16    String result = new String(s.data, s.pos, (int) byteCount, charset);
17    s.pos += byteCount;
18    size -= byteCount;
19    //讀取完當前Segment沒有數據了,那么就可以回收了
20    if (s.pos == s.limit) {
21      head = s.pop();
22      SegmentPool.recycle(s);
23    }
24
25    return result;
26  }

是不是很簡單,至於涉及的readByteArray自己看一下就可以了。

以上介紹了okio的緩存結構,其實最核心的就是Buffer類以及Segment類的操作,希望你能真正理解,在我們平時使用中接觸最多的是okio類,也就是對外暴露的api都定義在這個類中,我們簡要看一下。

八、 okio類解析

最開始我們介紹了一段寫操作的代碼:

1//向File中寫入數據
 2public static void writeTest(File file) {
 3    try {
 4        Okio.buffer(Okio.sink(file))
 5            .writeUtf8("write string by utf-8.\n")
 6            .writeInt(1234).close();
 7    } catch (Exception e) {
 8        e.printStackTrace();
 9    }
10}

拆分一下上述代碼:

 1//向File中寫入數據
 2public static void writeTest(File file) {
 3    try {
 4        Sink sink = Okio.sink(file);
 5        BufferedSink bufferedSink = Okio.buffer(sink );
 6        bufferedSink .writeUtf8("write string by utf-8.\n");
 7        bufferedSink .writeInt(1234);
 8        bufferedSink.close();
 9    } catch (Exception e) {
10        e.printStackTrace();
11    }
12}

Sink sink = Okio.sink(file)做了什么?

1  /** Returns a sink that writes to {@code file}. */
 2  public static Sink sink(File file) throws FileNotFoundException {
 3    if (file == null) throw new IllegalArgumentException("file == null");
 4    return sink(new FileOutputStream(file));
 5  }
 6
 7 /** Returns a sink that writes to {@code out}. */
 8  public static Sink sink(OutputStream out) {
 9    return sink(out, new Timeout());
10  }
11
12  private static Sink sink(final OutputStream out, final Timeout timeout) {
13    if (out == null) throw new IllegalArgumentException("out == null");
14    if (timeout == null) throw new IllegalArgumentException("timeout == null");
15
16    return new Sink() {
17      @Override public void write(Buffer source, long byteCount) throws IOException {
18        checkOffsetAndCount(source.size, 0, byteCount);
19        while (byteCount > 0) {
20          timeout.throwIfReached();
21          Segment head = source.head;
22          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
23          out.write(head.data, head.pos, toCopy);
24
25          head.pos += toCopy;
26          byteCount -= toCopy;
27          source.size -= toCopy;
28
29          if (head.pos == head.limit) {
30            source.head = head.pop();
31            SegmentPool.recycle(head);
32          }
33        }
34      }
35
36      @Override public void flush() throws IOException {
37        out.flush();
38      }
39
40      @Override public void close() throws IOException {
41        out.close();
42      }
43
44      @Override public Timeout timeout() {
45        return timeout;
46      }
47
48      @Override public String toString() {
49        return "sink(" + out + ")";
50      }
51    };
52  }

最終就是調用sink(final OutputStream out, final Timeout timeout) 方法new了一個Sink對象,並且write,close等方法也都是調用OutputStream的相應方法,所以okio底層還是用的原生OutputStream輸出流,只是再次基礎上封裝了自己的緩存邏輯。

Okio.buffer(sink)做了什么?

1  /**
2   * Returns a new sink that buffers writes to {@code sink}. The returned sink
3   * will batch writes to {@code sink}. Use this wherever you write to a sink to
4   * get an ergonomic and efficient access to data.
5   */
6  public static BufferedSink buffer(Sink sink) {
7    return new RealBufferedSink(sink);
8  }

這就更簡單了,返回一個RealBufferedSink對象而已。

好了okio類就介紹到這里,至於source自己看一下就可以了,okio只是封裝了一些方便外部調用的api而已。

九、 GZIP壓縮解壓的實現

okio自帶GZIP壓縮以及解壓功能,具體實現由GzipSource與GzipSink完成:

GzipSink 實現Sink接口,是帶有壓縮功能的Sink,會將要寫出的數據壓縮之后再寫出,內部有CRC32對象負責將原生sink的數據進行Gzip壓縮,然后由DeflaterSink對象負責將壓縮后的數據寫出。

GzipSource 實現了Source接口,是帶有解壓功能的Source,由InflaterSource讀取壓縮的數據,然后CRC32解壓數據,得到原始的數據。

GZip壓縮在網絡通信中經常用來壓縮傳輸的數據以節省流量,okhttp的例子中對數據的壓縮就使用了okio中的GzipSink來實現數據的壓縮,官方demo中有一個類 RequestBodyCompression,向我們展示了如何實現 RequestBody 的 Gzip 壓縮:

1 public final class RequestBodyCompression {
 2
 3  /**
 4   * The Google API KEY for OkHttp recipes. If you're using Google APIs for anything other than
 5   * running these examples, please request your own client ID!
 6   * https://console.developers.google.com/project
 7   */
 8
 9  public static final String GOOGLE_API_KEY = "AIzaSyAx2WZYe0My0i-uGurpvraYJxO7XNbwiGs";
10  public static final MediaType MEDIA_TYPE_JSON = MediaType.get("application/json");
11
12  private final OkHttpClient client = new OkHttpClient.Builder()
13      .addInterceptor(new GzipRequestInterceptor())
14      .build();
15
16  private final Moshi moshi = new Moshi.Builder().build();
17
18  private final JsonAdapter<Map<String, String>> mapJsonAdapter = moshi.adapter(
19      Types.newParameterizedType(Map.class, String.class, String.class));
20
21  public void run() throws Exception {
22
23    Map<String, String> requestBody = new LinkedHashMap<>();
24
25    requestBody.put("longUrl", "https://publicobject.com/2014/12/04/html-formatting-javadocs/");
26
27    RequestBody jsonRequestBody = RequestBody.create(
28
29   MEDIA_TYPE_JSON, mapJsonAdapter.toJson(requestBody));
30
31    Request request = new Request.Builder()
32        .url("https://www.googleapis.com/urlshortener/v1/url?key=" + GOOGLE_API_KEY)
33        .post(jsonRequestBody)
34        .build();
35
36    try (Response response = client.newCall(request).execute()) {
37
38      if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
39      System.out.println(response.body().string());
40    }
41  }
42
43  public static void main(String... args) throws Exception {
44
45    new RequestBodyCompression().run();
46  }
47
48  /** This interceptor compresses the HTTP request body. Many webservers can't handle this! */
49
50  static class GzipRequestInterceptor implements Interceptor {
51
52    @Override public Response intercept(Chain chain) throws IOException {
53
54      Request originalRequest = chain.request();
55
56      if (originalRequest.body() == null || originalRequest.header("Content-Encoding") != null) {
57
58        return chain.proceed(originalRequest);
59      }
60
61      Request compressedRequest = originalRequest.newBuilder()
62          .header("Content-Encoding", "gzip")
63          .method(originalRequest.method(), gzip(originalRequest.body()))
64          .build();
65      return chain.proceed(compressedRequest);
66    }
67
68    private RequestBody gzip(final RequestBody body) {
69
70      return new RequestBody() {
71
72        @Override public MediaType contentType() {
73
74          return body.contentType();
75        }
76
77        @Override public long contentLength() {
78
79          return -1; // We don't know the compressed length in advance!
80        }
81        @Override public void writeTo(BufferedSink sink) throws IOException {
82         //GZIP壓縮實現
83          BufferedSink gzipSink = Okio.buffer(new GzipSink(sink));
84          body.writeTo(gzipSink);
85          gzipSink.close();
86        }
87      };
88    }
89  }
90 }

十、總結

以上介紹了okio中最核心的部分,okio中還有其余功能沒有介紹比如:超時機制,ByteString,實現生產者消費者功能的Pipe類,HashSink與HashSource等等,其實這些自己去看看就可以了,這些功能也都只是邊角的擴展而已,okio最核心的就是其緩存功能,希望你靜下心來好好研究一下,這里我不想說什么原生io多不堪,okio多么優秀等等,okio確實優秀但也只是對原生io的擴展:舍棄原生io的緩存機制,自己另起爐灶擼起袖子自己實現,同時給我們很多啟發在我們改造優化項目的時候不是把幾個方法合並一起就叫做優化了,更高級的做法就像okio一樣充分理解原生功能明白其缺點在其之上進行改造,這樣才最有意義。

很多同學初次接觸okio估計都是從okhttp開始知道的,作為okhttp底層的io庫,其高效的緩存也為上層okhttp的高效提供了很好的基礎,在我們誇贊okhttp的同時也要知道底層默默付出的okio啊,另外okio完全可以單獨使用,同時也建議項目中io操作使用okio。

好了,本篇到此為止,希望對你有用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM