okhttp詳解
概論
okhttp
是一個網絡庫,其功能主要有兩點:
- 請求的發起與響應的接收
- 多個請求的管理
當前版本為4.9.0
。
請求的發起與響應的接收
這個過程是U
型的,如拆輪子系列:拆 OkHttp中的流程圖所顯示的:
先是向下一步一步地加工報文,到最底端將報文發送到服務端,然后拿到服務器返回的報文后,再向上一步一步將報文轉化為開發者友好的Response。
請求報文
請求報文分為三個部分:
- 請求行
- 請求頭
- 請求體
請求行
是對該請求的大致描述,請求頭
是對請求行
的補充以及對請求體
的約束,請求體
是客戶端想要傳遞給服務器的數據。
請求行
請求行分為三個部分,每個部分用空格隔開:
- Http方法,一般為GET、POST、HEAD、PUT、DELETE、OPTIONS、TRACE、CONNECT;
- 請求目標,就是
url
,例如https://www.baidu.com
; - HTTP版本,例如
HTTP/1.1
綜合一下就是:
GET https://www.baidu.com HTTP/1.1
請求頭
請求頭的結構比較簡單,以鍵值對的形式存在,以回車符和換行符隔開,形式如下:
Bdpagetype: 2
Bdqid: 0xed9518d40011b00b
Cache-Control: private
Connection: keep-alive
Content-Encoding: gzip
Content-Type: text/html;charset=utf-8
Date: Sun, 21 Feb 2021 13:35:58 GMT
Expires: Sun, 21 Feb 2021 13:35:58 GMT
Server: BWS/1.1
Set-Cookie: BDSVRTM=42; path=/
Set-Cookie: BD_HOME=1; path=/
Set-Cookie: H_PS_PSSID=33425_33506_33403_33344_31253_26350; path=/; domain=.baidu.com
Strict-Transport-Security: max-age=172800
Traceid: 1613914558067414836217119616857332101131
X-Ua-Compatible: IE=Edge,chrome=1
請求體
請求體比較靈活,沒有固定的格式,可以根據服務的需要自定義格式;
但是有幾種常見的格式:
- Json
{
"rate": 0.0867625,
"displayable": true
}
服務器可以將這種數據格式轉化為對應的數據類;
- 表單 (application/x-www-form-urlencoded)
name=tom&password=1234&realme=tomson
- 二進制表單(multipart/form-data)
------WebKitFormBoundary7MA4YWxkTrZu0gW
Content-Disposition: form-data; name="url"
https://www.baidu.com/
------WebKitFormBoundary7MA4YWxkTrZu0gW
Content-Disposition: form-data; name="name"
waffle
------WebKitFormBoundary7MA4YWxkTrZu0gW
Content-Disposition: form-data; name="desk"; filename="桌子.jpg"
Content-Type: image/jpeg
...contents of 桌子.jpg...
其中------WebKitFormBoundary7MA4YWxkTrZu0gW
用於分割不同的字段;
響應報文
響應報文的格式與請求報文的格式十分相似,唯一不同的是響應行不同;
響應頭分為三個部分,HTTP版本、狀態碼和狀態碼的文本描述;
HTTP/1.1 200 OK
關於狀態碼,請看這篇文章。
攔截器
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
以上七種攔截器的具體功能稍后再說,先解釋其具體的工作過程,一個請求是如何通過這一U
型過程的。
下面這一段代碼是請求的開端:
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
下面這一段代碼是RealInterceptorChain::proceed
的具體內容;
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if (exchange != null) {
check(exchange.finder.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
// index 代表了當前調用鏈中應該使用攔截器的下標
// 因此 next 代表了調用下一個攔截器的Chain
val next = copy(index = index + 1, request = request)
// 獲取當前環節所需攔截器
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}
下面是Interceptor
的基本結構:
override fun intercept(chain: Interceptor.Chain): Response{
// 對Request的處理加工
val response = chain.proceed(request)
// 對Response的處理加工
return response
}
通過這種責任鏈的設計模式,保證了報文的處理過程。
client.interceptors
這是用戶自定義的攔截器,默認是空的列表;
RetryAndFollowUpInterceptor
此攔截器並沒有對Request的加工,只有對Response的加工;
對Response的加工分為兩個部分:
- 失敗重試
try {
response = realChain.proceed(request)
newExchangeFinder = true
} catch (e: RouteException) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
throw e.firstConnectException.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e.firstConnectException
}
newExchangeFinder = false
continue
} catch (e: IOException) {
// An attempt to communicate with a server failed. The request may have been sent.
if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
throw e.withSuppressed(recoveredFailures)
} else {
recoveredFailures += e
}
newExchangeFinder = false
continue
}
private fun recover(
e: IOException,
call: RealCall,
userRequest: Request,
requestSendStarted: Boolean
): Boolean {
// The application layer has forbidden retries.
if (!client.retryOnConnectionFailure) return false
// We can't send the request body again.
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
// This exception is fatal.
if (!isRecoverable(e, requestSendStarted)) return false
// No more routes to attempt.
if (!call.retryAfterFailure()) return false
// For failure recovery, use the same route selector with a new connection.
return true
}
在處理過程中出現異常且不滿足重試條件的,直接拋出異常,不再重試;
- 重定向
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = call.interceptorScopedExchange
// 根據狀態碼,生成一個實例,如果不是重定向,則返回null
val followUp = followUpRequest(response, exchange)
// 不是重定向直接返回response
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
call.timeoutEarlyExit()
}
closeActiveExchange = false
return response
}
// 是重定向,但是只能發起一次請求,同樣直接返回response
val followUpBody = followUp.body
if (followUpBody != null && followUpBody.isOneShot()) {
closeActiveExchange = false
return response
}
response.body?.closeQuietly()
// 重定向次數大於20,拋出異常
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
// 進行request的刷新,進入下一次循環
request = followUp
priorResponse = response
BridgeInterceptor
- 根據Request的內容添加請求頭
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
- 獲得Response后保留cookie,並且解碼響應體
val networkResponse = chain.proceed(requestBuilder.build())
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
// 對gzip的響應體進行解碼
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
CacheInterceptor
- 根據Resquest獲取緩存中的Response,如果緩存有效則直接返回;如果無效則向下執行;
val call = chain.call()
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
// CacheStrategy 有兩個字段,networkRequest和cacheResponse
// 當cacheResponse不為空時,緩存有效;
// 當networkRequest不為空時,緩存無效;
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE
if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
cacheCandidate.body?.closeQuietly()
}
// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build().also {
listener.satisfactionFailure(call, it)
}
}
// 緩存有效,直接返回
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build().also {
listener.cacheHit(call, it)
}
}
// 緩存命中記錄
if (cacheResponse != null) {
listener.cacheConditionalHit(call, cacheResponse)
} else if (cache != null) {
listener.cacheMiss(call)
}
- 發起請求,更新緩存
var networkResponse: Response? = null
try {
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response.also {
listener.cacheHit(call, it)
}
} else {
cacheResponse.body?.closeQuietly()
}
}
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response).also {
if (cacheResponse != null) {
// This will log a conditional cache miss only.
listener.cacheMiss(call)
}
}
}
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
ConnectInterceptor
這個攔截器只做了一件事:打開客戶端到服務端的Socket連接;
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
可以看到其關鍵就在initExchange
函數上;
其主要調用棧如下:
RealCall::initExchange
獲得一個Exchange
用來發送請求和接收響應;- 在開始處理請求之前,我們需要
ExchangeFinder::find
來獲取一個對報文編碼和解碼的工具類; - 但是其編碼解碼過程,又依賴於對服務器的連接;
尋找連接的代碼如下:
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
if (call.isCanceled()) throw IOException("Canceled")
// 復用call中connection
val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!
if (callConnection != null) {
var toClose: Socket? = null
synchronized(callConnection) {
if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
toClose = call.releaseConnectionNoEvents()
}
}
// If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here
// because we already acquired it.
if (call.connection != null) {
check(toClose == null)
return callConnection
}
// The call's connection was released.
toClose?.closeQuietly()
eventListener.connectionReleased(call, callConnection)
}
// We need a new connection. Give it fresh stats.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// 嘗試從連接池中獲取
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
// Nothing in the pool. Figure out what route we'll try next.
val routes: List<Route>?
val route: Route
if (nextRouteToTry != null) {
// Use a route from a preceding coalesced connection.
routes = null
route = nextRouteToTry!!
nextRouteToTry = null
} else if (routeSelection != null && routeSelection!!.hasNext()) {
// Use a route from an existing route selection.
routes = null
route = routeSelection!!.next()
} else {
// Compute a new route selection. This is a blocking operation!
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
val localRouteSelection = localRouteSelector.next()
routeSelection = localRouteSelection
routes = localRouteSelection.routes
if (call.isCanceled()) throw IOException("Canceled")
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. We have a better chance of matching thanks to connection coalescing.
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
val result = call.connection!!
eventListener.connectionAcquired(call, result)
return result
}
route = localRouteSelection.next()
}
// Connect. Tell the call about the connecting call so async cancels work.
val newConnection = RealConnection(connectionPool, route)
call.connectionToCancel = newConnection
try {
newConnection.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
} finally {
call.connectionToCancel = null
}
call.client.routeDatabase.connected(newConnection.route())
// If we raced another call connecting to this host, coalesce the connections. This makes for 3
// different lookups in the connection pool!
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
val result = call.connection!!
nextRouteToTry = route
newConnection.socket().closeQuietly()
eventListener.connectionAcquired(call, result)
return result
}
synchronized(newConnection) {
connectionPool.put(newConnection)
call.acquireConnectionNoEvents(newConnection)
}
eventListener.connectionAcquired(call, newConnection)
return newConnection
}
其查找過程如下:
- 嘗試復用
call.connection
; - 嘗試在連接池中查找;
- 更換路由再次在連接池中查找;
- 新建一個
RealConnection
;
networkInterceptors
自定義的攔截器,默認為空元素的列表;
CallServerInterceptor
雖然這是責任鏈的最后一步,但是能講的確實不多,代碼都相對淺顯;
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
還有一些對特殊狀態碼的處理,不再贅述;
多個請求的管理
同步和異步
- 同步請求過程
override fun execute(): Response {
// 對執行狀態進行校驗
check(executed.compareAndSet(false, true)) { "Already Executed" }
// 開始計算超時
timeout.enter()
// 調用請求開始回調
callStart()
try {
// 將call添加到列表,詳細分析見下面
client.dispatcher.executed(this)
// 實際執行請求,上面已詳細講述
return getResponseWithInterceptorChain()
} finally {
// 將call從列表中移除
client.dispatcher.finished(this)
}
}
- 異步請求過程
override fun enqueue(responseCallback: Callback) {
// 對執行狀態進行校驗
check(executed.compareAndSet(false, true)) { "Already Executed" }
// 調用請求開始回調,這個調用感覺有點不對,應該在真正執行請求之前,這里只是在排隊
callStart()
// 將call加入到排隊隊列
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
- Dispatcher
Dispatcher
是Call
的管理者;
下面是它的一些重要字段:
// 最大並發數量
var maxRequests = 64
// 每個域名的最大並發數量
var maxRequestsPerHost = 5
// 線程池,用於執行異步請求
val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
/** 准備執行的異步請求隊列 */
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** 正在執行的異步請求隊列 */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** 正在執行的同步請求隊列 */
private val runningSyncCalls = ArrayDeque<RealCall>()
重要方法:
// 將異步請求添加到排隊隊列
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
// 復用RealCall
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
// 開始隊列處理
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
// 確認當前線程沒有被鎖
this.assertThreadDoesntHoldLock()
// 此次准備執行的異步請求列表
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
// 大於閾值,停止添加
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
// 大於閾值,跳過這個請求
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
// 從排隊隊列中移除
i.remove()
asyncCall.callsPerHost.incrementAndGet()
// 添加到新的隊列
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
// 一個一個地丟到線程池里執行
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}