RateLimiter服務限流實現


服務限流

需求

1、針對單機的服務流量進行控制,避免突發大流量造成服務異常。2、對業務無侵入。

算法

現在主流的幾種限流方式:

  • 通過限制單位時間段內調用量來限流
  • 通過限制系統的並發調用程度來限流
  • 使用漏桶(Leaky Bucket)算法來進行限流
  • (使用令牌桶(Token Bucket)算法來進行限流

通過限制單位時間段內調用量來限流

通過限制某個服務的單位時間內的調用量來進行限流。我們需要做的就是通過一個計數器統計單位時間段某個服務的訪問量,如果超過了我們設定的閾值,
則該單位時間段內則不允許繼續訪問,或者把接下來的請求放入隊列中等待到下一個單位時間段繼續訪問。

  • 優點:實現簡單,閾值可動態配置。
  • 缺點:若單位時間內前一小段時間內就被大流量消耗完,則將導致該時間段內剩余的時間都拒絕服務。該現象為:“突刺消耗”。

通過限制系統的並發調用程度來限流

通過並發限制來限流,我們通過嚴格限制某服務的並發訪問程度,其實也就限制了該服務單位時間段內的訪問量,
比如限制服務的並發訪問數是100,而服務處理的平均耗時是10毫秒,該服務每秒能提供( 1000 / 10 ) * 100 = 10,000 次。

  • 優點:有更嚴格的限制邊界,適合連接數、線程數的一個限制。
  • 缺點:對服務來說,並發閾值調優困難,難以准確判定服務閾值設置多少合適。一般采用Semaphore實現,但Semaphore沒有提供重設信號量的方法,所以閾值動態配置也是問題。

漏桶算法

請求流量以不確定速率申請資源,程序處理以恆定的速率進行,就是漏桶算法的基本原理。

漏斗有一個進水口 和 一個出水口,出水口以一定速率出水,並且有一個最大出水速率:

  • 在漏斗中沒有水的時候
    • 如果進水速率小於等於最大出水速率,那么,出水速率等於進水速率,此時,不會積水
    • 如果進水速率大於最大出水速率,那么,漏斗以最大速率出水,此時,多余的水會積在漏斗中
  • 在漏斗中有水的時候
    • 出水口以最大速率出水
    • 如果漏斗未滿,且有進水的話,那么這些水會積在漏斗中

如果漏斗已滿,且有進水的話,那么這些水會溢出到漏斗之外。

  • 優點:不管突然流量有多大,漏桶都保證了流量的常速率輸出。
  • 缺點:漏桶的出水速度是恆定的,那么意味着如果瞬時大流量的話,將有大部分請求被丟棄掉。

令牌桶算法

對於很多應用場景來說,除了要求能夠限制數據的平均傳輸速率外,還要求允許某種程度的突發傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更為適合。

令牌桶算法的原理是系統以恆定的速率產生令牌,然后把令牌放到令牌桶中,令牌桶有一個容量,當令牌桶滿了的時候,再向其中放令牌,
那么多余的令牌會被丟棄;當想要處理一個請求的時候,需要從令牌桶中取出一個令牌,如果此時令牌桶中沒有令牌,那么則拒絕該請求。

  • 優點:令牌桶算法能夠在限制調用的平均速率的同時還允許某種程度的突發調用。guava RateLimiter就是基於令牌桶算法實現,所以代碼實現簡易。可動態配置令牌生成速率。
  • 缺點:

基於以上四種算法的介紹,令牌桶不僅能夠限制調用的平均速率同時還允許一定程度的突發調用,不會導致突發調用大量請求被丟棄,更加靈活,且代碼實現簡易。綜上:建議選擇令牌桶算法實現限流。

代碼

限流設計


通過切面攔截請求,判斷限流是否開啟,若開啟則進行令牌的獲取,獲取成功則執行業務,否則丟棄該請求。
令牌獲取:

  1. 無配置超時時間,直接獲取結果
  2. 配置超時時間,在該段時間內不斷嘗試獲取令牌。

環境配置

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
     <version>2.1.1.RELEASE</version>
</dependency>
<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-aop</artifactId>
      <version>2.1.1.RELEASE</version>
</dependency>
<dependency>
	<groupId>com.google.guava</groupId>
	<artifactId>guava</artifactId>
	<version>18.0</version>
</dependency>

配置文件

ratelimit.properties

# 限流模塊的配置

# 是否開啟限流
ratelimit.doRateLimit=false

# 配置超時時間(配置將等待獲取,不配置將直接返回),單位毫秒
ratelimit.waitTimeout=20

# 服務限流保護,服務每秒允許的TPS(需評估單個服務所允許的最大TPS)
ratelimit.permitsPerSecond=200

RateLimitConfig

/**
 * 限流配置信息
 * 
 */
@Data
@Component
@ConfigurationProperties(prefix = "ratelimit")
@PropertySource(value = "classpath:ratelimit.properties")
public class RateLimitConfig implements Serializable {
    
    private static final long serialVersionUID = 1L;
    private boolean           doRateLimit      = false;
    private long              waitTimeout;
    private long              permitsPerSecond;

}

限流服務

/**
 * 限流服務接口
 * 
 */
public interface IRateLimitService {

    /**
     * 嘗試獲取許可證,獲取1個,立即返回非阻塞
     * 
     * @return
     */
    boolean tryAcquire();

    /**
     * 嘗試獲取多個許可證,立即返回非阻塞
     * 
     * @param permits
     * @return
     */
    boolean tryAcquire(int permits);

    /**
     * 阻塞獲取許可證,獲取1個,若超過timeout未獲取到許可證,則返回false
     * 
     * @param timeout
     * @return
     */
    boolean acquire(long timeout);

    /**
     * 阻塞獲取多個許可證,若超過timeout未獲取到許可證,則返回false
     * 
     * @param permits
     * @param timeout
     * @return
     */
    boolean acquire(int permits, long timeout);

}
/**
 * Guava RateLimiter的限流實現
 * 
 */
@Service
public class RateLimitServiceImpl implements IRateLimitService {

    private RateLimitConfig config;

    private RateLimiter rateLimiter;

    @Autowired
    public RateLimitServiceImpl(RateLimitConfig config) {
        this.config = config;
        this.rateLimiter = RateLimiter.create(this.config.getPermitsPerSecond());
    }

    @Override
    public boolean tryAcquire() {
        return this.tryAcquire(1);
    }

    @Override
    public boolean tryAcquire(int permits) {
        return rateLimiter.tryAcquire(permits);
    }

    @Override
    public boolean acquire(long timeout) {
        return this.acquire(1, timeout);
    }

    @Override
    public boolean acquire(int permits, long timeout) {
        long start = System.currentTimeMillis();
        for (;;) {
            boolean tryAcquire = rateLimiter.tryAcquire(permits);
            if (tryAcquire) {
                return true;
            }
            long end = System.currentTimeMillis();
            if ((end - start) >= timeout) {
                return false;
            }
        }
    }

}

切面攔截

/**
 * 限流切面
 * 
 */
@Aspect
@Order(-1)
@Component
public class RateLimitAspect {
    private static final Logger LOG = LoggerFactory.getLogger(RateLimitAspect.class);

    private RateLimitConfig config;

    private IRateLimitService rateLimitService;

    @Autowired
    public RateLimitAspect(RateLimitConfig config, IRateLimitService rateLimitService) {
        this.config = config;
        this.rateLimitService = rateLimitService;
    }

    @Pointcut("execution(public * xxx.xxx.*Controller.*(..))")
    public void executionMethod() {}

    @Around(value = "executionMethod()")
    public Object doRateLimit(ProceedingJoinPoint pjp) throws Throwable {
        if (LOG.isDebugEnabled()) {
            LOG.debug("進入限流處理切面!");
        }
        Object result = null;
        // 判斷是否限流
        try {
            if (config.isDoRateLimit()) {
                // 開啟限流
                boolean acquireResult = false;
                // 1.查看是否配置超時時間
                if (config.getWaitTimeout() == 0L) {
                    // 2.獲取令牌
                    acquireResult = rateLimitService.tryAcquire();
                } else {
                    // 2.獲取令牌,超時時間內獲取令牌
                    acquireResult = rateLimitService.acquire(config.getWaitTimeout());
                }
                if (acquireResult) {
                    // 3.成功獲取令牌,放行
                    result = pjp.proceed();
                } else {
                    // 3.失敗獲取令牌,返回錯誤碼 429 => to many requests
                    result = ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).build();
                }
            } else {
                // 無開啟限流,直接放行
                result = pjp.proceed();
            }
        } catch (Throwable e) {
            throw e;
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("限流處理切面結束!");
        }
        return result;
    }

}

測試

測試環境

  • Jmeter測試
  • 請求數據:{"name":"張三","age":12}
  • 響應結果:{"name":"張三","age":12}
  • 業務處理時間:100ms
  • 設置TPS:1500

測試結果

Transaction per Second(TPS)

Hits per Second(每秒請求數)

響應時間

聚合報告

  • 成功請求:

  • 錯誤請求:

  • 全部請求:

總結

  1. 在每秒2萬多的請求下,TPS依舊穩定在1500。
  2. 采用切面的方式,應用無感知。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM