OkHttp是一個精巧的網絡請求庫,有如下特性:
1)支持http2,對一台機器的所有請求共享同一個socket
2)內置連接池,支持連接復用,減少延遲
3)支持透明的gzip壓縮響應體
4)通過緩存避免重復的請求
5)請求失敗時自動重試主機的其他ip,自動重定向
6)好用的API
其本身就是一個很強大的庫,再加上Retrofit2、Picasso的這一套組合拳,使其愈發的受到開發者的關注。本篇博客,我將對Okhttp3進行分析(源碼基於Okhttp3.4)。
如何引入Okhttp3?
配置Okhttp3非常簡單,只需要在Android Studio 的gradle進行如下的配置:
compile 'com.squareup.okhttp3:okhttp:3.4.1'
* 1
添加網絡權限:
<uses-permission android:name="android.permission.INTERNET"/>
* 1
Okhttp3的基本使用
okHttp的get請求
okHttp的一般使用如下,okHttp默認使用的就是get請求
String url = "http://write.blog.csdn.net/postlist/0/0/enabled/1";
mHttpClient = new OkHttpClient();
Request request = new Request.Builder().url(url).build();
okhttp3.Response response = null;
try {
response = mHttpClient.newCall(request).execute();
String json = response.body().string();
Log.d("okHttp",json);
} catch (IOException e) {
e.printStackTrace();
}
}
我們試着將數據在logcat進行打印,發現會報錯,原因就是不能在主線程中進行耗時的操作

說明mHttpClient.newCall(request).execute()是同步的,那有沒有異步的方法呢,答案是肯定的,就是mHttpClient.newCall(request).enqueue()方法,里面需要new一個callback我們對代碼進行修改,如下
public void requestBlog() {
String url = "http://write.blog.csdn.net/postlist/0/0/enabled/1";
mHttpClient = new OkHttpClient();
Request request = new Request.Builder().url(url).build();
/* okhttp3.Response response = null;*/
/*response = mHttpClient.newCall(request).execute();*/
mHttpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
String json = response.body().string();
Log.d("okHttp", json);
}
});
}

Okhttp的POST請求
POST提交Json數據
private void postJson() throws IOException {
String url = "http://write.blog.csdn.net/postlist/0/0/enabled/1";
String json = "haha";
OkHttpClient client = new OkHttpClient();
RequestBody body = RequestBody.create(JSON, json);
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.d(TAG, response.body().string());
}
});
}
POST提交鍵值對
很多時候我們會需要通過POST方式把鍵值對數據傳送到服務器。 OkHttp提供了很方便的方式來做這件事情。
private void post(String url, String json) throws IOException {
OkHttpClient client = new OkHttpClient();
RequestBody formBody = new FormBody.Builder()
.add("name", "liming")
.add("school", "beida")
.build();
Request request = new Request.Builder()
.url(url)
.post(formBody)
.build();
Call call = client.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
String str = response.body().string();
Log.i(TAG, str);
}
});
}
異步上傳文件
上傳文件本身也是一個POST請求
定義上傳文件類型
public static final MediaType MEDIA_TYPE_MARKDOWN
= MediaType.parse("text/x-markdown; charset=utf-8");
* 1
* 2
將文件上傳到服務器上:
private void postFile() {
OkHttpClient mOkHttpClient = new OkHttpClient();
File file = new File("/sdcard/demo.txt");
Request request = new Request.Builder()
.url("https://api.github.com/markdown/raw")
.post(RequestBody.create(MEDIA_TYPE_MARKDOWN, file))
.build();
mOkHttpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.i(TAG, response.body().string());
}
});
}
添加如下權限:
<uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE"/>
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/>
提取響應頭
典型的HTTP頭 像是一個 Map
private final OkHttpClient client = new OkHttpClient();
public void run() throws Exception {
Request request = new Request.Builder()
.url("https://api.github.com/repos/square/okhttp/issues")
.header("User-Agent", "OkHttp Headers.java")
.addHeader("Accept", "application/json; q=0.5")
.addHeader("Accept", "application/vnd.github.v3+json")
.build();
Response response = client.newCall(request).execute();
if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
System.out.println("Server: " + response.header("Server"));
System.out.println("Date: " + response.header("Date"));
System.out.println("Vary: " + response.headers("Vary"));
}
Post方式提交String
使用HTTP POST提交請求到服務。這個例子提交了一個markdown文檔到web服務,以HTML方式渲染markdown。因為整個請求體都在內存中,因此避免使用此api提交大文檔(大於1MB)。
private void postString() throws IOException {
OkHttpClient client = new OkHttpClient();
String postBody = ""
+ "Releases\n"
+ "--------\n"
+ "\n"
+ " * zhangfei\n"
+ " * guanyu\n"
+ " * liubei\n";
Request request = new Request.Builder()
.url("https://api.github.com/markdown/raw")
.post(RequestBody.create(MEDIA_TYPE_MARKDOWN, postBody))
.build();
Call call = client.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
System.out.println(response.body().string());
}
});
}
Post方式提交流
以流的方式POST提交請求體。請求體的內容由流寫入產生。這個例子是流直接寫入Okio的BufferedSink。你的程序可能會使用OutputStream,你可以使用BufferedSink.outputStream()來獲取。
public static final MediaType MEDIA_TYPE_MARKDOWN
= MediaType.parse("text/x-markdown; charset=utf-8");
private void postStream() throws IOException {
RequestBody requestBody = new RequestBody() {
@Override
public MediaType contentType() {
return MEDIA_TYPE_MARKDOWN;
}
@Override
public void writeTo(BufferedSink sink) throws IOException {
sink.writeUtf8("Numbers\n");
sink.writeUtf8("-------\n");
for (int i = 2; i <= 997; i++) {
sink.writeUtf8(String.format(" * %s = %s\n", i, factor(i)));
}
}
private String factor(int n) {
for (int i = 2; i < n; i++) {
int x = n / i;
if (x * i == n) return factor(x) + " × " + i;
}
return Integer.toString(n);
}
};
Request request = new Request.Builder()
.url("https://api.github.com/markdown/raw")
.post(requestBody)
.build();
Call call = client.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
System.out.println(response.body().string());
}
});
}
Post方式提交表單
private void postForm() {
OkHttpClient client = new OkHttpClient();
RequestBody formBody = new FormBody.Builder()
.add("search", "Jurassic Park")
.build();
Request request = new Request.Builder()
.url("https://en.wikipedia.org/w/index.php")
.post(formBody)
.build();
Call call = client.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
System.out.println(response.body().string());
}
});
}
Post方式提交分塊請求
MultipartBody 可以構建復雜的請求體,與HTML文件上傳形式兼容。多塊請求體中每塊請求都是一個請求體,可以定義自己的請求頭。這些請求頭可以用來描述這塊請求,例如他的Content-Disposition。如果Content-Length和Content-Type可用的話,他們會被自動添加到請求頭中。
private static final String IMGUR_CLIENT_ID = "...";
private static final MediaType MEDIA_TYPE_PNG = MediaType.parse("image/png");
private void postMultipartBody() {
OkHttpClient client = new OkHttpClient();
// Use the imgur image upload API as documented at https://api.imgur.com/endpoints/image
MultipartBody body = new MultipartBody.Builder("AaB03x")
.setType(MultipartBody.FORM)
.addPart(
Headers.of("Content-Disposition", "form-data; name=\"title\""),
RequestBody.create(null, "Square Logo"))
.addPart(
Headers.of("Content-Disposition", "form-data; name=\"image\""),
RequestBody.create(MEDIA_TYPE_PNG, new File("website/static/logo-square.png")))
.build();
Request request = new Request.Builder()
.header("Authorization", "Client-ID " + IMGUR_CLIENT_ID)
.url("https://api.imgur.com/3/image")
.post(body)
.build();
Call call = client.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
System.out.println(response.body().string());
}
});
}
響應緩存
為了緩存響應,你需要一個你可以讀寫的緩存目錄,和緩存大小的限制。這個緩存目錄應該是私有的,不信任的程序應不能讀取緩存內容。
一個緩存目錄同時擁有多個緩存訪問是錯誤的。大多數程序只需要調用一次new OkHttpClient(),在第一次調用時配置好緩存,然后其他地方只需要調用這個實例就可以了。否則兩個緩存示例互相干擾,破壞響應緩存,而且有可能會導致程序崩潰。
響應緩存使用HTTP頭作為配置。你可以在請求頭中添加Cache-Control: max-stale=3600 ,OkHttp緩存會支持。你的服務通過響應頭確定響應緩存多長時間,例如使用Cache-Control: max-age=9600。
int cacheSize = 10 * 1024 * 1024; // 10 MiB
Cache cache = new Cache(cacheDirectory, cacheSize);
OkHttpClient.Builder builder = new OkHttpClient.Builder();
builder.cache(cache);
OkHttpClient client = builder.build();
Request request = new Request.Builder()
.url("http://publicobject.com/helloworld.txt")
.build();
Call call = client.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
String response1Body = response.body().string();
System.out.println("Response 1 response: " + response);
System.out.println("Response 1 cache response: " + response.cacheResponse());
System.out.println("Response 1 network response: " + response.networkResponse());
}
});
超時
沒有響應時使用超時結束call。沒有響應的原因可能是客戶點鏈接問題、服務器可用性問題或者這之間的其他東西。OkHttp支持連接,讀取和寫入超時。
private void ConfigureTimeouts() {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
OkHttpClient client = builder.build();
client.newBuilder().connectTimeout(10, TimeUnit.SECONDS);
client.newBuilder().readTimeout(10,TimeUnit.SECONDS);
client.newBuilder().writeTimeout(10,TimeUnit.SECONDS);
Request request = new Request.Builder()
.url("http://httpbin.org/delay/2") // This URL is served with a 2 second delay.
.build();
Call call = client.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
System.out.println("Response completed: " + response);
}
});
}
簡單封裝okHttp框架
新建一個工具類OkHttpUtils
OkHttpClient必須是單例的,所以這里我們需要使用到單例設計模式,私有化構造函數,提供一個方法給外界獲取OkHttpUtils實例對象
public class OkHttpUtils {
private static OkHttpUtils mInstance;
private OkHttpClient mHttpClient;
private OkHttpUtils() {
};
public static OkHttpUtils getInstance(){
return mInstance;
}
}
一般網絡請求分為get和post請求兩種,但無論哪種請求都是需要用到request的,所以我們首先封裝一個request,創建一個doRequest方法,在其內先編寫mHttpClient.newCall(request).enqueue(new Callback())相關邏輯
public void doRequest(final Request request){
mHttpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
}
我們需要自定義一個callback,BaseCallback,並將其傳入request方法中
public class BaseCallback {
}
在OkHttpUtils中編寫get和post方法
public void get(String url){
}
public void post(String url,Map<String,Object> param){
}
post方法中構建request對象,這里我們需要創建一個buildRequest方法,用於生成request對象
private Request buildRequest(String url,HttpMethodType methodType,Map<String,Object> params){
return null;
}
這里需要定一個枚舉對象HttpMethodType,用於區分是get還是post
enum HttpMethodType{
GET,
POST,
}
buildRequest方法根據HttpMethodType不同有相應的邏輯處理
private Request buildRequest(String url,HttpMethodType methodType,Map<String,Object> params){
Request.Builder builder = new Request.Builder()
.url(url);
if (methodType == HttpMethodType.POST){
builder.post(body);
}
else if(methodType == HttpMethodType.GET){
builder.get();
}
return builder.build();
}
builder.post()方法中需要一個body,所以我們需要創建一個方法builderFormData()方法用於返回RequestBody,這里內部邏輯后面再進行完善
private RequestBody builderFormData(Map<String,Object> params){
return null;
}
於是buildRequest方法變成了這樣
private Request buildRequest(String url,HttpMethodType methodType,Map<String,Object> params){
Request.Builder builder = new Request.Builder()
.url(url);
if (methodType == HttpMethodType.POST){
RequestBody body = builderFormData(params);
builder.post(body);
}
else if(methodType == HttpMethodType.GET){
builder.get();
}
return builder.build();
}
get方法進行修改:
public void get(String url,BaseCallback callback){
Request request = buildRequest(url,HttpMethodType.GET,null);
doRequest(request,callback);
}
post方法進行修改:
public void post(String url,Map<String,Object> params,BaseCallback callback){
Request request = buildRequest(url,HttpMethodType.POST,params);
doRequest(request,callback);
}
完善builderFormData()方法
private RequestBody builderFormData(Map<String,String> params){
FormBody.Builder builder = new FormBody.Builder();
if(params!=null){
for(Map.Entry<String,String> entry:params.entrySet()){
builder.add(entry.getKey(),entry.getValue());
}
}
return builder.build();
}
BaseCallback中定義一個抽象方法onBeforeRequest,這樣做的理由是我們在加載網絡數據成功前,一般都有進度條等顯示,這個方法就是用來做這些處理的
public abstract class BaseCallback {
public abstract void onBeforeRequest(Request request);
}
OkHttpUtils的doRequest方法增加如下語句:
baseCallback.onBeforeRequest(request);
* 1
BaseCallback中多定義2個抽象方法
public abstract void onFailure(Request request, Exception e) ;
/**
*請求成功時調用此方法
* @param response
*/
public abstract void onResponse(Response response);
由於Response的狀態有多種,比如成功和失敗,所以需要onResponse分解為3個抽象方法
/**
*
* 狀態碼大於200,小於300 時調用此方法
* @param response
* @param t
* @throws
*/
public abstract void onSuccess(Response response,T t) ;
/**
* 狀態碼400,404,403,500等時調用此方法
* @param response
* @param code
* @param e
*/
public abstract void onError(Response response, int code,Exception e) ;
/**
* Token 驗證失敗。狀態碼401,402,403 等時調用此方法
* @param response
* @param code
*/
public abstract void onTokenError(Response response, int code);
response.body.string()方法返回的都是String類型,而我們需要顯示的數據其實是對象,所以我們就想抽取出方法,直接返回對象,由於我們不知道對象的類型是什么,所以我們在BaseCallback中使用范型
public abstract class BaseCallback<T>
* 1
BaseCallback中需要將泛型轉換為Type,所以要聲明Type類型
public Type mType;
* 1
BaseCallback中需要如下一段代碼,將泛型T轉換為Type類型
static Type getSuperclassTypeParameter(Class<?> subclass)
{
Type superclass = subclass.getGenericSuperclass();
if (superclass instanceof Class)
{
throw new RuntimeException("Missing type parameter.");
}
ParameterizedType parameterized = (ParameterizedType) superclass;
return $Gson$Types.canonicalize(parameterized.getActualTypeArguments()[0]);
}
在BaseCallback的構造函數中進行mType進行賦值
public BaseCallback()
{
mType = getSuperclassTypeParameter(getClass());
}
OkHttpUtils中doRequest方法的onFailure與onResponse方法會相應的去調用baseCallback的方法
mHttpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
baseCallback.onFailure(request,e);
}
@Override
public void onResponse(Call call, Response response) throws IOException {
if(response.isSuccessful()) {
baseCallback.onSuccess(response,null);
}else {
baseCallback.onError(response,response.code(),null);
}
/*mGson.fromJson(response.body().string(),baseCallback.mType);*/
}
});
onResponse方法中成功的情況又有區分,根據mType的類型不同有相應的處理邏輯,同時還要考慮Gson解析錯誤的情況
@Override
public void onResponse(Call call, Response response) throws IOException {
if(response.isSuccessful()) {
String resultStr = response.body().string();
if (baseCallback.mType == String.class){
baseCallback.onSuccess(response,resultStr);
}
else {
try {
Object obj = mGson.fromJson(resultStr, baseCallback.mType);
baseCallback.onSuccess(response,obj);
}
catch (com.google.gson.JsonParseException e){ // Json解析的錯誤
baseCallback.onError(response,response.code(),e);
}
}
}else {
baseCallback.onError(response,response.code(),null);
}
}
構造函數中進行一些全局變量的初始化的操作,還有一些超時的設計
private OkHttpUtils() {
mHttpClient = new OkHttpClient();
OkHttpClient.Builder builder = mHttpClient.newBuilder();
builder.connectTimeout(10, TimeUnit.SECONDS);
builder.readTimeout(10,TimeUnit.SECONDS);
builder.writeTimeout(30,TimeUnit.SECONDS);
mGson = new Gson();
};
靜態代碼塊初始化OkHttpUtils對象
static {
mInstance = new OkHttpUtils();
}
在okHttpUtils內,需要創建handler進行UI界面的更新操作,創建callbackSuccess方法
private void callbackSuccess(final BaseCallback callback , final Response response, final Object obj ){
mHandler.post(new Runnable() {
@Override
public void run() {
callback.onSuccess(response, obj);
}
});
}
doRequest方法的onResponse方法也進行相應的改寫
if (baseCallback.mType == String.class){
/*baseCallback.onSuccess(response,resultStr);*/
callbackSuccess(baseCallback,response,resultStr);
}
創建callbackError方法
private void callbackError(final BaseCallback callback, final Response response, final Exception e) {
mHandler.post(new Runnable() {
@Override
public void run() {
callback.onError(response, response.code(), e);
}
});
}
將doRequest方法的onResponse方法中的baseCallback.onError(response,response.code(),e);替換為callbackError(baseCallback,response,e);方法
@Override
public void onResponse(Call call, Response response) throws IOException {
if(response.isSuccessful()) {
String resultStr = response.body().string();
if (baseCallback.mType == String.class){
/*baseCallback.onSuccess(response,resultStr);*/
callbackSuccess(baseCallback,response,resultStr);
}
else {
try {
Object obj = mGson.fromJson(resultStr, baseCallback.mType);
/*baseCallback.onSuccess(response,obj);*/
callbackSuccess(baseCallback,response,obj);
}
catch (com.google.gson.JsonParseException e){ // Json解析的錯誤
/*baseCallback.onError(response,response.code(),e);*/
callbackError(baseCallback,response,e);
}
}
}else {
callbackError(baseCallback,response,null);
/*baseCallback.onError(response,response.code(),null);*/
}
}
至此,我們的封裝基本完成。
OkHttp3源碼分析
請求處理分析
當我們要請求網絡的時候我們需要用OkHttpClient.newCall(request)進行execute或者enqueue操作,當我們調用newCall時:
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
@Override public Call newCall(Request request) {
return new RealCall(this, request);
}
實際返回的是一個RealCall類,我們調用enqueue異步請求網絡實際上是調用了RealCall的enqueue方法:
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
最終的請求是dispatcher來完成的。
Dispatcher任務調度
Dispatcher的本質是異步請求的管理器,控制最大請求並發數和單個主機的最大並發數,並持有一個線程池負責執行異步請求。對同步的請求只是用作統計。他是如何做到控制並發呢,其實原理就在上面的2個execute代碼里面,真正網絡請求執行前后會調用executed和finished方法,而對於AsyncCall的finished方法后,會根據當前並發數目選擇是否執行隊列中等待的AsyncCall。並且如果修改Dispatcher的maxRequests或者maxRequestsPerHost也會觸發這個過程。
Dispatcher主要用於控制並發的請求,它主要維護了以下變量:
/** 最大並發請求數*/
private int maxRequests = 64;
/** 每個主機最大請求數*/
private int maxRequestsPerHost = 5;
/** 消費者線程池 */
private ExecutorService executorService;
/** 將要運行的異步請求隊列 */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/**正在運行的異步請求隊列 */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** 正在運行的同步請求隊列 */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
構造函數
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
Dispatcher有兩個構造函數,可以使用自己設定線程池,如果沒有設定線程池則會在請求網絡前自己創建線程池,這個線程池類似於CachedThreadPool比較適合執行大量的耗時比較少的任務。
異步請求
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
當正在運行的異步請求隊列中的數量小於64並且正在運行的請求主機數小於5時則把請求加載到runningAsyncCalls中並在線程池中執行,否則就再入到readyAsyncCalls中進行緩存等待。
AsyncCall
線程池中傳進來的參數就是AsyncCall它是RealCall的內部類,內部也實現了execute方法:
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
首先我們來看看最后一行, 無論這個請求的結果如何都會執行client.dispatcher().finished(this);
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
finished方法將此次請求從runningAsyncCalls移除后還執行了promoteCalls方法:
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
可以看到最關鍵的點就是會從readyAsyncCalls取出下一個請求,並加入runningAsyncCalls中並交由線程池處理。好了讓我們再回到上面的AsyncCall的execute方法,我們會發getResponseWithInterceptorChain方法返回了Response,很明顯這是在請求網絡。
Interceptor攔截器
在回到RealCall中,我們看到無論是execute還是enqueue,真正的Response是通過這個函數getResponseWithInterceptorChain獲取的,其他的代碼都是用作控制與回調。而這里就是真正請求的入口,也是到了OkHttp的一個很精彩的設計:Interceptor與Chain
看一下RealCall中的getResponseWithInterceptorChain方法
private Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!retryAndFollowUpInterceptor.isForWebSocket()) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(
retryAndFollowUpInterceptor.isForWebSocket()));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
這也是與舊版本不一致的地方,在3.4.x以前,沒有這些內部的這些攔截器,只有用戶的攔截器與網絡攔截器。而Request和Response是通過HttpEngine來完成的。在RealCall實現了用戶攔截器與RetryAndFollowUp的過程,而在HttpEngine內部處理了請求轉換、Cookie、Cache、網絡攔截器、連接網絡的過程。值得一提的是,在舊版是獲取到Response后調用網絡攔截器的攔截。
而在這里,RealInterceptorChain會遞歸的創建並以此調用攔截器,去掉諸多異常,簡化版代碼如下:
public Response proceed(Request request, StreamAllocation streamAllocation, HttpStream httpStream,
Connection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpStream != null && !sameConnection(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpStream != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpStream, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpStream != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
return response;
}
Chain與Interceptor會互相遞歸調用,直到鏈的盡頭。
我們看到,通過職責鏈模式,清楚地切開了不同的邏輯,每個攔截器完成自己的職責,從而完成用戶的網絡請求。
大概流程是:
1)先經過用戶攔截器
2)RetryAndFollowUpInterceptor負責自動重試和進行必要的重定向
3)BridgeIntercetor負責將用戶Request轉換成一個實際的網絡請求的Request,再調用下層的攔截器獲取Response,最后再將網絡Response轉換成用戶的Reponse
4)CacheInterceptor負責控制緩存
5)ConnectInterceptor負責進行連接主機
6)網絡攔截器進行攔截
7)CallServerInterceptor是真正和服務器通信,完成http請求
連接與通信
在RetryAndFollowUpInterceptor中,會創建StreamAllocation,然后交給下游的ConnectInterceptor
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpStream httpStream = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpStream, connection);
}
這里會創建一個HttpStream,並且取到一個RealConnection,繼續交給下游的CallServerInterceptor。
我們跟蹤進去看看,StreamAllocation里面做了什么
public HttpStream newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
int connectTimeout = client.connectTimeoutMillis();
int readTimeout = client.readTimeoutMillis();
int writeTimeout = client.writeTimeoutMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
HttpStream resultStream;
if (resultConnection.framedConnection != null) {
resultStream = new Http2xStream(client, this, resultConnection.framedConnection);
} else {
resultConnection.socket().setSoTimeout(readTimeout);
resultConnection.source.timeout().timeout(readTimeout, MILLISECONDS);
resultConnection.sink.timeout().timeout(writeTimeout, MILLISECONDS);
resultStream = new Http1xStream(
client, this, resultConnection.source, resultConnection.sink);
}
synchronized (connectionPool) {
stream = resultStream;
return resultStream;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
這里的代碼邏輯是這樣的,找一個健康的連接,設置超時時間,然后根據協議創建一個HttpStream並返回。
繼續跟進去看findHealthyConnection:
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
上面的邏輯也很簡單,在findConnection中找一個連接,然后做健康檢查,如果不健康就回收,並再次循環,那么真正尋找連接的代碼就在findConnection里面了:
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (stream != null) throw new IllegalStateException("stream != null");
if (canceled) throw new IOException("Canceled");
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// Attempt to get a connection from the pool.
RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);
if (pooledConnection != null) {
this.connection = pooledConnection;
return pooledConnection;
}
selectedRoute = route;
}
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
synchronized (connectionPool) {
route = selectedRoute;
refusedStreamCount = 0;
}
}
RealConnection newConnection = new RealConnection(selectedRoute);
acquire(newConnection);
synchronized (connectionPool) {
Internal.instance.put(connectionPool, newConnection);
this.connection = newConnection;
if (canceled) throw new IOException("Canceled");
}
newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(),
connectionRetryEnabled);
routeDatabase().connected(newConnection.route());
return newConnection;
}
這里大概分成分成3大步:
1)如果當前有連接並且符合要求的話,就直接返回
2)如果線程池能取到一個符合要求的連接的話,就直接返回
3)如果Route為空,從RouteSelector取一個Route,然后新建一個RealConnection,並放入ConnectionPool,隨后調用connect,再返回
也就是說不管當前走的是步驟1還是2,一開始一定是從3開始的,也就是在RealConnection的connect中真正完成了socket連接。
connect里面代碼比較長,真正要做的就是一件事,如果是https請求並且是http代理,則建立隧道連接,隧道連接請參考RFC2817,否則建立普通連接。
這兩者都調用了2個函數:connectSocket(connectTimeout, readTimeout); establishProtocol(readTimeout, writeTimeout, connectionSpecSelector);
但是隧道連接則多了一個代理認證的過程,可能會反復的connectSocket和構造請求。
看一下connectSocket:
private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
rawSocket.setSoTimeout(readTimeout);
try {
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
throw new ConnectException("Failed to connect to " + route.socketAddress());
}
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
}
就是根據Route來創建socket,在connect,隨后將rawSocket的InputStream與OutputStream包裝成Source與Sink。這里提一下,OkHttp是依賴Okio的,Okio封裝了Java的IO API,如這里的Source與Sink,非常簡潔實用。
而establishProtocol里,如果是https則走TLS協議,生成一個SSLSocket,並進行握手和驗證,同時如果是HTTP2或者SPDY3的話,則生成一個FrameConnection。這里不再多提,HTTP2和HTTP1.X大相徑庭,我們這里主要是分析HTTP1.X的連接,后面有機會我們會單獨開篇講HTTP2。同時TLS相關的話題這里也一並略過,想了解的朋友可以看一看相應的Java API和HTTPS連接的資料。
再回到StreamAllcation.newStream的代碼resultStream = new Http1xStream( client, this, resultConnection.source, resultConnection.sink);實質上HttpStream其實就是Request和Response讀寫Socket的抽象,我們看到Http1xStream取到了Socket輸入輸出流,隨后在CallServerInterceptor可以拿來做讀寫。
我們看CallServerInterceptor做了什么:
@Override public Response intercept(Chain chain) throws IOException {
HttpStream httpStream = ((RealInterceptorChain) chain).httpStream();
StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
Request request = chain.request();
long sentRequestMillis = System.currentTimeMillis();
httpStream.writeRequestHeaders(request);
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
Sink requestBodyOut = httpStream.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
httpStream.finishRequest();
Response response = httpStream.readResponseHeaders()
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
if (!forWebSocket || response.code() != 101) {
response = response.newBuilder()
.body(httpStream.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
int code = response.code();
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
CallServerInterceptor顧名思義,就是真正和Server進行通信的地方。這里也是按照HTTP協議,依次寫入請求頭,還有根據情況決定是否寫入請求體。隨后讀響應頭閉構造一個Response。
里面具體是如何實現呢,我們看Http1xStream:
首先是寫頭:
@Override public void writeRequestHeaders(Request request) throws IOException {
String requestLine = RequestLine.get(
request, streamAllocation.connection().route().proxy().type());
writeRequest(request.headers(), requestLine);
}
構造好請求行,進入writeRequest:
/** Returns bytes of a request header for sending on an HTTP transport. */
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}
這里就一目了然了,就是一行行的寫請求行和請求頭到sink中
再看readResponse:
/** Parses bytes of a response header from an HTTP transport. */
public Response.Builder readResponse() throws IOException {
if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
throw new IllegalStateException("state: " + state);
}
try {
while (true) {
StatusLine statusLine = StatusLine.parse(source.readUtf8LineStrict());
Response.Builder responseBuilder = new Response.Builder()
.protocol(statusLine.protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(readHeaders());
if (statusLine.code != HTTP_CONTINUE) {
state = STATE_OPEN_RESPONSE_BODY;
return responseBuilder;
}
}
} catch (EOFException e) {
// Provide more context if the server ends the stream before sending a response.
IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
exception.initCause(e);
throw exception;
}
}
也是一樣的,從source中讀請求行和請求頭
最后看openResponseBody:
@Override public ResponseBody openResponseBody(Response response) throws IOException {
Source source = getTransferStream(response);
return new RealResponseBody(response.headers(), Okio.buffer(source));
}
這里說一下就是根據請求的響應把包裹InputStream的source再次封裝,里面做一些控制邏輯,然后再封裝成ResponseBody。
例如FiexdLengthSource,就是期望獲取到byte的長度是固定的值:
/** An HTTP body with a fixed length specified in advance. */
private class FixedLengthSource extends AbstractSource {
private long bytesRemaining;
public FixedLengthSource(long length) throws IOException {
bytesRemaining = length;
if (bytesRemaining == 0) {
endOfInput(true);
}
}
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");
if (bytesRemaining == 0) return -1;
long read = source.read(sink, Math.min(bytesRemaining, byteCount));
if (read == -1) {
endOfInput(false); // The server didn't supply the promised content length.
throw new ProtocolException("unexpected end of stream");
}
bytesRemaining -= read;
if (bytesRemaining == 0) {
endOfInput(true);
}
return read;
}
@Override public void close() throws IOException {
if (closed) return;
if (bytesRemaining != 0 && !Util.discard(this, DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
endOfInput(false);
}
closed = true;
}
}
當讀完期望的長度時就把這個RealConnection回收,如果少於期望的長度則拋異常。
ConnectionPool
到了OkHttp3時代,ConnectionPool就是每個Client獨享的了,我們剛才提到了ConnectionPool,那么他到底是如何運作呢。
ConnectionPool持有一個靜態的線程池。
StreamAllocation不管通過什么方式,在獲取到RealConnection后,RealConnection會添加一個對StreamAllocation的引用。
在每個RealConnection加入ConnectionPool后,如果當前沒有在清理,就會把cleanUpRunnable加入線程池。
cleanUpRunnable里面是一個while(true),一個循環包括:
調用一次cleanUp方法進行清理並返回一個long, 如果是-1則退出,否則調用wait方法等待這個long值的時間
cleanUp代碼如下:
ong cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
遍歷每一個RealConnection,通過引用數目確定哪些是空閑的,哪些是在使用中,同時找到空閑時間最長的RealConnection。
如果空閑數目超過最大空閑數或者空閑時間超過最大空閑時間,則清理掉這個RealConnection,並返回0,表示需要立刻再次清理
否則如果空閑的數目大於0個,則等待最大空閑時間-已有的最長空閑時間
否則如果使用中的數目大於0,則等待最大空閑時間
否則 返回 -1,並標識退出清除狀態
同時如果某個RealConnection空閑后,會進入ConnectionPool.connectionBecameIdle方法,如果不可被復用,則被移除,否則立刻喚醒上面cleanUp的wait,再次清理,因為可能超過了最大空閑數目
這樣通過一個靜態的線程池,ConnectionPool做到了每個實例定期清理,保證不會超過最大空閑時間和最大空閑數目的策略。
OkHttp3分析就到此結束了。