高並發之限流RateLimiter(二)


Guava RateLimiter提供了令牌桶算法實現:平滑突發限流(SmoothBursty)和平滑預熱限流(SmoothWarmingUp)實現。

SmoothBursty:令牌生成速度恆定

 @Test
    public void testAcquire() {
        // acquire(i); 獲取令牌,返回阻塞的時間,支持預消費.
        RateLimiter limiter = RateLimiter.create(1);

        for (int i = 1; i < 10; i++) {
            double waitTime = limiter.acquire();
            System.out.println("cutTime=" + longToDate(System.currentTimeMillis()) + " acq:" + i + " waitTime:" + waitTime);
        }
    }

    public static String longToDate(long lo){
        Date date = new Date(lo);
        SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return sd.format(date);
    }

輸出結果:

cutTime=2019-03-29 09:31:42 acq:1 waitTime:0.0
cutTime=2019-03-29 09:31:43 acq:2 waitTime:0.989135
cutTime=2019-03-29 09:31:44 acq:3 waitTime:0.998023
cutTime=2019-03-29 09:31:45 acq:4 waitTime:0.999573
cutTime=2019-03-29 09:31:46 acq:5 waitTime:0.999359
cutTime=2019-03-29 09:31:47 acq:6 waitTime:0.999566
cutTime=2019-03-29 09:31:48 acq:7 waitTime:0.998763
cutTime=2019-03-29 09:31:49 acq:8 waitTime:0.999163
cutTime=2019-03-29 09:31:50 acq:9 waitTime:1.000036

說明:每秒1個令牌生成一個令牌,從輸出可看出很平滑,這種實現將突發請求速率平均成固定請求速率。

下面demo是突發請求:

@Test
    public void testAcquire2() {
        // 請求突發
        RateLimiter limiter = RateLimiter.create(5);

        for (int i = 1; i < 5; i++) {
            double waitTime = 0;
            if(i == 2){
                waitTime = limiter.acquire(10);
            }else{
                waitTime = limiter.acquire(1);
            }

            System.out.println("cutTime=" + longToDate(System.currentTimeMillis()) + " acq:" + i + " waitTime:" + waitTime);
        }
    }

輸出:

cutTime=2019-03-29 09:53:55 acq:1 waitTime:0.0
cutTime=2019-03-29 09:53:56 acq:2 waitTime:0.188901
cutTime=2019-03-29 09:53:58 acq:3 waitTime:1.99789
cutTime=2019-03-29 09:53:58 acq:4 waitTime:0.198832

說明:

i=1,消費i個令牌,此時還剩4個令牌;

i=2,突發10個請求,令牌桶算法也允許了這種突發(允許消費未來的令牌);

i=3,上次請求消費了,所以需要等待2s;

 

下面看源碼:


 

簡單介紹下:Stopwatch

public final class Stopwatch {
    private final Ticker ticker;//計時器,用於獲取當前時間
    private boolean isRunning;//計時器是否運行中的狀態標記
    private long elapsedNanos;//用於標記從計時器開啟到調用統計的方法時過去的時間
    private long startTick;//計時器開啟的時刻時間

    private long elapsedNanos() {
        return this.isRunning ? this.ticker.read() - this.startTick + this.elapsedNanos : this.elapsedNanos;
    }
    public long elapsed(TimeUnit desiredUnit) {
        return desiredUnit.convert(this.elapsedNanos(), TimeUnit.NANOSECONDS);
    }
 }

TimeUnit:

MILLISECONDS {
        public long toNanos(long d)   { return x(d, C2/C0, MAX/(C2/C0)); }
        public long toMicros(long d)  { return x(d, C2/C1, MAX/(C2/C1)); }
        public long toMillis(long d)  { return d; }
        public long toSeconds(long d) { return d/(C3/C2); }
        public long toMinutes(long d) { return d/(C4/C2); }
        public long toHours(long d)   { return d/(C5/C2); }
        public long toDays(long d)    { return d/(C6/C2); }
        public long convert(long d, TimeUnit u) { return u.toMillis(d); } int excessNanos(long d, long m) { return 0; }
    },
 
 MICROSECONDS {
        public long toNanos(long d)   { return x(d, C1/C0, MAX/(C1/C0)); }
        public long toMicros(long d)  { return d; }
        public long toMillis(long d)  { return d/(C2/C1); }
        public long toSeconds(long d) { return d/(C3/C1); }
        public long toMinutes(long d) { return d/(C4/C1); }
        public long toHours(long d)   { return d/(C5/C1); }
        public long toDays(long d)    { return d/(C6/C1); }
        public long convert(long d, TimeUnit u) { return u.toMicros(d); } int excessNanos(long d, long m) { return (int)((d*C1) - (m*C2)); }
    },
    
NANOSECONDS {
        public long toNanos(long d)   { return d; }
        public long toMicros(long d)  { return d/(C1/C0); } public long toMillis(long d)  { return d/(C2/C0); } public long toSeconds(long d) { return d/(C3/C0); }
        public long toMinutes(long d) { return d/(C4/C0); }
        public long toHours(long d)   { return d/(C5/C0); }
        public long toDays(long d)    { return d/(C6/C0); }
        public long convert(long d, TimeUnit u) { return u.toNanos(d); }
        int excessNanos(long d, long m) { return (int)(d - (m*C2)); }
},

其中:

static final long C0 = 1L;
static final long C1 = C0 * 1000L;
static final long C2 = C1 * 1000L;
static final long C3 = C2 * 1000L;
static final long C4 = C3 * 60L;
static final long C5 = C4 * 60L;
static final long C6 = C5 * 24L;

 

@Test
public void stopwatch1() {
    Stopwatch stopwatch = Stopwatch.createStarted();

    doSomething();
    stopwatch.stop(); // optional
    long millis = stopwatch.elapsed(MILLISECONDS);
    System.out.println("time: " + stopwatch);
}

@Test
public void stopwatch2() {
    Stopwatch stopwatch = Stopwatch.createStarted();
    //doSomething();
    stopwatch.stop();
    long millis = stopwatch.elapsed(MILLISECONDS);
    System.out.println("time: " + stopwatch);

    stopwatch.reset().start();
    //doSomething();
    stopwatch.stop();
    millis = stopwatch.elapsed(MILLISECONDS);
    System.out.println("time: " + stopwatch);
}

public static void doSomething(){
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

stopwatch1結果:

time: 100.8 ms

執行過程:

使用stopwatch對程序運行時間進行調試,首先調用 StopWatch.createStarted()創建並啟動一個stopwatch實例,調用stopwatch.stop()停止計時,此時會更新stopwatch的elapsedNanos時間,為stopwatch開始啟動到結束計時的時間,再次調用stopwatch.elapsed(),獲取stopwatch在start-stop時間段,時間流逝的長度。

RateLimiter.class

public static RateLimiter create(double permitsPerSecond) {
        return create(permitsPerSecond, RateLimiter.SleepingStopwatch.createFromSystemTimer());//Stopwatch類稍后
    }

    @VisibleForTesting
    static RateLimiter create(double permitsPerSecond, RateLimiter.SleepingStopwatch stopwatch) {
        RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0D);
        rateLimiter.setRate(permitsPerSecond);
        return rateLimiter;
    }
    
    public final void setRate(double permitsPerSecond) {
        Preconditions.checkArgument(permitsPerSecond > 0.0D && !Double.isNaN(permitsPerSecond), "rate must be positive");
        synchronized(this.mutex()) {
            this.doSetRate(permitsPerSecond, this.stopwatch.readMicros());
        }
    }
    
    abstract void doSetRate(double var1, long var3);
說明:this.stopwatch.readMicros());源碼最終調用的是
NANOSECONDS {
public long toNanos(long d) { return d; }
public long toMicros(long d) { return d/(C1/C0); } //return (stopwatch中的elapsedNanos,表示時間差)/(1L * 1000L/1L)
}

SmoothRateLimiter

final void doSetRate(double permitsPerSecond, long nowMicros) {
    this.resync(nowMicros);
    double stableIntervalMicros = (double)TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    this.doSetRate(permitsPerSecond, stableIntervalMicros);
}
abstract void doSetRate(double var1, double var3);

void resync(long nowMicros) {
    if (nowMicros > this.nextFreeTicketMicros) {
        //相當於(double)(nowMicros - this.nextFreeTicketMicros) * (permitsPerSecond double)TimeUnit.SECONDS.toMicros(1L)) //令牌生成速率:xx/單位時間
        double newPermits = (double)(nowMicros - this.nextFreeTicketMicros) / this.coolDownIntervalMicros();
        this.storedPermits = Math.min(this.maxPermits, this.storedPermits + newPermits);
        this.nextFreeTicketMicros = nowMicros;
    }
}

說明:

nowMicros:表示用於標記從計時器開啟到調用統計的方法時過去的時間
coolDownIntervalMicros:添加令牌時間間隔
stableIntervalMicros:添加令牌時間間隔 = (double)TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌數)
newPermits:時間段內新生令牌數
storedPermits:當前令牌數

nextFreeTicketMicros:

下一次請求可以獲取令牌的起始時間,由於RateLimiter允許預消費,上次請求預消費令牌后,下次請求需要等待相應的時間到nextFreeTicketMicros時刻才可以獲取令牌

SmoothBursty

static final class SmoothBursty extends SmoothRateLimiter {
        final double maxBurstSeconds;

        SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
            super(stopwatch, null);
            this.maxBurstSeconds = maxBurstSeconds;//在RateLimiter未使用時,最多存儲幾秒的令牌
        }

        void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
            double oldMaxPermits = this.maxPermits;
            this.maxPermits = this.maxBurstSeconds * permitsPerSecond;
            if (oldMaxPermits == 1.0D / 0.0) { //相當於oldMaxPermits ==Double.POSITIVE_INFINITY ,Double.POSITIVE_INFINITY 表示無窮大
                
                this.storedPermits = this.maxPermits;
            } else {
                this.storedPermits = oldMaxPermits == 0.0D ? 0.0D : this.storedPermits * this.maxPermits / oldMaxPermits;
            }

        }

        long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
            return 0L;
        }

        double coolDownIntervalMicros() {
            return this.stableIntervalMicros;
        }
    }

參數說明:

maxBurstSeconds:在RateLimiter未使用時,最多存儲幾秒的令牌
permitsPerSecond: 速率=令牌數/每秒
maxPermits :最大存儲令牌數 = maxBurstSeconds * permitsPerSecond
storedPermits: 當前存儲令牌數

RateLimiter幾個常用接口分析

1、acquire() 函數主要用於獲取permits個令牌,並計算需要等待多長時間,進而掛起等待,並將該值返回

RateLimiter.calss

@CanIgnoreReturnValue
public double acquire() {
  return acquire(1);
}

/**
* 獲取令牌,返回阻塞的時間
**/
@CanIgnoreReturnValue
public double acquire(int permits) {
  long microsToWait = reserve(permits);
  
  //獲取等待時間后,阻塞線程
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

final long reserve(int permits) {
  checkPermits(permits);
  synchronized (mutex()) {
    return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  }
}

final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = this.reserveEarliestAvailable(permits, nowMicros);
    return Math.max(momentAvailable - nowMicros, 0L);
}
abstract long reserveEarliestAvailable(int var1, long var2);

SmoothRateLimiter.class

final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
        this.resync(nowMicros);
        long returnValue = this.nextFreeTicketMicros;//resync()方法后,如果nowMicros > this.nextFreeTicketMicros,等於nowMicros
        
        double storedPermitsToSpend = Math.min((double)requiredPermits, this.storedPermits);
        //freshPermits從令牌桶中獲取令牌后還需要的令牌數量
        double freshPermits = (double)requiredPermits - storedPermitsToSpend;
        
        //平滑這里this.storedPermitsToWaitTime()直接返回0L + 還需要令牌數量/速率(需要的時間)
        long waitMicros = this.storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long)(freshPermits * this.stableIntervalMicros);
        
        //如果超前消費,將導致下次請求等待時間=LongMath.saturatedAdd(this.nextFreeTicketMicros, waitMicros);
        this.nextFreeTicketMicros = LongMath.saturatedAdd(this.nextFreeTicketMicros, waitMicros);
        this.storedPermits -= storedPermitsToSpend;
        return returnValue;
    }

 

2、tryAcquire()

函數可以嘗試在timeout時間內獲取令牌,如果可以則掛起等待相應時間並返回true,否則立即返回false

 public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
        long timeoutMicros = Math.max(unit.toMicros(timeout), 0L);//超時時間
        checkPermits(permits);
        long microsToWait;
        synchronized(this.mutex()) {
            long nowMicros = this.stopwatch.readMicros();
            if (!this.canAcquire(nowMicros, timeoutMicros)) {
                return false;
            }
            //獲取需要阻塞時間
            microsToWait = this.reserveAndGetWaitLength(permits, nowMicros);
        }

        this.stopwatch.sleepMicrosUninterruptibly(microsToWait);
        return true;
    }

    private boolean canAcquire(long nowMicros, long timeoutMicros) {
        //下一次請求可以獲取令牌的起始時間
        return this.queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
    }
canAcquire用於判斷timeout時間內是否可以獲取令牌,通過判斷當前時間+超時時間是否大於nextFreeTicketMicros 來決定是否能夠拿到足夠的令牌數,如果可以獲取到,則過程同acquire,線程sleep等待,如果通過 canAcquire在此超時時間內不能回去到令牌,則可以快速返回,不需要等待timeout后才知道能否獲取到令牌。

 

SmoothWarmingUp:令牌生成速度緩慢提升直到維持在一個穩定值

SmoothWarmingUp創建方式:RateLimiter.create(doublepermitsPerSecond, long warmupPeriod, TimeUnit unit)

permitsPerSecond表示每秒新增的令牌數,warmupPeriod表示在從冷啟動速率過渡到平均速率的時間間隔。

@Test
    public void acquire1() {
        RateLimiter limiter = RateLimiter.create(5, 1000, TimeUnit.MILLISECONDS);
        for (int i = 1; i < 6; i++) {
            System.out.println(limiter.acquire());
        }

        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (int i = 1; i < 6; i++) {
            System.out.println(limiter.acquire());
        }
    }

 結果:

0.0
0.518741
0.357811
0.219877
0.199584
0.0
0.361189
0.220761
0.19938
0.199856

速率是梯形上升速率的,也就是說冷啟動時會以一個比較大的速率慢慢到平均速率;然后趨於平均速率(梯形下降到平均速率)。可以通過調節warmupPeriod參數實現一開始就是平滑固定速率。

 

參考:

https://www.cnblogs.com/xuwc/p/9123078.html

https://www.cnblogs.com/xuwc/p/9123078.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM