限流与熔断


概述

 

为了保证系统不被突发流量击垮,进行容量保障是十分有必要的。从架构的稳定性角度看,在有限资源的情况下,所能提供的单位时间服务能力也是有限的。假如超过承受能力,可能会带来整个服务的停顿,应用的Crash,进而可能将风险传递给服务调用方造成整个系统的服务能力丧失,进而引发雪崩。

 

为了避免系统压力大时引发服务雪崩,就需要在系统中引入限流,降级和熔断等工具。

 

目的

 

最初在基础数据内部,我们使用了限流组件,然后调研了降级熔断框架Hystrix,发现这是一个很好的框架,能够提升依赖大量外部服务的系统的容错能力,但是不适合依赖比较简单的基础数据系统。

 

机票交易系统依赖诸多子系统,比如保险,各种X产品等。故而,在此简单介绍一下,推广给交易和报价的同学使用。

 

限流

 

根据排队理论,具有延迟的服务随着请求量的不断提升,其平均响应时间也会迅速提升,为了保证服务的SLA,有必要控制单位时间的请求量。这就是限流为什么愈发重要的原因。

 

分类

 

qps限流

 

限制每秒处理请求数不超过阈值。

 

并发限流

 

限制同时处理的请求数目。Java 中的 Semaphore 是做并发限制的好工具,特别适用于资源有效的场景。

 

单机限流

 

Guava 中的 RateLimiter。

 

集群限流

 

TC 提供的 common-blocking 组件提供此功能。

 

算法

 

漏桶算法

 

漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

 

 

旗舰店运价库抓取调度

 

在旗舰店运价库对航司报价的离线抓取中,由于航司接口有qps限制,需要限制访问速率。所以我们借助redis队列作为漏桶,实现了对航司接口访问速率的控制。

 

 

令牌桶算法

 

对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。
令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

 

 

在 Guava 的 RateLimiter 中,使用的就是令牌桶算法,允许部分突发流量传输。在其源码里,可以看到能够突发传输的流量等于 maxBurstSeconds * qps。

 

/**
  * This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling.
  * The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in
  * terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10
  * seconds, we can save up to 2 * 10 = 20 permits.
  */
static  final  class  SmoothBursty  extends  SmoothRateLimiter {
   /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
   final  double  maxBurstSeconds;
 
   SmoothBursty(SleepingStopwatch stopwatch,  double  maxBurstSeconds) {
     super (stopwatch);
     this .maxBurstSeconds = maxBurstSeconds;
   }

 

Guava 里除了允许突发传输的 SmoothBursty, 还有于此相反的,可以缓慢启动的 SmoothWarmingUp。

 

滑动窗口

 

  1. TC 提供的 common-blocking 限流组件,背后采用滑动窗口来计算流量是否超出限制。
  2. 熔断框架 Hystrix 也采用滑动窗口来统计服务调用信息。

 

RxJava 实现滑动窗口计数

 

RxJava 是一个响应式函数编程框架,以观察者模式为骨架,能够轻松实现滑动窗口计数功能。具体示例代码如下:

 

package  com.rxjava.test;
 
import  org.slf4j.Logger;
import  org.slf4j.LoggerFactory;
import  rx.Observable;
import  rx.functions.Func1;
import  rx.functions.Func2;
import  rx.subjects.PublishSubject;
import  rx.subjects.SerializedSubject;
 
import  java.util.concurrent.TimeUnit;
 
/**
  * 模拟滑动窗口计数
  * Created by albon on 17/6/24.
  */
public  class  RollingWindowTest {
     private  static  final  Logger logger = LoggerFactory.getLogger(WindowTest. class );
 
     public  static  final  Func2<Integer, Integer, Integer> INTEGER_SUM =
             (integer, integer2) -> integer + integer2;
 
     public  static  final  Func1<Observable<Integer>, Observable<Integer>> WINDOW_SUM =
             window -> window.scan( 0 , INTEGER_SUM).skip( 3 );
 
     public  static  final  Func1<Observable<Integer>, Observable<Integer>> INNER_BUCKET_SUM =
             integerObservable -> integerObservable.reduce( 0 , INTEGER_SUM);
 
     public  static  void  main(String[] args)  throws  InterruptedException {
         PublishSubject<Integer> publishSubject = PublishSubject.create();
         SerializedSubject<Integer, Integer> serializedSubject = publishSubject.toSerialized();
 
         serializedSubject
                 .window( 5 , TimeUnit.SECONDS)  // 5秒作为一个基本块
                 .flatMap(INNER_BUCKET_SUM)            // 基本块内数据求和
                 .window( 3 1 )               // 3个块作为一个窗口,滚动布数为1
                 .flatMap(WINDOW_SUM)                  // 窗口数据求和
                 .subscribe((Integer integer) ->
                         logger.info( "[{}] call ...... {}" ,
                         Thread.currentThread().getName(), integer));
 
         // 缓慢发送数据,观察效果
         for  ( int  i= 0 ; i< 100 ; ++i) {
             if  (i <  30 ) {
                 serializedSubject.onNext( 1 );
             else  {
                 serializedSubject.onNext( 2 );
             }
             Thread.sleep( 1000 );
         }
     }
}

 

降级和熔断

 

降级

 

业务降级,是指牺牲非核心的业务功能,保证核心功能的稳定运行。简单来说,要实现优雅的业务降级,需要将功能实现拆分到相对独立的不同代码单元,分优先级进行隔离。在后台通过开关控制,降级部分非主流程的业务功能,减轻系统依赖和性能损耗,从而提升集群的整体吞吐率。

 

降级的重点是:业务之间有优先级之分。

 

降级的典型应用是:电商活动期间。

 

熔断

 

老式电闸都安装了保险丝,一旦有人使用超大功率的设备,保险丝就会烧断以保护各个电器不被强电流给烧坏。同理我们的接口也需要安装上“保险丝”,以防止非预期的请求对系统压力过大而引起的系统瘫痪,当流量过大时,可以采取拒绝或者引流等机制。

 

同样在分布式系统中,当被调用的远程服务无法使用时,如果没有过载保护,就会导致请求的资源阻塞在远程服务器上耗尽资源。很多时候,刚开始可能只是出现了局部小规模的故障,然而由于种种原因,故障影响范围越来越大,最终导致全局性的后果。这种过载保护,就是熔断器。

 

熔断器的设计思路

 

 

  1. Closed:初始状态,熔断器关闭,正常提供服务
  2. Open: 失败次数,失败百分比达到一定的阈值之后,熔断器打开,停止访问服务
  3. Half-Open:熔断一定时间之后,小流量尝试调用服务,如果成功则恢复,熔断器变为Closed状态

 

降级熔断相似点

 

  1. 目的一致,都是从可用性可靠性着想,为防止系统的整体缓慢甚至崩溃,采用的技术手段
  2. 最终表现类似,对于两者来说,最终让用户体验到的是某些功能暂时不可达或不可用
  3. 粒度一般都是服务级别
  4. 自治性要求很高,熔断模式一般都是服务基于策略的自动触发,降级虽说可人工干预,但在微服务架构下,完全靠人显然不可能,开关预置、配置中心都是必要手段

 

降级熔断区别

 

  1. 触发原因不一样,服务熔断一般是某个服务(下游服务)故障引起,而服务降级一般是从整体负荷考虑
  2. 自愈能力要求不一样,服务熔断在发生后有自愈能力,而服务降级没有该职责

 

相关框架组件

 

common-blocking 限流组件

 

http://wiki.corp.qunar.com/display/devwiki/common-blocking

 

优点

 

  1. qconfig 控制,灵活方便
  2. 支持同步/异步模式,异步模式不增加接口响应时间
  3. 支持HTTP, Dubbo接口
  4. 集群级别的限流
  5. 支持自定义更细的限流粒度

 

dubbo 熔断

 

http://wiki.corp.qunar.com/pages/viewpage.action?pageId=105912517

 

 

hystrix 降级熔断

 

适用对象

 

依赖大量外部服务的业务

 

 

主要功能

 

  1. 服务故障时,触发熔断,快速失败,而非排队处理,占用系统资源
  2. 支持线程池隔离,避免个别依赖服务耗光线程而影响其他服务
  3. 提供兜底策略,以尽量减少失败
  4. 通过限制线程池数目,或者信号量最大并发数,可以达到限流的目的
  5. 支持对结果进行Cache

 


图中流程的说明:

 

  1. 将远程服务调用逻辑封装进一个HystrixCommand。
  2. 对于每次服务调用可以使用同步或异步机制,对应执行execute()或queue()。
  3. 判断熔断器(circuit-breaker)是否打开或者半打开状态,如果打开跳到步骤8,进行回退策略,如果关闭进入步骤4。
  4. 判断线程池/队列/信号量(使用了舱壁隔离模式)是否跑满,如果跑满进入回退步骤8,否则继续后续步骤5。
  5. run方法中执行了实际的服务调用。
    1. 服务调用发生超时时,进入步骤8。
  6. 判断run方法中的代码是否执行成功。
    1. 执行成功返回结果。
    2. 执行中出现错误则进入步骤8。
  7. 所有的运行状态(成功,失败,拒绝,超时)上报给熔断器,用于统计从而影响熔断器状态。
  8. 进入getFallback()回退逻辑。
    1. 没有实现getFallback()回退逻辑的调用将直接抛出异常。
    2. 回退逻辑调用成功直接返回。
    3. 回退逻辑调用失败抛出异常。
  9. 返回执行成功结果。

 

命令模式

 

hystrix 使用命令模式对服务调用进行封装,要想使用 hystrix,只需要继承 HystrixCommand 即可。

 

public  class  CommandHelloFailure  extends  HystrixCommand<String> {
 
     public  CommandHelloFailure() {
        super (HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(HystrixCommandEnum.AV_QUERY.GROUP.KEY))
                 .andCommandKey(HystrixCommandKey.Factory.asKey(HystrixCommandEnum.AV_QUERY.KEY))
                 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey( "AvPool" ))
                 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                         .withCoreSize( 2 ) //ThreadPoolExecutor maximumPoolSize=maxnumsize=10
                         .withMaxQueueSize(- 1 ) //线程打满后,直接快速拒绝。
                 )
                 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                         .withExecutionTimeoutInMilliseconds( 10000 ) //run方法执行时间,超过一秒就会被cancel.
                         .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                         .withCircuitBreakerRequestVolumeThreshold( 30 )
                         .withCircuitBreakerErrorThresholdPercentage( 50 ) //错误率达到50%后,开启熔断.
                         .withCircuitBreakerSleepWindowInMilliseconds( 5000 ) //熔断后休眠5000毫秒后发起重试.
                 ));         this .name = name;
     }
 
     @Override
     protected  String run() {
         throw  new  RuntimeException( "this command always fails" );
     }
 
     @Override
     protected  String getFallback() {
         return  "Hello Failure "  + name +  "!" ;
     }
}

 

HystrixCommand有4个方法可供调用

 

方法名

返回值

含义

execute

run方法的返回值

同步阻塞执行

queue

Future

异步非阻塞执行

observe

Observable<?>

事件注册前执行

toObservable

Observable<?>

事件注册后执行

 

熔断判断逻辑关键源代码

 

类名

说明

HystrixCircuitBreaker

熔断判断类

HealthCountsStream

Command 执行结果统计类,主要提供了 HealthCounts 对象作为统计结果,用于 HystrixCircuitBreaker 中进行熔断判断

BucketedRollingCounterStream 
BucketedCounterStream

滑动窗口计数类

HystrixCommandCompletionStream

Command 执行结果数据发布类

 

无线同学封装的 qhystrix

 

无线平台的同学,对hystrix进行了适当的封装,能够和公司的公共组件进行配合使用,十分方便。

 

  1. 引入了 QConfig 动态配置,方便线上灵活修改服务配置,更多配置相关资料可以查看官方文档 。

    scheduledMonitorOpen= true
    #av_pool线程池核心线程数目
    hystrix.threadpool.av_pool.coreSize= 10
    #av_pool线程池队列大小
    hystrix.threadpool.av_pool.queueSizeRejectionThreshold= 2
    #执行策略,以信号量/线程池的方式执行
    hystrix.command.av_query.execution.isolation.strategy=THREAD
    #超时是否启用
    hystrix.command.av_query.execution.timeout.enabled= true
    #当超时的时候是否中断(interrupt) HystrixCommand.run()执行
    hystrix.command.av_query.execution.isolation.thread.interruptOnTimeout= true
    #超时时间,支持线程池/信号量
    hystrix.command.av_query.execution.isolation.thread.timeoutInMilliseconds= 10000
    #统计执行数据的时间窗口,单位毫秒【此配置修改后需要重启才能生效】
    hystrix.command.av_query.metrics.rollingStats.timeInMilliseconds= 10000
    #触发熔断器开启的窗口时间需要达到的最小请求数
    hystrix.command.av_query.circuitBreaker.requestVolumeThreshold= 10
    #熔断器保持开启状态时间。默认为 5000 毫秒,则熔断器中断请求 5 秒后会进入半打开状态,放部分流量过去重试
    hystrix.command.av_query.circuitBreaker.sleepWindowInMilliseconds= 10000
    #窗口时间内的出错率,默认为 50 ,则达到 50 %出错率时,熔断开启
    hystrix.command.av_query.circuitBreaker.errorThresholdPercentage= 50
  2. 引入了 Watcher 监控,方便观察服务运行情况,以及添加报警。

    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SUCCESS_Count= 6
    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SHORT_CIRCUITED_Time= 0
    hystrix.CommandKey_av_query.cluster.RT_FALLBACK_SUCCESS_Count= 64
    JVM_Thread_Count= 147
    hystrix.CommandKey_av_query.cluster.RT_SUCCESS_Time= 0
    hystrix.CommandKey_av_query.cluster.RT_FAILURE_Count= 25
    JVM_PS_MarkSweep_Count= 0
    Fare_Rule_Change_Task_Queue_Size_Value= 0
    hystrix.CommandKey_av_query.cluster.RT_SHORT_CIRCUITED_Time= 0
    hystrix.CommandKey_av_query.cluster.RT_SHORT_CIRCUITED_Count= 39
    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FAILURE_Count= 25
    hystrix.CommandKey_av_query.cluster.RT_SUCCESS_Count= 6
    hystrix.CommandKey_av_query.cluster.RT_FALLBACK_SUCCESS_Time= 0
    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FALLBACK_SUCCESS_Time= 0
    Av_Change_Task_Queue_Size_Value= 0
    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FALLBACK_SUCCESS_Count= 64
    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FAILURE_Time= 0
    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SUCCESS_Time= 0
    no_y_email_task_Time= 0
    no_y_queue_empty_Count= 0
    hystrix.CommandKey_av_query.cluster.RT_FAILURE_Time= 0
    JVM_PS_Scavenge_Count= 0
    no_y_email_task_Count= 0
    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SHORT_CIRCUITED_Count= 39
    no_y_queue_empty_Time= 0

 

注解驱动开发

 

注解是 Java 语言的一大优势,很多开发框架比如 Spring, Hibernate 都将这一优势使用到了极致。所以呢,官方 也提供了基于注解进行开发的 hystrix-javanica 包,能够减少大量繁琐代码的编写。

 

<dependency>
     <groupId>com.netflix.hystrix</groupId>
     <artifactId>hystrix-javanica</artifactId>
     <version> 1.5 . 11 </version>
</dependency>

 

<aop:aspectj-autoproxy/>
<bean id= "hystrixAspect"  class = "com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect" ></bean>

 

在POM引入jar包,并且配置spring aop即可。示例代码如下:

 

package  com.qunar.flight.farecore.storage.service.av;
 
import  com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import  com.netflix.hystrix.strategy.HystrixPlugins;
import  com.qunar.mobile.hystrix.config.HystrixConfig;
import  com.qunar.mobile.hystrix.config.QconfigUtils;
import  com.qunar.mobile.hystrix.monitor.HystrixScheduledMonitor;
import  com.qunar.mobile.hystrix.monitor.QunarHystrixCommandExecutionHook;
import  org.slf4j.Logger;
import  org.slf4j.LoggerFactory;
import  org.springframework.stereotype.Service;
import  qunar.tc.qconfig.client.Configuration;
 
import  javax.annotation.PostConstruct;
import  java.util.Map;
 
/**
  * @author albon
  *         Date: 17-6-30
  *         Time: 上午10:44
  */
@Service
public  class  TestHystrixService {
     private  static  final  Logger logger = LoggerFactory.getLogger(TestHystrixService. class );
 
