最近我很好奇在RPC中限流熔斷降級要怎么做,hystrix已經1年多沒有更新了,感覺要被遺棄的感覺,那么我就把眼光聚焦到了阿里的Sentinel,順便學習一下阿里的源代碼。
這一章我主要講的是FlowRuleManager在加載FlowRule的時候做了什么,下一篇正式講Sentinel如何控制並發數的。
下面我給出一個簡化版的demo,這個demo只能單線程訪問,先把過程講清楚再講多線程版本。
初始化流量控制的規則:限定20個線程並發訪問
public class FlowThreadDemo {
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static AtomicInteger activeThread = new AtomicInteger();
private static volatile boolean stop = false;
private static final int threadCount = 100;
private static int seconds = 60 + 40;
private static volatile int methodBRunningTime = 2000;
public static void main(String[] args) throws Exception {
System.out.println(
"MethodA will call methodB. After running for a while, methodB becomes fast, "
+ "which make methodA also become fast ");
tick();
initFlowRule();
Entry methodA = null;
try {
TimeUnit.MILLISECONDS.sleep(5);
methodA = SphU.entry("methodA");
activeThread.incrementAndGet();
//Entry methodB = SphU.entry("methodB");
TimeUnit.MILLISECONDS.sleep(methodBRunningTime);
//methodB.exit();
pass.addAndGet(1);
} catch (BlockException e1) {
block.incrementAndGet();
} catch (Exception e2) {
// biz exception
} finally {
total.incrementAndGet();
if (methodA != null) {
methodA.exit();
activeThread.decrementAndGet();
}
}
}
private static void initFlowRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource("methodA");
// set limit concurrent thread for 'methodA' to 20
rule1.setCount(20);
rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);
rule1.setLimitApp("default");
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
private static void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}
static class TimerTask implements Runnable {
@Override
public void run() {
long start = System.currentTimeMillis();
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
System.out.println(seconds + " total qps is: " + oneSecondTotal);
System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
+ ", pass:" + oneSecondPass
+ ", block:" + oneSecondBlock
+ " activeThread:" + activeThread.get());
if (seconds-- <= 0) {
stop = true;
}
if (seconds == 40) {
System.out.println("method B is running much faster; more requests are allowed to pass");
methodBRunningTime = 20;
}
}
long cost = System.currentTimeMillis() - start;
System.out.println("time cost: " + cost + " ms");
System.out.println("total:" + total.get() + ", pass:" + pass.get()
+ ", block:" + block.get());
System.exit(0);
}
}
}
FlowRuleManager
在這個demo中,首先會調用FlowRuleManager#loadRules進行規則注冊
我們先聊一下規則配置的代碼:
private static void initFlowRule() {
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource("methodA");
// set limit concurrent thread for 'methodA' to 20
rule1.setCount(20);
rule1.setGrade(RuleConstant.FLOW_GRADE_THREAD);
rule1.setLimitApp("default");
rules.add(rule1);
FlowRuleManager.loadRules(rules);
}
這段代碼里面先定義一個流量控制規則,然后調用loadRules進行注冊。
FlowRuleManager初始化
FlowRuleManager
FlowRuleManager 類里面有幾個靜態參數:
//規則集合
private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();
//監聽器
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
//用來監聽配置是否發生變化
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();
//創建一個延遲的線程池
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("sentinel-metrics-record-task", true));
static {
//設置監聽
currentProperty.addListener(LISTENER);
//每一秒鍾調用一次MetricTimerListener的run方法
SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS);
}
在初始化的時候會為靜態變量都賦上值。
在新建MetricTimerListener實例的時候做了很多事情,容我慢慢分析。
MetricTimerListener
public class MetricTimerListener implements Runnable {
private static final MetricWriter metricWriter = new MetricWriter(SentinelConfig.singleMetricFileSize(),
SentinelConfig.totalMetricFileCount());
....
}
首次初始化MetricTimerListener的時候會創建一個MetricWriter實例。我們先看傳入的兩個參數SentinelConfig.singleMetricFileSize()和SentinelConfig.totalMetricFileCount()。
SentinelConfig在首次初始化的時候會初始化靜態代碼塊:
SentinelConfig
static {
try {
initialize();
loadProps();
resolveAppType();
RecordLog.info("[SentinelConfig] Application type resolved: " + appType);
} catch (Throwable ex) {
RecordLog.warn("[SentinelConfig] Failed to initialize", ex);
ex.printStackTrace();
}
}
這段靜態代碼塊主要是設置一下配置參數。
SentinelConfig#singleMetricFileSize
SentinelConfig#totalMetricFileCount
public static long singleMetricFileSize() {
try {
//獲取的是 1024 * 1024 * 50
return Long.parseLong(props.get(SINGLE_METRIC_FILE_SIZE));
} catch (Throwable throwable) {
RecordLog.warn("[SentinelConfig] Parse singleMetricFileSize fail, use default value: "
+ DEFAULT_SINGLE_METRIC_FILE_SIZE, throwable);
return DEFAULT_SINGLE_METRIC_FILE_SIZE;
}
}
public static int totalMetricFileCount() {
try {
//默認是:6
return Integer.parseInt(props.get(TOTAL_METRIC_FILE_COUNT));
} catch (Throwable throwable) {
RecordLog.warn("[SentinelConfig] Parse totalMetricFileCount fail, use default value: "
+ DEFAULT_TOTAL_METRIC_FILE_COUNT, throwable);
return DEFAULT_TOTAL_METRIC_FILE_COUNT;
}
}
singleMetricFileSize方法和totalMetricFileCount主要是獲取SentinelConfig在靜態變量里設入得參數。
然后我們進入到MetricWriter的構造方法中:
MetricWriter
public MetricWriter(long singleFileSize, int totalFileCount) {
if (singleFileSize <= 0 || totalFileCount <= 0) {
throw new IllegalArgumentException();
}
RecordLog.info(
"[MetricWriter] Creating new MetricWriter, singleFileSize=" + singleFileSize + ", totalFileCount="
+ totalFileCount);
// /Users/luozhiyun/logs/csp/
this.baseDir = METRIC_BASE_DIR;
File dir = new File(baseDir);
if (!dir.exists()) {
dir.mkdirs();
}
long time = System.currentTimeMillis();
//轉換成秒
this.lastSecond = time / 1000;
//singleFileSize = 1024 * 1024 * 50
this.singleFileSize = singleFileSize;
//totalFileCount = 6
this.totalFileCount = totalFileCount;
try {
this.timeSecondBase = df.parse("1970-01-01 00:00:00").getTime() / 1000;
} catch (Exception e) {
RecordLog.warn("[MetricWriter] Create new MetricWriter error", e);
}
}
構造器里面主要是創建文件夾,設置單個文件大小,總文件個數,設置時間。
講完了MetricTimerListener的靜態屬性,現在我們來講MetricTimerListener的run方法。
MetricTimerListener#run
public void run() {
//這個run方法里面主要是做定時的數據采集,然后寫到log文件里去
Map<Long, List<MetricNode>> maps = new TreeMap<Long, List<MetricNode>>();
//遍歷集群節點
for (Entry<ResourceWrapper, ClusterNode> e : ClusterBuilderSlot.getClusterNodeMap().entrySet()) {
String name = e.getKey().getName();
ClusterNode node = e.getValue();
Map<Long, MetricNode> metrics = node.metrics();
aggregate(maps, metrics, name);
}
//匯總統計的數據
aggregate(maps, Constants.ENTRY_NODE.metrics(), Constants.TOTAL_IN_RESOURCE_NAME);
if (!maps.isEmpty()) {
for (Entry<Long, List<MetricNode>> entry : maps.entrySet()) {
try {
//寫入日志中
metricWriter.write(entry.getKey(), entry.getValue());
} catch (Exception e) {
RecordLog.warn("[MetricTimerListener] Write metric error", e);
}
}
}
}
上面的run方法其實就是每秒把統計的數據寫到日志里去。其中Constants.ENTRY_NODE.metrics()
負責統計數據,我們下面分析以下這個方法。
Constants.ENTRY_NODE
這句代碼會實例化一個ClusterNode實例。
ClusterNode是繼承StatisticNode,統計數據時在StatisticNode中實現的。
Metrics方法也是調用的StatisticNode方法。
我們先看看StatisticNode的全局變量
public class StatisticNode implements Node {
//構建一個統計60s的數據,設置60個滑動窗口,每個窗口1s
//這里創建的是BucketLeapArray實例來進行統計
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
//上次統計的時間戳
private long lastFetchTime = -1;
.....
}
然后我們看看StatisticNode的metrics方法:
StatisticNode#metrics
public Map<Long, MetricNode> metrics() {
// The fetch operation is thread-safe under a single-thread scheduler pool.
long currentTime = TimeUtil.currentTimeMillis();
//獲取當前時間的滑動窗口的開始時間
currentTime = currentTime - currentTime % 1000;
Map<Long, MetricNode> metrics = new ConcurrentHashMap<>();
//獲取滑動窗口里統計的數據
List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details();
long newLastFetchTime = lastFetchTime;
// Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date).
for (MetricNode node : nodesOfEverySecond) {
//篩選符合的滑動窗口的節點
if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) {
metrics.put(node.getTimestamp(), node);
//選出符合節點里最大的時間戳數據賦值
newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp());
}
}
//設置成滑動窗口里統計的最大時間
lastFetchTime = newLastFetchTime;
return metrics;
}
這個方法主要是調用rollingCounterInMinute進行數據的統計,然后篩選出有效的統計結果返回。
我們進入到rollingCounterInMinute是ArrayMetric的實例,所以我們進入到ArrayMetric的details方法中
ArrayMetric#details
public List<MetricNode> details() {
List<MetricNode> details = new ArrayList<MetricNode>();
//調用BucketLeapArray
data.currentWindow();
//列出統計結果
List<WindowWrap<MetricBucket>> list = data.list();
for (WindowWrap<MetricBucket> window : list) {
if (window == null) {
continue;
}
//對統計結果進行封裝
MetricNode node = new MetricNode();
//代表一秒內被流量控制的請求數量
node.setBlockQps(window.value().block());
//則是一秒內業務本身異常的總和
node.setExceptionQps(window.value().exception());
// 代表一秒內到來到的請求
node.setPassQps(window.value().pass());
//代表一秒內成功處理完的請求;
long successQps = window.value().success();
node.setSuccessQps(successQps);
//代表一秒內該資源的平均響應時間
if (successQps != 0) {
node.setRt(window.value().rt() / successQps);
} else {
node.setRt(window.value().rt());
}
//設置統計窗口的開始時間
node.setTimestamp(window.windowStart());
node.setOccupiedPassQps(window.value().occupiedPass());
details.add(node);
}
return details;
}
這個方法首先會調用dat.currentWindow()
設置當前時間窗口到窗口列表里去。然后調用data.list()
列出所有的窗口數據,然后遍歷不為空的窗口數據封裝成MetricNode返回。
data是BucketLeapArray的實例,BucketLeapArray繼承了LeapArray,主要的統計都是在LeapArray中進行的,所以我們直接看看LeapArray的currentWindow方法。
LeapArray#currentWindow
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
//通過當前時間判斷屬於哪個窗口
int idx = calculateTimeIdx(timeMillis);
//計算出窗口開始時間
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
while (true) {
//獲取數組里的老數據
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
// 如果對應時間窗口的開始時間與計算得到的開始時間一樣
// 那么代表當前即是我們要找的窗口對象,直接返回
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
//如果當前的開始時間小於原開始時間,那么就更新到新的開始時間
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
//一般來說不會走到這里
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
這個方法里首先會傳入一個timeMillis是當前的時間戳。然后調用calculateTimeIdx
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
//計算當前時間能夠落在array的那個節點上
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
calculateTimeIdx方法用當前的時間戳除以每個窗口的大小,再和array數據取模。array數據是一個容量為60的數組,代表被統計的60秒分割的60個小窗口。
舉例:
例如當前timeMillis = 1567175708975
timeId = 1567175708975/1000 = 1567175708
timeId % array.length() = 1567175708%60 = 8
也就是說當前的時間窗口是第八個。
然后調用calculateWindowStart計算當前時間開始時間
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
//用當前時間減去窗口大小,計算出窗口開始時間
return timeMillis - timeMillis % windowLengthInMs;
}
接下來就是一個while循環:
在看while循環之前我們看一下array數組里面是什么樣的對象
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
WindowWrap是一個時間窗口的包裝對象,里面包含時間窗口的長度,這里是1000;窗口開始時間;窗口內的數據實體,是調用newEmptyBucket方法返回一個MetricBucket。
MetricBucket
public class MetricBucket {
private final LongAdder[] counters;
//默認4900
private volatile long minRt;
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
//初始化minRt,默認是4900
initMinRt();
}
...
}
MetricEvent是一個枚舉類:
public enum MetricEvent {
PASS,
BLOCK,
EXCEPTION,
SUCCESS,
RT,
OCCUPIED_PASS
}
也就是是MetricBucket為每個窗口通過一個內部數組counters統計了這個窗口內的所有數據。
接下來我們來講一下while循環里所做的事情:
- 從array里獲取bucket節點
- 如果節點已經存在,那么用CAS更新一個新的節點
- 如果節點是新的,那么直接返回
- 如果節點失效了,設置當前節點,清除所有失效節點
舉例:
1. 如果array數據里面的bucket數據如下所示:
B0 B1 B2 NULL B4
||_______|_______|_______|_______|_______||___
200 400 600 800 1000 1200 timestamp
^
time=888
正好當前時間所對應的槽位里面的數據是空的,那么就用CAS更新
2. 如果array里面已經有數據了,並且槽位里面的窗口開始時間和當前的開始時間相等,那么直接返回
B0 B1 B2 B3 B4
||_______|_______|_______|_______|_______||___
200 400 600 800 1000 1200 timestamp
^
time=888
3. 例如當前時間是1676,所對應窗口里面的數據的窗口開始時間小於當前的窗口開始時間,那么加上鎖,然后設置槽位的窗口開始時間為當前窗口開始時間,並把槽位里面的數據重置
(old)
B0 B1 B2 NULL B4
|_______||_______|_______|_______|_______|_______||___
... 1200 1400 1600 1800 2000 2200 timestamp
^
time=1676
所以上面的array數組大概是這樣:
array數組由一個個的WindowWrap實例組成,WindowWrap實例里面由MetricBucket進行數據統計。
然后繼續回到ArrayMetric的details方法,講完了上面的data.currentWindow()
,現在再來講data.list()
list方法最后也會調用到LeapArray的list方法中:
LeapArray#list
public List<WindowWrap<T>> list(long validTime) {
int size = array.length();
List<WindowWrap<T>> result = new ArrayList<WindowWrap<T>>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
//如果windowWrap節點為空或者當前時間戳比windowWrap的窗口開始時間大超過60s,那么就跳過
//也就是說只要60s以內的數據
if (windowWrap == null || isWindowDeprecated(validTime, windowWrap)) {
continue;
}
result.add(windowWrap);
}
return result;
}
這個方法是用來把array里面都統計好的節點都找出來,並且是不為空,且是當前時間60秒內的數據。
最后Constants.ENTRY_NODE.metrics() 會返回所有符合條件的統計節點數據然后傳入aggregate方法中,遍歷為每個MetricNode節點設置Resource為TOTAL_IN_RESOURCE_NAME,封裝好調用metricWriter.write
進行寫日志操作。
最后總結一下在初始化FlowRuleManager的時候做了什么:
- FlowRuleManager在初始化的時候會調用靜態代碼塊進行初始化
- 在靜態代碼塊內調用ScheduledExecutorService線程池,每隔1秒調用一次MetricTimerListener的run方法
- MetricTimerListener會調用
Constants.ENTRY_NODE.metrics()
進行定時的統計- 調用StatisticNode進行統計,統計60秒內的數據,並將60秒的數據分割成60個小窗口
- 在設置當前窗口的時候如果里面沒有數據直接設置,如果存在數據並且是最新的直接返回,如果是舊數據,那么reset原來的統計數據
- 每個小窗口里面的數據由MetricBucket進行封裝
- 最后將統計好的數據通過metricWriter寫入到log里去
FlowRuleManager加載規則
FlowRuleManager是調用loadRules進行規則加載的:
FlowRuleManager#loadRules
public static void loadRules(List<FlowRule> rules) {
currentProperty.updateValue(rules);
}
currentProperty這個實例是在FlowRuleManager是在靜態代碼塊里面進行加載的,上面我們講過,生成的是DynamicSentinelProperty的實例。
我們進入到DynamicSentinelProperty的updateValue中:
public boolean updateValue(T newValue) {
//判斷新的元素和舊元素是否相同
if (isEqual(value, newValue)) {
return false;
}
RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);
value = newValue;
for (PropertyListener<T> listener : listeners) {
listener.configUpdate(newValue);
}
return true;
}
updateValue方法就是校驗一下是不是已經存在相同的規則了,如果不存在那么就直接設置value等於新的規則,然后通知所有的監聽器更新一下規則配置。
currentProperty實例里面的監聽器會在FlowRuleManager初始化靜態代碼塊的時候設置一個FlowPropertyListener監聽器實例,FlowPropertyListener是FlowRuleManager的內部類:
private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
@Override
public void configUpdate(List<FlowRule> value) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
flowRules.clear();
//這個map的維度是key是Resource
flowRules.putAll(rules);
}
RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
}
....
}
configUpdate首先會調用FlowRuleUtil.buildFlowRuleMap()
方法將所有的規則按resource分類,然后排序返回成map,然后將FlowRuleManager的原來的規則清空,放入新的規則集合到flowRules中去。
FlowRuleUtil#buildFlowRuleMap
這個方法最后會調用到FlowRuleUtil的另一個重載的方法:
public static <K> Map<K, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Function<FlowRule, K> groupFunction,
Predicate<FlowRule> filter, boolean shouldSort) {
Map<K, List<FlowRule>> newRuleMap = new ConcurrentHashMap<>();
if (list == null || list.isEmpty()) {
return newRuleMap;
}
Map<K, Set<FlowRule>> tmpMap = new ConcurrentHashMap<>();
for (FlowRule rule : list) {
//校驗必要字段:資源名,限流閾值, 限流閾值類型,調用關系限流策略,流量控制效果等
if (!isValidRule(rule)) {
RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule);
continue;
}
if (filter != null && !filter.test(rule)) {
continue;
}
//應用名,如果沒有則會使用default
if (StringUtil.isBlank(rule.getLimitApp())) {
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
//設置拒絕策略:直接拒絕、Warm Up、勻速排隊,默認是DefaultController
TrafficShapingController rater = generateRater(rule);
rule.setRater(rater);
//獲取Resource名字
K key = groupFunction.apply(rule);
if (key == null) {
continue;
}
//根據Resource進行分組
Set<FlowRule> flowRules = tmpMap.get(key);
if (flowRules == null) {
// Use hash set here to remove duplicate rules.
flowRules = new HashSet<>();
tmpMap.put(key, flowRules);
}
flowRules.add(rule);
}
//根據ClusterMode LimitApp排序
Comparator<FlowRule> comparator = new FlowRuleComparator();
for (Entry<K, Set<FlowRule>> entries : tmpMap.entrySet()) {
List<FlowRule> rules = new ArrayList<>(entries.getValue());
if (shouldSort) {
// Sort the rules.
Collections.sort(rules, comparator);
}
newRuleMap.put(entries.getKey(), rules);
}
return newRuleMap;
}
這個方法首先校驗傳進來的rule集合不為空,然后遍歷rule集合。對rule的必要字段進行校驗,如果傳入了過濾器那么校驗過濾器,然后過濾resource為空的rule,最后相同的resource的rule都放到一起排序后返回。
注意這里默認生成的rater是DefaultController。
到這里FlowRuleManager已經分析完畢了,比較長。