概述
為了保證系統不被突發流量擊垮,進行容量保障是十分有必要的。從架構的穩定性角度看,在有限資源的情況下,所能提供的單位時間服務能力也是有限的。假如超過承受能力,可能會帶來整個服務的停頓,應用的Crash,進而可能將風險傳遞給服務調用方造成整個系統的服務能力喪失,進而引發雪崩。
為了避免系統壓力大時引發服務雪崩,就需要在系統中引入限流,降級和熔斷等工具。
目的
最初在基礎數據內部,我們使用了限流組件,然后調研了降級熔斷框架Hystrix,發現這是一個很好的框架,能夠提升依賴大量外部服務的系統的容錯能力,但是不適合依賴比較簡單的基礎數據系統。
機票交易系統依賴諸多子系統,比如保險,各種X產品等。故而,在此簡單介紹一下,推廣給交易和報價的同學使用。
限流
根據排隊理論,具有延遲的服務隨着請求量的不斷提升,其平均響應時間也會迅速提升,為了保證服務的SLA,有必要控制單位時間的請求量。這就是限流為什么愈發重要的原因。
分類
qps限流
限制每秒處理請求數不超過閾值。
並發限流
限制同時處理的請求數目。Java 中的 Semaphore 是做並發限制的好工具,特別適用於資源有效的場景。
單機限流
Guava 中的 RateLimiter。
集群限流
TC 提供的 common-blocking 組件提供此功能。
算法
漏桶算法
漏桶算法思路很簡單,水(請求)先進入到漏桶里,漏桶以一定的速度出水,當水流入速度過大會直接溢出,可以看出漏桶算法能強行限制數據的傳輸速率。
旗艦店運價庫抓取調度
在旗艦店運價庫對航司報價的離線抓取中,由於航司接口有qps限制,需要限制訪問速率。所以我們借助redis隊列作為漏桶,實現了對航司接口訪問速率的控制。
令牌桶算法
對於很多應用場景來說,除了要求能夠限制數據的平均傳輸速率外,還要求允許某種程度的突發傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更為適合。
令牌桶算法的原理是系統會以一個恆定的速度往桶里放入令牌,而如果請求需要被處理,則需要先從桶里獲取一個令牌,當桶里沒有令牌可取時,則拒絕服務。
在 Guava 的 RateLimiter 中,使用的就是令牌桶算法,允許部分突發流量傳輸。在其源碼里,可以看到能夠突發傳輸的流量等於 maxBurstSeconds * qps。
/**
* This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling.
* The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in
* terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10
* seconds, we can save up to 2 * 10 = 20 permits.
*/
static
final
class
SmoothBursty
extends
SmoothRateLimiter {
/** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
final
double
maxBurstSeconds;
SmoothBursty(SleepingStopwatch stopwatch,
double
maxBurstSeconds) {
super
(stopwatch);
this
.maxBurstSeconds = maxBurstSeconds;
}
|
Guava 里除了允許突發傳輸的 SmoothBursty, 還有於此相反的,可以緩慢啟動的 SmoothWarmingUp。
滑動窗口
- TC 提供的 common-blocking 限流組件,背后采用滑動窗口來計算流量是否超出限制。
- 熔斷框架 Hystrix 也采用滑動窗口來統計服務調用信息。
RxJava 實現滑動窗口計數
RxJava 是一個響應式函數編程框架,以觀察者模式為骨架,能夠輕松實現滑動窗口計數功能。具體示例代碼如下:
package
com.rxjava.test;
import
org.slf4j.Logger;
import
org.slf4j.LoggerFactory;
import
rx.Observable;
import
rx.functions.Func1;
import
rx.functions.Func2;
import
rx.subjects.PublishSubject;
import
rx.subjects.SerializedSubject;
import
java.util.concurrent.TimeUnit;
/**
* 模擬滑動窗口計數
* Created by albon on 17/6/24.
*/
public
class
RollingWindowTest {
private
static
final
Logger logger = LoggerFactory.getLogger(WindowTest.
class
);
public
static
final
Func2<Integer, Integer, Integer> INTEGER_SUM =
(integer, integer2) -> integer + integer2;
public
static
final
Func1<Observable<Integer>, Observable<Integer>> WINDOW_SUM =
window -> window.scan(
0
, INTEGER_SUM).skip(
3
);
public
static
final
Func1<Observable<Integer>, Observable<Integer>> INNER_BUCKET_SUM =
integerObservable -> integerObservable.reduce(
0
, INTEGER_SUM);
public
static
void
main(String[] args)
throws
InterruptedException {
PublishSubject<Integer> publishSubject = PublishSubject.create();
SerializedSubject<Integer, Integer> serializedSubject = publishSubject.toSerialized();
serializedSubject
.window(
5
, TimeUnit.SECONDS)
// 5秒作為一個基本塊
.flatMap(INNER_BUCKET_SUM)
// 基本塊內數據求和
.window(
3
,
1
)
// 3個塊作為一個窗口,滾動布數為1
.flatMap(WINDOW_SUM)
// 窗口數據求和
.subscribe((Integer integer) ->
logger.info(
"[{}] call ...... {}"
,
Thread.currentThread().getName(), integer));
// 緩慢發送數據,觀察效果
for
(
int
i=
0
; i<
100
; ++i) {
if
(i <
30
) {
serializedSubject.onNext(
1
);
}
else
{
serializedSubject.onNext(
2
);
}
Thread.sleep(
1000
);
}
}
}
|
降級和熔斷
降級
業務降級,是指犧牲非核心的業務功能,保證核心功能的穩定運行。簡單來說,要實現優雅的業務降級,需要將功能實現拆分到相對獨立的不同代碼單元,分優先級進行隔離。在后台通過開關控制,降級部分非主流程的業務功能,減輕系統依賴和性能損耗,從而提升集群的整體吞吐率。
降級的重點是:業務之間有優先級之分。
降級的典型應用是:電商活動期間。
熔斷
老式電閘都安裝了保險絲,一旦有人使用超大功率的設備,保險絲就會燒斷以保護各個電器不被強電流給燒壞。同理我們的接口也需要安裝上“保險絲”,以防止非預期的請求對系統壓力過大而引起的系統癱瘓,當流量過大時,可以采取拒絕或者引流等機制。
同樣在分布式系統中,當被調用的遠程服務無法使用時,如果沒有過載保護,就會導致請求的資源阻塞在遠程服務器上耗盡資源。很多時候,剛開始可能只是出現了局部小規模的故障,然而由於種種原因,故障影響范圍越來越大,最終導致全局性的后果。這種過載保護,就是熔斷器。
熔斷器的設計思路
- Closed:初始狀態,熔斷器關閉,正常提供服務
- Open: 失敗次數,失敗百分比達到一定的閾值之后,熔斷器打開,停止訪問服務
- Half-Open:熔斷一定時間之后,小流量嘗試調用服務,如果成功則恢復,熔斷器變為Closed狀態
降級熔斷相似點
- 目的一致,都是從可用性可靠性着想,為防止系統的整體緩慢甚至崩潰,采用的技術手段
- 最終表現類似,對於兩者來說,最終讓用戶體驗到的是某些功能暫時不可達或不可用
- 粒度一般都是服務級別
- 自治性要求很高,熔斷模式一般都是服務基於策略的自動觸發,降級雖說可人工干預,但在微服務架構下,完全靠人顯然不可能,開關預置、配置中心都是必要手段
降級熔斷區別
- 觸發原因不一樣,服務熔斷一般是某個服務(下游服務)故障引起,而服務降級一般是從整體負荷考慮
- 自愈能力要求不一樣,服務熔斷在發生后有自愈能力,而服務降級沒有該職責
相關框架組件
common-blocking 限流組件
http://wiki.corp.qunar.com/display/devwiki/common-blocking
優點
- qconfig 控制,靈活方便
- 支持同步/異步模式,異步模式不增加接口響應時間
- 支持HTTP, Dubbo接口
- 集群級別的限流
- 支持自定義更細的限流粒度
dubbo 熔斷
http://wiki.corp.qunar.com/pages/viewpage.action?pageId=105912517
hystrix 降級熔斷
適用對象
依賴大量外部服務的業務
主要功能
- 服務故障時,觸發熔斷,快速失敗,而非排隊處理,占用系統資源
- 支持線程池隔離,避免個別依賴服務耗光線程而影響其他服務
- 提供兜底策略,以盡量減少失敗
- 通過限制線程池數目,或者信號量最大並發數,可以達到限流的目的
- 支持對結果進行Cache
- 將遠程服務調用邏輯封裝進一個HystrixCommand。
- 對於每次服務調用可以使用同步或異步機制,對應執行execute()或queue()。
- 判斷熔斷器(circuit-breaker)是否打開或者半打開狀態,如果打開跳到步驟8,進行回退策略,如果關閉進入步驟4。
- 判斷線程池/隊列/信號量(使用了艙壁隔離模式)是否跑滿,如果跑滿進入回退步驟8,否則繼續后續步驟5。
- run方法中執行了實際的服務調用。
- 服務調用發生超時時,進入步驟8。
- 判斷run方法中的代碼是否執行成功。
- 執行成功返回結果。
- 執行中出現錯誤則進入步驟8。
- 所有的運行狀態(成功,失敗,拒絕,超時)上報給熔斷器,用於統計從而影響熔斷器狀態。
- 進入getFallback()回退邏輯。
- 沒有實現getFallback()回退邏輯的調用將直接拋出異常。
- 回退邏輯調用成功直接返回。
- 回退邏輯調用失敗拋出異常。
- 返回執行成功結果。
命令模式
hystrix 使用命令模式對服務調用進行封裝,要想使用 hystrix,只需要繼承 HystrixCommand 即可。
public
class
CommandHelloFailure
extends
HystrixCommand<String> {
public
CommandHelloFailure() {
super
(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(HystrixCommandEnum.AV_QUERY.GROUP.KEY))
.andCommandKey(HystrixCommandKey.Factory.asKey(HystrixCommandEnum.AV_QUERY.KEY))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(
"AvPool"
))
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
.withCoreSize(
2
)
//ThreadPoolExecutor maximumPoolSize=maxnumsize=10
.withMaxQueueSize(-
1
)
//線程打滿后,直接快速拒絕。
)
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(
10000
)
//run方法執行時間,超過一秒就會被cancel.
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
.withCircuitBreakerRequestVolumeThreshold(
30
)
.withCircuitBreakerErrorThresholdPercentage(
50
)
//錯誤率達到50%后,開啟熔斷.
.withCircuitBreakerSleepWindowInMilliseconds(
5000
)
//熔斷后休眠5000毫秒后發起重試.
));
this
.name = name;
}
@Override
protected
String run() {
throw
new
RuntimeException(
"this command always fails"
);
}
@Override
protected
String getFallback() {
return
"Hello Failure "
+ name +
"!"
;
}
}
|
HystrixCommand有4個方法可供調用
方法名 |
返回值 |
含義 |
---|---|---|
execute |
run方法的返回值 |
同步阻塞執行 |
queue |
Future |
異步非阻塞執行 |
observe |
Observable<?> |
事件注冊前執行 |
toObservable |
Observable<?> |
事件注冊后執行 |
熔斷判斷邏輯關鍵源代碼
類名 |
說明 |
---|---|
HystrixCircuitBreaker |
熔斷判斷類 |
HealthCountsStream |
Command 執行結果統計類,主要提供了 HealthCounts 對象作為統計結果,用於 HystrixCircuitBreaker 中進行熔斷判斷 |
BucketedRollingCounterStream |
滑動窗口計數類 |
HystrixCommandCompletionStream |
Command 執行結果數據發布類 |
無線同學封裝的 qhystrix
無線平台的同學,對hystrix進行了適當的封裝,能夠和公司的公共組件進行配合使用,十分方便。
-
引入了 QConfig 動態配置,方便線上靈活修改服務配置,更多配置相關資料可以查看官方文檔 。
scheduledMonitorOpen=
true
#av_pool線程池核心線程數目
hystrix.threadpool.av_pool.coreSize=
10
#av_pool線程池隊列大小
hystrix.threadpool.av_pool.queueSizeRejectionThreshold=
2
#執行策略,以信號量/線程池的方式執行
hystrix.command.av_query.execution.isolation.strategy=THREAD
#超時是否啟用
hystrix.command.av_query.execution.timeout.enabled=
true
#當超時的時候是否中斷(interrupt) HystrixCommand.run()執行
hystrix.command.av_query.execution.isolation.thread.interruptOnTimeout=
true
#超時時間,支持線程池/信號量
hystrix.command.av_query.execution.isolation.thread.timeoutInMilliseconds=
10000
#統計執行數據的時間窗口,單位毫秒【此配置修改后需要重啟才能生效】
hystrix.command.av_query.metrics.rollingStats.timeInMilliseconds=
10000
#觸發熔斷器開啟的窗口時間需要達到的最小請求數
hystrix.command.av_query.circuitBreaker.requestVolumeThreshold=
10
#熔斷器保持開啟狀態時間。默認為
5000
毫秒,則熔斷器中斷請求
5
秒后會進入半打開狀態,放部分流量過去重試
hystrix.command.av_query.circuitBreaker.sleepWindowInMilliseconds=
10000
#窗口時間內的出錯率,默認為
50
,則達到
50
%出錯率時,熔斷開啟
hystrix.command.av_query.circuitBreaker.errorThresholdPercentage=
50
-
引入了 Watcher 監控,方便觀察服務運行情況,以及添加報警。
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SUCCESS_Count=
6
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SHORT_CIRCUITED_Time=
0
hystrix.CommandKey_av_query.cluster.RT_FALLBACK_SUCCESS_Count=
64
JVM_Thread_Count=
147
hystrix.CommandKey_av_query.cluster.RT_SUCCESS_Time=
0
hystrix.CommandKey_av_query.cluster.RT_FAILURE_Count=
25
JVM_PS_MarkSweep_Count=
0
Fare_Rule_Change_Task_Queue_Size_Value=
0
hystrix.CommandKey_av_query.cluster.RT_SHORT_CIRCUITED_Time=
0
hystrix.CommandKey_av_query.cluster.RT_SHORT_CIRCUITED_Count=
39
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FAILURE_Count=
25
hystrix.CommandKey_av_query.cluster.RT_SUCCESS_Count=
6
hystrix.CommandKey_av_query.cluster.RT_FALLBACK_SUCCESS_Time=
0
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FALLBACK_SUCCESS_Time=
0
Av_Change_Task_Queue_Size_Value=
0
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FALLBACK_SUCCESS_Count=
64
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FAILURE_Time=
0
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SUCCESS_Time=
0
no_y_email_task_Time=
0
no_y_queue_empty_Count=
0
hystrix.CommandKey_av_query.cluster.RT_FAILURE_Time=
0
JVM_PS_Scavenge_Count=
0
no_y_email_task_Count=
0
hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SHORT_CIRCUITED_Count=
39
no_y_queue_empty_Time=
0
注解驅動開發
注解是 Java 語言的一大優勢,很多開發框架比如 Spring, Hibernate 都將這一優勢使用到了極致。所以呢,官方 也提供了基於注解進行開發的 hystrix-javanica 包,能夠減少大量繁瑣代碼的編寫。
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-javanica</artifactId>
<version>
1.5
.
11
</version>
</dependency>
|
<aop:aspectj-autoproxy/>
<bean id=
"hystrixAspect"
class
=
"com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect"
></bean>
|
在POM引入jar包,並且配置spring aop即可。示例代碼如下:
package
com.qunar.flight.farecore.storage.service.av;
import
com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import
com.netflix.hystrix.strategy.HystrixPlugins;
import
com.qunar.mobile.hystrix.config.HystrixConfig;
import
com.qunar.mobile.hystrix.config.QconfigUtils;
import
com.qunar.mobile.hystrix.monitor.HystrixScheduledMonitor;
import
com.qunar.mobile.hystrix.monitor.QunarHystrixCommandExecutionHook;
import
org.slf4j.Logger;
import
org.slf4j.LoggerFactory;
import
org.springframework.stereotype.Service;
import
qunar.tc.qconfig.client.Configuration;
import
javax.annotation.PostConstruct;
import
java.util.Map;
/**
* @author albon
* Date: 17-6-30
* Time: 上午10:44
*/
@Service
public
class
TestHystrixService {
private
static
final
Logger logger = LoggerFactory.getLogger(TestHystrixService.
class
);
@PostConstruct
public
void
init() {
logger.info(
"init execution hook"
);
HystrixPlugins.getInstance().registerCommandExecutionHook(
new
QunarHystrixCommandExecutionHook());
QconfigUtils.getInstance().getHystrixMapConfig()
.addListener(
new
Configuration.ConfigListener<Map<String, String>>() {
@Override
public
void
onLoad(Map<String, String> conf) {
boolean
isOpen = Boolean.parseBoolean(conf.get(
"scheduledMonitorOpen"
));
HystrixScheduledMonitor.getInstance().setOpen(isOpen);
// qconfig熱發修改配置
HystrixConfig.getInstance().setConfig(conf);
}
});
}
@HystrixCommand
(commandKey =
"av_query"
, groupKey =
"av_group"
, threadPoolKey =
"av_pool"
, fallbackMethod =
"queryFallback"
)
public
String query() {
return
String.format(
"[%s] RUN SUCCESS"
, Thread.currentThread().getName());
}
public
String queryFallback() {
return
String.format(
"[%s] FALLBACK"
, Thread.currentThread().getName());
}
}
|
注意:因為注解方式不再繼承quanr-hystrix里的AbstractHystrixCommand,故這里特意在 PostContruct 方法里,注冊了 Hook 類,以及對 QConfig 配置進行監聽。
在使用時,曾經遇到一個小問題,異常信息如下:
Caused by: org.aspectj.apache.bcel.classfile.ClassFormatException: Invalid
byte
tag in constant pool:
18
at org.aspectj.apache.bcel.classfile.ClassParser.readConstantPool(ClassParser.java:
192
) ~[aspectjweaver-
1.6
.
10
.jar:
1.6
.
10
]
at org.aspectj.apache.bcel.classfile.ClassParser.parse(ClassParser.java:
131
) ~[aspectjweaver-
1.6
.
10
.jar:
1.6
.
10
]
at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadJavaClass(NonCachingClassLoaderRepository.java:
262
) ~[aspectjweaver-
1.6
.
10
.jar:
1.6
.
10
]
at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadClass(NonCachingClassLoaderRepository.java:
242
) ~[aspectjweaver-
1.6
.
10
.jar:
1.6
.
10
]
at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadClass(NonCachingClassLoaderRepository.java:
249
) ~[aspectjweaver-
1.6
.
10
.jar:
1.6
.
10
]
at org.aspectj.weaver.reflect.Java15AnnotationFinder.getAnnotations(Java15AnnotationFinder.java:
202
) ~[aspectjweaver-
1.6
.
10
.jar:
1.6
.
10
]
at org.aspectj.weaver.reflect.ReflectionBasedResolvedMemberImpl.unpackAnnotations(ReflectionBasedResolvedMemberImpl.java:
211
) ~[aspectjweaver-
1.6
.
10
.jar:
1.6
.
10
]
at org.aspectj.weaver.reflect.ReflectionBasedResolvedMemberImpl.hasAnnotation(ReflectionBasedResolvedMemberImpl.java:
163
) ~[aspectjweaver-
1.6
.
10
.jar:
1.6
.
10
]
at org.aspectj.weaver.patterns.ExactAnnotationTypePattern.matches(ExactAnnotationTypePattern.java:
109
) ~[aspectjweaver-
1.6
.
10
.jar:
1.6
.
10
]
at org.aspectj.weaver.patterns.ExactAnnotationTypePattern.matches(ExactAnnotationTypePattern.java:
96
) ~[aspectjweaver-
1.6
.
10
.jar:
1.6
.
10
]
|
通過升級aspectjweaver包到最新版解決
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>
1.8
.
10
</version>
</dependency>
|
更詳細的資料可以查看官方文檔。
資料
qunar-hystrixhttp://gitlab.corp.qunar.com/mobile_public/qunar-hystrix/wikis/start
官方文檔https://github.com/Netflix/Hystrix/wiki
javanica文檔 https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-javanica
使用中遇到的坑
commons-configuration 包版本低引起的 NPE 異常
報價同學在其系統中引入時,遇到了很奇怪的 NPE 異常
java.lang.NullPointerException
at com.netflix.config.ConcurrentMapConfiguration.clearConfigurationListeners(ConcurrentMapConfiguration.java:
330
)
at org.apache.commons.configuration.event.EventSource.<init>(EventSource.java:
76
)
at org.apache.commons.configuration.AbstractConfiguration.<init>(AbstractConfiguration.java:
63
)
at com.netflix.config.ConcurrentMapConfiguration.<init>(ConcurrentMapConfiguration.java:
68
)
at com.netflix.config.ConcurrentCompositeConfiguration.<init>(ConcurrentCompositeConfiguration.java:
172
)
at com.netflix.config.ConfigurationManager.getConfigInstance(ConfigurationManager.java:
125
)
at com.netflix.config.DynamicPropertyFactory.getInstance(DynamicPropertyFactory.java:
263
)
at com.netflix.config.DynamicProperty.getInstance(DynamicProperty.java:
245
)
at com.netflix.config.PropertyWrapper.<init>(PropertyWrapper.java:
58
)
at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius$ArchaiusDynamicProperty.<init>(HystrixDynamicPropertiesArchaius.java:
62
)
at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius$StringDynamicProperty.<init>(HystrixDynamicPropertiesArchaius.java:
73
)
at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius.getString(HystrixDynamicPropertiesArchaius.java:
34
)
at com.netflix.hystrix.strategy.HystrixPlugins.getPluginImplementationViaProperties(HystrixPlugins.java:
344
)
at com.netflix.hystrix.strategy.HystrixPlugins.getPluginImplementation(HystrixPlugins.java:
334
)
at com.netflix.hystrix.strategy.HystrixPlugins.getPropertiesStrategy(HystrixPlugins.java:
243
)
at com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory.getCommandProperties(HystrixPropertiesFactory.java:
62
)
at com.netflix.hystrix.AbstractCommand.initCommandProperties(AbstractCommand.java:
204
)
at com.netflix.hystrix.AbstractCommand.<init>(AbstractCommand.java:
163
)
at com.netflix.hystrix.HystrixCommand.<init>(HystrixCommand.java:
147
)
at com.netflix.hystrix.HystrixCommand.<init>(HystrixCommand.java:
133
)
|
ConcurrentMapConfiguration.clearConfigurationListeners 方法內容如下:
public
class
ConcurrentMapConfiguration
extends
AbstractConfiguration {
protected
ConcurrentHashMap<String,Object> map;
private
Collection<ConfigurationListener> listeners =
new
CopyOnWriteArrayList<ConfigurationListener>();
@Override
public
void
clearConfigurationListeners() {
listeners.clear();
// NPE異常就在這一行
}
}
|
初看代碼,listeners 變量是有初始化語句的,為啥還會是 null 呢?我們繼續往上看 ConcurrentMapConfiguration 的父類,clearConfigurationListeners 方法覆蓋的是父類 EventSource 的同名方法。EventSource 代碼如下:
public
class
EventSource {
/** A collection for the registered event listeners. */
private
Collection listeners;
/** A counter for the detail events. */
private
int
detailEvents;
/**
* Creates a new instance of <code>EventSource</code>.
*/
public
EventSource() {
clearConfigurationListeners();
}
/**
* Removes all registered configuration listeners.
*/
public
void
clearConfigurationListeners() {
listeners =
new
LinkedList();
}
|
clearConfigurationListeners 方法是在父類的構造函數中調用的,此時子類還沒有做初始化操作,listeners 也還沒有賦值,這就奇葩了。仔細觀察后發現,EventSource 是 commons-configuration 包里的,而 ConcurrentMapConfiguration 是 archaius-core 包(Netflix 開源的基於java的配置管理類庫)的,archaius 中引用的 commons-configuration 是較高的版本 1.8。高版本的 EventSource 代碼有所不同,其構造函數里沒有再調用 clearConfigurationListeners,而是調用 initListeners 方法。升級 pom 中的 commons-configuration 版本后,系統運行正常啦。
public
class
EventSource {
/** A collection for the registered event listeners. */
private
Collection<ConfigurationListener> listeners;
/**
* Creates a new instance of {@code EventSource}.
*/
public
EventSource() {
initListeners();
}
/**
* Initializes the collections for storing registered event listeners.
*/
private
void
initListeners() {
listeners =
new
CopyOnWriteArrayList<ConfigurationListener>();
errorListeners =
new
CopyOnWriteArrayList<ConfigurationErrorListener>();
}
|
dynamic-limiter 組件
設計目的
使用 Hystrix 編程必須使用其命令模式,繼承 HystrixCommand 或 HystrixObservableCommand。對於程序中原先采用異步回調模式的代碼,使用 Hystrix 必須改造成同步模式,涉及到系統代碼的大量改造,並且隔離在單獨的線程池中,導致線程切換增加,影響性能。
為此,我們在深入了解 Hystrix 原理之后,決定自己動手寫一個降級熔斷工具 dynamic-limiter,提供以下兩種功能:
- 簡化給異步回調模式的代碼增加降級熔斷的功能的步驟。主要做法是,不再采用 Hystrix 僵硬的命令模式,分離服務調用結果的監控和熔斷判斷邏輯。
- 針對接收消息->獲取資源→進入計算隊列的編程模式,設計動態限流組件,邏輯如下圖所示:
3. 監控系統負載,動態設置限流閾值,原理如下: