現在記錄話單的時候想加一個參數:每秒接口調用的並發量,也就是所謂的QPS(Queries per second)。QPS即每秒請求數,是對一個特定的接口在規定時間內請求流量的衡量標准。那么如何實現QPS的計算呢?我想到的是兩種方案:
1、一定時間內(比如一分鍾)的請求總量/統計時間段(比如一分鍾),最終得出就是每秒的並發量,它是基於某一段時間來統計的
2、直接統計一秒鍾內的請求總量,就是按每秒的時間段來統計,簡單粗暴
方案一的適用場景應該是報表、運維統計之類的,只關心QPS曲線;如果用來做並發量校驗,明顯只能用方案二,需要實時獲取QPS。那么如何統計一秒內的並發量?假設某一個時間點有接口到來,那么就開始統計該接口,在一秒之內,來多少個累加多少次。一秒之后,統計數清零。之后的某一個時間點,又有接口到來,又開始統計一秒之內的接口調用量,如此循環往復。
那么如何維護一個一秒之內的接口計數器呢?我覺得失效緩存是一個合適的選擇,緩存的鍵即為接口名,值就是接口統計數,過期時間一秒。為了避免引入第三方中間件,我們自己實現該過期緩存,需要維護一個定時器和一個優先級隊列,每秒清理一次隊列中已過期的緩存。
廢話說完了,看代碼:
1、緩存的值
import lombok.Getter; import lombok.Setter; import java.util.concurrent.atomic.AtomicLong; /** * 內部類,緩存對象,按失效時間排序,越早失效越前 * @author wulf * @since 20200422 */ @Getter @Setter public class CacheNode implements Comparable<CacheNode> { private String key; private AtomicLong callQuantity; private long expireTime; public CacheNode(String key, AtomicLong callQuantity, long expireTime) { this.key = key; this.callQuantity = callQuantity; this.expireTime = expireTime; } @Override public int compareTo(CacheNode o) { long dif = this.expireTime - o.expireTime; if (dif > 0) { return 1; } else if (dif < 0) { return -1; } return 0; } }
2、過期緩存:
import com.wlf.bean.CacheNode; import java.util.Map; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; /** * 帶過期時間的緩存 * * @author wulf * @since 2020/04/21 */ public class ExpiredCache { // 緩存key=接口名,value=接口調用量、過期時間戳 private Map<String, CacheNode> cache = new ConcurrentHashMap<>();// 重入鎖 private ReentrantLock lock = new ReentrantLock(); // 失效隊列 private PriorityQueue<CacheNode> queue = new PriorityQueue<>(); // 啟動定時任務,每秒清理一次過期緩存 private final static ScheduledExecutorService scheduleExe = new ScheduledThreadPoolExecutor(10); // 構造函數中啟動定時任務,執行對已過期緩存的清理工作,每秒執行一次 public ExpiredCache() { scheduleExe.scheduleAtFixedRate(new CleanExpireCacheTask(), 1L, 1L, TimeUnit.SECONDS); } /** * 內部類,清理過期緩存對象 */ private class CleanExpireCacheTask implements Runnable { @Override public void run() { long currentTime = System.currentTimeMillis(); // 取出隊列中的隊頭元素,對已過期的元素執行清除計划,剩下沒有過期則退出 while (true) { lock.lock(); try { CacheNode cacheNode = queue.peek(); // 已經把隊列清空了,或者所有過期元素已清空了,退出 if (cacheNode == null || cacheNode.getExpireTime() > currentTime) { return; } // 開始大清理了 cache.remove(cacheNode.getKey()); queue.poll(); } finally { lock.unlock(); } } } } /** * 根據緩存key獲取values * * @param cacheKey * @return */ public CacheNode getCacheNode(String cacheKey) { return cache.get(cacheKey); } /** * 加入緩存,設置存活時間 * * @param cacheKey * @param ttl 緩存的存活時間 * return */ public AtomicLong set(String cacheKey, long ttl) { // 若緩存中已存在緩存節點,不需要更新過期時間,僅更新QPS值 CacheNode oldNode = cache.get(cacheKey); if (oldNode != null) { AtomicLong oldQps = oldNode.getCallQuantity(); oldQps.incrementAndGet(); } else { // 否則新創建CacheNode對象,失效時間=當前時間+緩存存活時間 AtomicLong qps = new AtomicLong(1); CacheNode newNode = new CacheNode(cacheKey, qps, System.currentTimeMillis() + ttl * 1000); // 放入緩存,加入過期隊列 cache.put(cacheKey, newNode); queue.add(newNode); } return cache.get(cacheKey).getCallQuantity(); } }
3、在切面中統計接口QPS:
package com.wlf.cdr; import com.wlf.javabean.ots.TranslateCdr; import com.wlf.utils.ExpiredCache; import com.wlf.utils.IPUtil; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.stereotype.Component; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import javax.servlet.http.HttpServletRequest; import java.text.SimpleDateFormat; import java.util.Date; @Slf4j @Aspect @Component public class CdrAsept { private final static SimpleDateFormat SF = new SimpleDateFormat("yyyyMMddHHmmss"); // 話單格式:接口名稱|話單記錄時間|接口時延|調用方IP|本地IP|用戶ID|用戶名|源語言|目標語言|結果碼|QPS private final static String CDR_FORMAT = "{}|{}|{}|{}|{}|{}|{}|{}|{}|{}|{}"; // 過期緩存 private ExpiredCache expiredCache = new ExpiredCache(); @Around("execution(* com.wlf.translateprovider.controller.TranslateController.*(..))") public Object recordCdr(ProceedingJoinPoint joinPoint) throws Throwable { long startTime = System.currentTimeMillis(); String startDate = SF.format(new Date(startTime)); // 白名單校驗 ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest httpServletRequest = attributes.getRequest(); String localIp = IPUtil.getLocalIp(); String remoteIp = IPUtil.getRemoteIp(httpServletRequest); TranslateCdr cdr = new TranslateCdr(); cdr.setRemoteIp(remoteIp); CdrThreadLocal.setTranslateCdr(cdr); // 獲取接口名 String requestPath = httpServletRequest.getRequestURI(); String cacheKey = requestPath.substring(requestPath.lastIndexOf("/") + 1, requestPath.length()); // 設置過期時間為1秒 long qps = expiredCache.set(cacheKey, 1).get(); Object result = joinPoint.proceed(); long endTime = System.currentTimeMillis(); cdr = CdrThreadLocal.getTranslateCdr(); if (cdr != null) { log.error(CDR_FORMAT, cacheKey, startDate, endTime - startTime, remoteIp, localIp, cdr.getUserId(), cdr.getUserName(), cdr.getFrom(), cdr.getTo(), cdr.getResultCode(), qps); } CdrThreadLocal.delThreadLocal(); return result; } }
在切面中只需set一下,如果這時緩存有數據,就累加統計數,沒有就設置統計數為1,再get出來的得到QPS。但這里為了兼顧吞吐量,讓接口的調用不受QPS統計的影響,並沒有在切面或者過期緩存的set方法加鎖,因此對兩個並發時間很短的接口,統計數會相同。