我們這里用通過唯一 id 獲取知乎的某個回答作為例子,首先我們先明確下,一次HTTP請求到服務器上處理完之后,將響應寫回這次請求的連接,就是完成這次請求了,如下:
public void request(Connection connection, HttpRequest request) {
//處理request,省略代碼
connection.write(response);//完成響應
}
假設獲取回答需要調用兩個接口,獲取評論數量還有獲取回答信息,傳統的代碼可能會這么去寫:
//獲取評論數量
public void getCommentCount(Connection connection, HttpRequest request) {
Integer commentCount = null;
try {
//從緩存獲取評論數量,阻塞IO
commentCount = getCommnetCountFromCache(id);
} catch(Exception e) {
try {
//緩存獲取失敗就從數據庫中獲取,阻塞IO
commentCount = getVoteCountFromDB(id);
} catch(Exception ex) {
}
}
connection.write(commentCount);
}
//獲取回答
public void getAnswer(Connection connection, HttpRequest request) {
//獲取點贊數量
Integer voteCount = null;
try {
//從緩存獲取點贊數量,阻塞IO
voteCount = getVoteCountFromCache(id);
} catch(Exception e) {
try {
//緩存獲取失敗就從數據庫中獲取,阻塞IO
voteCount = getVoteCountFromDB(id);
} catch(Exception ex) {
}
}
//從數據庫獲取回答信息,阻塞IO
Answer answer = getAnswerFromDB(id);
//拼裝Response
ResultVO response = new ResultVO();
if (voteCount != null) {
response.setVoteCount(voteCount);
}
if (answer != null) {
response.setAnswer(answer);
}
connection.write(response);//完成響應
}
在這種實現下,你的進程只需要一個線程池,承載了所有請求。這種實現下,有兩個弊端:
- 線程池 IO 阻塞,導致某個存儲變慢或者緩存擊穿的話,所有服務都堵住了。假設現在評論緩存突然掛了,全都訪問數據庫,導致請求變慢。由於線程需要等待 IO 響應,導致唯一一個線程池被堆滿,無法處理獲取回答的請求。
- 對於獲取回答信息,獲取點贊數量其實和獲取回答信息是可以並發進行的。不用非得先獲取點贊數量之后再獲取回答信息。
現在,NIO 非阻塞 IO 很普及了,有了非阻塞 IO,我們可以通過響應式編程,來讓我們的線程不會阻塞,而是一直在處理請求。這是如何實現的呢?
傳統的 BIO,是線程將數據寫入 Connection 之后,當前線程進入 Block 狀態,直到響應返回,之后接着做響應返回后的動作。NIO 則是線程將數據寫入 Connection 之后,將響應返回后需要做的事情以及參數緩存到一個地方之后,直接返回。在有響應返回后,NIO 的 Selector 的 Read 事件會是 Ready 狀態,掃描 Selector 事件的線程,會告訴你的線程池數據好了,然后線程池中的某個線程,拿出剛剛緩存的要做的事情還有參數,繼續處理。
那么,怎樣實現緩存響應返回后需要做的事情以及參數的呢?Java 本身提供了兩種接口,一個是基於回調的 Callback 接口(Java 8 引入的各種Functional Interface),一種是 Future 框架。
基於 Callback 的實現:
//獲取回答
public void getAnswer(Connection connection, HttpRequest request) {
ResultVO resultVO = new ResultVO();
getVoteCountFromCache(id, (count, throwable) -> {
//異常不為null則為獲取失敗
if (throwable != null) {
//讀取緩存失敗就從數據庫獲取
getVoteCountFromDB(id, (count2, throwable2) -> {
if (throwable2 == null) {
resultVO.setVoteCount(voteCount);
}
//從數據庫讀取回答信息
getAnswerFromDB(id, (answer, throwable3) -> {
if (throwable3 == null) {
resultVO.setAnswer(answer);
connection.write(resultVO);
} else {
connection.write(throwable3);
}
});
});
} else {
//獲取成功,設置voteCount
resultVO.setVoteCount(voteCount);
//從數據庫讀取回答信息
getAnswerFromDB(id, (answer, throwable2) -> {
if (throwable2 == null) {
resultVO.setAnswer(answer);
//返回響應
connection.write(resultVO);
} else {
//返回錯誤響應
connection.write(throwable2);
}
});
}
});
}
可以看出,隨着調用層級的加深,callback 層級越來越深,越來越難寫,而且啰嗦的代碼很多。並且,基於 CallBack 想實現獲取點贊數量其實和獲取回答信息並發是很難寫的,這里還是先獲取點贊數量之后再獲取回答信息。
那么基於 Future 呢?我們用 Java 8 之后引入的 CompletableFuture
來試着實現下。
//獲取回答
public void getAnswer(Connection connection, HttpRequest request) {
ResultVO resultVO = new ResultVO();
//所有的異步任務都執行完之后要做的事情
CompletableFuture.allOf(
getVoteCountFromCache(id)
//發生異常,從數據庫讀取
.exceptionallyComposeAsync(throwable -> getVoteCountFromDB(id))
//讀取完之后,設置VoteCount
.thenAccept(voteCount -> {
resultVO.setVoteCount(voteCount);
}),
getAnswerFromDB(id).thenAccept(answer -> {
resultVO.setAnswer(answer);
})
).exceptionallyAsync(throwable -> {
connection.write(throwable);
}).thenRun(() -> {
connection.write(resultVO);
});
}
這種實現就看上去簡單多了,並且讀取點贊數量還有讀取回答內容是同時進行的。
Project Reactor 在 Completableuture 這種實現的基礎上,增加了更多的組合方式以及更完善的異常處理機制,以及面對背壓時候的處理機制,還有重試機制。
每日一刷,輕松提升技術,斬獲各種offer: