一個高效qps統計工具


最近學習了LongAdder和Striped64,打算寫一個qps統計工具,剛好項目上也需要。

借鑒了一下前人的文章:https://www.cnblogs.com/ganRegister/p/9369131.html

 

上述文章核心思想還是基於固定窗口計算的,但是統計的時候應該要基於滑動窗口

 

優化點:

1、增加了一個最大qps的統計

2、對上鎖部分的代碼進行了一些簡化

3、修改了一個類成員的語義(Bucket.latestPassedTime --> Bucket.firstPassTime)

4、給出了基於滑動窗口統計的qps、最大qps方法

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;

public class RollingNumber {

    private int bucketNum;

    /**
     * 槽組
     */
    private Bucket[] buckets;

    /**
     * 時間片
     */
    private long bucketTimeSlice;


    /**
     * 目標槽位下標
     */
    private volatile Integer targetBucketPosition;

    /**
     * 臨界跨槽時的時間點
     */
    private volatile long lastPassTimeCloseToTargetBucket;

    /**
     * 刷新槽位時使用的鎖
     */
    private ReentrantLock enterNextBucketLock;


    /**
     * 固定窗口的最大qps
     */
    private volatile long maxSummary;

    private RollingNumber() {
        this(60, 1000);
    }

    private RollingNumber(int bucketNum, int millerSecond) {
        this.bucketNum = bucketNum;
        buckets = new Bucket[bucketNum];
        for (int i = 0; i < bucketNum; i++) {
            buckets[i] = new Bucket();
        }
        this.bucketTimeSlice = millerSecond;
        enterNextBucketLock = new ReentrantLock();
        this.lastPassTimeCloseToTargetBucket = System.currentTimeMillis() - (2 * bucketTimeSlice);
        maxSummary = 0;
    }

    /**
     * 默認60個槽位,槽位的時間片為1000毫秒
     */
    public static RollingNumber create() {
        return new RollingNumber();
    }

    public static RollingNumber create(int bucketNum, int millerSecond) {
        return new RollingNumber(bucketNum, millerSecond);
    }


    /**
     * 槽值滑動統計
     *
     * @return
     */
    public long summary() {
        long time = System.currentTimeMillis();
        int currentBucketIndex = (int) (time / bucketTimeSlice) % bucketNum;
        /*
         * qps統計后 lastPassTimeCloseToTargetBucket 會逐步趨近到槽界界點
         */
        long qps = 0;
        if (time - lastPassTimeCloseToTargetBucket >= bucketTimeSlice) {
            //是0忽略
        } else {
            int lastIndex = currentBucketIndex == 0 ? bucketNum - 1 : currentBucketIndex - 1;
            long duration = (time - lastPassTimeCloseToTargetBucket) % bucketTimeSlice;
            long slideCount = (long) (buckets[lastIndex].sum() * ((bucketTimeSlice - duration) % bucketTimeSlice) * 0.001);
            long currentSum = buckets[currentBucketIndex].sum();
            qps = slideCount + currentSum;
        }
        return qps;
    }

    /**
     * 歷史最大槽值統計
     *
     * @return
     */
    public long getMaxSummary() {
        if (maxSummary == 0) {
            return summary();
        }
        return maxSummary;
    }

    /**
     * 清理歷史記錄
     */
    public void clearMaxSummary() {
        enterNextBucketLock.lock();
        try {
            maxSummary = 0;
        } finally {
            enterNextBucketLock.unlock();
        }
    }

    /**
     * 計數
     */
    public void click() {
        long passTime = System.currentTimeMillis();
        if (targetBucketPosition == null) {
            targetBucketPosition = (int) (passTime / bucketTimeSlice) % bucketNum;
        }
        Bucket currentBucket = buckets[targetBucketPosition];
        if (passTime - lastPassTimeCloseToTargetBucket >= bucketTimeSlice) {
            if (enterNextBucketLock.isLocked()) {
                //忽略跳過
            } else {
                enterNextBucketLock.lock();  //可以嘗試用tryLock
                try {
                    if (passTime - lastPassTimeCloseToTargetBucket >= bucketTimeSlice) {
                        int nextTargetBucketPosition = (int) (passTime / bucketTimeSlice) % bucketNum;
                        Bucket nextBucket = buckets[nextTargetBucketPosition];
                        if (!nextBucket.equals(currentBucket)) {
                            //跨槽
                            long summary = buckets[targetBucketPosition].sum();
                            if (summary > maxSummary) {
                                maxSummary = summary;
                            }
                            nextBucket.reset(passTime);
                            //目標槽位變動
                            targetBucketPosition = nextTargetBucketPosition;
                        }
                        lastPassTimeCloseToTargetBucket = passTime;
                        nextBucket.incr();
                        return;
                    } else {
                        currentBucket = buckets[targetBucketPosition];
                    }
                } finally {
                    enterNextBucketLock.unlock();
                }
            }
        } else {
            //沒到接近跨槽臨界值
        }
        currentBucket.incr();
    }

    private static final class Bucket {
        /**
         * 槽內計數器
         */
        private LongAdder adder;

        /**
         * 第一次時間,只記錄一次
         */
        private long firstPassTime;

        public Bucket() {
            adder = new LongAdder();
            firstPassTime = System.currentTimeMillis();
        }

        /**
         * 計數
         */
        public void incr() {
            adder.increment();
        }

        /**
         * 重制
         */
        public void reset(long time) {
            adder.reset();
            firstPassTime = time;
        }

        /**
         * 統計
         *
         * @return
         */
        public long sum() {
            return adder.sum();
        }

        public long getFirstPassTime() {
            return firstPassTime;
        }

        public long sumThenReset() {
            return adder.sumThenReset();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        RollingNumber rollingNumber = new RollingNumber();
        int threadNum = 10;
        int rollingCnt = 3000;
        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        List<Thread> threadList = new ArrayList<Thread>();
        Random random = new Random();
        for (int i = 0; i < threadNum; i++) {
            threadList.add(new Thread() {
                public void run() {
                    for (int i = 0; i < rollingCnt; i++) {
                        //這里可以適當增加隨機延時看效果
//                        try {
//                            TimeUnit.MILLISECONDS.sleep(2L);
//                        } catch (InterruptedException e) {
//                            e.printStackTrace();
//                        }
                        rollingNumber.click();
                    }
                    countDownLatch.countDown();
                }
            });
        }

        long startTime = System.currentTimeMillis();
        for (Thread thread : threadList) {
            thread.start();
        }
        countDownLatch.await();
        long endTime = System.currentTimeMillis();
        long totalTime = endTime - startTime;
        System.out.println("totalMilliseconds:  " + totalTime);
        System.out.println("current qps is " + rollingNumber.summary());
        System.out.println("max qps is " + rollingNumber.getMaxSummary());
    }

}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM