前言
項目中一般會請求第三方的接口,也會對外提供接口,可能是RPC,也可能是HTTP等方式。在對外提供接口時,有必要提供相應的批量接口,好的批量實現能夠提升性能。
高並發場景中,調用批量接口相比調用非批量接口有更大的性能優勢。但有時候,請求更多的是單個接口,不能夠直接調用批量接口,如果這個接口是高頻接口,對其做請求合並就很有必要了。比如電影網站的獲取電影詳情接口,APP的一次請求是單個接口調用,用戶量少的時候請求也不多,完全沒問題;但同一時刻往往有大量用戶訪問電影詳情,是個高並發的高頻接口,如果都是單次查詢,后台就不一定能hold住了。為了優化這個接口,后台可以將相同的請求進行合並,然后調用批量的查詢接口。如下圖所示
無合並請求
合並請求前,我們一般是調用服務層的單次創建方法。看起來都比較簡單,且易於理解。
以創建設備接口為例。
@Reference(check = false)
private DeviceService deviceService;
/**
* 注冊設備
*
* @param productKey 產品key
* @param deviceName 設備名
* @return 設備ID
*/
public R<Long> registDevice(String productKey, String deviceName) {
log.debug("開始注冊: {}, {}", productKey, deviceName);
DeviceRequestDto deviceCreateQuery = new DeviceRequestDto()
.setProductKey(productKey)
.setName(deviceName);
Long deviceId = deviceService.createDevice(deviceCreateQuery);
return deviceId != null
? R.ok(deviceId)
: R.error(DEVICE_CREATE_ERROR);
}
請求合並
請求合並的好處前面有提到,那不能每次寫接口就做請求合並吧?我們要明白,技術無好壞,要在特定的業務場景下衡量利弊,采用與否需要深思熟慮。合並請求會令代碼變得復雜,也會增加一定的接口延遲,其中還可能存在各種未知的風險。
合並請求是針對高並發場景的一種手段,我們實現請求合並之前,要結合業務場景思考一番,是否值得承受的合並帶來的訪問延遲?用戶體驗是否會打折扣?自身的技術是否足夠hold住請求合並帶來的未知風險?
思路:收到前端的請求時,先存起來,隔段時間批量請求第三方服務批量接口,然后分別通知存起來的請求,並且響應前端。
代碼實現
還是針對上述設備注冊接口,我們對其進行改造,來實現一個簡單的請求合並。
1. 批量接口
首先,我們需要有能夠批量調用的接口。在對外提供接口時,也非常有必要提供相應的批量接口,且內部實現應該是優化過的。
此處我們在服務層模擬了一個批量創建設備的接口, 如下:
- 方法簽名
/**
* 批量創建設備接口
*
* @param deviceRequestDtoList 入參信息
* @return 創建結果
*/
R<List<DeviceCreateResp>> batchCreateDevice(List<DeviceCreateQuery> deviceList);
- 入參
@Data
public class DeviceCreateQuery implements Serializable {
/**
* 產品標識
*/
private String productKey;
/**
* 設備名稱
*/
private String name;
/**
* 請求源,一次批量請求保證唯一
*/
private String requestSource;
}
- 返回值
@Data
public class DeviceCreateResp implements Serializable {
/**
* 設備ID
*/
private Long deviceId;
/**
* 請求源,一次批量請求保證唯一
*/
private String requestSource;
}
2. 合並單個請求
- 積攢請求的阻塞隊列
private LinkedBlockingDeque<DeviceCreateRequest> deviceCreateQueue = new LinkedBlockingDeque<>();
- 積攢請求的自定義結構
@Data
static class DeviceCreateRequest {
/** 產品key */
private String productKey;
/** 設備名 */
private String deviceName;
/** 請求源,需保證唯一 */
private String requestSource;
/** CompletableFuture接口 */
private CompletableFuture<Long> completedFuture;
}
- 積攢請求
public R<Long> registDevice(String productKey, String deviceName) {
log.debug("開始注冊: {}, {}", productKey, deviceName);
// 緩存請求 ====== start
CompletableFuture<Long> completedFuture = new CompletableFuture<>();
DeviceCreateRequest deviceCreateRequest = new DeviceCreateRequest();
deviceCreateRequest.setProductKey(productKey);
deviceCreateRequest.setDeviceName(deviceName);
deviceCreateRequest.setRequestSource(UUID.randomUUID().toString());
deviceCreateRequest.setCompletedFuture(completedFuture);
deviceCreateQueue.add(deviceCreateRequest);
// 緩存請求 ====== end
Long deviceId = null;
try {
deviceId = completedFuture.get();
} catch (Exception e) {
log.error("設備注冊失敗", e);
}
return deviceId != null
? R.ok(deviceId)
: R.error(DEVICE_CREATE_ERROR);
}
3. 發送批量請求
此處使用了spring,在init方法中利用定時任務線程池批量分發請求。同時使用了newScheduledThreadPool
,其中線程池大小和定時間隔時長需要根據業務量做權衡
/** 積攢請求的阻塞隊列 */
private LinkedBlockingDeque<DeviceCreateRequest> deviceCreateQueue = new LinkedBlockingDeque<>();
/** 線程池數量 */
@Value("${iot.register.merge.device.request.num:100}")
private int createDeviceMergeNum;
/** 定時間隔時長 */
@Value("${iot.register.merge.device.request.period:30}")
private long createDeviceMergePeriod;
@Reference(check = false)
private DeviceService deviceService;
@PostConstruct
public void init() {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(createDeviceMergeNum);
scheduledExecutorService.scheduleAtFixedRate(() -> {
// 把出queue的請求存儲一次
List<DeviceCreateRequest> questBak = new ArrayList<>();
// 批量創建設備的入參
List<DeviceCreateQuery> deviceCreateQueryList = new ArrayList<>();
int size = deviceCreateQueue.size();
for (int i = 0; i < size; i++) {
DeviceCreateRequest deviceCreateRequest = deviceCreateQueue.poll();
if (Objects.nonNull(deviceCreateRequest)) {
questBak.add(deviceCreateRequest);
deviceCreateQueryList.add(buildDeviceCreateQuery(deviceCreateRequest));
}
}
if (!deviceCreateQueryList.isEmpty()) {
try {
List<DeviceCreateResp> response = deviceService.batchCreateDevice(deviceCreateQueryList);
Map<String, Long> collect = response.stream()
.collect(Collectors.toMap(
DeviceCreateResp::getRequestSource, DeviceCreateResp::getDeviceId
));
// 通知請求的線程
for (DeviceCreateRequest deviceCreateRequest : questBak) {
deviceCreateRequest.getCompletedFuture().complete(collect.get(deviceCreateRequest.getRequestSource()));
}
} catch (Throwable throwable) {
log.error("批量注冊設備異常", throwable);
// 通知請求的線程-異常
questBak.forEach(deviceCreateRequest -> deviceCreateRequest.getCompletedFuture().obtrudeException(throwable));
}
}
}, 0, createDeviceMergePeriod, TimeUnit.MILLISECONDS);
}
總結
請求合並是解決高並發場景下某些問題的一種思路,本文只做了一個簡單的實現,算是對這塊知識的一次實踐吧。用到了BlockingDeque
、CompletableFuture
接口,涉及Java多線程相關的知識,實現方式比較野蠻。業界有很多優秀的開源框架做請求合並,比如hystrix
,需要花時間好好學習哈哈。