前言
OkHttp
是個人使用的比較多的網絡請求庫,但是一直沒有探究它的實現原理,這次就對OkHttp
的源碼進行分析,探究其實現原理。
分析的okhttp
源碼版本:4.9.2
。
基本使用
GET
同步地發起請求,會阻塞線程,不能直接在主線程當中調用
private fun getData() {
thread {
val client: OkHttpClient = OkHttpClient()
val request: Request = Request.Builder()
.get()
.url("https://www.baidu.com")
.build()
val response: Response = client.newCall(request).execute()
val data: String? = response.body?.string()
Log.d(TAG, "OkHttp 同步請求:$data")
}
}
獲取OkHttpClient
實例,有兩種方法,一是像上面代碼一樣,直接new
一個OkHttpClient
對象;二是new
一個OkHttpClient
的Builder
對象,可以對client
配置一些參數,如下
val client: OkHttpClient = OkHttpClient.Builder().build()
異步地發起請求,會自動切換線程
private fun getData() {
val client: OkHttpClient = OkHttpClient()
val request: Request = Request.Builder()
.get()
.url("https://www.baidu.com")
.build()
client.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
}
override fun onResponse(call: Call, response: Response) {
val data: String? = response.body?.string()
Log.d(TAG, "OkHttp 同步請求:$data")
}
})
}
POST
POST請求,相比於GET請求,多了一個構造RequestBody
的步驟,並且請求方法要從get
改為post
。
同步請求,會阻塞線程,不能直接在主線程當中調用
private fun getData() {
thread{
val client: OkHttpClient = OkHttpClient()
val requestBody: RequestBody = FormBody.Builder()
.add("username", "EDGNB")
.add("password", "123456")
.build()
val request: Request = Request.Builder()
.url("https://www.wanandroid.com/user/login")
.post(requestBody)
.build()
val response: Response = client.newCall(request).execute()
val data: String? = response.body?.string()
Log.d(TAG, "OkHttp 同步請求:$data")
}
}
異步請求
private fun getData() {
val client: OkHttpClient = OkHttpClient()
val requestBody: RequestBody = FormBody.Builder()
.add("username", "EDGNB")
.add("password", "123456")
.build()
val request: Request = Request.Builder()
.url("https://www.wanandroid.com/user/login")
.post(requestBody)
.build()
client.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
}
override fun onResponse(call: Call, response: Response) {
val data: String? = response.body?.string()
Log.d(TAG, "OkHttp 同步請求:$data")
}
})
}
對象的創建
OkHttpClient
和Request
的構造均使用到了建造者Builder
模式,調用者可直接通過鏈式調用對這些對象進行自定義初始化。Request
類和OkHttpClient
類的本質都是一個描述對象。
OkHttpClient
前面提到,構造一個OkHttpClient
有兩種方式
// 方式一
val client: OkHttpClient = OkHttpClient()
// 方式二
val client: OkHttpClient = OkHttpClient.Builder().build()
我們查看方式一,它調用了下面的構造方法
constructor() : this(Builder())
構造了一個Builder
實例,並調用了主構造函數,OkHttpClient
的成員變量會利用構造的Builder
,進行初始化
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
val dispatcher: Dispatcher = builder.dispatcher
val connectionPool: ConnectionPool = builder.connectionPool
val interceptors: List<Interceptor> =
builder.interceptors.toImmutableList()
...
}
OkHttpClient.Builder
定義的屬性如下
class Builder constructor() {
// 調度器
internal var dispatcher: Dispatcher = Dispatcher()
// 連接池
internal var connectionPool: ConnectionPool = ConnectionPool()
// 整體流程攔截器
internal val interceptors: MutableList<Interceptor> = mutableListOf()
// 網絡請求攔截器
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
// 流程監聽器
internal var eventListenerFactory: EventListener.Factory =
EventListener.NONE.asFactory()
// 連接失敗是否自動重試
internal var retryOnConnectionFailure = true
// 服務器認證設置
internal var authenticator: Authenticator = Authenticator.NONE
// http是否重定向
internal var followRedirects = true
// 是否可以從HTTP重定向到HTTPS
internal var followSslRedirects = true
// Cookie策略,是否保存Cookie
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
// 緩存配置
internal var cache: Cache? = null
// Dns配置
internal var dns: Dns = Dns.SYSTEM
// 代理
internal var proxy: Proxy? = null
// 代理選擇器
internal var proxySelector: ProxySelector? = null
// 代理服務器認證設置
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
// socket工廠
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
// sslSocket工廠 用於https
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
// 證書信息管理
internal var x509TrustManagerOrNull: X509TrustManager? = null
// 傳輸層版本和連接協議 TLS等
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
// 支持的協議
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
// 主機名校驗
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
// 證書鏈
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
// 證書確認
internal var certificateChainCleaner: CertificateChainCleaner? = null
// 調用超時時間( 0的含義? )
internal var callTimeout = 0
// 連接超時時間
internal var connectTimeout = 10_000
// 讀取超時時間
internal var readTimeout = 10_000
// 寫入超時時間
internal var writeTimeout = 10_000
// 針對HTTP2和web socket的ping間隔
internal var pingInterval = 0
internal var minWebSocketMessageToCompress =
RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
// 路由數據庫
internal var routeDatabase: RouteDatabase? = null
...
}
Request
Request
用於描述單次請求的參數信息,使用方法如下
val request: Request = Request.Builder()
.get()
.url("https://www.baidu.com")
.build()
在Request.Builder
的build
方法中,利用自身保存的屬性,去構造一個Request
對象
open fun build(): Request {
return Request(
checkNotNull(url) { "url == null" },
method,
headers.build(),
body,
tags.toImmutableMap()
)
}
Request.Builder
有如下屬性
open class Builder {
// 請求url
internal var url: HttpUrl? = null
// 請求方法
internal var method: String
// 請求頭
internal var headers: Headers.Builder
// 請求體
internal var body: RequestBody? = null
/** A mutable map of tags, or an immutable empty map if we don't have any. */
// 請求tag
internal var tags: MutableMap<Class<*>, Any> = mutableMapOf()
...
}
發起請求
無論是同步請求
val response: Response = client.newCall(request).execute()
還是異步請求
client.newCall(request).enqueue(...)
都需要先調用OkHttpClient
的newCall
方法,該方法如下
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket =
false)
該方法返回一個RealCall
對象,定義如下
class RealCall(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {...}
它是Call
接口的唯一實現類,內部保存了
- client:也就是前面的
OkHttpClient
- originalRequest:我們前面創建的
Request
,用於描述單次請求的參數信息 - forWebSocket:是否使用
Web Socket
,默認值為false
RealCall
包裝了Request
和OKHttpClient
這兩個類的實例,使得后面的方法中可以很方便的使用這兩者。
插敘:ArrayDeque
我們后面會使用到ArrayDeque
,這里先簡單介紹ArrayDeque
是什么,以及它的作用
ArrayDeque
是JDK
的一個容器,它有隊列和棧的性質的抽象數據類型,繼承結構如下

在Java
中,我們使用棧一般會使用Stack
類,而隊列的話,Java
提供了Queue
接口,我們可以使用LinkedList
來實現隊列的功能(LinkedList
實現了Deque
接口,Deque
繼承了Queue
接口)。
從ArrayDeque
繼承結構可以看出,它實現了Deque
接口,因此它也有隊列的功能,在ArrayDeque
類中有這樣的一段注釋
/*
* This class is likely to be faster than
* {@link Stack} when used as a stack, and faster than {@link LinkedList}
* when used as a queue.
* /
它表達了兩層意思:
- 當作為「棧」來使用的時候,它的性能比
Stack
更快 - 當作為「隊列」來使用的時候,它的性能比
LinkedList
更快
在Stack
類的注釋中,我們也可以看到它推薦我們使用ArrayDeque
:
/*
* <p>A more complete and consistent set of LIFO stack operations is
* provided by the {@link Deque} interface and its implementations, which
* should be used in preference to this class. For example:
* <pre> {@code
* Deque<Integer> stack = new ArrayDeque<Integer>();}</pre>
*/
ArrayDeque
實現了Deque
接口,Deque
是double-ended queue
的縮寫,意為「雙端隊列」,什么意思呢?我們查看Deque
定義了哪些方法
public interface Deque<E> extends Queue<E> {
/**
* 在 首、尾 添加元素
*/
void addFirst(E e);
void addLast(E e);
boolean offerFirst(E e);
boolean offerLast(E e);
/**
* 移除並返回 首、尾 的元素
*/
E removeFirst();
E removeLast();
E pollFirst();
E pollLast();
/**
* 返回(但不移除) 首、尾 的元素
*/
E getFirst();
E getLast();
E peekFirst();
E peekLast();
...
}
我們在Deque
的「首、尾」兩端都可以做元素的「插入、刪除」操作,比如
- 我們在雙端隊列的隊頭插入,隊頭彈出,那么它就可以作為一個棧來使用
- 我們在雙端隊列的隊頭插入,隊尾彈出,那么它就可以作為一個隊列來使用
或者在雙端隊列的隊尾插入,隊頭彈出,也是可以作為一個隊列來使用
從ArrayDeque
類的注釋中,我們還可以得到下面有關ArrayDeque
的信息
- 沒有容量限制,根據實際需要自動擴容
- 是線程不安全的
- 不允許放入
null
的元素
另外,ArrayDeque
內部是使用循環數組來實現的。
同步請求
同步請求要調用RealCall
的execute
方法,定義如下
override fun execute(): Response {
// 使用CAS,保證這個RealCall對象只發起一次請求
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
// 將請求添加到任務隊列當中
client.dispatcher.executed(this)
// 通過一系列攔截器的請求處理和響應處理,獲取返回結果,並return
return getResponseWithInterceptorChain()
} finally {
// 不論是否成功,通知Dispatcher請求完成
client.dispatcher.finished(this)
}
}
我們首先調用了OkHttpClient
的調度器dispatcher
的executed
方法,來將請求添加到任務隊列當中,Dispatcher::executed
如下
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
runningSyncCalls
是一個ArrayDeque
,定義如下
private val runningSyncCalls = ArrayDeque<RealCall>()
前面我們已經介紹過ArrayDeque
了,executed
方法調用了runningSyncCalls
的add
方法,我們注意到executed
方法使用了同步鎖,使用同步鎖的原因是因為ArrayDeque
是線程不安全的。
為什么在OkHttp
中選擇ArrayDeque
這種容器作為任務隊列?
答:因為ArrayDeque
效率高,在「插敘:ArrayDeque」中,我們提到ArrayDeque
可以作為棧和隊列,並且性能優於Stack
、LinkedList
。
client.dispatcher.executed(this)
所做的事情,就是將這個請求添加到一個雙端隊列中。
回到RealCallback
的execute
方法中,又調用了getResponseWithInterceptorChain
方法,該方法如下
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
// 按順序添加一系列的攔截器
val interceptors = mutableListOf<Interceptor>()
// 添加用戶自定義攔截器
interceptors += client.interceptors
// 添加重定向攔截器
interceptors += RetryAndFollowUpInterceptor(client)
// 橋攔截器
interceptors += BridgeInterceptor(client.cookieJar)
// 添加緩存攔截器,從緩存中拿數據
interceptors += CacheInterceptor(client.cache)
// 添加網絡連接攔截器,負責建立網絡連接
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
// 服務器請求攔截器,負責向服務器發送請求數據、從服務器讀取響應數據
interceptors += CallServerInterceptor(forWebSocket)
// 構建一條責任鏈,注意前面構建的攔截器列表interceptors作為參數傳入
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
// 執行責任鏈,得到請求的響應
val response = chain.proceed(originalRequest)
// 判斷請求是否取消,若沒有取消則返回響應
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
getResponseWithInterceptorChain
方法會按一定的順序構建攔截器列表interceptors
,接着利用interceptors
創建RealInterceptorChain
對象,該對象是一條責任鏈,使用了責任鏈模式,攔截器會按順序依次調用,每當一個攔截器執行完畢之后會調用下一個攔截器或者不調用並返回結果,我們最終拿到的響應就是這個鏈條執行之后返回的結果。當我們自定義一個攔截器的時候,也會被加入到這個攔截器鏈條里。
最后在RealCallback
的execute
方法中調用client.dispatcher.finished(this)
,通知dispatcher
請求完成。
這就是一個同步請求發起與響應的過程,關於Dispatcher
和攔截器責任鏈是如何工作的,稍后分析。
小結
發起一個同步請求的語句是
client.newCall(request).execute()
- 首先調用
OkHttpClient
的newCall
方法,傳入Request
,返回一個RealCall
對象 - 調用
RealCall
對象的execute
方法。在該方法中,先調用OkHttpClient
的dispatcher
的executed
方法,將請求加入dispatcher
中的雙端隊列中;再調用getResponseWithInterceptorChain()
,通過攔截器責任鏈獲取響應,最后通知dispatcher
請求完成。
異步請求
發起異步請求的方法
client.newCall(request).enqueue(object : Callback {...})
這里也是構造一個RealCall
對象,然后調用RealCall
對象的enqueue
方法,傳入Callback
。
我們查看RealCall
的enqueue
方法
override fun enqueue(responseCallback: Callback) {
// 使用CAS,保證這個RealCall對象只發起一次請求
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
// 創建AsyncCall對象,然后通過dispatcher分發任務
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
這里創建了一個AsyncCall
對象,構造參數是用戶傳入的Callback
,然后調用OkHttpClient
的dispatcher
的enqueue
方法,傳入我們創建的AsyncCall
對象。
Dispatcher
的enqueue
方法如下
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
// 將AsyncCall添加到請求隊列中
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running
// call to the same host.
if (!call.call.forWebSocket) {
// 尋找同一host的Call
val existingCall = findExistingCallWithHost(call.host)
// 復用Call的callsPerHost
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
// 嘗試執行等待隊列中的任務
promoteAndExecute()
}
readyAsyncCalls
同樣是一個雙端隊列,其類型為ArrayDeque<AsyncCall>()
,注意,同步任務是添加到runningSyncCalls
當中,異步任務是添加到readyAsyncCalls
當中。
Dispatcher::promoteAndExecute
如下
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
// 創建一個待執行任務列表
val executableCalls = mutableListOf<AsyncCall>()
// 標識是否有任務在執行
val isRunning: Boolean
synchronized(this) {
// 前面就是將異步任務AsyncCall添加到readyAsyncCalls當中
val i = readyAsyncCalls.iterator()
// 遍歷異步任務等待隊列
while (i.hasNext()) {
val asyncCall = i.next()
// 當前總請求數沒有達到上限
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
// 當前同一主機名請求數沒有達到上限
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
// 從等待隊列中移除AsyncCall
i.remove()
// 同一主機名請求數+1
asyncCall.callsPerHost.incrementAndGet()
// 將該請求添加到待執行任務列表
executableCalls.add(asyncCall)
// 將該請求添加到執行任務隊列
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
// 遍歷待執行任務列表,傳入線程池執行任務
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
到現在為止,我們接觸到了三個隊列,在這里區分一下
runningSyncCalls
:類型是ArrayDeque<RealCall>
,「同步任務」的執行隊列readyAsyncCalls
:類型是ArrayDeque<AsyncCall>
,「異步任務」的等待隊列runningAsyncCalls
:類型是ArrayDeque<AsyncCall>
,「異步任務」的執行隊列
它們都是ArrayDeque
,雙端隊列。
AsyncCall
表示一個異步任務,它是RealCall
的一個內部類,並且實現了Runnable
接口,定義如下
internal inner class AsyncCall(
// responseCallback就是用戶傳入的獲取處理結果的回調
private val responseCallback: Callback
) : Runnable {
...
/**
* Attempt to enqueue this async call on [executorService]. This will attempt to
* clean up if the executor has been shut down by reporting the call as failed.
*/
// 對外暴露的接口,通過傳入一個線程池來執行該任務
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
// 嘗試在線程池中執行該任務
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
// 線程池拒絕執行該任務,拋出異常
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
// 回調onFailure方法,通知用戶執行失敗
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
// 通知dispatcher請求完成
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
// 通過攔截器責任鏈獲取請求響應
val response = getResponseWithInterceptorChain()
signalledCallback = true
// 通過回調,返回響應結果給用戶
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
// 出現異常,回調onFailure方法,通知用戶執行失敗
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
// 出現異常,回調onFailure方法,通知用戶執行失敗
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
// 通知dispatcher請求完成
client.dispatcher.finished(this)
}
}
}
}
異步線程池
異步請求在線程池中執行,我們向AsyncCall
的executeOn
方法傳入的參數就是「異步任務執行的線程池」,傳入的參數是Dispatcher
的executorService
,executorService
對應的線程池可以在構造Dispatcher
的時候指定,若用戶沒有指定線程池,那么默認是使用下面的線程池
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
構造線程池的幾個參數如下
- 核心線程數
corePoolSize
:表示線程池中空閑線程也不會被回收的數量,這里設置為0說明線程池中所有空閑線程都會被回收。 - 最大線程數
maximumPoolSize
:線程池最大支持創建的線程數,這里設置為Int.MAX_VALUE
。 - 線程存活時間
keepAliveTime
:非核心線程空閑后超過該時間就會被回收,這里設置為60個時間單位(60s)。 - 時間單位
TimeUnit
:空閑線程存活的時間單位,這里設置為秒TimeUnit.SECONDS
。 - 線程池中的任務隊列
workQueue
:該隊列主要用來存儲已經被提交但是尚未執行的任務,這里設置為SynchronousQueue()
。 - 線程工廠
ThreadFactory
:線程的創建工廠,這個線程工廠指定了創建的線程的「名字」,以及創建的線程「非守護線程」。
對於上面指定的參數,我們思考下面的幾個問題:
為什么要選用SynchronousQueue作為任務隊列?
首先我們要了解一下SynchronousQueue
是什么,它是一個隊列,但它內部不存在任何的容器,它是一種經典的生產者-消費者模型,它有多個生產者和消費者,當一個生產線程進行生產操作(put)時,若沒有消費者線程進行消費(take),那么該線程會阻塞,直到有消費者進行消費。也就是說,它僅僅實現了一個傳遞的操作,這種傳遞功能由於沒有了中間的放入容器,再從容器中取出的過程,因此是一種快速傳遞元素的方式,這對於我們網絡請求這種高頻請求來說,是十分合適的。關於 SynchronousQueue
可以看這篇文章:java並發之SynchronousQueue實現原理_-CSDN博客.
線程池的最大線程數不受限制,可以無限制的進行並發網絡請求嗎?
在OkHttp
的設計中,線程個數的維護工作不再交給線程池,而是由Dispatcher
實現,通過外部設置的maxRequests
以及maxRequestsPerHost
來調整異步請求的等待隊列以及執行隊列,從而實現對線程最大數量的控制。Dispatcher
具體實現將在后面分析。
小結
發起一個異步請求的語句是
client.newCall(request).enqueue(object : Callback {...})
- 首先調用
OkHttpClient
的newCall
方法,傳入Request
,返回一個RealCall
對象 - 調用
RealCall
對象的enqueue
方法,並傳入結果回調Callback
- 在
ReallCall::enqueue
中,利用Callback
構造了一個AsyncCall
對象,並調用了OkHttpClient
的dispatcher
的enqueue
方法,將AsyncCall
作為參數傳入 - 在
Dispatcher::enqueue
中,先將AsyncCall
添加到請求隊列當中,然后調用自己的promoteAndExecute
方法,嘗試執行等待隊列中的任務 Dispatcher::promoteAndExecute
方法會遍歷異步任務的等待隊列,然后嘗試在線程池中去執行這些異步任務
異步任務的執行,也是調用getResponseWithInterceptorChain
方法,通過攔截器責任鏈獲取請求的響應,這一點和「同步請求」是一樣的。
Dispatcher
前面多次出現了調度器Dispatcher
的身影,這里對它做一個總結。
維護的隊列
Dispatcher
中維護了三個隊列:
runningSyncCalls
:類型是ArrayDeque<RealCall>
,「同步任務」的執行隊列readyAsyncCalls
:類型是ArrayDeque<AsyncCall>
,「異步任務」的等待隊列runningAsyncCalls
:類型是ArrayDeque<AsyncCall>
,「異步任務」的執行隊列
請求相關
同步請求中,RealCall
的execute
方法會調用到Dispatcher
的executed
方法,將「同步任務」保存到runningSyncCalls
中
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
異步請求中,RealCall
的enqueue
方法會調用Dispatcher
的enqueue
方法
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
// 將AsyncCall添加到請求隊列中
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running
// call to the same host.
if (!call.call.forWebSocket) {
// 尋找同一host的Call
val existingCall = findExistingCallWithHost(call.host)
// 復用Call的callsPerHost
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
// 嘗試執行等待隊列中的任務
promoteAndExecute()
}
在Dispatcher
的enqueue
方法中,首先將「異步任務」添加到readyAsyncCalls
中,然后調用promoteAndExecute()
方法,該方法會遍歷readyAsyncCalls
,尋找能夠執行的AsyncCall
,能夠執行的條件是「當前總請求數」以及「同一主機名的請求數」都沒有達到上限,最后調用能夠執行的AsyncCall
的executeOn
方法,在Dispatcher
提供的線程池中執行異步任務。
通知完成
前面我們已經知道,不管是同步請求,還是異步請求,在請求完成后,都會調用Dispatcher
的finished
方法,通知Dispatcher
請求已執行完畢。我們前面沒有展開分析Dispatcher
被通知后會做些什么,這里就對它進行分析。
「同步請求」完成后會調用Dispatcher
下面的方法
/** Used by [Call.execute] to signal completion. */
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
「異步請求」完成后會調用Dispatcher
下面的方法
/** Used by [AsyncCall.run] to signal completion. */
internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
「異步請求」調用的finished
比「同步請求」調用的finished
多了一個callsPerHost
減1的操作,也就是「主機名請求數」減1的操作。接着,它們都會調用到Dispatcher
下面的這個重載方法
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
// 將「同步任務/異步任務」從「執行隊列」當中移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
// 嘗試執行「等待隊列」中的「異步任務」
val isRunning = promoteAndExecute()
// isRunning:在「執行隊列」中有「同步/異步任務」待執行
// idleCallback:Dispatcher空閑時的回調
// 若「執行隊列」中沒有待執行的任務,表明當前的Dispatcher是空閑的,這時候,如果idleCallback
// 不為null,就執行idleCallback的run方法
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
小結
Dispatcher
在 網絡請求發起過程中的總結:
Dispatcher
中維護三個任務隊列,實現方式都為雙端隊列,分別為同步請求執行隊列,異步請求等待隊列,異步請求執行隊列。- 當發起一個同步/異步請求,就會將該請求添加到同步請求執行隊列/異步請求等待隊列中,然后嘗試執行這些請求。
- 請求執行完畢就會將這些請求從隊列中移除,若后面沒有需要執行的任務后,調用
idleCallback.run()
執行一些空閑任務。 - 異步請求執行有兩個入口:一是添加了一個異步請求,二是一個請求執行完畢。
請求時序圖
同步請求的時序圖:

異步請求的時序圖:

請求響應
無論是同步請求,還是異步請求,最終都會調用到RealCall
的getResponseWithInterceptorChain
方法,通過該方法可以拿到請求的響應。關於getResponseWithInterceptorChain
方法,我們需要探究兩個關鍵的點
- 核心機制:攔截器機制
- 設計模式:在攔截器中如何運用責任鏈模式
什么是責任鏈模式?在典型的責任鏈設計模式里,很多對象由每一個對象對其下級的引用而連接起來形成一條鏈。請求在這個鏈上傳遞,直到鏈上的某一個對象決定處理此請求。發出這個請求的客戶端並不知道鏈上的哪一個對象最終處理這個請求,這使得系統可以在不影響客戶端的情況下動態地重新組織和分配責任。
我們回顧下getResponseWithInterceptorChain
方法
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
// 按順序添加一系列的攔截器
val interceptors = mutableListOf<Interceptor>()
// 添加用戶自定義攔截器
interceptors += client.interceptors
// 添加重定向攔截器
interceptors += RetryAndFollowUpInterceptor(client)
// 橋攔截器
interceptors += BridgeInterceptor(client.cookieJar)
// 添加緩存攔截器,從緩存中拿數據
interceptors += CacheInterceptor(client.cache)
// 添加網絡連接攔截器,負責建立網絡連接
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
// 服務器請求攔截器,負責向服務器發送請求數據、從服務器讀取響應數據
interceptors += CallServerInterceptor(forWebSocket)
// 構建一條責任鏈,注意前面構建的攔截器列表interceptors作為參數傳入
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
// 執行責任鏈,得到請求的響應
val response = chain.proceed(originalRequest)
// 判斷請求是否取消,若沒有取消則返回響應
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
該方法會按一定的順序構建攔截器列表interceptors
,接着利用interceptors
創建RealInterceptorChain
對象,該對象是一條責任鏈,使用了責任鏈模式,攔截器會按順序依次調用,每當一個攔截器執行完畢之后會調用下一個攔截器或者不調用並返回結果,我們最終拿到的響應就是這個鏈條執行之后返回的結果。當我們自定義一個攔截器的時候,也會被加入到這個攔截器鏈條里。
Interceptor
我們先看下攔截器Interceptor
的定義
fun interface Interceptor {
@Throws(IOException::class)
fun intercept(chain: Chain): Response
companion object {
// 利用lambda,創建一個攔截器
inline operator fun invoke(crossinline block: (chain: Chain) -> Response):
Interceptor = Interceptor { block(it) }
}
interface Chain {
fun request(): Request
@Throws(IOException::class)
fun proceed(request: Request): Response
fun connection(): Connection?
fun call(): Call
fun connectTimeoutMillis(): Int
fun withConnectTimeout(timeout: Int, unit: TimeUnit): Chain
fun readTimeoutMillis(): Int
fun withReadTimeout(timeout: Int, unit: TimeUnit): Chain
fun writeTimeoutMillis(): Int
fun withWriteTimeout(timeout: Int, unit: TimeUnit): Chain
}
}
該接口里面定義了intercept
方法,還有一個Chain
接口,攔截器鏈RealInterceptorChain
就是實現了Chain
接口。
雖然不同的攔截器實現不同,但是它們往往具有相同的結構:
@Throws(IOException::class)
override fun intercept(chain: Chain): Response {
// 獲取Request
val request = chain.request
// 處理:Request階段,攔截器對Request進行處理
// 調用RealInterceptorChain的proceed方法,在該方法里面,會遞歸調用下一個攔截器的intercept
// 方法,拿到下一個攔截器的response
val response = (chain as RealInterceptorChain).proceed(request)
// 處理:Response階段,完成了該攔截器獲取Response的過程,將Response返回到上一個攔截器中
return response
}
攔截器鏈的整體執行流程
我們先來看一個生活中的例子,以便更好地理解「攔截器鏈」是如何工作的。下面是一個流水線生產的例子
對應到OkHttp
的響應過程就是:包裝了Request
的Chain
遞歸的從每個Interceptor
手中走過去,最后請求網絡得到的Response
又會逆序的從每個Interceptor
走回來,把Response
返回到開發者手中。
我們來看看攔截器鏈RealInterceptorChain
的工作流程
class RealInterceptorChain(
internal val call: RealCall,
private val interceptors: List<Interceptor>,
private val index: Int,
internal val exchange: Exchange?,
internal val request: Request,
internal val connectTimeoutMillis: Int,
internal val readTimeoutMillis: Int,
internal val writeTimeoutMillis: Int
) : Interceptor.Chain {
...
internal fun copy(
index: Int = this.index,
exchange: Exchange? = this.exchange,
request: Request = this.request,
connectTimeoutMillis: Int = this.connectTimeoutMillis,
readTimeoutMillis: Int = this.readTimeoutMillis,
writeTimeoutMillis: Int = this.writeTimeoutMillis
) = RealInterceptorChain(call, interceptors, index, exchange, request, connectTimeoutMillis,
readTimeoutMillis, writeTimeoutMillis)
...
@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
...
// Call the next interceptor in the chain.
// copy出一個攔截器鏈,並且將index設置為(index+1),便於之后「調用責任鏈的下一個攔截器」
val next = copy(index = index + 1, request = request)
// 獲取當前的index對應的攔截器
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
// 執行當前攔截器的intercept方法
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
...
return response
}
}
要很好地理解攔截器鏈的工作流程,需要將RealInterceptorChain
的proceed
方法與前面攔截器的intercept
方法結合起來看。我們理一下它們的工作流程,其中RealInterceptorChain
簡稱為Chain
- 在
RealCall
的getResponseWithInterceptorChain
方法中,先調用index
為0的Chain
的proceed
方法,並傳入originalRequest
- 在
proceed
方法里,copy
出一個Chain(index+1(1),request)
,然后調用當前index(0)
的Interceptor
的intercept
方法,並且傳入新copy
出來的Chain
- 在
Interceptor
的intercept
方法,從Chain
當中獲取request
,然后對request
添加一些自己的處理,接着調用Chain
的proceed
方法,傳入處理后的request
,等待返回response
- 在
proceed
方法里,copy
出一個Chain(index+1(2),request)
,然后調用當前index(1)
的Interceptor
的intercept
方法,並且傳入新copy
出來的Chain
- 在
Interceptor
的intercept
方法,從Chain
當中獲取request
,然后對request
添加一些自己的處理,接着調用Chain
的proceed
方法,傳入處理后的request
,等待返回response
- 在
proceed
方法里,copy
出一個Chain(index+1(3),request)
,然后調用當前index(2)
的Interceptor
的intercept
方法,並且傳入新copy
出來的Chain
- ......
- 最后一個
Interceptor
的intercept
方法處理完后,向上返回處理后的Response
- 倒數第二個
Interceptor
在intercept
方法中,對返回的Response
進行一些加工后向上返回 - 倒數第三個
Interceptor
在intercept
方法中,對返回的Response
進行一些加工后向上返回 - ......
- 最終在
RealCall
的getResponseWithInterceptorChain
方法中,獲取到最終Interceptor
處理完后的Response
,然后將該Response
傳給用戶,通知用戶請求成功完成。
在copy
攔截器鏈的時候,下標index
設置為index+1
就是為了讓攔截器責任鏈能准備地從攔截器容器中取出下一個攔截器進行處理。
整個工作過程就是,當前攔截器對Request
處理完畢后,通過調用傳入的責任鏈的 proceed
方法,將請求交給下一個攔截器處理,然后獲得下一個攔截器返回的Response
,再對Response
處理后交給上一個攔截器。
顯然在這里使用了一種遞歸設計,請求從上往下交付,每個攔截器拿到請求都可以對請求做出相應處理,然后交給下一個攔截器,最后一個攔截器處理請求拿到響應后,將響應從下往上交付,每個攔截器拿到響應后再對響應做出響應處理,然后交給上一個攔截器。若中間的過程中若出現了錯誤,則通過拋出異常來通知上層。
「中間的某個攔截器」也可以選擇不將請求Request
交給下一級攔截器處理,這時候「該攔截器」直接返回Response
給上一級的攔截器即可。
Interceptor順序
從RealCall
的getResponseWithInterceptorChain
方法中,我們可以看出OkHttp
預置了哪些Interceptor
、用戶可以自定義哪些Interceptor
,以及這些Interceptor
之間的順序
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
...
}
在OkHttp
中預置了下面幾種Interceptor
:
RetryAndFollowUpInterceptor
:負責實現重定向功能BridgeInterceptor
:將用戶構造的請求轉換為向服務器發送的請求,將服務器返回的響應轉換為對用戶友好的響應CacheInterceptor
:讀取緩存、更新緩存ConnectInterceptor
:建立與服務器的連接CallServerInterceptor
:從服務器讀取響應
可以看出,整個網絡請求的過程由各個攔截器互相配合從而實現,通過這種攔截器的機制,可以很方便地調節網絡請求的過程及先后順序,同時也能夠很方便地使用戶對其進行擴展。
其中用戶可以在兩個時機插入 Interceptor
:
- 網絡請求前后:通過
OkHttpClient.Builder.addInterceptor
方法添加 - 讀取響應前后:通過
OkHttpClient.Builder.addNetworkInterceptor
方法添加
其整體流程如圖所示:
總結
OkHttp
采用了攔截器機制和責任鏈模式,它使用多個負責不同功能的攔截器,並將這些攔截器通過責任鏈連接在一起,采用遞歸的方式進行調用,每一層的攔截器在請求前和響應后都可以對 請求/響應 做出不同的處理,通過各個攔截器的協調合作,完成了網絡請求的響應過程。