統計接口QPS


  現在記錄話單的時候想加一個參數:每秒接口調用的並發量,也就是所謂的QPS(Queries per second)。QPS即每秒請求數,是對一個特定的接口在規定時間內請求流量的衡量標准。那么如何實現QPS的計算呢?我想到的是兩種方案:

  1、一定時間內(比如一分鍾)的請求總量/統計時間段(比如一分鍾),最終得出就是每秒的並發量,它是基於某一段時間來統計的

  2、直接統計一秒鍾內的請求總量,就是按每秒的時間段來統計,簡單粗暴

  方案一的適用場景應該是報表、運維統計之類的,只關心QPS曲線;如果用來做並發量校驗,明顯只能用方案二,需要實時獲取QPS。那么如何統計一秒內的並發量?假設某一個時間點有接口到來,那么就開始統計該接口,在一秒之內,來多少個累加多少次。一秒之后,統計數清零。之后的某一個時間點,又有接口到來,又開始統計一秒之內的接口調用量,如此循環往復。

  那么如何維護一個一秒之內的接口計數器呢?我覺得失效緩存是一個合適的選擇,緩存的鍵即為接口名,值就是接口統計數,過期時間一秒。為了避免引入第三方中間件,我們自己實現該過期緩存,需要維護一個定時器和一個優先級隊列,每秒清理一次隊列中已過期的緩存。

  廢話說完了,看代碼:

  1、緩存的值

import lombok.Getter;
import lombok.Setter;

import java.util.concurrent.atomic.AtomicLong;

/**
 * 內部類,緩存對象,按失效時間排序,越早失效越前
 * @author wulf
 * @since 20200422
 */
@Getter
@Setter
public class CacheNode implements Comparable<CacheNode> {
    private String key;
    private AtomicLong callQuantity;
    private long expireTime;

    public CacheNode(String key, AtomicLong callQuantity, long expireTime) {
        this.key = key;
        this.callQuantity = callQuantity;
        this.expireTime = expireTime;
    }


    @Override
    public int compareTo(CacheNode o) {
        long dif = this.expireTime - o.expireTime;
        if (dif > 0) {
            return 1;
        } else if (dif < 0) {
            return -1;
        }
        return 0;
    }
}

 

  2、過期緩存:

import com.wlf.bean.CacheNode;

import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 帶過期時間的緩存
 *
 * @author wulf
 * @since 2020/04/21
 */
public class ExpiredCache {

    // 緩存key=接口名,value=接口調用量、過期時間戳
    private Map<String, CacheNode> cache = new ConcurrentHashMap<>();// 重入鎖
    private ReentrantLock lock = new ReentrantLock();

    // 失效隊列
    private PriorityQueue<CacheNode> queue = new PriorityQueue<>();

    // 啟動定時任務,每秒清理一次過期緩存
    private final static ScheduledExecutorService scheduleExe = new ScheduledThreadPoolExecutor(10);

    // 構造函數中啟動定時任務,執行對已過期緩存的清理工作,每秒執行一次
    public ExpiredCache() {
        scheduleExe.scheduleAtFixedRate(new CleanExpireCacheTask(), 1L, 1L, TimeUnit.SECONDS);
    }

    /**
     * 內部類,清理過期緩存對象
     */
    private class CleanExpireCacheTask implements Runnable {

        @Override
        public void run() {
            long currentTime = System.currentTimeMillis();
            // 取出隊列中的隊頭元素,對已過期的元素執行清除計划,剩下沒有過期則退出
            while (true) {
                lock.lock();
                try {
                    CacheNode cacheNode = queue.peek();
                    // 已經把隊列清空了,或者所有過期元素已清空了,退出
                    if (cacheNode == null || cacheNode.getExpireTime() > currentTime) {
                        return;
                    }

                    // 開始大清理了
                    cache.remove(cacheNode.getKey());
                    queue.poll();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    /**
     * 根據緩存key獲取values
     *
     * @param cacheKey
     * @return
     */
    public CacheNode getCacheNode(String cacheKey) {
        return cache.get(cacheKey);
    }

    /**
     * 加入緩存,設置存活時間
     *
     * @param cacheKey
     * @param ttl      緩存的存活時間
     *                 return
     */
    public AtomicLong set(String cacheKey, long ttl) {

        // 若緩存中已存在緩存節點,不需要更新過期時間,僅更新QPS值
        CacheNode oldNode = cache.get(cacheKey);
        if (oldNode != null) {
            AtomicLong oldQps = oldNode.getCallQuantity();
            oldQps.incrementAndGet();
        } else {
            // 否則新創建CacheNode對象,失效時間=當前時間+緩存存活時間
            AtomicLong qps = new AtomicLong(1);
            CacheNode newNode = new CacheNode(cacheKey, qps, System.currentTimeMillis() + ttl * 1000);

            // 放入緩存,加入過期隊列
            cache.put(cacheKey, newNode);
            queue.add(newNode);
        }
        return cache.get(cacheKey).getCallQuantity();
    }

}

 

  3、在切面中統計接口QPS:

package com.wlf.cdr;

import com.wlf.javabean.ots.TranslateCdr;
import com.wlf.utils.ExpiredCache;
import com.wlf.utils.IPUtil;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import java.text.SimpleDateFormat;
import java.util.Date;

@Slf4j
@Aspect
@Component
public class CdrAsept {
    private final static SimpleDateFormat SF = new SimpleDateFormat("yyyyMMddHHmmss");

    // 話單格式:接口名稱|話單記錄時間|接口時延|調用方IP|本地IP|用戶ID|用戶名|源語言|目標語言|結果碼|QPS
    private final static String CDR_FORMAT = "{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}";

    // 過期緩存
    private ExpiredCache expiredCache = new ExpiredCache();

    @Around("execution(* com.wlf.translateprovider.controller.TranslateController.*(..))")
    public Object recordCdr(ProceedingJoinPoint joinPoint) throws Throwable {

        long startTime = System.currentTimeMillis();
        String startDate = SF.format(new Date(startTime));

        // 白名單校驗
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest httpServletRequest = attributes.getRequest();
        String localIp = IPUtil.getLocalIp();
        String remoteIp = IPUtil.getRemoteIp(httpServletRequest);
        TranslateCdr cdr = new TranslateCdr();
        cdr.setRemoteIp(remoteIp);
        CdrThreadLocal.setTranslateCdr(cdr);

        // 獲取接口名
        String requestPath = httpServletRequest.getRequestURI();
        String cacheKey = requestPath.substring(requestPath.lastIndexOf("/") + 1, requestPath.length());

        // 設置過期時間為1秒
        long qps = expiredCache.set(cacheKey, 1).get();

        Object result = joinPoint.proceed();

        long endTime = System.currentTimeMillis();
        cdr = CdrThreadLocal.getTranslateCdr();
        if (cdr != null) {
            log.error(CDR_FORMAT, cacheKey, startDate, endTime - startTime, remoteIp, localIp, cdr.getUserId(),
                    cdr.getUserName(), cdr.getFrom(), cdr.getTo(), cdr.getResultCode(), qps);
        }
        CdrThreadLocal.delThreadLocal();
        return result;
    }
}

 

  在切面中只需set一下,如果這時緩存有數據,就累加統計數,沒有就設置統計數為1,再get出來的得到QPS。但這里為了兼顧吞吐量,讓接口的調用不受QPS統計的影響,並沒有在切面或者過期緩存的set方法加鎖,因此對兩個並發時間很短的接口,統計數會相同。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM