限流與熔斷


概述

 

為了保證系統不被突發流量擊垮,進行容量保障是十分有必要的。從架構的穩定性角度看,在有限資源的情況下,所能提供的單位時間服務能力也是有限的。假如超過承受能力,可能會帶來整個服務的停頓,應用的Crash,進而可能將風險傳遞給服務調用方造成整個系統的服務能力喪失,進而引發雪崩。

 

為了避免系統壓力大時引發服務雪崩,就需要在系統中引入限流,降級和熔斷等工具。

 

目的

 

最初在基礎數據內部,我們使用了限流組件,然后調研了降級熔斷框架Hystrix,發現這是一個很好的框架,能夠提升依賴大量外部服務的系統的容錯能力,但是不適合依賴比較簡單的基礎數據系統。

 

機票交易系統依賴諸多子系統,比如保險,各種X產品等。故而,在此簡單介紹一下,推廣給交易和報價的同學使用。

 

限流

 

根據排隊理論,具有延遲的服務隨着請求量的不斷提升,其平均響應時間也會迅速提升,為了保證服務的SLA,有必要控制單位時間的請求量。這就是限流為什么愈發重要的原因。

 

分類

 

qps限流

 

限制每秒處理請求數不超過閾值。

 

並發限流

 

限制同時處理的請求數目。Java 中的 Semaphore 是做並發限制的好工具,特別適用於資源有效的場景。

 

單機限流

 

Guava 中的 RateLimiter。

 

集群限流

 

TC 提供的 common-blocking 組件提供此功能。

 

算法

 

漏桶算法

 

漏桶算法思路很簡單,水(請求)先進入到漏桶里,漏桶以一定的速度出水,當水流入速度過大會直接溢出,可以看出漏桶算法能強行限制數據的傳輸速率。

 

 

旗艦店運價庫抓取調度

 

在旗艦店運價庫對航司報價的離線抓取中,由於航司接口有qps限制,需要限制訪問速率。所以我們借助redis隊列作為漏桶,實現了對航司接口訪問速率的控制。

 

 

令牌桶算法

 

對於很多應用場景來說,除了要求能夠限制數據的平均傳輸速率外,還要求允許某種程度的突發傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更為適合。
令牌桶算法的原理是系統會以一個恆定的速度往桶里放入令牌,而如果請求需要被處理,則需要先從桶里獲取一個令牌,當桶里沒有令牌可取時,則拒絕服務。

 

 

在 Guava 的 RateLimiter 中,使用的就是令牌桶算法,允許部分突發流量傳輸。在其源碼里,可以看到能夠突發傳輸的流量等於 maxBurstSeconds * qps。

 

/**
  * This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling.
  * The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in
  * terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10
  * seconds, we can save up to 2 * 10 = 20 permits.
  */
static  final  class  SmoothBursty  extends  SmoothRateLimiter {
   /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
   final  double  maxBurstSeconds;
 
   SmoothBursty(SleepingStopwatch stopwatch,  double  maxBurstSeconds) {
     super (stopwatch);
     this .maxBurstSeconds = maxBurstSeconds;
   }

 

Guava 里除了允許突發傳輸的 SmoothBursty, 還有於此相反的,可以緩慢啟動的 SmoothWarmingUp。

 

滑動窗口

 

  1. TC 提供的 common-blocking 限流組件,背后采用滑動窗口來計算流量是否超出限制。
  2. 熔斷框架 Hystrix 也采用滑動窗口來統計服務調用信息。

 

RxJava 實現滑動窗口計數

 

RxJava 是一個響應式函數編程框架,以觀察者模式為骨架,能夠輕松實現滑動窗口計數功能。具體示例代碼如下:

 

package  com.rxjava.test;
 
import  org.slf4j.Logger;
import  org.slf4j.LoggerFactory;
import  rx.Observable;
import  rx.functions.Func1;
import  rx.functions.Func2;
import  rx.subjects.PublishSubject;
import  rx.subjects.SerializedSubject;
 
import  java.util.concurrent.TimeUnit;
 
/**
  * 模擬滑動窗口計數
  * Created by albon on 17/6/24.
  */
public  class  RollingWindowTest {
     private  static  final  Logger logger = LoggerFactory.getLogger(WindowTest. class );
 
     public  static  final  Func2<Integer, Integer, Integer> INTEGER_SUM =
             (integer, integer2) -> integer + integer2;
 
     public  static  final  Func1<Observable<Integer>, Observable<Integer>> WINDOW_SUM =
             window -> window.scan( 0 , INTEGER_SUM).skip( 3 );
 
     public  static  final  Func1<Observable<Integer>, Observable<Integer>> INNER_BUCKET_SUM =
             integerObservable -> integerObservable.reduce( 0 , INTEGER_SUM);
 
     public  static  void  main(String[] args)  throws  InterruptedException {
         PublishSubject<Integer> publishSubject = PublishSubject.create();
         SerializedSubject<Integer, Integer> serializedSubject = publishSubject.toSerialized();
 
         serializedSubject
                 .window( 5 , TimeUnit.SECONDS)  // 5秒作為一個基本塊
                 .flatMap(INNER_BUCKET_SUM)            // 基本塊內數據求和
                 .window( 3 1 )               // 3個塊作為一個窗口,滾動布數為1
                 .flatMap(WINDOW_SUM)                  // 窗口數據求和
                 .subscribe((Integer integer) ->
                         logger.info( "[{}] call ...... {}" ,
                         Thread.currentThread().getName(), integer));
 
         // 緩慢發送數據,觀察效果
         for  ( int  i= 0 ; i< 100 ; ++i) {
             if  (i <  30 ) {
                 serializedSubject.onNext( 1 );
             else  {
                 serializedSubject.onNext( 2 );
             }
             Thread.sleep( 1000 );
         }
     }
}

 

降級和熔斷

 

降級

 

業務降級,是指犧牲非核心的業務功能,保證核心功能的穩定運行。簡單來說,要實現優雅的業務降級,需要將功能實現拆分到相對獨立的不同代碼單元,分優先級進行隔離。在后台通過開關控制,降級部分非主流程的業務功能,減輕系統依賴和性能損耗,從而提升集群的整體吞吐率。

 

降級的重點是:業務之間有優先級之分。

 

降級的典型應用是:電商活動期間。

 

熔斷

 

老式電閘都安裝了保險絲,一旦有人使用超大功率的設備,保險絲就會燒斷以保護各個電器不被強電流給燒壞。同理我們的接口也需要安裝上“保險絲”,以防止非預期的請求對系統壓力過大而引起的系統癱瘓,當流量過大時,可以采取拒絕或者引流等機制。

 

同樣在分布式系統中,當被調用的遠程服務無法使用時,如果沒有過載保護,就會導致請求的資源阻塞在遠程服務器上耗盡資源。很多時候,剛開始可能只是出現了局部小規模的故障,然而由於種種原因,故障影響范圍越來越大,最終導致全局性的后果。這種過載保護,就是熔斷器。

 

熔斷器的設計思路

 

 

  1. Closed:初始狀態,熔斷器關閉,正常提供服務
  2. Open: 失敗次數,失敗百分比達到一定的閾值之后,熔斷器打開,停止訪問服務
  3. Half-Open:熔斷一定時間之后,小流量嘗試調用服務,如果成功則恢復,熔斷器變為Closed狀態

 

降級熔斷相似點

 

  1. 目的一致,都是從可用性可靠性着想,為防止系統的整體緩慢甚至崩潰,采用的技術手段
  2. 最終表現類似,對於兩者來說,最終讓用戶體驗到的是某些功能暫時不可達或不可用
  3. 粒度一般都是服務級別
  4. 自治性要求很高,熔斷模式一般都是服務基於策略的自動觸發,降級雖說可人工干預,但在微服務架構下,完全靠人顯然不可能,開關預置、配置中心都是必要手段

 

降級熔斷區別

 

  1. 觸發原因不一樣,服務熔斷一般是某個服務(下游服務)故障引起,而服務降級一般是從整體負荷考慮
  2. 自愈能力要求不一樣,服務熔斷在發生后有自愈能力,而服務降級沒有該職責

 

相關框架組件

 

common-blocking 限流組件

 

http://wiki.corp.qunar.com/display/devwiki/common-blocking

 

優點

 

  1. qconfig 控制,靈活方便
  2. 支持同步/異步模式,異步模式不增加接口響應時間
  3. 支持HTTP, Dubbo接口
  4. 集群級別的限流
  5. 支持自定義更細的限流粒度

 

dubbo 熔斷

 

http://wiki.corp.qunar.com/pages/viewpage.action?pageId=105912517

 

 

hystrix 降級熔斷

 

適用對象

 

依賴大量外部服務的業務

 

 

主要功能

 

  1. 服務故障時,觸發熔斷,快速失敗,而非排隊處理,占用系統資源
  2. 支持線程池隔離,避免個別依賴服務耗光線程而影響其他服務
  3. 提供兜底策略,以盡量減少失敗
  4. 通過限制線程池數目,或者信號量最大並發數,可以達到限流的目的
  5. 支持對結果進行Cache

 


圖中流程的說明:

 

  1. 將遠程服務調用邏輯封裝進一個HystrixCommand。
  2. 對於每次服務調用可以使用同步或異步機制,對應執行execute()或queue()。
  3. 判斷熔斷器(circuit-breaker)是否打開或者半打開狀態,如果打開跳到步驟8,進行回退策略,如果關閉進入步驟4。
  4. 判斷線程池/隊列/信號量(使用了艙壁隔離模式)是否跑滿,如果跑滿進入回退步驟8,否則繼續后續步驟5。
  5. run方法中執行了實際的服務調用。
    1. 服務調用發生超時時,進入步驟8。
  6. 判斷run方法中的代碼是否執行成功。
    1. 執行成功返回結果。
    2. 執行中出現錯誤則進入步驟8。
  7. 所有的運行狀態(成功,失敗,拒絕,超時)上報給熔斷器,用於統計從而影響熔斷器狀態。
  8. 進入getFallback()回退邏輯。
    1. 沒有實現getFallback()回退邏輯的調用將直接拋出異常。
    2. 回退邏輯調用成功直接返回。
    3. 回退邏輯調用失敗拋出異常。
  9. 返回執行成功結果。

 

命令模式

 

hystrix 使用命令模式對服務調用進行封裝,要想使用 hystrix,只需要繼承 HystrixCommand 即可。

 

public  class  CommandHelloFailure  extends  HystrixCommand<String> {
 
     public  CommandHelloFailure() {
        super (HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(HystrixCommandEnum.AV_QUERY.GROUP.KEY))
                 .andCommandKey(HystrixCommandKey.Factory.asKey(HystrixCommandEnum.AV_QUERY.KEY))
                 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey( "AvPool" ))
                 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                         .withCoreSize( 2 ) //ThreadPoolExecutor maximumPoolSize=maxnumsize=10
                         .withMaxQueueSize(- 1 ) //線程打滿后,直接快速拒絕。
                 )
                 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                         .withExecutionTimeoutInMilliseconds( 10000 ) //run方法執行時間,超過一秒就會被cancel.
                         .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                         .withCircuitBreakerRequestVolumeThreshold( 30 )
                         .withCircuitBreakerErrorThresholdPercentage( 50 ) //錯誤率達到50%后,開啟熔斷.
                         .withCircuitBreakerSleepWindowInMilliseconds( 5000 ) //熔斷后休眠5000毫秒后發起重試.
                 ));         this .name = name;
     }
 
     @Override
     protected  String run() {
         throw  new  RuntimeException( "this command always fails" );
     }
 
     @Override
     protected  String getFallback() {
         return  "Hello Failure "  + name +  "!" ;
     }
}

 

HystrixCommand有4個方法可供調用

 

方法名

返回值

含義

execute

run方法的返回值

同步阻塞執行

queue

Future

異步非阻塞執行

observe

Observable<?>

事件注冊前執行

toObservable

Observable<?>

事件注冊后執行

 

熔斷判斷邏輯關鍵源代碼

 

類名

說明

HystrixCircuitBreaker

熔斷判斷類

HealthCountsStream

Command 執行結果統計類,主要提供了 HealthCounts 對象作為統計結果,用於 HystrixCircuitBreaker 中進行熔斷判斷

BucketedRollingCounterStream 
BucketedCounterStream

滑動窗口計數類

HystrixCommandCompletionStream

Command 執行結果數據發布類

 

無線同學封裝的 qhystrix

 

無線平台的同學,對hystrix進行了適當的封裝,能夠和公司的公共組件進行配合使用,十分方便。

 

  1. 引入了 QConfig 動態配置,方便線上靈活修改服務配置,更多配置相關資料可以查看官方文檔 。

    scheduledMonitorOpen= true
    #av_pool線程池核心線程數目
    hystrix.threadpool.av_pool.coreSize= 10
    #av_pool線程池隊列大小
    hystrix.threadpool.av_pool.queueSizeRejectionThreshold= 2
    #執行策略,以信號量/線程池的方式執行
    hystrix.command.av_query.execution.isolation.strategy=THREAD
    #超時是否啟用
    hystrix.command.av_query.execution.timeout.enabled= true
    #當超時的時候是否中斷(interrupt) HystrixCommand.run()執行
    hystrix.command.av_query.execution.isolation.thread.interruptOnTimeout= true
    #超時時間,支持線程池/信號量
    hystrix.command.av_query.execution.isolation.thread.timeoutInMilliseconds= 10000
    #統計執行數據的時間窗口,單位毫秒【此配置修改后需要重啟才能生效】
    hystrix.command.av_query.metrics.rollingStats.timeInMilliseconds= 10000
    #觸發熔斷器開啟的窗口時間需要達到的最小請求數
    hystrix.command.av_query.circuitBreaker.requestVolumeThreshold= 10
    #熔斷器保持開啟狀態時間。默認為 5000 毫秒,則熔斷器中斷請求 5 秒后會進入半打開狀態,放部分流量過去重試
    hystrix.command.av_query.circuitBreaker.sleepWindowInMilliseconds= 10000
    #窗口時間內的出錯率,默認為 50 ,則達到 50 %出錯率時,熔斷開啟
    hystrix.command.av_query.circuitBreaker.errorThresholdPercentage= 50
  2. 引入了 Watcher 監控,方便觀察服務運行情況,以及添加報警。

    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SUCCESS_Count= 6
    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SHORT_CIRCUITED_Time= 0
    hystrix.CommandKey_av_query.cluster.RT_FALLBACK_SUCCESS_Count= 64
    JVM_Thread_Count= 147
    hystrix.CommandKey_av_query.cluster.RT_SUCCESS_Time= 0
    hystrix.CommandKey_av_query.cluster.RT_FAILURE_Count= 25
    JVM_PS_MarkSweep_Count= 0
    Fare_Rule_Change_Task_Queue_Size_Value= 0
    hystrix.CommandKey_av_query.cluster.RT_SHORT_CIRCUITED_Time= 0
    hystrix.CommandKey_av_query.cluster.RT_SHORT_CIRCUITED_Count= 39
    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FAILURE_Count= 25
    hystrix.CommandKey_av_query.cluster.RT_SUCCESS_Count= 6
    hystrix.CommandKey_av_query.cluster.RT_FALLBACK_SUCCESS_Time= 0
    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FALLBACK_SUCCESS_Time= 0
    Av_Change_Task_Queue_Size_Value= 0
    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FALLBACK_SUCCESS_Count= 64
    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FAILURE_Time= 0
    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SUCCESS_Time= 0
    no_y_email_task_Time= 0
    no_y_queue_empty_Count= 0
    hystrix.CommandKey_av_query.cluster.RT_FAILURE_Time= 0
    JVM_PS_Scavenge_Count= 0
    no_y_email_task_Count= 0
    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SHORT_CIRCUITED_Count= 39
    no_y_queue_empty_Time= 0

 

注解驅動開發

 

注解是 Java 語言的一大優勢,很多開發框架比如 Spring, Hibernate 都將這一優勢使用到了極致。所以呢,官方 也提供了基於注解進行開發的 hystrix-javanica 包,能夠減少大量繁瑣代碼的編寫。

 

<dependency>
     <groupId>com.netflix.hystrix</groupId>
     <artifactId>hystrix-javanica</artifactId>
     <version> 1.5 . 11 </version>
</dependency>

 

<aop:aspectj-autoproxy/>
<bean id= "hystrixAspect"  class = "com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect" ></bean>

 

在POM引入jar包,並且配置spring aop即可。示例代碼如下:

 

package  com.qunar.flight.farecore.storage.service.av;
 
import  com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import  com.netflix.hystrix.strategy.HystrixPlugins;
import  com.qunar.mobile.hystrix.config.HystrixConfig;
import  com.qunar.mobile.hystrix.config.QconfigUtils;
import  com.qunar.mobile.hystrix.monitor.HystrixScheduledMonitor;
import  com.qunar.mobile.hystrix.monitor.QunarHystrixCommandExecutionHook;
import  org.slf4j.Logger;
import  org.slf4j.LoggerFactory;
import  org.springframework.stereotype.Service;
import  qunar.tc.qconfig.client.Configuration;
 
import  javax.annotation.PostConstruct;
import  java.util.Map;
 
/**
  * @author albon
  *         Date: 17-6-30
  *         Time: 上午10:44
  */
@Service
public  class  TestHystrixService {
     private  static  final  Logger logger = LoggerFactory.getLogger(TestHystrixService. class );
 
     @PostConstruct
     public  void  init() {
         logger.info( "init execution hook" );
 
         HystrixPlugins.getInstance().registerCommandExecutionHook( new  QunarHystrixCommandExecutionHook());
         QconfigUtils.getInstance().getHystrixMapConfig()
                 .addListener( new  Configuration.ConfigListener<Map<String, String>>() {
                     @Override
                     public  void  onLoad(Map<String, String> conf) {
                         boolean  isOpen = Boolean.parseBoolean(conf.get( "scheduledMonitorOpen" ));
                         HystrixScheduledMonitor.getInstance().setOpen(isOpen);
 
                         // qconfig熱發修改配置
                         HystrixConfig.getInstance().setConfig(conf);
                     }
                 });
     }
 
     @HystrixCommand (commandKey =  "av_query" , groupKey =  "av_group" , threadPoolKey =  "av_pool" , fallbackMethod =  "queryFallback" )
     public  String query() {
         return  String.format( "[%s] RUN SUCCESS" , Thread.currentThread().getName());
     }
 
     public  String queryFallback() {
         return  String.format( "[%s] FALLBACK" , Thread.currentThread().getName());
     }
 
}

 

注意:因為注解方式不再繼承quanr-hystrix里的AbstractHystrixCommand,故這里特意在 PostContruct 方法里,注冊了 Hook 類,以及對 QConfig 配置進行監聽。

 

在使用時,曾經遇到一個小問題,異常信息如下:

 

Caused by: org.aspectj.apache.bcel.classfile.ClassFormatException: Invalid  byte  tag in constant pool:  18
     at org.aspectj.apache.bcel.classfile.ClassParser.readConstantPool(ClassParser.java: 192 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.apache.bcel.classfile.ClassParser.parse(ClassParser.java: 131 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadJavaClass(NonCachingClassLoaderRepository.java: 262 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadClass(NonCachingClassLoaderRepository.java: 242 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadClass(NonCachingClassLoaderRepository.java: 249 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.weaver.reflect.Java15AnnotationFinder.getAnnotations(Java15AnnotationFinder.java: 202 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.weaver.reflect.ReflectionBasedResolvedMemberImpl.unpackAnnotations(ReflectionBasedResolvedMemberImpl.java: 211 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.weaver.reflect.ReflectionBasedResolvedMemberImpl.hasAnnotation(ReflectionBasedResolvedMemberImpl.java: 163 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.weaver.patterns.ExactAnnotationTypePattern.matches(ExactAnnotationTypePattern.java: 109 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.weaver.patterns.ExactAnnotationTypePattern.matches(ExactAnnotationTypePattern.java: 96 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]

 

通過升級aspectjweaver包到最新版解決

 

<dependency>
     <groupId>org.aspectj</groupId>
     <artifactId>aspectjweaver</artifactId>
     <version> 1.8 . 10 </version>
</dependency>

 

更詳細的資料可以查看官方文檔

 

資料

 

qunar-hystrixhttp://gitlab.corp.qunar.com/mobile_public/qunar-hystrix/wikis/start
官方文檔https://github.com/Netflix/Hystrix/wiki
javanica文檔 https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-javanica

使用中遇到的坑

commons-configuration 包版本低引起的 NPE 異常

報價同學在其系統中引入時,遇到了很奇怪的 NPE 異常

java.lang.NullPointerException
         at com.netflix.config.ConcurrentMapConfiguration.clearConfigurationListeners(ConcurrentMapConfiguration.java: 330 )
         at org.apache.commons.configuration.event.EventSource.<init>(EventSource.java: 76 )
         at org.apache.commons.configuration.AbstractConfiguration.<init>(AbstractConfiguration.java: 63 )
         at com.netflix.config.ConcurrentMapConfiguration.<init>(ConcurrentMapConfiguration.java: 68 )
         at com.netflix.config.ConcurrentCompositeConfiguration.<init>(ConcurrentCompositeConfiguration.java: 172 )
         at com.netflix.config.ConfigurationManager.getConfigInstance(ConfigurationManager.java: 125 )
         at com.netflix.config.DynamicPropertyFactory.getInstance(DynamicPropertyFactory.java: 263 )
         at com.netflix.config.DynamicProperty.getInstance(DynamicProperty.java: 245 )
         at com.netflix.config.PropertyWrapper.<init>(PropertyWrapper.java: 58 )
         at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius$ArchaiusDynamicProperty.<init>(HystrixDynamicPropertiesArchaius.java: 62 )
         at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius$StringDynamicProperty.<init>(HystrixDynamicPropertiesArchaius.java: 73 )
         at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius.getString(HystrixDynamicPropertiesArchaius.java: 34 )
         at com.netflix.hystrix.strategy.HystrixPlugins.getPluginImplementationViaProperties(HystrixPlugins.java: 344 )
         at com.netflix.hystrix.strategy.HystrixPlugins.getPluginImplementation(HystrixPlugins.java: 334 )
         at com.netflix.hystrix.strategy.HystrixPlugins.getPropertiesStrategy(HystrixPlugins.java: 243 )
         at com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory.getCommandProperties(HystrixPropertiesFactory.java: 62 )
         at com.netflix.hystrix.AbstractCommand.initCommandProperties(AbstractCommand.java: 204 )
         at com.netflix.hystrix.AbstractCommand.<init>(AbstractCommand.java: 163 )
         at com.netflix.hystrix.HystrixCommand.<init>(HystrixCommand.java: 147 )
         at com.netflix.hystrix.HystrixCommand.<init>(HystrixCommand.java: 133 )

ConcurrentMapConfiguration.clearConfigurationListeners 方法內容如下:

public  class  ConcurrentMapConfiguration  extends  AbstractConfiguration {
     protected  ConcurrentHashMap<String,Object> map;
     private  Collection<ConfigurationListener> listeners =  new  CopyOnWriteArrayList<ConfigurationListener>(); 
 
     @Override
     public  void  clearConfigurationListeners() {
         listeners.clear();  // NPE異常就在這一行
     }
}

初看代碼,listeners 變量是有初始化語句的,為啥還會是 null 呢?我們繼續往上看 ConcurrentMapConfiguration 的父類,clearConfigurationListeners 方法覆蓋的是父類 EventSource 的同名方法。EventSource 代碼如下:

public  class  EventSource {
     /** A collection for the registered event listeners. */
     private  Collection listeners;
 
     /** A counter for the detail events. */
     private  int  detailEvents;
 
     /**
      * Creates a new instance of <code>EventSource</code>.
      */
     public  EventSource() {
         clearConfigurationListeners();
     }
 
     /**
      * Removes all registered configuration listeners.
      */
     public  void  clearConfigurationListeners() {
         listeners =  new  LinkedList();
     }

 clearConfigurationListeners 方法是在父類的構造函數中調用的,此時子類還沒有做初始化操作,listeners 也還沒有賦值,這就奇葩了。仔細觀察后發現,EventSource 是 commons-configuration 包里的,而 ConcurrentMapConfiguration 是 archaius-core 包(Netflix 開源的基於java的配置管理類庫)的,archaius 中引用的 commons-configuration 是較高的版本 1.8。高版本的 EventSource 代碼有所不同,其構造函數里沒有再調用 clearConfigurationListeners,而是調用 initListeners 方法。升級 pom 中的 commons-configuration 版本后,系統運行正常啦

public  class  EventSource {
     /** A collection for the registered event listeners. */
     private  Collection<ConfigurationListener> listeners;
 
     /**
      * Creates a new instance of {@code EventSource}.
      */
     public  EventSource() {
         initListeners();
     }
 
     /**
      * Initializes the collections for storing registered event listeners.
      */
     private  void  initListeners() {
         listeners =  new  CopyOnWriteArrayList<ConfigurationListener>();
         errorListeners =  new  CopyOnWriteArrayList<ConfigurationErrorListener>();
     }

dynamic-limiter 組件

設計目的

使用 Hystrix 編程必須使用其命令模式,繼承 HystrixCommand 或 HystrixObservableCommand。對於程序中原先采用異步回調模式的代碼,使用 Hystrix 必須改造成同步模式,涉及到系統代碼的大量改造,並且隔離在單獨的線程池中,導致線程切換增加,影響性能。

為此,我們在深入了解 Hystrix 原理之后,決定自己動手寫一個降級熔斷工具 dynamic-limiter,提供以下兩種功能:

  1.  簡化給異步回調模式的代碼增加降級熔斷的功能的步驟。主要做法是,不再采用 Hystrix 僵硬的命令模式,分離服務調用結果的監控和熔斷判斷邏輯。
  2.  針對接收消息->獲取資源→進入計算隊列的編程模式,設計動態限流組件,邏輯如下圖所示:

    3. 監控系統負載,動態設置限流閾值,原理如下:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM