最近學習了LongAdder和Striped64,打算寫一個qps統計工具,剛好項目上也需要。
借鑒了一下前人的文章:https://www.cnblogs.com/ganRegister/p/9369131.html
上述文章核心思想還是基於固定窗口計算的,但是統計的時候應該要基於滑動窗口
優化點:
1、增加了一個最大qps的統計
2、對上鎖部分的代碼進行了一些簡化
3、修改了一個類成員的語義(Bucket.latestPassedTime --> Bucket.firstPassTime)
4、給出了基於滑動窗口統計的qps、最大qps方法
import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; public class RollingNumber { private int bucketNum; /** * 槽組 */ private Bucket[] buckets; /** * 時間片 */ private long bucketTimeSlice; /** * 目標槽位下標 */ private volatile Integer targetBucketPosition; /** * 臨界跨槽時的時間點 */ private volatile long lastPassTimeCloseToTargetBucket; /** * 刷新槽位時使用的鎖 */ private ReentrantLock enterNextBucketLock; /** * 固定窗口的最大qps */ private volatile long maxSummary; private RollingNumber() { this(60, 1000); } private RollingNumber(int bucketNum, int millerSecond) { this.bucketNum = bucketNum; buckets = new Bucket[bucketNum]; for (int i = 0; i < bucketNum; i++) { buckets[i] = new Bucket(); } this.bucketTimeSlice = millerSecond; enterNextBucketLock = new ReentrantLock(); this.lastPassTimeCloseToTargetBucket = System.currentTimeMillis() - (2 * bucketTimeSlice); maxSummary = 0; } /** * 默認60個槽位,槽位的時間片為1000毫秒 */ public static RollingNumber create() { return new RollingNumber(); } public static RollingNumber create(int bucketNum, int millerSecond) { return new RollingNumber(bucketNum, millerSecond); } /** * 槽值滑動統計 * * @return */ public long summary() { long time = System.currentTimeMillis(); int currentBucketIndex = (int) (time / bucketTimeSlice) % bucketNum; /* * qps統計后 lastPassTimeCloseToTargetBucket 會逐步趨近到槽界界點 */ long qps = 0; if (time - lastPassTimeCloseToTargetBucket >= bucketTimeSlice) { //是0忽略 } else { int lastIndex = currentBucketIndex == 0 ? bucketNum - 1 : currentBucketIndex - 1; long duration = (time - lastPassTimeCloseToTargetBucket) % bucketTimeSlice; long slideCount = (long) (buckets[lastIndex].sum() * ((bucketTimeSlice - duration) % bucketTimeSlice) * 0.001); long currentSum = buckets[currentBucketIndex].sum(); qps = slideCount + currentSum; } return qps; } /** * 歷史最大槽值統計 * * @return */ public long getMaxSummary() { if (maxSummary == 0) { return summary(); } return maxSummary; } /** * 清理歷史記錄 */ public void clearMaxSummary() { enterNextBucketLock.lock(); try { maxSummary = 0; } finally { enterNextBucketLock.unlock(); } } /** * 計數 */ public void click() { long passTime = System.currentTimeMillis(); if (targetBucketPosition == null) { targetBucketPosition = (int) (passTime / bucketTimeSlice) % bucketNum; } Bucket currentBucket = buckets[targetBucketPosition]; if (passTime - lastPassTimeCloseToTargetBucket >= bucketTimeSlice) { if (enterNextBucketLock.isLocked()) { //忽略跳過 } else { enterNextBucketLock.lock(); //可以嘗試用tryLock try { if (passTime - lastPassTimeCloseToTargetBucket >= bucketTimeSlice) { int nextTargetBucketPosition = (int) (passTime / bucketTimeSlice) % bucketNum; Bucket nextBucket = buckets[nextTargetBucketPosition]; if (!nextBucket.equals(currentBucket)) { //跨槽 long summary = buckets[targetBucketPosition].sum(); if (summary > maxSummary) { maxSummary = summary; } nextBucket.reset(passTime); //目標槽位變動 targetBucketPosition = nextTargetBucketPosition; } lastPassTimeCloseToTargetBucket = passTime; nextBucket.incr(); return; } else { currentBucket = buckets[targetBucketPosition]; } } finally { enterNextBucketLock.unlock(); } } } else { //沒到接近跨槽臨界值 } currentBucket.incr(); } private static final class Bucket { /** * 槽內計數器 */ private LongAdder adder; /** * 第一次時間,只記錄一次 */ private long firstPassTime; public Bucket() { adder = new LongAdder(); firstPassTime = System.currentTimeMillis(); } /** * 計數 */ public void incr() { adder.increment(); } /** * 重制 */ public void reset(long time) { adder.reset(); firstPassTime = time; } /** * 統計 * * @return */ public long sum() { return adder.sum(); } public long getFirstPassTime() { return firstPassTime; } public long sumThenReset() { return adder.sumThenReset(); } } public static void main(String[] args) throws InterruptedException { RollingNumber rollingNumber = new RollingNumber(); int threadNum = 10; int rollingCnt = 3000; CountDownLatch countDownLatch = new CountDownLatch(threadNum); List<Thread> threadList = new ArrayList<Thread>(); Random random = new Random(); for (int i = 0; i < threadNum; i++) { threadList.add(new Thread() { public void run() { for (int i = 0; i < rollingCnt; i++) { //這里可以適當增加隨機延時看效果 // try { // TimeUnit.MILLISECONDS.sleep(2L); // } catch (InterruptedException e) { // e.printStackTrace(); // } rollingNumber.click(); } countDownLatch.countDown(); } }); } long startTime = System.currentTimeMillis(); for (Thread thread : threadList) { thread.start(); } countDownLatch.await(); long endTime = System.currentTimeMillis(); long totalTime = endTime - startTime; System.out.println("totalMilliseconds: " + totalTime); System.out.println("current qps is " + rollingNumber.summary()); System.out.println("max qps is " + rollingNumber.getMaxSummary()); } }