前提
筆者之前在查找Sentinel相關資料的時候,偶然中找到了Martin Fowler
大神的一篇文章《CircuitBreaker》。於是花了點時間仔細閱讀,順便溫習一下斷路器CircuitBreaker
的原理與實現。
CircuitBreaker的原理
現實生活中的熔斷器(更多時候被稱為保險絲)是一種安裝在電路中用於保證電路安全運行的電子元件。它的外形一般是一個絕緣的玻璃容器包裹着一段固定大小電阻和固定熔點的纖細合金導體,如下圖:
電路中,保險絲會和其他用電的原件串聯,根據物理公式Q = I^2*R*T
(Q
為熱能值,也理解為保險絲熔斷的極限熱能值,I
為電流中的電流,R
為保險絲固定電阻,T
為時間),如果電路中有其他用電的原件短路,會導致電流I
十分大,導致在T
很小的情況下,計算得到的Q
遠大於保險絲熔斷的極限熱能值,保險絲就會被擊穿熔斷。這個時候整個電路處於斷開狀態,從而避免電路過載導致電路中的用電原件損毀。
電路中的電流過大會導致所有電阻比較大的電器發生大量積熱,很容易出現火災,所以保險絲在過去曾經起到巨大的作用。后來出現了更加先進的"空氣開關",漏電開關多數都升級為此實現,保險絲依然會應用在各種的原件中,但是幾乎淡出了日常生活觸及的視線范圍。
記得小時候某個傍晚爺爺拉開了白熾燈,啪的一聲整個屋子的電器都停了,突然停電了。他說了句:保險絲"燒"了,等我換一條。換上保險絲把總閘門打上去后供電恢復。
從上面的分析可見:現實中的熔斷器是一次性使用的消耗品,而且有些場景下需要人為干預(更換)。
軟件系統中的CircuitBreaker
在設計上是借鑒了現實生活中熔斷器的功能並且做出改良而誕生的一種模式。這個模式出現的背景是:隨着軟件和計算機網絡的發展,以及當前微服務架構的普及,應用會部署在不同的計算機上或者同一台計算機的不同進程上,那么需要通過遠程調用進行交互。遠程調用和單進程的內存態調用的最大區別之一是:遠程調用有可能因為各種原因出現調用失敗、沒有響應的掛起(其實就是無超時期限的等待)或者直到某個超時的期限才返回結果。這些故障會導致調用方的資源被一直占用無法釋放(最常見的就是調用方接收請求或者處理請求的線程被長時間掛起):
如果發生故障的被調用方節點剛好是關鍵節點,並且此故障節點的上游調用者比較多(例如上圖中的內部網關),那么級聯故障會蔓延,極端情況下甚至會耗盡了整個服務集群中的所有資源。如果在服務的遠程調用加入了CircuitBreaker
組件,那么單個服務調用的效果如下:
斷路器CircuitBreaker
的基本原理比較簡單:將受保護的函數(方法)包裝在斷路器對象中進行調用,斷路器對象將會監視所有的調用相關的數據(主要是統計維度的數據,一般方法參數可以過濾)。一旦出現故障的調用達到了某個閾值或者觸發了某些規則,斷路器就會切換為Open
狀態,所有經由斷路器的調用都會快速失敗,請求不會到達下游被調用方。筆者認為從實際來看,CircuitBreaker
的核心功能就是三大塊:
- 調用數據度量統計。
- 維護斷路器自身的狀態。
- 基於前兩點保護包裹在斷路器中執行的調用。
基於調用數據的度量統計一般會引入JDK8
中的原子(Atomic
)類型。下游被調用方不會一直處於故障,為了斷路器能夠自恢復,引入了Half_Open
狀態和滑動窗口的概念。同時,考慮到程序容器的線程阻塞帶來的毀滅性影響,有時候可以考慮進行如下優化:斷路器把受保護的調用基於定義好的資源標識選擇特定的線程池(或者信號量)進行調用,充分利用FutureTask#get(long timeout, TimeUnit unit)
設定調用任務最大超時期限的優勢,這樣就能基本避免出現故障的遠程調用阻塞了本應用容器的線程。
這里的容器是特指Tomcat、Netty、Jetty等。而這里提到的線程池隔離、滑動窗口等概念會在下文具體實現的時候再詳細展開。
基於線程池隔離:
直接基於容器線程隔離:
CircuitBreaker的簡易實現
這一小節會按照上一小節的理論,設計多種CircuitBreaker
的實現,由簡單到復雜一步一步進行迭代。CircuitBreaker
的狀態轉換設計圖如下:
基於此設計圖,Martin Fowler
大神在其文章中也給予了偽代碼如下:
class ResetCircuitBreaker...
// 初始化
def initialize &block
@circuit = block
@invocation_timeout = 0.01
@failure_threshold = 5
@monitor = BreakerMonitor.new
@reset_timeout = 0.1
reset
end
// 重置
def reset
@failure_count = 0
@last_failure_time = nil
@monitor.alert :reset_circuit
end
// 狀態維護
def state
case
when (@failure_count >= @failure_threshold) &&
(Time.now - @last_failure_time) > @reset_timeout
:half_open
when (@failure_count >= @failure_threshold)
:open
else
:closed
end
end
// 調用
def call args
case state
when :closed, :half_open
begin
do_call args
// 這里從描述來看應該是漏了調用reset方法
// reset
rescue Timeout::Error
record_failure
raise $!
end
when :open
raise CircuitBreaker::Open
else
raise "Unreachable"
end
end
// 記錄失敗
def record_failure
@failure_count += 1
@last_failure_time = Time.now
@monitor.alert(:open_circuit) if :open == state
end
下面的多種實現的思路都是基於此偽代碼的基本框架進行編寫。
基於異常閾值不會自恢復的實現
這種實現最簡單,也就是只需要維護狀態Closed
轉向Open
的臨界條件即可,可以設定一個異常計數的閾值,然后使用一個原子計數器統計異常數量即可,Java
代碼實現如下:
// 斷路器狀態
public enum CircuitBreakerStatus {
/**
* 關閉
*/
CLOSED,
/**
* 開啟
*/
OPEN,
/**
* 半開啟
*/
HALF_OPEN
}
@Getter
public class SimpleCircuitBreaker {
private final long failureThreshold;
private final LongAdder failureCounter;
private final LongAdder callCounter;
private CircuitBreakerStatus status;
public SimpleCircuitBreaker(long failureThreshold) {
this.failureThreshold = failureThreshold;
this.callCounter = new LongAdder();
this.failureCounter = new LongAdder();
this.status = CircuitBreakerStatus.CLOSED;
}
private final Object fallback = null;
@SuppressWarnings("unchecked")
public <T> T call(Supplier<T> supplier) {
try {
if (CircuitBreakerStatus.CLOSED == this.status) {
return supplier.get();
}
} catch (Exception e) {
this.failureCounter.increment();
tryChangingStatus();
} finally {
this.callCounter.increment();
}
return (T) fallback;
}
private void tryChangingStatus() {
if (this.failureThreshold <= this.failureCounter.sum()) {
this.status = CircuitBreakerStatus.OPEN;
System.out.println(String.format("SimpleCircuitBreaker狀態轉換,[%s]->[%s]", CircuitBreakerStatus.CLOSED,
CircuitBreakerStatus.OPEN));
}
}
public void call(Runnable runnable) {
call(() -> {
runnable.run();
return null;
});
}
}
在多線程調用的前提下,如果在很短時間內有大量的線程中的方法調用出現異常,有可能所有調用都會涌進去tryChangingStatus()
方法,這種情況下會導致CircuitBreaker
的狀態被並發修改,可以考慮使用AtomicReference
包裹CircuitBreakerStatus
,做CAS
更新(確保只更新一次)即可。變更的代碼如下:
private final AtomicReference<CircuitBreakerStatus> status;
public SimpleCircuitBreaker(long failureThreshold) {
......
this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED);
}
public <T> T call(Supplier<T> supplier) {
try {
if (CircuitBreakerStatus.CLOSED == this.status.get()) {
return supplier.get();
}
......
private void tryChangingStatus() {
if (this.failureThreshold <= this.failureCounter.sum()) {
boolean b = this.status.compareAndSet(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN);
if (b) {
System.out.println(String.format("SimpleCircuitBreaker狀態轉換,[%s]->[%s]", CircuitBreakerStatus.CLOSED,
CircuitBreakerStatus.OPEN));
}
}
}
並發極高的場景下假設出現調用異常前提下,異常計數器failureCounter
的計數值有可能在一瞬間就遠超過了異常閾值failureCounter
,但是一般不考慮對這些計數值的比較或者狀態切換的准確時機添加同步機制(例如加鎖),因為一旦加入同步機制會大大降低並發性能,這樣引入斷路器反而成為了性能隱患,顯然是不合理的。所以一般設計斷路器邏輯的時候,並不需要控制斷路器狀態切換的具體計數值臨界點,保證狀態一定切換正常即可。基於此簡陋斷路器編寫一個同步調用的測試例子:
public static class Service {
public String process(int i) {
System.out.println("進入process方法,number:" + i);
throw new RuntimeException(String.valueOf(i));
}
}
public static void main(String[] args) throws Exception {
SimpleCircuitBreaker circuitBreaker = new SimpleCircuitBreaker(5L);
Service service = new Service();
for (int i = 0; i < 10; i++) {
int temp = i;
String result = circuitBreaker.call(() -> service.process(temp));
System.out.println(String.format("返回結果:%s,number:%d", result, temp));
}
}
測試結果輸出如下:
進入process方法,number:0
返回結果:null,number:0
進入process方法,number:1
返回結果:null,number:1
進入process方法,number:2
返回結果:null,number:2
進入process方法,number:3
返回結果:null,number:3
進入process方法,number:4
SimpleCircuitBreaker狀態轉換,[CLOSED]->[OPEN]
返回結果:null,number:4
返回結果:null,number:5
返回結果:null,number:6
返回結果:null,number:7
返回結果:null,number:8
返回結果:null,number:9
細心的伙伴會發現,基本上狀態的維護和變更和數據統計都位於調用異常或者失敗的方法入口以及最后的finally代碼塊,在真實的調用邏輯前一般只會做狀態判斷或者下文提到的分配調用資源等。
基於異常閾值並且能夠自恢復的實現
基於異常閾值、能夠自恢復的CircuitBreaker
實現需要引入Half_Open
狀態,同時需要記錄最后一次失敗調用的時間戳以及reset_timeout
(斷路器的當前的系統時間戳減去上一階段最后一次失敗調用的時間差,大於某個值的時候,並且當前的失敗調用大於失敗閾值則需要把狀態重置為Half_Open
,這里的"某個值"定義為reset_timeout
),示意圖如下:
假設當前的調用為圓形6
,當前系統時間戳減去(上一輪)最后一個失敗調用(圓形5
)的時間戳大於預設的reset_timeout
的時候,不論當次調用是成功還是失敗,直到下一次調用失敗或者失敗調用數降低到轉換為Closed
狀態之前,都處於Half_Open
狀態,會對單個調用進行放行(並發場景下也有可能同時放行多個調用)。代碼實現如下:
// 添加一個Monitor用於記錄狀態變更
public enum CircuitBreakerStatusMonitor {
/**
* 單例
*/
X;
public void report(String name, CircuitBreakerStatus o, CircuitBreakerStatus n) {
System.out.println(String.format("斷路器[%s]狀態變更,[%s]->[%s]", name, o, n));
}
public void reset(String name) {
System.out.println(String.format("斷路器[%s]重置", name));
}
}
@Getter
public class RestCircuitBreaker {
private final long failureThreshold;
private final long resetTimeout;
private LongAdder failureCounter;
private LongAdder callCounter;
private AtomicReference<CircuitBreakerStatus> status;
private final Object fallback = null;
/**
* 最后一次調用失敗的時間戳
*/
private long lastFailureTime;
public RestCircuitBreaker(long failureThreshold, long resetTimeout) {
this.failureThreshold = failureThreshold;
this.resetTimeout = resetTimeout;
reset();
}
public void reset() {
CircuitBreakerStatusMonitor.X.reset("RestCircuitBreaker");
this.callCounter = new LongAdder();
this.failureCounter = new LongAdder();
this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED);
this.lastFailureTime = -1L;
}
@SuppressWarnings("unchecked")
public <T> T call(Supplier<T> supplier) {
try {
if (shouldAllowExecution()) {
T result = supplier.get();
markSuccess();
return result;
}
} catch (Exception e) {
markNoneSuccess();
} finally {
this.callCounter.increment();
}
return (T) fallback;
}
public void call(Runnable runnable) {
call(() -> {
runnable.run();
return null;
});
}
boolean shouldAllowExecution() {
// 本質是Closed狀態
if (lastFailureTime == -1L) {
return true;
}
// 沒到達閾值
if (failureThreshold > failureCounter.sum()) {
return true;
}
return shouldTryAfterRestTimeoutWindow()
&& changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN);
}
boolean changeStatus(CircuitBreakerStatus o, CircuitBreakerStatus n) {
boolean r = status.compareAndSet(o, n);
if (r) {
CircuitBreakerStatusMonitor.X.report("RestCircuitBreaker", o, n);
}
return r;
}
boolean shouldTryAfterRestTimeoutWindow() {
long lastFailureTimeSnap = lastFailureTime;
long currentTime = System.currentTimeMillis();
return currentTime > lastFailureTimeSnap + resetTimeout;
}
public void markSuccess() {
if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) {
reset();
}
}
public void markNoneSuccess() {
this.failureCounter.increment();
if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) {
this.lastFailureTime = System.currentTimeMillis();
}
if (this.failureCounter.sum() >= failureThreshold &&
changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) {
this.lastFailureTime = System.currentTimeMillis();
}
}
}
編寫一個測試客戶端RestCircuitBreakerClient
:
public class RestCircuitBreakerClient {
public static void main(String[] args) throws Exception {
Service service = new Service();
RestCircuitBreaker cb = new RestCircuitBreaker(5, 500);
for (int i = 0; i < 10; i++) {
int temp = i;
String result = cb.call(() -> service.process(temp));
System.out.println(String.format("返回結果:%s,number:%d", result, temp));
}
Thread.sleep(501L);
// 故意成功
cb.call(service::processSuccess);
for (int i = 0; i < 3; i++) {
int temp = i;
String result = cb.call(() -> service.process(temp));
System.out.println(String.format("返回結果:%s,number:%d", result, temp));
}
}
public static class Service {
public String process(int i) {
System.out.println("進入process方法,number:" + i);
throw new RuntimeException(String.valueOf(i));
}
public void processSuccess() {
System.out.println("調用processSuccess方法");
}
}
}
輸出結果如下:
斷路器[RestCircuitBreaker]重置
進入process方法,number:0
返回結果:null,number:0
進入process方法,number:1
返回結果:null,number:1
進入process方法,number:2
返回結果:null,number:2
進入process方法,number:3
返回結果:null,number:3
進入process方法,number:4
斷路器[RestCircuitBreaker]狀態變更,[CLOSED]->[OPEN]
返回結果:null,number:4
返回結果:null,number:5
返回結果:null,number:6
返回結果:null,number:7
返回結果:null,number:8
返回結果:null,number:9
斷路器[RestCircuitBreaker]狀態變更,[OPEN]->[HALF_OPEN]
調用processSuccess方法 # <------ 這個位置的成功調用重置了斷路器的狀態
斷路器[RestCircuitBreaker]狀態變更,[HALF_OPEN]->[CLOSED]
斷路器[RestCircuitBreaker]重置
進入process方法,number:0
返回結果:null,number:0
進入process方法,number:1
返回結果:null,number:1
進入process方法,number:2
返回結果:null,number:2
基於線程池隔離和超時控制
在使用CircuitBreaker
的時候,可以基於不同的資源(唯一標識可以使用resource_key
或者resource_name
)創建單獨的線程池,讓資源基於線程池進行隔離調用。這種設計的原則借鑒於運貨船的船艙設計,每個船艙都使用絕緣的材料進行分隔,一旦某個船艙出現了火情,也不會蔓延到其他船艙。在Java
體系中,可以使用線程池ThreadPoolExecutor#submit(Callable<T> task)
進行指定超時上限限制的任務提交和結果獲取,這樣就可以預設一個調用超時時間上限,限制每個調用的可用的最大調用時間。
首先需要設計一個輕量級的資源線程池管理模塊:
// 資源配置
@Data
public class CircuitBreakerResourceConf {
private String resourceName;
private int coreSize;
private int queueSize;
private long timeout;
}
public enum CircuitBreakerResourceManager {
/**
* 單例
*/
X;
public final Map<String, CircuitBreakerResource> cache = new ConcurrentHashMap<>(8);
public void register(CircuitBreakerResourceConf conf) {
cache.computeIfAbsent(conf.getResourceName(), rn -> {
int coreSize = conf.getCoreSize();
int queueSize = conf.getQueueSize();
BlockingQueue<Runnable> queue;
if (queueSize > 0) {
queue = new ArrayBlockingQueue<>(queueSize);
} else {
queue = new SynchronousQueue<>();
}
ThreadPoolExecutor executor = new ThreadPoolExecutor(
coreSize,
coreSize,
0,
TimeUnit.SECONDS,
queue,
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName(rn + "-CircuitBreakerWorker-" + counter.getAndIncrement());
return thread;
}
},
new ThreadPoolExecutor.AbortPolicy()
);
CircuitBreakerResource resource = new CircuitBreakerResource();
resource.setExecutor(executor);
resource.setTimeout(conf.getTimeout());
return resource;
});
}
public CircuitBreakerResource get(String resourceName) {
return Optional.ofNullable(cache.get(resourceName)).orElseThrow(() -> new IllegalArgumentException(resourceName));
}
}
編寫斷路器ResourceCircuitBreaker
的實現代碼:
@Getter
public class ResourceCircuitBreaker {
private final long failureThreshold;
private final long resetTimeout;
private LongAdder failureCounter;
private LongAdder callCounter;
private AtomicReference<CircuitBreakerStatus> status;
private final ThreadPoolExecutor executor;
private final Object fallback = null;
private final String circuitBreakerName;
/**
* 最后一次調用失敗的時間戳
*/
private long lastFailureTime;
/**
* 執行超時上限,單位毫秒
*/
private final long executionTimeout;
public ResourceCircuitBreaker(String resourceName, long failureThreshold, long resetTimeout) {
CircuitBreakerResource resource = CircuitBreakerResourceManager.X.get(resourceName);
this.circuitBreakerName = "ResourceCircuitBreaker-" + resourceName;
this.executor = resource.getExecutor();
this.executionTimeout = resource.getTimeout();
this.failureThreshold = failureThreshold;
this.resetTimeout = resetTimeout;
reset();
}
public void reset() {
CircuitBreakerStatusMonitor.X.reset(this.circuitBreakerName);
this.callCounter = new LongAdder();
this.failureCounter = new LongAdder();
this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED);
this.lastFailureTime = -1L;
}
@SuppressWarnings("unchecked")
public <T> T call(Supplier<T> supplier) {
try {
if (shouldAllowExecution()) {
Future<T> future = this.executor.submit(warp(supplier));
T result = future.get(executionTimeout, TimeUnit.MILLISECONDS);
markSuccess();
return result;
}
} catch (Exception e) {
markNoneSuccess();
} finally {
this.callCounter.increment();
}
return (T) fallback;
}
<T> Callable<T> warp(Supplier<T> supplier) {
return supplier::get;
}
public void call(Runnable runnable) {
call(() -> {
runnable.run();
return null;
});
}
boolean shouldAllowExecution() {
// 本質是Closed狀態
if (lastFailureTime == -1L) {
return true;
}
// 沒到達閾值
if (failureThreshold > failureCounter.sum()) {
return true;
}
return shouldTryAfterRestTimeoutWindow()
&& changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN);
}
boolean changeStatus(CircuitBreakerStatus o, CircuitBreakerStatus n) {
boolean r = status.compareAndSet(o, n);
if (r) {
CircuitBreakerStatusMonitor.X.report(this.circuitBreakerName, o, n);
}
return r;
}
boolean shouldTryAfterRestTimeoutWindow() {
long lastFailureTimeSnap = lastFailureTime;
long currentTime = System.currentTimeMillis();
return currentTime > lastFailureTimeSnap + resetTimeout;
}
public void markSuccess() {
if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) {
reset();
}
}
public void markNoneSuccess() {
this.failureCounter.increment();
if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) {
this.lastFailureTime = System.currentTimeMillis();
}
if (this.failureCounter.sum() >= failureThreshold &&
changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) {
this.lastFailureTime = System.currentTimeMillis();
}
}
}
編寫測試場景類ResourceCircuitBreakerClient
:
public class ResourceCircuitBreakerClient {
public static void main(String[] args) throws Exception {
CircuitBreakerResourceConf conf = new CircuitBreakerResourceConf();
conf.setCoreSize(10);
conf.setQueueSize(0);
conf.setResourceName("SERVICE");
conf.setTimeout(50);
CircuitBreakerResourceManager.X.register(conf);
Service service = new Service();
ResourceCircuitBreaker cb = new ResourceCircuitBreaker("SERVICE", 5, 500);
for (int i = 0; i < 10; i++) {
int temp = i;
String result = cb.call(() -> service.process(temp));
System.out.println(String.format("返回結果:%s,number:%d", result, temp));
}
Thread.sleep(501L);
cb.call(service::processSuccess);
for (int i = 0; i < 3; i++) {
int temp = i;
String result = cb.call(() -> service.process(temp));
System.out.println(String.format("返回結果:%s,number:%d", result, temp));
}
}
public static class Service {
private final Random r = new Random();
public String process(int i) {
int sleep = r.nextInt(200);
System.out.println(String.format("線程[%s]-進入process方法,number:%d,休眠%d毫秒",
Thread.currentThread().getName(), i, sleep));
try {
Thread.sleep(sleep);
} catch (InterruptedException ignore) {
}
return String.valueOf(i);
}
public void processSuccess() {
System.out.println(String.format("線程[%s]-調用processSuccess方法", Thread.currentThread().getName()));
}
}
}
某次執行的輸出結果如下:
斷路器[ResourceCircuitBreaker-SERVICE]重置
線程[SERVICE-CircuitBreakerWorker-0]-進入process方法,number:0,休眠67毫秒
返回結果:null,number:0
線程[SERVICE-CircuitBreakerWorker-1]-進入process方法,number:1,休眠85毫秒
返回結果:null,number:1
線程[SERVICE-CircuitBreakerWorker-2]-進入process方法,number:2,休眠72毫秒
返回結果:null,number:2
線程[SERVICE-CircuitBreakerWorker-3]-進入process方法,number:3,休眠88毫秒
返回結果:null,number:3
線程[SERVICE-CircuitBreakerWorker-4]-進入process方法,number:4,休眠28毫秒
返回結果:4,number:4
線程[SERVICE-CircuitBreakerWorker-5]-進入process方法,number:5,休眠102毫秒
斷路器[ResourceCircuitBreaker-SERVICE]狀態變更,[CLOSED]->[OPEN]
返回結果:null,number:5
返回結果:null,number:6
返回結果:null,number:7
返回結果:null,number:8
返回結果:null,number:9
斷路器[ResourceCircuitBreaker-SERVICE]狀態變更,[OPEN]->[HALF_OPEN]
線程[SERVICE-CircuitBreakerWorker-6]-調用processSuccess方法
斷路器[ResourceCircuitBreaker-SERVICE]狀態變更,[HALF_OPEN]->[CLOSED]
斷路器[ResourceCircuitBreaker-SERVICE]重置
線程[SERVICE-CircuitBreakerWorker-7]-進入process方法,number:0,休眠74毫秒
返回結果:null,number:0
線程[SERVICE-CircuitBreakerWorker-8]-進入process方法,number:1,休眠111毫秒
返回結果:null,number:1
線程[SERVICE-CircuitBreakerWorker-9]-進入process方法,number:2,休眠183毫秒
返回結果:null,number:2
滑動窗口和百分比統計
上一個小節已經實現了資源基於線程池隔離進行調用,但是有一點明顯的不足就是:斷路器的狀態管理和重置並不符合生產場景,HALF_OPEN -> CLOSED
的狀態切換和重置不應該在放行單個調用成功之后立刻觸發,而應該建立在一定時間范圍內,調用的(平均)失敗率下降到某個閾值或者調用的(平均)成功率恢復到某個閾值,否則很多場景下會導致斷路器的狀態頻繁發生切換,功能基本處於失效的狀態。也就是大多數場景下,一段時間內的failurePercent
會比異常計數和failureThreshold
的直接對比更加准確。可以引入滑動窗口(Sliding Window
)的概念,記錄每個時間單元內的調用總次數、調用成功次數、調用超時次數和非超時的調用失敗次數,為了簡化操作這個時間單元定義為1
秒:
定義一個用於記錄這四種調用次數的桶Bucket
類(這里的實現稍微跟上圖有點不同,非超時失敗修改為線程池拒絕的任務統計,而失敗統計包括了任務超時執行和一般的業務異常):
@RequiredArgsConstructor
@Getter
public class MetricInfo {
private final long total;
private final long success;
private final long failure;
private final long reject;
public static final MetricInfo EMPTY = new MetricInfo(0, 0, 0, 0);
public MetricInfo merge(MetricInfo other) {
return new MetricInfo(
this.total + other.getTotal(),
this.success + other.getSuccess(),
this.failure + other.getFailure(),
this.reject + other.getReject()
);
}
}
public class Bucket {
// 記錄窗口開始的時間戳
@Getter
private final long windowStartTimestamp;
private final LongAdder total;
private final LongAdder success;
private final LongAdder failure;
private final LongAdder reject;
public Bucket(long windowStartTimestamp) {
this.windowStartTimestamp = windowStartTimestamp;
this.total = new LongAdder();
this.success = new LongAdder();
this.reject = new LongAdder();
this.failure = new LongAdder();
}
public void increaseTotal() {
this.total.increment();
}
public void increaseSuccess() {
this.success.increment();
}
public void increaseFailure() {
this.failure.increment();
}
public void increaseReject() {
this.reject.increment();
}
public long totalCount() {
return this.total.sum();
}
public long successCount() {
return this.success.sum();
}
public long failureCount() {
return this.failure.sum();
}
public long rejectCount() {
return this.reject.sum();
}
public void reset() {
this.total.reset();
this.success.reset();
this.failure.reset();
this.reject.reset();
}
public MetricInfo metricInfo() {
return new MetricInfo(
totalCount(),
successCount(),
failureCount(),
rejectCount()
);
}
@Override
public String toString() {
return String.format("Bucket[wt=%d,t=%d,s=%d,f=%d,r=%d]",
windowStartTimestamp,
totalCount(),
successCount(),
failureCount(),
rejectCount()
);
}
}
在Hystrix
中,為了更加靈活,Bucket
中的計數器設計為LongAdder[]
類型,便於通過各種需要計數事件枚舉的順序值來直接進行計數和累加,而為了節約內存空間,滑動窗口設計成一個容量固定可復用的環形隊列BucketCircularArray#ListState
,這里可以站在巨人的肩膀上借鑒其思路實現BucketCircular
:
public class BucketCircular implements Iterable<Bucket> {
private final AtomicReference<BucketArray> bucketArray;
public BucketCircular(int bucketNumber) {
// 這里有個技巧,初始化數組的時候讓數組的總長度為桶數量+1,便於額外的添加和移除桶操作
AtomicReferenceArray<Bucket> buckets = new AtomicReferenceArray<>(bucketNumber + 1);
this.bucketArray = new AtomicReference<>(new BucketArray(buckets, 0, 0, bucketNumber));
}
public Bucket getTail() {
return this.bucketArray.get().tail();
}
/**
* 在環形隊列尾部添加一個桶
*/
public void addTail(Bucket bucket) {
BucketArray bucketArray = this.bucketArray.get();
BucketArray newBucketArray = bucketArray.addBucket(bucket);
// 這個方法會在鎖中執行,理論上不會CAS失敗
this.bucketArray.compareAndSet(bucketArray, newBucketArray);
}
public Bucket[] toArray() {
return this.bucketArray.get().toArray();
}
public int size() {
return this.bucketArray.get().getSize();
}
@Override
public Iterator<Bucket> iterator() {
return Collections.unmodifiableList(Arrays.asList(toArray())).iterator();
}
public void clear() {
while (true) {
BucketArray bucketArray = this.bucketArray.get();
BucketArray clear = bucketArray.clear();
if (this.bucketArray.compareAndSet(bucketArray, clear)) {
return;
}
}
}
}
添加一個新的Bucket
到循環隊列的尾部的時候,因為隊列的長度是固定的,需要判斷是否需要重新計算頭指針和尾指針。測試一下:
public static void main(String[] args) throws Exception {
BucketCircular circular = new BucketCircular(5);
circular.addTail(new Bucket(111L));
circular.addTail(new Bucket(System.currentTimeMillis()));
circular.addTail(new Bucket(System.currentTimeMillis()));
circular.addTail(new Bucket(System.currentTimeMillis()));
circular.addTail(new Bucket(System.currentTimeMillis()));
circular.addTail(new Bucket(System.currentTimeMillis()));
circular.addTail(new Bucket(222L));
Stream.of(circular.toArray()).forEach(System.out::println);
}
// 輸出結果
Bucket[wt=1603613365205,t=0,s=0,f=0,r=0]
Bucket[wt=1603613365205,t=0,s=0,f=0,r=0]
Bucket[wt=1603613365205,t=0,s=0,f=0,r=0]
Bucket[wt=1603613365205,t=0,s=0,f=0,r=0]
Bucket[wt=222,t=0,s=0,f=0,r=0]
接着編寫一個用於管理Bucket
和提供數據統計入口的SlidingWindowMonitor
:
// 累計數據累加器
public class BucketCumulativeCalculator {
private LongAdder total = new LongAdder();
private LongAdder success = new LongAdder();
private LongAdder failure = new LongAdder();
private LongAdder reject = new LongAdder();
public void addBucket(Bucket lb) {
total.add(lb.totalCount());
success.add(lb.successCount());
failure.add(lb.failureCount());
reject.add(lb.rejectCount());
}
public MetricInfo sum() {
return new MetricInfo(
total.sum(),
success.sum(),
failure.sum(),
reject.sum()
);
}
public void reset() {
total = new LongAdder();
success = new LongAdder();
failure = new LongAdder();
reject = new LongAdder();
}
}
// 下面的幾個參數為了簡單起見暫時固定
public class SlidingWindowMonitor {
/**
* 窗口長度 - 10秒
*/
private final int windowDuration = 10000;
/**
* 桶的大小 - 時間單位為1秒
*/
private final int bucketSizeInTimeUint = 1000;
/**
* 桶的數量 - 必須滿足windowDuration % bucketSizeInTimeUint = 0
*/
private final int bucketNumber = windowDuration / bucketSizeInTimeUint;
private final BucketCircular bucketCircular;
/**
* 用於創建桶的時候進行鎖定
*/
private final ReentrantLock lock;
/**
* 累計計數器
*/
private final BucketCumulativeCalculator calculator = new BucketCumulativeCalculator();
public SlidingWindowMonitor() {
this.bucketCircular = new BucketCircular(bucketNumber);
this.lock = new ReentrantLock();
}
void reset() {
Bucket tailBucket = bucketCircular.getTail();
if (null != tailBucket) {
calculator.addBucket(tailBucket);
}
bucketCircular.clear();
}
/**
* 累計統計
*/
public MetricInfo getCumulativeMetricInfo() {
return getCurrentMetricInfo().merge(calculator.sum());
}
/**
* 當前統計
*/
public MetricInfo getCurrentMetricInfo() {
Bucket currentBucket = getCurrentBucket();
if (null == currentBucket) {
return MetricInfo.EMPTY;
}
return currentBucket.metricInfo();
}
/**
* 滾動統計 - 這個就是斷路器計算錯誤請求百分比的來源數據
*/
public MetricInfo getRollingMetricInfo() {
Bucket currentBucket = getCurrentBucket();
if (null == currentBucket) {
return MetricInfo.EMPTY;
}
MetricInfo info = new MetricInfo(0, 0, 0, 0);
for (Bucket bucket : this.bucketCircular) {
info = info.merge(bucket.metricInfo());
}
return info;
}
/**
* 這個方法是核心 - 用於獲取當前系統時間對應的Bucket
*/
Bucket getCurrentBucket() {
long time = System.currentTimeMillis();
Bucket tailBucket = bucketCircular.getTail();
// 隊尾的桶還在當前的時間所在的桶區間內則直接使用此桶
if (null != tailBucket && time < tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint) {
return tailBucket;
}
if (lock.tryLock()) {
try {
// 循環隊列為空
if (null == bucketCircular.getTail()) {
Bucket newBucket = new Bucket(time);
bucketCircular.addTail(newBucket);
return newBucket;
} else {
// 需要創建足夠多的桶以追上當前的時間
for (int i = 0; i < bucketNumber; i++) {
tailBucket = bucketCircular.getTail();
// 最新的一個桶已經追上了當前時間
if (time < tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint) {
return tailBucket;
}
// 當前時間已經到了下一個窗口
else if (time > tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint + windowDuration) {
reset();
return getCurrentBucket();
}
// 這種情況是當前最新時間比窗口超前,要填補過去的桶
else {
bucketCircular.addTail(new Bucket(tailBucket.getWindowStartTimestamp() + bucketSizeInTimeUint));
calculator.addBucket(tailBucket);
}
}
return bucketCircular.getTail();
}
} finally {
lock.unlock();
}
} else {
// 獲取鎖失敗說明多線程並發創建桶,再獲取一次不空則為另一個獲取鎖成功的線程創建的最新的桶,否則需要進行線程等待和遞歸獲取
tailBucket = bucketCircular.getTail();
if (null != tailBucket) {
return tailBucket;
}
try {
Thread.sleep(5);
} catch (InterruptedException ignore) {
}
// 遞歸
return getCurrentBucket();
}
}
public void incrementTotal() {
getCurrentBucket().increaseTotal();
}
public void incrementSuccess() {
getCurrentBucket().increaseSuccess();
}
public void incrementFailure() {
getCurrentBucket().increaseFailure();
}
public void incrementReject() {
getCurrentBucket().increaseReject();
}
}
最后,把SlidingWindowMonitor
和之前的ResourceCircuitBreaker
做一次融合進化,得到SlidingWindowCircuitBreaker
:
package cn.throwx.cb;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
/**
* @author throwable
* @version v1
* @description
* @since 2020/10/25 17:14
*/
public class SlidingWindowCircuitBreaker {
/**
* 失敗百分比閾值
*/
private final long errorPercentThreshold;
/**
* 熔斷等待窗口
*/
private final long resetTimeout;
private AtomicReference<CircuitBreakerStatus> status;
private final ThreadPoolExecutor executor;
private final String circuitBreakerName;
/**
* 最后一次調用失敗的時間戳
*/
private long lastFailureTime;
/**
* 執行超時上限,單位毫秒
*/
private final long executionTimeout;
/**
* 滑動窗口監視器
*/
private final SlidingWindowMonitor slidingWindowMonitor;
public SlidingWindowCircuitBreaker(String resourceName,
long errorPercentThreshold,
long resetTimeout) {
CircuitBreakerResource resource = CircuitBreakerResourceManager.X.get(resourceName);
this.circuitBreakerName = "SlidingWindowCircuitBreaker-" + resourceName;
this.executor = resource.getExecutor();
this.executionTimeout = resource.getTimeout();
this.errorPercentThreshold = errorPercentThreshold;
this.resetTimeout = resetTimeout;
this.slidingWindowMonitor = new SlidingWindowMonitor();
reset();
}
public void reset() {
CircuitBreakerStatusMonitor.X.reset(this.circuitBreakerName);
this.status = new AtomicReference<>(CircuitBreakerStatus.CLOSED);
this.lastFailureTime = -1L;
}
@SuppressWarnings("unchecked")
public <T> T call(Supplier<T> supplier) {
return call(supplier, (Fallback<T>) Fallback.F);
}
public <T> T call(Supplier<T> supplier, Fallback<T> fallback) {
try {
if (shouldAllowExecution()) {
slidingWindowMonitor.incrementTotal();
Future<T> future = this.executor.submit(warp(supplier));
T result = future.get(executionTimeout, TimeUnit.MILLISECONDS);
markSuccess();
return result;
}
} catch (RejectedExecutionException ree) {
markReject();
} catch (Exception e) {
markFailure();
}
return fallback.fallback();
}
<T> Callable<T> warp(Supplier<T> supplier) {
return supplier::get;
}
public void call(Runnable runnable) {
call(() -> {
runnable.run();
return null;
});
}
boolean shouldAllowExecution() {
// 本質是Closed狀態
if (lastFailureTime == -1L) {
return true;
}
// 沒到達閾值
if (errorPercentThreshold > rollingErrorPercentage()) {
return true;
}
return shouldTryAfterRestTimeoutWindow()
&& changeStatus(CircuitBreakerStatus.OPEN, CircuitBreakerStatus.HALF_OPEN);
}
boolean changeStatus(CircuitBreakerStatus o, CircuitBreakerStatus n) {
boolean r = status.compareAndSet(o, n);
if (r) {
CircuitBreakerStatusMonitor.X.report(this.circuitBreakerName, o, n);
}
return r;
}
boolean shouldTryAfterRestTimeoutWindow() {
long lastFailureTimeSnap = lastFailureTime;
long currentTime = System.currentTimeMillis();
return currentTime > lastFailureTimeSnap + resetTimeout;
}
public void markSuccess() {
slidingWindowMonitor.incrementSuccess();
if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.CLOSED)) {
reset();
}
}
public void markReject() {
slidingWindowMonitor.incrementReject();
if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) {
this.lastFailureTime = System.currentTimeMillis();
}
}
public int rollingErrorPercentage() {
MetricInfo rollingMetricInfo = slidingWindowMonitor.getRollingMetricInfo();
long rejectCount = rollingMetricInfo.getReject();
long failureCount = rollingMetricInfo.getFailure();
long totalCount = rollingMetricInfo.getTotal();
int errorPercentage = (int) ((double) (rejectCount + failureCount) / totalCount * 100);
CircuitBreakerStatusMonitor.X.report(this.circuitBreakerName, String.format("錯誤百分比:%d", errorPercentage));
return errorPercentage;
}
public void markFailure() {
slidingWindowMonitor.incrementFailure();
if (changeStatus(CircuitBreakerStatus.HALF_OPEN, CircuitBreakerStatus.OPEN)) {
this.lastFailureTime = System.currentTimeMillis();
}
if (rollingErrorPercentage() >= errorPercentThreshold &&
changeStatus(CircuitBreakerStatus.CLOSED, CircuitBreakerStatus.OPEN)) {
this.lastFailureTime = System.currentTimeMillis();
}
}
}
編寫一個測試客戶端SlidingWindowCircuitBreakerClient
:
public class SlidingWindowCircuitBreakerClient {
public static void main(String[] args) throws Exception {
CircuitBreakerResourceConf conf = new CircuitBreakerResourceConf();
conf.setCoreSize(10);
conf.setQueueSize(0);
conf.setResourceName("SERVICE");
conf.setTimeout(50);
CircuitBreakerResourceManager.X.register(conf);
Service service = new Service();
SlidingWindowCircuitBreaker cb = new SlidingWindowCircuitBreaker("SERVICE", 50, 500);
for (int i = 0; i < 10; i++) {
int temp = i;
String result = cb.call(() -> service.process(temp));
System.out.println(String.format("返回結果:%s,number:%d", result, temp));
}
Thread.sleep(501L);
cb.call(service::processSuccess);
for (int i = 0; i < 3; i++) {
int temp = i;
String result = cb.call(() -> service.process(temp));
System.out.println(String.format("返回結果:%s,number:%d", result, temp));
}
Thread.sleep(501L);
cb.call(service::processSuccess);
cb.call(service::processSuccess);
}
public static class Service {
private final Random r = new Random();
public String process(int i) {
int sleep = r.nextInt(200);
System.out.println(String.format("線程[%s]-進入process方法,number:%d,休眠%d毫秒",
Thread.currentThread().getName(), i, sleep));
try {
Thread.sleep(sleep);
} catch (InterruptedException ignore) {
}
return String.valueOf(i);
}
public void processSuccess() {
System.out.println(String.format("線程[%s]-調用processSuccess方法", Thread.currentThread().getName()));
}
}
}
某次執行結果如下:
斷路器[SlidingWindowCircuitBreaker-SERVICE]重置
線程[SERVICE-CircuitBreakerWorker-0]-進入process方法,number:0,休眠67毫秒
斷路器[SlidingWindowCircuitBreaker-SERVICE]-錯誤百分比:100
斷路器[SlidingWindowCircuitBreaker-SERVICE]狀態變更,[CLOSED]->[OPEN]
返回結果:null,number:0
斷路器[SlidingWindowCircuitBreaker-SERVICE]-錯誤百分比:100
返回結果:null,number:1
斷路器[SlidingWindowCircuitBreaker-SERVICE]-錯誤百分比:100
返回結果:null,number:2
斷路器[SlidingWindowCircuitBreaker-SERVICE]-錯誤百分比:100
返回結果:null,number:3
斷路器[SlidingWindowCircuitBreaker-SERVICE]-錯誤百分比:100
返回結果:null,number:4
斷路器[SlidingWindowCircuitBreaker-SERVICE]-錯誤百分比:100
返回結果:null,number:5
斷路器[SlidingWindowCircuitBreaker-SERVICE]-錯誤百分比:100
返回結果:null,number:6
斷路器[SlidingWindowCircuitBreaker-SERVICE]-錯誤百分比:100
返回結果:null,number:7
斷路器[SlidingWindowCircuitBreaker-SERVICE]-錯誤百分比:100
返回結果:null,number:8
斷路器[SlidingWindowCircuitBreaker-SERVICE]-錯誤百分比:100
返回結果:null,number:9
斷路器[SlidingWindowCircuitBreaker-SERVICE]-錯誤百分比:100
斷路器[SlidingWindowCircuitBreaker-SERVICE]狀態變更,[OPEN]->[HALF_OPEN]
線程[SERVICE-CircuitBreakerWorker-1]-調用processSuccess方法
斷路器[SlidingWindowCircuitBreaker-SERVICE]狀態變更,[HALF_OPEN]->[CLOSED]
斷路器[SlidingWindowCircuitBreaker-SERVICE]重置
線程[SERVICE-CircuitBreakerWorker-2]-進入process方法,number:0,休眠84毫秒
斷路器[SlidingWindowCircuitBreaker-SERVICE]-錯誤百分比:66
斷路器[SlidingWindowCircuitBreaker-SERVICE]狀態變更,[CLOSED]->[OPEN]
返回結果:null,number:0
斷路器[SlidingWindowCircuitBreaker-SERVICE]-錯誤百分比:66
返回結果:null,number:1
斷路器[SlidingWindowCircuitBreaker-SERVICE]-錯誤百分比:66
返回結果:null,number:2
斷路器[SlidingWindowCircuitBreaker-SERVICE]-錯誤百分比:66
斷路器[SlidingWindowCircuitBreaker-SERVICE]狀態變更,[OPEN]->[HALF_OPEN]
線程[SERVICE-CircuitBreakerWorker-3]-調用processSuccess方法
斷路器[SlidingWindowCircuitBreaker-SERVICE]狀態變更,[HALF_OPEN]->[CLOSED]
斷路器[SlidingWindowCircuitBreaker-SERVICE]重置
線程[SERVICE-CircuitBreakerWorker-4]-調用processSuccess方法
小結
生產上應用CircuitBreaker
模式建議使用主流實現例如Hystrix
或者更活躍的Sentinel
,但是要深入學習此模式則需要老老實實做一次推演。
參考資料:
- CircuitBreaker - by Martin Fowler
- 《Release It! Design and Deploy Production-Ready Software》
(本文完 c-4-d e-a-20201025)