漏桶算法&信號量機制實戰——多級漏桶突破百度AI接口QPS限制瓶頸


一、描述

工作中需要調用第三方接口(百度AI接口),實現一些AI相關的功能。但是開通付費接口后,但仍有10QPS的限制,超出的部分會被百度拒絕,直接報錯。而我們的業務需求是要基本保證調用成功的。因此需要一個漏桶/限流器來控制調用速度去適配這10QPS的限制,剩余的請求進入等待隊列。

在完成適配后,10QPS對於業務並發峰值的場景是不夠的,而QPS疊加包長期購買太貴,階段購買又太麻煩,遂采用多帳號的方案去應對並發峰值。一開始計划是通過某種負載均衡的策略去實現,后來發現百度AI接口調用量上來后是有打折優惠的,因此設計多級漏桶去實現。

二、前置知識

2.1 漏桶與令牌桶

漏桶

漏桶算法思路很簡單,水(請求)先進入到漏桶里,漏桶以一定的速度出水,當水流入速度過大會直接溢出,可以看出漏桶算法能強行限制數據的傳輸速率。

令牌桶

對於很多應用場景來說,除了要求能夠限制數據的平均傳輸速率外,還要求允許某種程度的突發傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更為適合。令牌桶算法的原理是系統會以一個恆定的速度往桶里放入令牌,而如果請求需要被處理,則需要先從桶里獲取一個令牌,當桶里沒有令牌可取時,則拒絕服務。

應用場景:
並不能說明令牌桶一定比漏洞好,她們使用場景不一樣。令牌桶可以用來保護自己,主要用來對調用者頻率進行限流,為的是讓自己不被打垮。所以如果自己本身有處理能力的時候,如果流量突發(實際消費能力強於配置的流量限制),那么實際處理速率可以超過配置的限制。而漏桶算法,這是用來保護他人,也就是保護他所調用的系統。主要場景是,當調用的第三方系統本身沒有保護機制,或者有流量限制的時候,我們的調用速度不能超過他的限制,由於我們不能更改第三方系統,所以只有在主調方控制。這個時候,即使流量突發,也必須舍棄。因為消費能力是第三方決定的。

總結起來:如果要讓自己的系統不被打垮,用令牌桶。如果保證別人的系統不被打垮,用漏桶算法。

2.2 限流利器-Semaphore

在 JUC 包下,有一個 Semaphore 類,翻譯成信號量,Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。Semaphore 跟鎖(synchronized、Lock)有點相似,不同的地方是,鎖同一時刻只允許一個線程訪問某一資源,而 Semaphore 則可以控制同一時刻多個線程訪問某一資源。

Semaphore(信號量)並不是 Java 語言特有的,幾乎所有的並發語言都有。所以也就存在一個「信號量模型」的概念,如下圖所示:

信號量模型比較簡單,

可以概括為:「一個計數器、一個隊列、三個方法」

計數器:記錄當前還可以運行多少個資源訪問資源。

隊列:待訪問資源的線程

init():初始化計數器的值,可就是允許多少線程同時訪問資源。

up():計數器加1,有線程歸還資源時,如果計數器的值大於或者等於 0 時,從等待隊列中喚醒一個線程

down():計數器減 1,有線程占用資源時,如果此時計數器的值小於 0 ,線程將被阻塞。

鑒於我們是部署多實例的分布式系統,JUC實現Semaphore的並不是適用,使用Redisson實現的分布式信號量。

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
</dependency>

下面介紹一下關於Semaphore的主要方法:

//獲取指定名稱的信號量實例
RSemaphore getSemaphore(String name);
 
//獲取指定名稱的信號量實例,許可帶有過期時間
RPermitExpirableSemaphore getPermitExpirableSemaphore(String name);
 
//嘗試設置信號量許可個數
boolean trySetPermits(int permits);
 
//從信號量中獲取許可,相當於獲取到執行權,獲取不到會阻塞,直到獲取到為止
String acquire() throws InterruptedException;
 
//獲取一個指定過期時間的許可
String acquire(long leaseTime, TimeUnit unit) throws InterruptedException;
 
//嘗試獲取一個許可,不會阻塞,獲取不到返回null
String tryAcquire();
 
//嘗試在指定時間內獲取一個許可
String tryAcquire(long waitTime, TimeUnit unit) throws InterruptedException;
 
//嘗試在指定時間內獲取一個指定過期時間的許可
String tryAcquire(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
 
//嘗試通過許可id釋放
boolean tryRelease(String permitId);
 
//通過id釋放許可
void release(String permitId);

三、調用百度AI接口適配實現

通過對漏桶、令牌桶概念的了解,以及對限流利器-semaphore的的認識;就可以通過漏桶的理念以及redisson的semaphore API 去實現限制對百度接口的調用,保證10QPS勻速調用;

還有一個問題:“漏桶”的容量如何確定?

根據業務需求而定,假設調用AI接口的等待時間“容忍度”是 10s,假設AI接口本身2s的耗時,那么對於同一個AI接口的請求就是80個,“漏桶”的容量(等等隊列)就是80。

同樣可以使用現成的redisson提供的分布式隊列API,去實現、假如隊列滿,直接拒絕。

當然這個AI接口本身的耗時會根據它的類型以及數據的大小而改變,這里使用線程池執行器 ThreadPoolExecutor 的一些特性以及自帶的隊列去實現功能,具體:

  • future.get(TIME_OUT, TimeUnit.MILLISECONDS); 控制“容忍”時間
  • 線程池等待隊列大小來控制“桶”的容量,超出拒絕。

部分代碼實現:

1.構建調用任務,期中maxQPS為百度AI接口的最大QPS限制,TIME_WINDOW為信號量釋放時間 = 1s

public class BaiduAIClientTask implements Callable<BaiduAIClientTask> {
    @Override
    public BaiduAIClientTask call() {
        RPermitExpirableSemaphore semaphore = redissonClient.getPermitExpirableSemaphore("key");
        semaphore.trySetPermits(maxQPS);
        semaphoreAll.acquire(TIME_WINDOW,TimeUnit.MILLISECONDS);
        //調用百度接口
        this.jsonObject = aiClient.send(aipRequest);
    }
}

2.定義線程池執行任務

public class ClientAIOperation {
    public JSONObject getResponse(AipRequest aipRequest) {
        //獲取固定線程的線程池(單例),設置線程數量、隊列大小
        ExecutorService executorService = baiduAIClientContext.getExecutorService(nThreads,ququeSize);
        JSONObject jsonObject = null;
        Future<BaiduAIClientTask> future = null;
        try {
            future = executorService.submit(task);
            BaiduAIClientTask baiduAIClientTask = future.get(TIME_OUT, TimeUnit.MILLISECONDS);
            jsonObject = baiduAIClientTask.getJsonObject();
        } catch (RejectedExecutionException e) {
            throw new BaiduClientException("系統繁忙~");
        } catch (TimeoutException e) {
            future.cancel(true);
            throw new BaiduClientException("執行超時,請重試~");
        }
        return jsonObject;
    }
}

四、多帳號提升QPS瓶頸

上面的操作也是僅是盡可能保證用戶在容忍時間內能調用成功,支持10QPS的並發。

QPS疊加包

通過調研了解,百度AI接口提供QPS疊加包服務,在對應上課時間購買QPS疊加包,來應對高並發調用。

缺點:

①需要關心具體業務使用時間,頻繁的購買相應的QPS疊加包。

②太貴了 ,如下表:200QPS每天就是 2000元,而且僅僅是單個接口

接口 按天購買價格(元/QPS/天) 按月購買價格(元/QPS/月)
通用物體和場景識別 50 750
logo識別-檢索 10 150
圖像主體檢測(單主體) 5 75
植物識別 10 150
動物識別 10 150
菜品識別 10 150
自定義菜品識別-檢索 50 750
果蔬識別 10 150
地標識別 10 150
圖像主體檢測(多主體) 10 150

多賬號解決方案

鑒於QPS疊加包高額費用以及需要頻繁的購買相應服務,那么我們采用多帳號的方案來解決。

五、多級漏桶設計

但是多帳號同樣面臨一個問題,一般AI能力服務會隨着調用量的增加而費用減少,如下表:

月調用量(萬次) 菜品識別(元/千次)
0<月調用量<=5 0.70
5<月調用量<=10 0.60
10<月調用量<=20 0.50
20<月調用量<=50 0.40
50<月調用量<=100 0.35
100<月調用量 0.30

為了盡量使得一個賬號達到減免額度,采用如下圖所示設計,當用戶請求過來默認進入第一個『桶』中,當並發量過大,第一個『桶』處理不過來,溢出到下一個桶中,依次溢出。當最后一個『桶』滿后,剩下的請求在『蓄水池』{等待隊列}中等待

部分代碼實現:

通過遞歸與tryAcquire()方法的特行,實現讓桶溢出后流入下一個桶。

@Override
public BaiduAIClientTask call() {
    //蓄水池外流速度信號量 
    RPermitExpirableSemaphore semaphoreAll = redissonClient.getPermitExpirableSemaphore(“蓄水池”);
    //桶的總流量 = 桶流速 * 桶數
    semaphoreAll.trySetPermits(maxQPS * baiduAIClientList.size());
    try {
        //申請蓄水池信號量,如果滿了會阻塞於此 模擬閥門
        semaphoreAll.acquire(TIME_WINDOW,TimeUnit.MILLISECONDS);
        //第一個桶開始工作 leakyBucketNumber = 0
        this.leakyBucketWork(leakyBucketNumber,baiduAIClientList);
    } catch (RedissonShutdownException rse) {
        log.warn("【AI接口調用】終止!{}", rse.getMessage());
    } catch (InterruptedException e) {
        log.info("【AI接口調用】線程中斷,執行失敗");
    }
    return this;
}
 
/**
 * 漏桶工作,遞歸處理溢出
 *
 * @param leakyBucketNumber - 漏桶編號
 * @param baiduAIClientList - 調用百度client集合
 */
private void leakyBucketWork(Integer leakyBucketNumber, List<BaiduAIClient> baiduAIClientList) throws InterruptedException {
    //單個桶流速信號量  命名:url.leakyBucketNumber
    RPermitExpirableSemaphore semaphore = redissonClient.getPermitExpirableSemaphore(“單桶 + 編號”);
    semaphore.trySetPermits(maxQPS);
    String acquire = semaphore.tryAcquire(0,TIME_WINDOW, TimeUnit.MILLISECONDS);
    if (Strings.isNotEmpty(acquire)) {
        log.info("桶編號-{},獲取到信號量:{},開始執行任務",leakyBucketNumber,acquire);
        BaiduAIClient aiClient = baiduAIClientList.get(leakyBucketNumber);
        this.jsonObject = aiClient.send(aipRequest);
    } else if (leakyBucketNumber < baiduAIClientList.size() - 1) {
        //溢出:流向下一個桶
        leakyBucketNumber++;
        leakyBucketWork(leakyBucketNumber,baiduAIClientList);
    }
}

六、參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM