前言
接下來我們要分析的是攔截器鏈上的最后一個攔截器CallServerInterceptor
,它用於寫入請求與獲取響應,這里不需要再調用攔截器責任鏈的proceed
方法,CallServerInterceptor
在intercept
方法中將自己的工作做完后,就直接將響應返回給上一攔截器。
CallServerInterceptor::intercept
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
// 寫入請求頭
exchange.writeRequestHeaders(request)
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
// 如果請求方法允許攜帶請求體(非GET請求,非HEAD請求),並且請求體不為空
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// 當客戶端需要發送一個體積可能很大的消息體時,就會將Expect: 100-continue寫入請求頭,然后發送
// 不帶請求體的請求給服務端,然后服務端開始檢查請求消息頭,如果服務端滿足條件就會返回一個狀態碼
// 100的回復告知客戶端繼續發送消息體,否則返回一個狀態碼417(Expectation Failed)的回復來告知
// 客戶端不滿足期待條件,別發送請求體。
// 若客戶端包含Expect: 100-continue請求頭,則進入下面的If語句。
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
// 構建Response.Builder,當response狀態碼為100時,返回null
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
// 客戶端不包含"Expect: 100-continue",或者,客戶端包含"Expect: 100-continue"並且服務端
// 滿足期待條件
if (responseBuilder == null) {
// 請求體是否為雙工的,默認情況下返回false
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
// 如果滿足Expect: 100-continue,並且請求體不是雙工的,則寫入請求體
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
// 服務器未達到"Except: 100-continue"期望,不發送請求體
exchange.noRequestBody()
// 若連接不是多路復用的
if (!exchange.connection.isMultiplexed) {
exchange.noNewExchangesOnConnection()
}
}
} else {
// 不發送請求體
exchange.noRequestBody()
}
// 默認下,requestBody.isDuplex()返回false,進入If語句
if (requestBody == null || !requestBody.isDuplex()) {
// 結束請求處理
exchange.finishRequest()
}
if (responseBuilder == null) {
// 讀取響應頭
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
// 構建響應,包含:原始請求、握手信息、發送時間、接收時間
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// Server sent a 100-continue even though we did not request one. Try again to read the actual
// response status.
// 即使我們沒有請求,服務端也返回了狀態碼為100的響應,再次嘗試去讀實際的響應狀態
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
exchange.responseHeadersEnd(response)
// forWebSocket默認為false,code為101表示切換協議
response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
// 讀取響應體
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
// 是否要關閉TCP連接
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
}
// 將響應返回給上一個攔截器
return response
}
該方法的大致流程如下:
- 寫入請求頭(
Exchange.writeRequestHeaders
) - 寫入請求體(
Exchange.createRequestBody & requestBody.writeTo
) - 讀取響應頭(
Exchange.readResponseHeaders
) - 讀取響應體(
Exchange.openResponseBody
)
可以看出,該攔截器主要是利用了Exchange
的各種方法,完成請求和響應的寫與讀,在Exchange
里面,實際的I/O
操作交由ExchangeCodec
處理,ExchangeCodec
是一個接口,它有Http1ExchangeCodec
和Http2ExchangeCodec
兩個實現類,分別對應HTTP/1.x
、HTTP/2
協議版本。
ExchangeCodec
ExchangeCodec
是一個接口,負責對Http
請求進行編碼和對Http
響應進行解碼。有如下的變量與方法

Exchange::writeRequestHeaders
調用了ExchangeCodec
的writeRequestHeaders
方法;
@Throws(IOException::class)
fun writeRequestHeaders(request: Request) {
try {
eventListener.requestHeadersStart(call)
codec.writeRequestHeaders(request)
eventListener.requestHeadersEnd(call, request)
} catch (e: IOException) {
eventListener.requestFailed(call, e)
trackFailure(e)
throw e
}
}
Exchange::createRequestBody
調用了ExchangeCodec
的createRequestBody
方法;
@Throws(IOException::class)
fun createRequestBody(request: Request, duplex: Boolean): Sink {
this.isDuplex = duplex
val contentLength = request.body!!.contentLength()
eventListener.requestBodyStart(call)
val rawRequestBody = codec.createRequestBody(request, contentLength)
return RequestBodySink(rawRequestBody, contentLength)
}
Exchange::readResponseHeaders
調用了ExchangeCodec
的readResponseHeaders
方法
@Throws(IOException::class)
fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
try {
val result = codec.readResponseHeaders(expectContinue)
result?.initExchange(this)
return result
} catch (e: IOException) {
eventListener.responseFailed(call, e)
trackFailure(e)
throw e
}
}
Exchange::openResponseBody
調用了ExchangeCodec
的openResponseBodySource
方法
@Throws(IOException::class)
fun openResponseBody(response: Response): ResponseBody {
try {
val contentType = response.header("Content-Type")
val contentLength = codec.reportedContentLength(response)
val rawSource = codec.openResponseBodySource(response)
val source = ResponseBodySource(rawSource, contentLength)
return RealResponseBody(contentType, contentLength, source.buffer())
} catch (e: IOException) {
eventListener.responseFailed(call, e)
trackFailure(e)
throw e
}
}
Http1ExchangeCodec
writeRequestHeaders
override fun writeRequestHeaders(request: Request) {
// 拼接請求行,包含 method、url、HTTP/1.1
val requestLine = RequestLine.get(request, connection.route().proxy.type())
// 寫入請求Header和請求行
writeRequest(request.headers, requestLine)
}
該方法先是拼接請求行,接着寫入請求Header
和請求行。
Http1ExchangeCodec::writeRequest
如下
fun writeRequest(headers: Headers, requestLine: String) {
check(state == STATE_IDLE) { "state: $state" }
// 寫入請求行
sink.writeUtf8(requestLine).writeUtf8("\r\n")
// 寫入請求Header
for (i in 0 until headers.size) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n")
}
sink.writeUtf8("\r\n")
state = STATE_OPEN_REQUEST_BODY
}
createRequestBody
Http1ExchangeCodec::createRequestBody
:
override fun createRequestBody(request: Request, contentLength: Long): Sink {
return when {
request.body != null && request.body.isDuplex() -> throw ProtocolException(
"Duplex connections are not supported for HTTP/1")
// 分塊傳輸
request.isChunked -> newChunkedSink() // Stream a request body of unknown length.
// 請求體的長度已確定
contentLength != -1L -> newKnownLengthSink() // Stream a request body of a known length.
else -> // Stream a request body of a known length.
throw IllegalStateException(
"Cannot stream a request body without chunked encoding or a known content length!")
}
}
若要分塊傳輸,則調用newChunkedSink
方法獲取輸出流;
若不使用分塊傳輸,請求體的長度已確定,則調用newKnownLengthSink
方法獲取輸出流。
newChunkedSink
Http1ExchangeCodec::newChunkedSink
:
private fun newChunkedSink(): Sink {
// 檢查狀態
check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" }
state = STATE_WRITING_REQUEST_BODY
return ChunkedSink()
}
這里創建了一個ChunkedSink
並返回,ChunkedSink
是Http1ExchangeCodec
的普通內部類,且實現了Sink
接口
/**
* An HTTP body with alternating chunk sizes and chunk bodies. It is the caller's
* responsibility to buffer chunks; typically by using a buffered sink with this sink.
*/
// ChunkedSink含有兩個信息:分塊的數據長度 和 分塊的具體數據
private inner class ChunkedSink : Sink {
...
override fun write(source: Buffer, byteCount: Long) {
check(!closed) { "closed" }
if (byteCount == 0L) return
sink.writeHexadecimalUnsignedLong(byteCount)
sink.writeUtf8("\r\n")
sink.write(source, byteCount)
sink.writeUtf8("\r\n")
}
...
}
在write
方法中,首先寫入了十六進制的數據大小,接着寫入了數據。
newKnownLengthSink
Http1ExchangeCodec::newKnownLengthSink
:
private fun newKnownLengthSink(): Sink {
// 檢查狀態
check(state == STATE_OPEN_REQUEST_BODY) { "state: $state" }
state = STATE_WRITING_REQUEST_BODY
return KnownLengthSink()
}
這里創建了一個KnownLengthSink
並返回,KnownLengthSink
是Http1ExchangeCodec
的普通內部類,且實現了Sink
接口
/** An HTTP request body. */
private inner class KnownLengthSink : Sink {
...
override fun write(source: Buffer, byteCount: Long) {
check(!closed) { "closed" }
checkOffsetAndCount(source.size, 0, byteCount)
sink.write(source, byteCount)
}
...
}
在KnownLengthSink
的write
方法中,完成了對數據的寫入。
readResponseHeaders
Http1ExchangeCodec::readResponseHeaders
如下:
override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
check(state == STATE_OPEN_REQUEST_BODY || state == STATE_READ_RESPONSE_HEADERS) {
"state: $state"
}
try {
// 讀取並解析響應行內容
val statusLine = StatusLine.parse(headersReader.readLine())
// 根據響應行和響應頭,構建Response.Builder
val responseBuilder = Response.Builder()
.protocol(statusLine.protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(headersReader.readHeaders())
return when {
// 若expectContinue為true且響應碼為100,返回null
// (若用戶的請求頭包含Expect: 100-continue,則返回的響應碼有可能為100)
expectContinue && statusLine.code == HTTP_CONTINUE -> {
null
}
statusLine.code == HTTP_CONTINUE -> {
state = STATE_READ_RESPONSE_HEADERS
responseBuilder
}
else -> {
state = STATE_OPEN_RESPONSE_BODY
responseBuilder
}
}
} catch (e: EOFException) {
// Provide more context if the server ends the stream before sending a response.
val address = connection.route().address.url.redact()
throw IOException("unexpected end of stream on $address", e)
}
}
該方法主要做的事情
- 調用
readLine
方法讀取響應行信息,並且調用StatusLine.parse
方法將其解析為StatusLine
對象 - 根據響應行和響應頭,構建
Response.Builder
openResponseBodySource
Http1ExchangeCodec::openResponseBodySource
如下
override fun openResponseBodySource(response: Response): Source {
return when {
// 根據響應行和響應頭,判斷是否有響應體
!response.promisesBody() -> newFixedLengthSource(0)
// 判斷是否使用分塊傳輸
response.isChunked -> newChunkedSource(response.request.url)
else -> {
// 響應體長度
val contentLength = response.headersContentLength()
if (contentLength != -1L) {
// 長度已知
newFixedLengthSource(contentLength)
} else {
// 長度未知
newUnknownLengthSource()
}
}
}
}
這里根據不同的情況構造不同的輸入流,分別有newFixedLengthSource
、newChunkedSource
、newUnknownLengthSource
。
newChunkedSource
private fun newChunkedSource(url: HttpUrl): Source {
check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" }
state = STATE_READING_RESPONSE_BODY
return ChunkedSource(url)
}
返回了一個ChunkedSource
對象,該對象如下
/** An HTTP body with alternating chunk sizes and chunk bodies. */
private inner class ChunkedSource(private val url: HttpUrl) :
AbstractSource() {
...
override fun read(sink: Buffer, byteCount: Long): Long {
...
if (bytesRemainingInChunk == 0L || bytesRemainingInChunk == NO_CHUNK_YET) {
// 讀取分塊數據大小
readChunkSize()
if (!hasMoreChunks) return -1
}
// 調用父類的read方法,讀取分塊數據內容
val read = super.read(sink, minOf(byteCount, bytesRemainingInChunk))
...
return read
}
private fun readChunkSize() {
...
}
...
}
newFixedLengthSource
private fun newFixedLengthSource(length: Long): Source {
check(state == STATE_OPEN_RESPONSE_BODY) { "state: $state" }
state = STATE_READING_RESPONSE_BODY
return FixedLengthSource(length)
}
返回了一個FixedLengthSource
對象,該對象如下
/** An HTTP body with a fixed length specified in advance. */
private inner class FixedLengthSource(private var bytesRemaining: Long) :
AbstractSource() {
...
override fun read(sink: Buffer, byteCount: Long): Long {
...
// 調用父類的read方法讀取數據
val read = super.read(sink, minOf(bytesRemaining, byteCount))
...
return read
}
...
}
HTTP/2 流量控制
HTTP/2利用流來實現多路復用,這引入了對TCP連接的使用爭奪,會造成流被阻塞。流量控制方案確保在同一連接上的多個流之間不會造成破壞性的干擾。流量控制會用於各個獨立的流,也會用於整個連接。
HTTP/2"流"的流量控制的目標是:在不改變協議的情況下允許使用多種流量控制算法。
HTTP/2的流量控制具有以下特征:
- 流量控制是特定於一個連接的。每種類型的流量控制都是在單獨的一跳的兩個端點之間的,並不是在整個端到端的路徑上的。(這里的一跳指的是HTTP連接的一跳,而不是IP路由的一跳)
- 流量控制是基於WINDOW_UPDATE幀的。接收方公布自己打算在每個流以及整個連接上分別接收多少字節。這是一個以信用為基礎的方案。
- 流量控制是有方向的,由接收者全面控制。接收方可以為每個流和整個連接設置任意的窗口大小。發送方必須尊重接收方設置的流量控制限制。客戶方、服務端和中間代理作為接收方時都獨立地公布各自的流量控制窗口,作為發送方時都遵守對端的流量控制設置。
- 無論是新流還是整個連接,流量控制窗口的初始值是65535字節。
- 幀的類型決定了流量控制是否適用於幀。目前,只有DATA幀服從流量控制,所有其它類型的幀並不消耗流量控制窗口的空間。這保證了重要的控制幀不會被流量控制阻塞。
- 流量控制不能被禁用。
- HTTP/2只定義了WINDOW_UPDATE幀的格式和語義,並沒有規定接收方如何決定何時發送幀、發送什么樣的值,也沒有規定發送方如何選擇發送包。具體實現可以選擇任何滿足需求的算法。
發送端持有一個流量控制窗口(window),初始值為65536,發送端每發送一個DATA幀,就會把window遞減,遞減量為DATA幀的大小,如果window為0則不能發送任何幀。接收端可以發送WINDOW_UPDATE幀給發送端,發送端以幀內的Window Size Increment作為增量,加到window上。
在TCP中已經有流量控制了,為什么在HTTP/2中還需要進行流量控制?
答:在HTTP/2中利用流來實現多路復用,在一個TCP連接中有多條流用於傳輸數據,在TCP中的流量控制是針對於整個連接的,而HTTP/2中的流量控制既針對各個獨立的流,也針對整個TCP連接。HTTP/2對每條流都進行了流量控制,確保某個流不會阻塞其他流。
Http2ExchangeCodec
writeRequestHeaders
override fun writeRequestHeaders(request: Request) {
if (stream != null) return
val hasRequestBody = request.body != null
// 獲取存放Header的List
val requestHeaders = http2HeadersList(request)
// 獲取Http2Stream對象
stream = http2Connection.newStream(requestHeaders, hasRequestBody)
// We may have been asked to cancel while creating the new stream and sending the request
// headers, but there was still no stream to close.
if (canceled) {
stream!!.closeLater(ErrorCode.CANCEL)
throw IOException("Canceled")
}
stream!!.readTimeout().timeout(chain.readTimeoutMillis.toLong(), TimeUnit.MILLISECONDS)
stream!!.writeTimeout().timeout(chain.writeTimeoutMillis.toLong(), TimeUnit.MILLISECONDS)
}
該方法主要做了兩件事情:
- 將請求頭放入
List
集合中 - 通過剛剛創建的請求頭集合初始化並獲取一個
Http2Stream
對象
http2HeadersList
方法如下
fun http2HeadersList(request: Request): List<Header> {
val headers = request.headers
val result = ArrayList<Header>(headers.size + 4)
result.add(Header(TARGET_METHOD, request.method))
result.add(Header(TARGET_PATH, RequestLine.requestPath(request.url)))
val host = request.header("Host")
if (host != null) {
result.add(Header(TARGET_AUTHORITY, host)) // Optional.
}
result.add(Header(TARGET_SCHEME, request.url.scheme))
for (i in 0 until headers.size) {
// header names must be lowercase.
val name = headers.name(i).toLowerCase(Locale.US)
if (name !in HTTP_2_SKIPPED_REQUEST_HEADERS ||
name == TE && headers.value(i) == "trailers") {
result.add(Header(name, headers.value(i)))
}
}
return result
}
該方法將request.headers
轉化為了ArrayList<Header>
,並且添加了method
、path
、authority
、scheme
的Header
。
Http2Connection::newStream
如下
/**
* Returns a new locally-initiated stream.
*
* @param out true to create an output stream that we can use to send data to the
* remote peer.Corresponds to `FLAG_FIN`.
*/
// 返回一個本地初始化的流
@Throws(IOException::class)
fun newStream(
requestHeaders: List<Header>,
out: Boolean
): Http2Stream {
return newStream(0, requestHeaders, out)
}
@Throws(IOException::class)
private fun newStream(
associatedStreamId: Int,
requestHeaders: List<Header>,
out: Boolean
): Http2Stream {
val outFinished = !out
val inFinished = false
val flushHeaders: Boolean
val stream: Http2Stream
val streamId: Int
synchronized(writer) {
synchronized(this) {
...
// streamId 以2遞增
streamId = nextStreamId
nextStreamId += 2
// 初始化一個新的Http2Stream
stream = Http2Stream(streamId, this, outFinished, inFinished, null)
...
}
if (associatedStreamId == 0) {
// 寫入請求頭
writer.headers(outFinished, streamId, requestHeaders)
} else {
require(!client) { "client streams shouldn't have associated stream IDs" }
// HTTP/2 has a PUSH_PROMISE frame.
// 發送PUSH_PROMISE幀
writer.pushPromise(associatedStreamId, streamId, requestHeaders)
}
}
...
return stream
}
在該方法中,首先計算當前請求對應的streamId
,然后初始化一個Http2Stream
對象。因為傳入的associatedStreamId
為0,所以會調用writer.header
寫入請求頭(否則會調用writer.pushPromise
方法發送PUSH_PROMISE
幀)
寫入請求頭的Http2Writer::headers
方法如下:
@Synchronized @Throws(IOException::class)
fun headers(
outFinished: Boolean,
streamId: Int,
headerBlock: List<Header>
) {
if (closed) throw IOException("closed")
hpackWriter.writeHeaders(headerBlock)
val byteCount = hpackBuffer.size
val length = minOf(maxFrameSize.toLong(), byteCount)
var flags = if (byteCount == length) FLAG_END_HEADERS else 0
if (outFinished) flags = flags or FLAG_END_STREAM
frameHeader(
streamId = streamId,
length = length.toInt(),
type = TYPE_HEADERS,
flags = flags
)
sink.write(hpackBuffer, length)
if (byteCount > length) writeContinuationFrames(streamId, byteCount - length)
}
這里調用了 hpackWriter.writeHeaders
對 Header
進行了 HPACK
壓縮編碼,然后調用 frameHeader
方法寫入幀頭,再將壓縮編碼后的 Header
數據寫入輸出流sink
中。(在 HTTP/2 中會對 Header 的信息進行 HPACK 壓縮編碼)
createRequestBody
Http2ExchangeCodec::createRequestBody
:
override fun createRequestBody(request: Request, contentLength: Long): Sink {
return stream!!.getSink()
}
stream
在前面「寫入請求頭writeRequestHeaders
」中被創建,這里得到了stream
的輸出流sink
。
Http2Stream::getSink
:
fun getSink(): Sink {
synchronized(this) {
check(hasResponseHeaders || isLocallyInitiated) {
"reply before requesting the sink"
}
}
return sink
}
sink
變量:
internal val sink = FramingSink(
finished = outFinished
)
FramingSink
是Http2Stream
的一個普通內部類,實現了Sink
接口:
/*A sink that writes outgoing data frames of a stream. This class is not thread safe.*/
internal inner class FramingSink(
/** True if either side has cleanly shut down this stream. We shall send no more bytes. */
var finished: Boolean = false
) : Sink {
// 數據緩沖區
private val sendBuffer = Buffer()
/** Trailers to send at the end of the stream. */
var trailers: Headers? = null
var closed: Boolean = false
@Throws(IOException::class)
override fun write(source: Buffer, byteCount: Long) {
this@Http2Stream.assertThreadDoesntHoldLock()
// 將數據寫入緩沖區
sendBuffer.write(source, byteCount)
while (sendBuffer.size >= EMIT_BUFFER_SIZE) {
// 發出數據幀
emitFrame(false)
}
}
/**
* Emit a single data frame to the connection. The frame's size be limited by this stream's
* write window. This method will block until the write window is nonempty.
*/
// 向連接發出單個數據幀。幀的大小受此流的寫窗口的限制。此方法將阻塞,直到寫窗口非空為止
@Throws(IOException::class)
private fun emitFrame(outFinishedOnLastFrame: Boolean) {
val toWrite: Long
val outFinished: Boolean
synchronized(this@Http2Stream) {
writeTimeout.enter()
try {
// 如果應用程序已發送的字節數 >= 允許發送的字節總數,阻塞,等待WINDOW_UPDATE幀
// 更新流量窗口大小
while (writeBytesTotal >= writeBytesMaximum &&
!finished &&
!closed &&
errorCode == null) {
waitForIo() // Wait until we receive a WINDOW_UPDATE for this stream.
}
} finally {
writeTimeout.exitAndThrowIfTimedOut()
}
checkOutNotClosed() // Kick out if the stream was reset or closed while waiting.
// 計算能夠寫入的字節數
toWrite = minOf(writeBytesMaximum - writeBytesTotal, sendBuffer.size)
writeBytesTotal += toWrite
outFinished = outFinishedOnLastFrame && toWrite == sendBuffer.size && errorCode == null
}
writeTimeout.enter()
try {
// 寫入數據
connection.writeData(id, outFinished, sendBuffer, toWrite)
} finally {
writeTimeout.exitAndThrowIfTimedOut()
}
}
...
}
FramingSink
的write
方法首先將數據寫入緩沖區中,如果緩沖區的數據大小大於等於EMIT_BUFFER_SIZE
,就會調用emitFrame
方法發出數據幀。
在emitFrame
方法中,出現了兩個變量:
-
writeBytesTotal
: 應用程序已發送的字節數 -
writeBytesMaximum
:stream
流量窗口的大小
在emitFrame
方法中,如果應用程序已發送的字節數超過了流量窗口的大小,就會阻塞,等待接收方(此時接收方是服務端)發送WINDOW_UPDATE
幀,且writeBytesTotal < writeBytesMaximum
才會發送數據幀。發送數據幀的時候,會先調用minOf(writeBytesMaximum - writeBytesTotal, sendBuffer.size)
計算能夠發送的數據大小,它會取「寫窗口的剩余空間」和「寫緩沖區大小」的最小值,然后調用connection.writeData
寫入數據,其中connection
是一個Http2Connection
對象。
Http2Connection::writeData
如下:
/**
* ...(省略)
* Zero [byteCount] writes are not subject to flow control and will not block.
* The only use case for zero [byteCount] is closing a flushed output stream.
*/
// ...(省略)零字節計數寫入不受流控制,也不會阻塞,byteCount為0的唯一用例是關閉一個刷新的輸出流
@Throws(IOException::class)
fun writeData(
streamId: Int,
outFinished: Boolean,
buffer: Buffer?,
byteCount: Long
) {
// Empty data frames are not flow-controlled.
// 空數據幀不受流控制
if (byteCount == 0L) {
writer.data(outFinished, streamId, buffer, 0)
return
}
var byteCount = byteCount
while (byteCount > 0L) {
var toWrite: Int
synchronized(this@Http2Connection) {
try {
// 當前連接不允許發送更多數據,進入阻塞狀態,直到連接的流量控制窗口更新
while (writeBytesTotal >= writeBytesMaximum) {
// Before blocking, confirm that the stream we're writing is still open. It's possible
// that the stream has since been closed (such as if this write timed out.)
if (!streams.containsKey(streamId)) {
throw IOException("stream closed")
}
this@Http2Connection.wait() // Wait until we receive a WINDOW_UPDATE.
}
} catch (e: InterruptedException) {
Thread.currentThread().interrupt() // Retain interrupted status.
throw InterruptedIOException()
}
// 計算能夠寫入的數據大小
toWrite = minOf(byteCount, writeBytesMaximum - writeBytesTotal).toInt()
toWrite = minOf(toWrite, writer.maxDataLength())
// 更新寫入的總數據大小
writeBytesTotal += toWrite.toLong()
}
// 減去將要寫入的數據大小
byteCount -= toWrite.toLong()
// 寫入數據
writer.data(outFinished && byteCount == 0L, streamId, buffer, toWrite)
}
}
在writeData
方法中,主要工作如下:
- 判斷當前連接是否允許發送數據(
writeBytesTotal < writeBytesMaximum
),若不允許則進入阻塞狀態直到收到WINDOW_UPDATE
幀更新連接的流量窗口。 - 計算可發送數據大小(不超過剩余窗口大小以及每個幀的限制大小),調用
writer.data
方法寫入數據。 - 若當前還有需要發送的數據,重復1、2步驟。
Http2Writer::data
方法如下
/**
* ...
*
* @param source the buffer to draw bytes from. May be null if byteCount is 0.
* @param byteCount must be between 0 and the minimum of `source.length` and [maxDataLength].
*/
// source:緩存的字節,如果byteCount為0,source可能為null
// byteCount:值必須在[0,min(source.length,maxDataLength)]之間
@Synchronized @Throws(IOException::class)
fun data(outFinished: Boolean, streamId: Int, source: Buffer?, byteCount: Int) {
if (closed) throw IOException("closed")
var flags = FLAG_NONE
if (outFinished) flags = flags or FLAG_END_STREAM
dataFrame(streamId, flags, source, byteCount)
}
data
方法主要是調用了Http2Writer::dataFrame
方法:
@Throws(IOException::class)
fun dataFrame(streamId: Int, flags: Int, buffer: Buffer?, byteCount: Int) {
frameHeader(
streamId = streamId,
length = byteCount,
type = TYPE_DATA,
flags = flags
)
if (byteCount > 0) {
sink.write(buffer!!, byteCount.toLong())
}
}
先調用frameHeader
方法寫入幀頭,然后再寫入相應的數據。
readResponseHeaders
Http2ExchangeCodec::readResponseHeaders
如下
override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
// 獲取響應的Header
val headers = stream!!.takeHeaders()
// 構建一個Response.Builder對象
val responseBuilder = readHttp2HeadersList(headers, protocol)
// 若expectContinue為true,且響應碼為100,返回null,否則返回responseBuilder
return if (expectContinue && responseBuilder.code == HTTP_CONTINUE) {
null
} else {
responseBuilder
}
}
該方法首先獲取了響應的Header
,然后構建了一個Response.Builder
對象,接着根據情況,返回null
,或者responseBuilder
。我們在CallServerInterceptor::intercept
提到過,若用戶的請求頭包含Expect: 100-continue
,則返回的響應碼有可能為100
。
Http2Stream::takeHeaders
如下
@Synchronized @Throws(IOException::class)
fun takeHeaders(): Headers {
readTimeout.enter()
try {
// 若當前接收響應Header的隊列是空的,則阻塞
// headersQueue的類型是ArrayDeque<Headers>
while (headersQueue.isEmpty() && errorCode == null) {
waitForIo()
}
} finally {
readTimeout.exitAndThrowIfTimedOut()
}
// 取隊列的第一個Headers信息返回
if (headersQueue.isNotEmpty()) {
return headersQueue.removeFirst()
}
throw errorException ?: StreamResetException(errorCode!!)
}
headersQueue
的類型是ArrayDeque<Headers>
,表示響應Header
。如果headersQueue
里面沒有元素,就阻塞,否則取其第一個元素返回。
構建Response.Builder
對象的Http2ExchangeCodec::readHttp2HeadersList
方法如下:
/** Returns headers for a name value block containing an HTTP/2 response. */
fun readHttp2HeadersList(headerBlock: Headers, protocol: Protocol): Response.Builder {
var statusLine: StatusLine? = null
val headersBuilder = Headers.Builder()
// 遍歷Headers的信息
for (i in 0 until headerBlock.size) {
val name = headerBlock.name(i)
val value = headerBlock.value(i)
// 提取狀態行信息
if (name == RESPONSE_STATUS_UTF8) {
statusLine = StatusLine.parse("HTTP/1.1 $value")
// 過濾響應頭,HTTP_2_SKIPPED_RESPONSE_HEADERS包括:CONNECTION,HOST,KEEP_ALIVE,
// PROXY_CONNECTION,TE,TRANSFER_ENCODING,ENCODING,UPGRADE
} else if (name !in HTTP_2_SKIPPED_RESPONSE_HEADERS) {
headersBuilder.addLenient(name, value)
}
}
if (statusLine == null) throw ProtocolException("Expected ':status' header not present")
// 構建Response.Builder對象
return Response.Builder()
.protocol(protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(headersBuilder.build())
}
該方法主要是對Headers
對象進行遍歷,遍歷的過程中,會提取狀態行信息,並且符合要求的響應頭記錄下來,最后構建一個Response.Builder
對象。
openResponseBodySource
Http2ExchangeCodec::openResponseBodySource
如下:
override fun openResponseBodySource(response: Response): Source {
return stream!!.source
}
這里返回了Http2Stream
的source
對象,Http2Stream
的source
對象定義如下:
internal val source = FramingSource(
maxByteCount = connection.okHttpSettings.initialWindowSize.toLong(),
finished = inFinished
)
這里返回了一個FramingSource
對象,FramingSource
是Http2Stream
的普通內部類,實現了okio
的Source
接口:
// 該Source用於讀取流上即將到來的數據幀
inner class FramingSource internal constructor(
/** Maximum number of bytes to buffer before reporting a flow control error. */
private val maxByteCount: Long,
internal var finished: Boolean
) : Source {
// 接收數據緩沖區
val receiveBuffer = Buffer()
// 讀取數據緩沖區
val readBuffer = Buffer()
var trailers: Headers? = null
/** True if the caller has closed this stream. */
internal var closed: Boolean = false
@Throws(IOException::class)
override fun read(sink: Buffer, byteCount: Long): Long {
require(byteCount >= 0L) { "byteCount < 0: $byteCount" }
while (true) {
var tryAgain = false
var readBytesDelivered = -1L
var errorExceptionToDeliver: IOException? = null
// 1. Decide what to do in a synchronized block.
synchronized(this@Http2Stream) {
readTimeout.enter()
try {
if (errorCode != null) {
// Prepare to deliver an error.
errorExceptionToDeliver = errorException ?: StreamResetException(errorCode!!)
}
if (closed) {
throw IOException("stream closed")
} else if (readBuffer.size > 0L) {
// Prepare to read bytes. Start by moving them to the caller's buffer.
// 讀取數據
readBytesDelivered = readBuffer.read(sink, minOf(byteCount, readBuffer.size))
readBytesTotal += readBytesDelivered
// 未被WINDOW_UPDATE幀確認的字節總數
val unacknowledgedBytesRead = readBytesTotal - readBytesAcknowledged
if (errorExceptionToDeliver == null &&
unacknowledgedBytesRead >= connection.okHttpSettings.initialWindowSize / 2) {
// Flow control: notify the peer that we're ready for more data! Only
// send a WINDOW_UPDATE if the stream isn't in error.
// 流控制:通知對方我們已經准備好接收更多數據!只在流沒有錯誤的情況下發送
// WINDOW_UPDATE幀。
connection.writeWindowUpdateLater(id, unacknowledgedBytesRead)
readBytesAcknowledged = readBytesTotal
}
} else if (!finished && errorExceptionToDeliver == null) {
// Nothing to do. Wait until that changes then try again.
// 沒有數據可讀,阻塞,被喚醒之后再次嘗試讀取數據
waitForIo()
tryAgain = true
}
} finally {
readTimeout.exitAndThrowIfTimedOut()
}
}
// 2. Do it outside of the synchronized block and timeout.
// 是否要再次嘗試
if (tryAgain) {
continue
}
if (readBytesDelivered != -1L) {
// Update connection.unacknowledgedBytesRead outside the synchronized block.
updateConnectionFlowControl(readBytesDelivered)
return readBytesDelivered
}
if (errorExceptionToDeliver != null) {.
throw errorExceptionToDeliver!!
}
return -1L // This source is exhausted.
}
}
private fun updateConnectionFlowControl(read: Long) {
...
}
@Throws(IOException::class)
internal fun receive(source: BufferedSource, byteCount: Long) {
...
}
...
}
在FramingSource
的read
方法中,若有數據可讀,則調用readBuffer.read
方法讀取數據,若沒有數據可讀,則阻塞,被喚醒之后再次嘗試讀取數據。
當接收方(此時接收方是客戶端)接收的數據大小超過流量控制窗口大小,就調用connection.writeWindowUpdateLater
方法通知對方增加窗口大小,增加的大小為"未被WINDOW_UPDATE
幀確認的字節總數"。
Http2Connection::writeWindowUpdateLater
如下
internal fun writeWindowUpdateLater(
streamId: Int,
unacknowledgedBytesRead: Long
) {
writerQueue.execute("$connectionName[$streamId] windowUpdate") {
try {
writer.windowUpdate(streamId, unacknowledgedBytesRead)
} catch (e: IOException) {
failConnection(e)
}
}
}
這里異步發送一個WINDOW_UPDATE
幀,Http2Writer::windowUpdate
如下:
/**
* Inform peer that an additional `windowSizeIncrement` bytes can be sent on
* `streamId`, or the connection if `streamId` is zero.
*/
@Synchronized @Throws(IOException::class)
fun windowUpdate(streamId: Int, windowSizeIncrement: Long) {
if (closed) throw IOException("closed")
require(windowSizeIncrement != 0L && windowSizeIncrement <= 0x7fffffffL) {
"windowSizeIncrement == 0 || windowSizeIncrement > 0x7fffffffL: $windowSizeIncrement"
}
frameHeader(
streamId = streamId,
length = 4,
type = TYPE_WINDOW_UPDATE,
flags = FLAG_NONE
)
sink.writeInt(windowSizeIncrement.toInt())
sink.flush()
}
先寫入WINDOW_UPDATE
的幀頭,然后再寫入窗口增大數值。
總結
在 HTTP/1.x
的請求寫入和響應讀取中,主要是將普通請求、響應及 chunked
特性下的請求、響應進行了不同的處理。
在 HTTP/2
的請求寫入和響應讀取中,對流量控制進行了很好的支持,每個stream
以及connection
都會記錄當前發送數據大小以及流量窗口大小,通過窗口大小對寫入的數據大小進行了限制,客戶端和服務端在HTTP/2
的流量控制中都可以作為接收方,當請求寫入的時候,服務端是接收方,當讀取響應的時候,客戶端是接收方。此外,在傳輸Header
的時候,也使用了HTTP/2
的特性,利用HPACK
對 Header
進行壓縮編碼。