     @PostConstruct
     public  void  init() {
         logger.info( "init execution hook" );
 
         HystrixPlugins.getInstance().registerCommandExecutionHook( new  QunarHystrixCommandExecutionHook());
         QconfigUtils.getInstance().getHystrixMapConfig()
                 .addListener( new  Configuration.ConfigListener<Map<String, String>>() {
                     @Override
                     public  void  onLoad(Map<String, String> conf) {
                         boolean  isOpen = Boolean.parseBoolean(conf.get( "scheduledMonitorOpen" ));
                         HystrixScheduledMonitor.getInstance().setOpen(isOpen);
 
                         // qconfig热发修改配置
                         HystrixConfig.getInstance().setConfig(conf);
                     }
                 });
     }
 
     @HystrixCommand (commandKey =  "av_query" , groupKey =  "av_group" , threadPoolKey =  "av_pool" , fallbackMethod =  "queryFallback" )
     public  String query() {
         return  String.format( "[%s] RUN SUCCESS" , Thread.currentThread().getName());
     }
 
     public  String queryFallback() {
         return  String.format( "[%s] FALLBACK" , Thread.currentThread().getName());
     }
 
}

 

注意:因为注解方式不再继承quanr-hystrix里的AbstractHystrixCommand,故这里特意在 PostContruct 方法里,注册了 Hook 类,以及对 QConfig 配置进行监听。

 

在使用时,曾经遇到一个小问题,异常信息如下:

 

Caused by: org.aspectj.apache.bcel.classfile.ClassFormatException: Invalid  byte  tag in constant pool:  18
     at org.aspectj.apache.bcel.classfile.ClassParser.readConstantPool(ClassParser.java: 192 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.apache.bcel.classfile.ClassParser.parse(ClassParser.java: 131 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadJavaClass(NonCachingClassLoaderRepository.java: 262 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadClass(NonCachingClassLoaderRepository.java: 242 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadClass(NonCachingClassLoaderRepository.java: 249 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.weaver.reflect.Java15AnnotationFinder.getAnnotations(Java15AnnotationFinder.java: 202 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.weaver.reflect.ReflectionBasedResolvedMemberImpl.unpackAnnotations(ReflectionBasedResolvedMemberImpl.java: 211 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.weaver.reflect.ReflectionBasedResolvedMemberImpl.hasAnnotation(ReflectionBasedResolvedMemberImpl.java: 163 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.weaver.patterns.ExactAnnotationTypePattern.matches(ExactAnnotationTypePattern.java: 109 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]
     at org.aspectj.weaver.patterns.ExactAnnotationTypePattern.matches(ExactAnnotationTypePattern.java: 96 ) ~[aspectjweaver- 1.6 . 10 .jar: 1.6 . 10 ]

 

通过升级aspectjweaver包到最新版解决

 

<dependency>
     <groupId>org.aspectj</groupId>
     <artifactId>aspectjweaver</artifactId>
     <version> 1.8 . 10 </version>
</dependency>

 

更详细的资料可以查看官方文档

 

资料

 

qunar-hystrixhttp://gitlab.corp.qunar.com/mobile_public/qunar-hystrix/wikis/start
官方文档https://github.com/Netflix/Hystrix/wiki
javanica文档 https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-javanica

使用中遇到的坑

commons-configuration 包版本低引起的 NPE 异常

报价同学在其系统中引入时,遇到了很奇怪的 NPE 异常

java.lang.NullPointerException
         at com.netflix.config.ConcurrentMapConfiguration.clearConfigurationListeners(ConcurrentMapConfiguration.java: 330 )
         at org.apache.commons.configuration.event.EventSource.<init>(EventSource.java: 76 )
         at org.apache.commons.configuration.AbstractConfiguration.<init>(AbstractConfiguration.java: 63 )
         at com.netflix.config.ConcurrentMapConfiguration.<init>(ConcurrentMapConfiguration.java: 68 )
         at com.netflix.config.ConcurrentCompositeConfiguration.<init>(ConcurrentCompositeConfiguration.java: 172 )
         at com.netflix.config.ConfigurationManager.getConfigInstance(ConfigurationManager.java: 125 )
         at com.netflix.config.DynamicPropertyFactory.getInstance(DynamicPropertyFactory.java: 263 )
         at com.netflix.config.DynamicProperty.getInstance(DynamicProperty.java: 245 )
         at com.netflix.config.PropertyWrapper.<init>(PropertyWrapper.java: 58 )
         at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius$ArchaiusDynamicProperty.<init>(HystrixDynamicPropertiesArchaius.java: 62 )
         at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius$StringDynamicProperty.<init>(HystrixDynamicPropertiesArchaius.java: 73 )
         at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius.getString(HystrixDynamicPropertiesArchaius.java: 34 )
         at com.netflix.hystrix.strategy.HystrixPlugins.getPluginImplementationViaProperties(HystrixPlugins.java: 344 )
         at com.netflix.hystrix.strategy.HystrixPlugins.getPluginImplementation(HystrixPlugins.java: 334 )
         at com.netflix.hystrix.strategy.HystrixPlugins.getPropertiesStrategy(HystrixPlugins.java: 243 )
         at com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory.getCommandProperties(HystrixPropertiesFactory.java: 62 )
         at com.netflix.hystrix.AbstractCommand.initCommandProperties(AbstractCommand.java: 204 )
         at com.netflix.hystrix.AbstractCommand.<init>(AbstractCommand.java: 163 )
         at com.netflix.hystrix.HystrixCommand.<init>(HystrixCommand.java: 147 )
         at com.netflix.hystrix.HystrixCommand.<init>(HystrixCommand.java: 133 )

ConcurrentMapConfiguration.clearConfigurationListeners 方法内容如下:

public  class  ConcurrentMapConfiguration  extends  AbstractConfiguration {
     protected  ConcurrentHashMap<String,Object> map;
     private  Collection<ConfigurationListener> listeners =  new  CopyOnWriteArrayList<ConfigurationListener>(); 
 
     @Override
     public  void  clearConfigurationListeners() {
         listeners.clear();  // NPE异常就在这一行
     }
}

初看代码,listeners 变量是有初始化语句的,为啥还会是 null 呢?我们继续往上看 ConcurrentMapConfiguration 的父类,clearConfigurationListeners 方法覆盖的是父类 EventSource 的同名方法。EventSource 代码如下:

public  class  EventSource {
     /** A collection for the registered event listeners. */
     private  Collection listeners;
 
     /** A counter for the detail events. */
     private  int  detailEvents;
 
     /**
      * Creates a new instance of <code>EventSource</code>.
      */
     public  EventSource() {
         clearConfigurationListeners();
     }
 
     /**
      * Removes all registered configuration listeners.
      */
     public  void  clearConfigurationListeners() {
         listeners =  new  LinkedList();
     }

 clearConfigurationListeners 方法是在父类的构造函数中调用的,此时子类还没有做初始化操作,listeners 也还没有赋值,这就奇葩了。仔细观察后发现,EventSource 是 commons-configuration 包里的,而 ConcurrentMapConfiguration 是 archaius-core 包(Netflix 开源的基于java的配置管理类库)的,archaius 中引用的 commons-configuration 是较高的版本 1.8。高版本的 EventSource 代码有所不同,其构造函数里没有再调用 clearConfigurationListeners,而是调用 initListeners 方法。升级 pom 中的 commons-configuration 版本后,系统运行正常啦

public  class  EventSource {
     /** A collection for the registered event listeners. */
     private  Collection<ConfigurationListener> listeners;
 
     /**
      * Creates a new instance of {@code EventSource}.
      */
     public  EventSource() {
         initListeners();
     }
 
     /**
      * Initializes the collections for storing registered event listeners.
      */
     private  void  initListeners() {
         listeners =  new  CopyOnWriteArrayList<ConfigurationListener>();
         errorListeners =  new  CopyOnWriteArrayList<ConfigurationErrorListener>();
     }

dynamic-limiter 组件

设计目的

使用 Hystrix 编程必须使用其命令模式,继承 HystrixCommand 或 HystrixObservableCommand。对于程序中原先采用异步回调模式的代码,使用 Hystrix 必须改造成同步模式,涉及到系统代码的大量改造,并且隔离在单独的线程池中,导致线程切换增加,影响性能。

为此,我们在深入了解 Hystrix 原理之后,决定自己动手写一个降级熔断工具 dynamic-limiter,提供以下两种功能:

  1.  简化给异步回调模式的代码增加降级熔断的功能的步骤。主要做法是,不再采用 Hystrix 僵硬的命令模式,分离服务调用结果的监控和熔断判断逻辑。
  2.  针对接收消息->获取资源→进入计算队列的编程模式,设计动态限流组件,逻辑如下图所示:

    3. 监控系统负载,动态设置限流阈值,原理如下:


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM