一、描述
工作中需要調用第三方接口(百度AI接口),實現一些AI相關的功能。但是開通付費接口后,但仍有10QPS的限制,超出的部分會被百度拒絕,直接報錯。而我們的業務需求是要基本保證調用成功的。因此需要一個漏桶/限流器來控制調用速度去適配這10QPS的限制,剩余的請求進入等待隊列。
在完成適配后,10QPS對於業務並發峰值的場景是不夠的,而QPS疊加包長期購買太貴,階段購買又太麻煩,遂采用多帳號的方案去應對並發峰值。一開始計划是通過某種負載均衡的策略去實現,后來發現百度AI接口調用量上來后是有打折優惠的,因此設計多級漏桶
去實現。
二、前置知識
2.1 漏桶與令牌桶
漏桶
漏桶算法思路很簡單,水(請求)先進入到漏桶里,漏桶以一定的速度出水,當水流入速度過大會直接溢出,可以看出漏桶算法能強行限制數據的傳輸速率。
令牌桶
對於很多應用場景來說,除了要求能夠限制數據的平均傳輸速率外,還要求允許某種程度的突發傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更為適合。令牌桶算法的原理是系統會以一個恆定的速度往桶里放入令牌,而如果請求需要被處理,則需要先從桶里獲取一個令牌,當桶里沒有令牌可取時,則拒絕服務。
應用場景:
並不能說明令牌桶一定比漏洞好,她們使用場景不一樣。令牌桶可以用來保護自己,主要用來對調用者頻率進行限流,為的是讓自己不被打垮。所以如果自己本身有處理能力的時候,如果流量突發(實際消費能力強於配置的流量限制),那么實際處理速率可以超過配置的限制。而漏桶算法,這是用來保護他人,也就是保護他所調用的系統。主要場景是,當調用的第三方系統本身沒有保護機制,或者有流量限制的時候,我們的調用速度不能超過他的限制,由於我們不能更改第三方系統,所以只有在主調方控制。這個時候,即使流量突發,也必須舍棄。因為消費能力是第三方決定的。
總結起來:如果要讓自己的系統不被打垮,用令牌桶。如果保證別人的系統不被打垮,用漏桶算法。
2.2 限流利器-Semaphore
在 JUC 包下,有一個 Semaphore 類,翻譯成信號量,Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。Semaphore 跟鎖(synchronized、Lock)有點相似,不同的地方是,鎖同一時刻只允許一個線程訪問某一資源,而 Semaphore 則可以控制同一時刻多個線程訪問某一資源。
Semaphore(信號量)並不是 Java 語言特有的,幾乎所有的並發語言都有。所以也就存在一個「信號量模型」的概念,如下圖所示:
信號量模型比較簡單,
可以概括為:「一個計數器、一個隊列、三個方法」。
計數器:記錄當前還可以運行多少個資源訪問資源。
隊列:待訪問資源的線程
init():初始化計數器的值,可就是允許多少線程同時訪問資源。
up():計數器加1,有線程歸還資源時,如果計數器的值大於或者等於 0 時,從等待隊列中喚醒一個線程
down():計數器減 1,有線程占用資源時,如果此時計數器的值小於 0 ,線程將被阻塞。
鑒於我們是部署多實例的分布式系統
,JUC實現Semaphore的並不是適用,使用Redisson實現的分布式信號量。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
</dependency>
下面介紹一下關於Semaphore的主要方法:
//獲取指定名稱的信號量實例
RSemaphore getSemaphore(String name);
//獲取指定名稱的信號量實例,許可帶有過期時間
RPermitExpirableSemaphore getPermitExpirableSemaphore(String name);
//嘗試設置信號量許可個數
boolean trySetPermits(int permits);
//從信號量中獲取許可,相當於獲取到執行權,獲取不到會阻塞,直到獲取到為止
String acquire() throws InterruptedException;
//獲取一個指定過期時間的許可
String acquire(long leaseTime, TimeUnit unit) throws InterruptedException;
//嘗試獲取一個許可,不會阻塞,獲取不到返回null
String tryAcquire();
//嘗試在指定時間內獲取一個許可
String tryAcquire(long waitTime, TimeUnit unit) throws InterruptedException;
//嘗試在指定時間內獲取一個指定過期時間的許可
String tryAcquire(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
//嘗試通過許可id釋放
boolean tryRelease(String permitId);
//通過id釋放許可
void release(String permitId);
三、調用百度AI接口適配實現
通過對漏桶、令牌桶概念的了解,以及對限流利器-semaphore的的認識;就可以通過漏桶的理念以及redisson的semaphore API 去實現限制對百度接口的調用,保證10QPS勻速調用;
還有一個問題:“漏桶”的容量如何確定?
根據業務需求而定,假設調用AI接口的等待時間“容忍度”是 10s,假設AI接口本身2s的耗時,那么對於同一個AI接口的請求就是80個,“漏桶”的容量(等等隊列)就是80。
同樣可以使用現成的redisson提供的分布式隊列API,去實現、假如隊列滿,直接拒絕。
當然這個AI接口本身的耗時會根據它的類型以及數據的大小而改變,這里使用線程池執行器 ThreadPoolExecutor 的一些特性以及自帶的隊列去實現功能,具體:
- future.get(TIME_OUT, TimeUnit.MILLISECONDS); 控制“容忍”時間
- 線程池等待隊列大小來控制“桶”的容量,超出拒絕。
部分代碼實現:
1.構建調用任務,期中maxQPS為百度AI接口的最大QPS限制,TIME_WINDOW為信號量釋放時間 = 1s
public class BaiduAIClientTask implements Callable<BaiduAIClientTask> {
@Override
public BaiduAIClientTask call() {
RPermitExpirableSemaphore semaphore = redissonClient.getPermitExpirableSemaphore("key");
semaphore.trySetPermits(maxQPS);
semaphoreAll.acquire(TIME_WINDOW,TimeUnit.MILLISECONDS);
//調用百度接口
this.jsonObject = aiClient.send(aipRequest);
}
}
2.定義線程池執行任務
public class ClientAIOperation {
public JSONObject getResponse(AipRequest aipRequest) {
//獲取固定線程的線程池(單例),設置線程數量、隊列大小
ExecutorService executorService = baiduAIClientContext.getExecutorService(nThreads,ququeSize);
JSONObject jsonObject = null;
Future<BaiduAIClientTask> future = null;
try {
future = executorService.submit(task);
BaiduAIClientTask baiduAIClientTask = future.get(TIME_OUT, TimeUnit.MILLISECONDS);
jsonObject = baiduAIClientTask.getJsonObject();
} catch (RejectedExecutionException e) {
throw new BaiduClientException("系統繁忙~");
} catch (TimeoutException e) {
future.cancel(true);
throw new BaiduClientException("執行超時,請重試~");
}
return jsonObject;
}
}
四、多帳號提升QPS瓶頸
上面的操作也是僅是盡可能保證用戶在容忍時間內能調用成功,支持10QPS的並發。
QPS疊加包
通過調研了解,百度AI接口提供QPS疊加包服務,在對應上課時間購買QPS疊加包,來應對高並發調用。
缺點:
①需要關心具體業務使用時間,頻繁的購買相應的QPS疊加包。
②太貴了 ,如下表:200QPS每天就是 2000元,而且僅僅是單個接口
接口 | 按天購買價格(元/QPS/天) | 按月購買價格(元/QPS/月) |
---|---|---|
通用物體和場景識別 | 50 | 750 |
logo識別-檢索 | 10 | 150 |
圖像主體檢測(單主體) | 5 | 75 |
植物識別 | 10 | 150 |
動物識別 | 10 | 150 |
菜品識別 | 10 | 150 |
自定義菜品識別-檢索 | 50 | 750 |
果蔬識別 | 10 | 150 |
地標識別 | 10 | 150 |
圖像主體檢測(多主體) | 10 | 150 |
多賬號解決方案
鑒於QPS疊加包高額費用以及需要頻繁的購買相應服務,那么我們采用多帳號的方案來解決。
五、多級漏桶設計
但是多帳號同樣面臨一個問題,一般AI能力服務會隨着調用量的增加而費用減少,如下表:
月調用量(萬次) | 菜品識別(元/千次) |
---|---|
0<月調用量<=5 | 0.70 |
5<月調用量<=10 | 0.60 |
10<月調用量<=20 | 0.50 |
20<月調用量<=50 | 0.40 |
50<月調用量<=100 | 0.35 |
100<月調用量 | 0.30 |
為了盡量使得一個賬號達到減免額度,采用如下圖所示設計,當用戶請求過來默認進入第一個『桶』中,當並發量過大,第一個『桶』處理不過來,溢出到下一個桶中,依次溢出。當最后一個『桶』滿后,剩下的請求在『蓄水池』{等待隊列}中等待
部分代碼實現:
通過遞歸與tryAcquire()
方法的特行,實現讓桶溢出后流入下一個桶。
@Override
public BaiduAIClientTask call() {
//蓄水池外流速度信號量
RPermitExpirableSemaphore semaphoreAll = redissonClient.getPermitExpirableSemaphore(“蓄水池”);
//桶的總流量 = 桶流速 * 桶數
semaphoreAll.trySetPermits(maxQPS * baiduAIClientList.size());
try {
//申請蓄水池信號量,如果滿了會阻塞於此 模擬閥門
semaphoreAll.acquire(TIME_WINDOW,TimeUnit.MILLISECONDS);
//第一個桶開始工作 leakyBucketNumber = 0
this.leakyBucketWork(leakyBucketNumber,baiduAIClientList);
} catch (RedissonShutdownException rse) {
log.warn("【AI接口調用】終止!{}", rse.getMessage());
} catch (InterruptedException e) {
log.info("【AI接口調用】線程中斷,執行失敗");
}
return this;
}
/**
* 漏桶工作,遞歸處理溢出
*
* @param leakyBucketNumber - 漏桶編號
* @param baiduAIClientList - 調用百度client集合
*/
private void leakyBucketWork(Integer leakyBucketNumber, List<BaiduAIClient> baiduAIClientList) throws InterruptedException {
//單個桶流速信號量 命名:url.leakyBucketNumber
RPermitExpirableSemaphore semaphore = redissonClient.getPermitExpirableSemaphore(“單桶 + 編號”);
semaphore.trySetPermits(maxQPS);
String acquire = semaphore.tryAcquire(0,TIME_WINDOW, TimeUnit.MILLISECONDS);
if (Strings.isNotEmpty(acquire)) {
log.info("桶編號-{},獲取到信號量:{},開始執行任務",leakyBucketNumber,acquire);
BaiduAIClient aiClient = baiduAIClientList.get(leakyBucketNumber);
this.jsonObject = aiClient.send(aipRequest);
} else if (leakyBucketNumber < baiduAIClientList.size() - 1) {
//溢出:流向下一個桶
leakyBucketNumber++;
leakyBucketWork(leakyBucketNumber,baiduAIClientList);
}
}