背景
在互聯網的高並發場景下,請求會非常多,但是數據庫連接池比較少,或者說需要減少CPU壓力,減少處理邏輯的,需要把單個查詢,用某些手段,改為批量查詢多個后返回。
如:支付寶中,查詢“個人信息”,用戶只會觸發一次請求,查詢自己的信息,但是多個人同時這樣做就會產生多次數據庫連接。為了減少連接,需要在JAVA服務端進行合並請求,把多個“個人信息”查詢接口,合並為批量查詢多個“個人信息”接口,然后以個人信息在數據庫的id作為Key返回給上游系統或者頁面URL等調用方。
目的
- 減少訪問數據庫的次數
- 單位時間內的多個請求,合並為一個請求。讓業務邏輯層把單個查詢的sql,改為批量查詢的sql。或者邏輯里面需要調用redis,那批量邏輯里面就可以用redis的pipeline去實現。
- 本次需要使用JDK原生手段來實現請求合並,因為大家不一定會有Hystrix,所以用原生辦法實現,並解析HystrixCollapser里面是如何實現的。
點贊再看,關注公眾號:【地藏思維】給大家分享互聯網場景設計與架構設計方案
掘金:地藏Kelvin https://juejin.im/user/5d67da8d6fb9a06aff5e85f7
主要解決手段
- SpringCloud的Hystrix的自定義HystrixCollapse和HystrixCommand
- SpringCloud的Hystrix注解方式。
- 沒有服務治理框架時,利用JDK隊列、定時任務線程池處理。
在上一章已經說了第一二種,鑒於有同學沒有SpringCloud,所以使用第3種來做請求合並,並一起分析請求合並的原理。
建議先看第一章,第二章相當於為HystrixCollapser的內部原理描述
高並發場景-請求合並(一)SpringCloud中Hystrix請求合並
交互流程
開發
本章節為利用JDK原生包開發,所以沒有SpringCloud那么多東西要配置,編寫代碼只有一個類。
1. 創建請求層
只需要暴露單個查詢的接口,業務邏輯層里做請求合並的邏輯。
@RestController
public class UserController {
@Autowired
private UserBatchWithFutureServiceImpl userBatchWithFutureServiceImpl;
@RequestMapping(method = RequestMethod.GET,value = "/userbyMergeWithFuture/{id}")
public User userbyMergeWithFuture(@PathVariable Long id) throws InterruptedException, ExecutionException {
User user = this.userBatchWithFutureServiceImpl.getUserById(id);
return user;
}
}
2. 請求合並邏輯層
- 創建請求合並邏輯入口
- 創建阻塞隊列,用於累計多個請求參數
- 創建CompletableFuture類,為了本條線程阻塞,等批量查詢處理完后,異步獲取當前id對應的User結果信息。
- 執行CompletableFuture.get方法等待異步結果通知。
@Component
public class UserBatchWithFutureServiceImpl {
/** 積攢請求的阻塞隊列 */
private LinkedBlockingDeque<UserQueryDto> requestQueue = new LinkedBlockingDeque<>();
public User getUserById(Long id) throws InterruptedException, ExecutionException {
UserQueryDto userQueryDto = new UserQueryDto();
userQueryDto.setId(id);
CompletableFuture<User> completedFuture = new CompletableFuture<>();
userQueryDto.setCompletedFuture(completedFuture);
requestQueue.add(userQueryDto);
User user = completedFuture.get();
return user;
}
HystrixCollapser也是利用這種辦法來做異步通知的手段,讓請求接口主線程在獲得真正結果前阻塞等待。
3. 定時任務
在相同的類下創建定時任務,利用@PostConstruct讓當前類的Bean構造完后執行該方法,生成一個5秒定時任務。
大家可以設定定時的時間,我為了比較方便測試,而用了5秒。
/** 線程池數量 */
private int threadNum = 1;
/** 定時間隔時長 */
private long period = 5000;
@PostConstruct
public void init() {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(createDeviceMergeNum);
// 每5秒執行一次
scheduledExecutorService.scheduleAtFixedRate(new UserBatchThread(), 0, createDeviceMergePeriod,
TimeUnit.MILLISECONDS);
}
HystrixCollapser的每隔n毫秒就會處理一次執行單個方法轉批量方法,也是通過這類來實現的。
4. 在UserBatchWithFutureServiceImpl 類下創建內部類
創建內部類為了定時任務執行此邏輯,並且為了代碼整潔,不在創建線程池時編寫大方法塊的代碼。
在內部類里面主要邏輯:
- 從存放請求接口參數的requestQueue 隊列中,獲取所有成員,並放入當此觸發任務邏輯的局部變量中
- 並且取出關鍵的請求參數id放入局部變量List中。
- 只要獲取出變量,則進行批量查詢
- 最后利用CompletedFuture異步通知並喚醒getUserById方法等待的線程。
public class UserBatchThread implements Runnable {
@Override
public void run() {
List<UserQueryDto> requestQueueTmp = new ArrayList<>();
// 存放批量查詢的入參
List<Long> requestId = new ArrayList<>();
// 把出請求層放入的消息queue的元素取出來
int size = requestQueue.size();
for (int i = 0; i < size; i++) {
UserQueryDto request = requestQueue.poll();
if (Objects.nonNull(request)) {
requestQueueTmp.add(request);
requestId.add(request.getId());
}
}
if (!requestId.isEmpty()) {
try {
List<User> response = getUserBatchById(requestId);
Map<Long, User> collect = response.stream().collect(
Collectors.toMap(detail -> detail.getId(), Function.identity(), (key1, key2) -> key2));
// 通知請求的線程
for (UserQueryDto request : requestQueueTmp) {
request.getCompletedFuture().complete(collect.get(request.getId()));
}
} catch (Exception e) {
// 通知請求的線程-異常
requestQueueTmp.forEach(request -> request.getCompletedFuture().obtrudeException(e));
}
}
}
}
public List<User> getUserBatchById(List<Long> ids) {
System.out.println("進入批量處理方法" + ids);
List<User> ps = new ArrayList<>();
for (Long id : ids) {
User p = new User();
p.setId(id);
p.setUsername("dizang" + id);
ps.add(p);
}
return ps;
}
請求接口中入隊列的元素,就會從這里取出,HystrixCollasper也是利用這種poll方法原子性的獲取隊列里面元素,不會被定時任務的多次觸發而重復的獲取,只要滿足有至少一個都會做批量查詢,所以HystrixCollasper合並請求時,即使n毫秒內只有一個請求,也會去處理。
測試驗證
- 同上一章一樣觸發Swagger-ui頁面
- 請求兩次不同的參數
- 結果如下圖中,console日志已經輸出了兩次請求的入參
總結
到這里相信大家都已經完成了合並請求了。這次沒有依賴框架,基於原生做法,利用隊列存查詢所需的入參,然后利用線程池定時地獲取隊列的入參,再批量處理,利用線程的Future做異步返回結果。這樣我們就理解了SpringCloud的HystrixCollasper的內部流程了。希望能夠幫助沒有框架的項目,或者公司技術棧不合適的情況下的同學。
本文Demo
都在我springcloud的demo里面了,看provider-hystrix-request-merge這個工程下的內容,在UserBatchWithFutureServiceImpl類中。
https://gitee.com/kelvin-cai/spring-cloud-demo
歡迎關注公眾號,文章更快一步
我的公眾號 :地藏思維
掘金:地藏Kelvin
簡書:地藏Kelvin
我的Gitee: 地藏Kelvin https://gitee.com/kelvin-cai