一種高效的QPS統計方法


一、概述

對QPS的統計,通常是對一秒內各線程通過數據處理鏈中某一個切入點的次數進行累加計數。且不論采用何種方式都繞不開鎖,那如何結合QPS統計的場景,減少線程之間對鎖的競爭,是各實現方法考慮的重點問題。針對這個問題,Netflix的Hystrix限流組件中實現了一個十分高效的可進行QPS統計的工具類HystrixRollingNumber.java,該工具類也是集團限流工具Sentinel的核心。而該工具類的實現較為復雜,本文在該工具類的基礎上做了一些實現上的改進,並在mac pro上用4個線程進行了測試,得到了如下對比圖。從對比圖可看出,優化后的方案可穩定提升效率,降低耗時,最高可達30%。

Hystrix與New Approach(改進方案)測試對比圖:(縱軸:單位為毫秒,橫軸:每個線程的總計數值)

              QPS對比

 

二、具體實現

1)Hystrix組件所用的方案

時間片、定長數組、Striped64的結合。既HystrixRollingNumber.java中的實現。

具體參考:https://github.com/Netflix/Hystrix

 

2)Hystrix所用方案的優缺點

對比每一個請求都加鎖來統計計數,Hystrix通過盡量減少加鎖次數,降低加鎖粒度,從而降低線程之間對鎖的競爭,在效率上有指數級的提升,實現上也完全面向對象,但對循環數組的維護稍微復雜了一些,使得線程定位Bucket也變得復雜,可能耗時稍長。

 

3)改進思路

主要兩點:

一是在循環數組的維護與Bucket的定位上,實際可不需要Hystrix中那么多的判斷條件,直接利用當前時間取余定位到循環數組中Bucket的位置,同時可通過比較當前時間點與接近上一次進入統計方法的時間點及每個Bucket對應時間片的關系即可知道是否應該進入下一個Bucket,最后計算QPS通過收集該定長數組中的數據即可。二是在一的基礎上,盡量減少線程從進入統計方法到加數器LongAdder之間的耗時(此處即使多了一次賦值操作在性能上可能都會有較大影響)。

 

4)代碼實現

 

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Author: gxj
 */
public class QPSCalculator {


    private RollingNumber rollingNumber;

    public QPSCalculator() {
        this.rollingNumber = new RollingNumber();
    }


    public void pass() {
        rollingNumber.record();
    }


    private static final class RollingNumber {
        /**
         * 槽位的數量
         */
        private int sizeOfBuckets;
        /**
         * 時間片,單位毫秒
         */
        private int unitOfTimeSlice;
        /**
         * 用於判斷是否可跳過鎖爭搶
         */
        private int timeSliceUsedToCheckIfPossibleToBypass;
        /**
         * 槽位
         */
        private Bucket[] buckets;
        /**
         * 目標槽位的位置
         */
        private volatile Integer targetBucketPosition;
        /**
         * 接近目標槽位最新更新時間的時間
         */
        private volatile Long latestPassedTimeCloseToTargetBucket;
        /**
         * 進入下一個槽位時使用的鎖
         */
        private ReentrantLock enterNextBucketLock;
        /**
         * 默認60個槽位,槽位的時間片為1000毫秒
         */
        public RollingNumber() {
            this(60, 1000);
        }
        /**
         * 初始化Bucket數量與每個Bucket的時間片等
         *
         * @param sizeOfBuckets
         * @param unitOfTimeSlice
         */
        public RollingNumber(int sizeOfBuckets, int unitOfTimeSlice) {
            this.latestPassedTimeCloseToTargetBucket = System.currentTimeMillis() - (2 * unitOfTimeSlice);
            this.targetBucketPosition = null;
            this.sizeOfBuckets = sizeOfBuckets;
            this.unitOfTimeSlice = unitOfTimeSlice;
            this.enterNextBucketLock = new ReentrantLock();
            this.buckets = new Bucket[sizeOfBuckets];
            this.timeSliceUsedToCheckIfPossibleToBypass = 3 * unitOfTimeSlice;
            for (int i = 0; i < sizeOfBuckets; i++) {
                this.buckets[i] = new Bucket();
            }
        }


        private void record() {
            long passTime = System.currentTimeMillis();
            if (targetBucketPosition == null) {
                targetBucketPosition = (int) (passTime / unitOfTimeSlice) % sizeOfBuckets;
            }
            Bucket currentBucket = buckets[targetBucketPosition];
            if (passTime - latestPassedTimeCloseToTargetBucket >= unitOfTimeSlice) {
                if (enterNextBucketLock.isLocked() && (passTime - latestPassedTimeCloseToTargetBucket) < timeSliceUsedToCheckIfPossibleToBypass) {
                } else {
                    try {
                        enterNextBucketLock.lock();
                        if (passTime - latestPassedTimeCloseToTargetBucket >= unitOfTimeSlice) {
                            int nextTargetBucketPosition = (int) (passTime / unitOfTimeSlice) % sizeOfBuckets;
                            Bucket nextBucket = buckets[nextTargetBucketPosition];
                            if (nextBucket.equals(currentBucket)) {
                                if (passTime - latestPassedTimeCloseToTargetBucket >= unitOfTimeSlice) {
                                    latestPassedTimeCloseToTargetBucket = passTime;
                                }
                            } else {
                                nextBucket.reset(passTime);
                                targetBucketPosition = nextTargetBucketPosition;
                                latestPassedTimeCloseToTargetBucket = passTime;
                            }
                            nextBucket.pass();
                            return;
                        } else {
                            currentBucket = buckets[targetBucketPosition];
                        }
                    } finally {
                        enterNextBucketLock.unlock();
                    }
                }
            }
            currentBucket.pass();
        }

        public Bucket[] getBuckets() {
            return buckets;
        }
    }


    private static class Bucket implements Serializable {

        private static final long serialVersionUID = -9085720164508215774L;

        private Long latestPassedTime;

        private LongAdder longAdder;

        public Bucket() {
            this.latestPassedTime = System.currentTimeMillis();
            this.longAdder = new LongAdder();
        }


        public void pass() {
            longAdder.add(1);
        }

        public long countTotalPassed() {
            return longAdder.sum();
        }

        public long getLatestPassedTime() {
            return latestPassedTime;
        }

        public void reset(long latestPassedTime) {
            this.longAdder.reset();
            this.latestPassedTime = latestPassedTime;
        }
    }



    public static void main(String[] args) {
        try {
            final QPSCalculator qpsCalculator = new QPSCalculator();
            int threadNum = 4;
            CountDownLatch countDownLatch = new CountDownLatch(threadNum);
            List<Thread> threadList = new ArrayList<Thread>();
            for (int i = 0; i < threadNum; i++) {
                threadList.add(new Thread() {
                    public void run() {
                        for (int i = 0; i < 50000000; i++) {
                            qpsCalculator.pass();
                        }
                        countDownLatch.countDown();
                    }
                });
            }

            long startTime = System.currentTimeMillis();
            for (Thread thread : threadList) {
                thread.start();
            }
            countDownLatch.await();
            long endTime = System.currentTimeMillis();
            long totalTime = endTime - startTime;
            System.out.print("totalMilliseconds:  " + totalTime);
            TimeUnit.SECONDS.sleep(1000L);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM