sentinel自定義統一限流降級處理


在使用sentinel進行限流降級處理時,sentinel在發生blockException時默認返回僅僅是一句Blocked by Sentinel (flow limiting),而我們大部分的應用一般都會統一返回一個固定json格式的數據,因此需要對spring-cloud-starter-alibaba-sentinel中的部分源碼進行一些擴展來滿足需求。

1.依賴

        <dependency>
            <groupId>com.alibaba.csp</groupId>
            <artifactId>sentinel-datasource-apollo</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
            <version>2.1.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <dependency>
            <groupId>io.github.openfeign</groupId>
            <artifactId>feign-okhttp</artifactId>
        </dependency>

2.配置

在apollo控制台中修改sentinel.flowRules及sentinel.degradeRules並發布,可以動態修改限流降級規則

連接sentinel-dashboard,非必須
spring.cloud.sentinel.transport.dashboard=http://127.0.0.1:8829

#配置apollo作為規則存儲的數據源
#流控規則key設置
spring.cloud.sentinel.datasource.ds.apollo.namespace-name = application
spring.cloud.sentinel.datasource.ds.apollo.flow-rules-key = sentinel.flowRules
spring.cloud.sentinel.datasource.ds.apollo.default-flow-rule-value = test
spring.cloud.sentinel.datasource.ds.apollo.rule-type=flow
#降級規則key設置
spring.cloud.sentinel.datasource.ds1.apollo.namespace-name = application
spring.cloud.sentinel.datasource.ds1.apollo.flow-rules-key = sentinel.degradeRules
spring.cloud.sentinel.datasource.ds1.apollo.default-flow-rule-value = test
spring.cloud.sentinel.datasource.ds1.apollo.rule-type=degrade

#每個字段的具體意思參考sentinel官網
sentinel.flowRules = [{"strategy":0,"grade":1,"controlBehavior":0,"resource":"POST:http://demo-service/test","limitApp":"default","count":3}]

sentinel.degradeRules = [{"resource":"POST:http://demo-service/test","limitApp":"default","grade":2,"count":4,"timeWindow":12,"strategy":0,"controlBehavior":0,"clusterMode":false}]

#添加sentinel攔截器,攔截所有url作為埋點資源,關閉后則不能對server端進行降級限流。默認為true
spring.cloud.sentinel.filter.enabled=true

#自定義sentinel feign限流開關,默認為true
creis.feign.sentinel.enabled=true


#關閉懶加載,體現在項目啟動后dashboard就能看到
spring.cloud.sentinel.eager=true


3.sentinel-dashboard安裝

參考官網

4.源碼改造

1.server端統一限流降級返回值

  • 原理

    這里給出server端限流原理的源碼查看流程,可以看出spring-cloud-starter-alibaba-sentinel中自動裝配了攔截器來攔截所有http請求,最終的異常處理類是BlockExceptionHandler。

    SentinelWebAutoConfiguration ->SentinelWebInterceptor->AbstractSentinelInterceptor->BaseWebMvcConfig->BlockExceptionHandler
    

    sentinel給了一個默認實現類,這也就是我們看到的"Blocked by Sentinel (flow limiting)"。

    public class DefaultBlockExceptionHandler implements BlockExceptionHandler {
        public DefaultBlockExceptionHandler() {
        }
    
        public void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception {
            response.setStatus(429);
            StringBuffer url = request.getRequestURL();
            if ("GET".equals(request.getMethod()) && StringUtil.isNotBlank(request.getQueryString())) {
                url.append("?").append(request.getQueryString());
            }
    
            PrintWriter out = response.getWriter();
            out.print("Blocked by Sentinel (flow limiting)");
            out.flush();
            out.close();
        }
    }
    
  • 改造

    定義自己的異常處理類並加入的spring容器中

    @Component
    public class MyBlockExceptionHandler implements BlockExceptionHandler {
        private BlockExceptionUtil blockExceptionUtil;
    
        public MyBlockExceptionHandler(BlockExceptionUtil blockExceptionUtil) {
            this.blockExceptionUtil = blockExceptionUtil;
        }
        public MyBlockExceptionHandler() {
        }
    
        @Override
        public void handle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, BlockException e) throws Exception {
            BaseDtoResponse baseDtoResponse = blockExceptionUtil.getResponseDto(e, null);
            httpServletResponse.setStatus(HttpStatus.OK.value());
            httpServletResponse.setCharacterEncoding("UTF-8");
            httpServletResponse.setContentType("application/json;charset=utf-8");
            httpServletResponse.setHeader("Content-Type","application/json;charset=utf-8");
            new ObjectMapper().writeValue(httpServletResponse.getWriter(),baseDtoResponse);
        }
    }
    

2.restTemplate統一限流降級返回值

sentinel官方給了一個@SentinelRestTemplate 用以支持對 RestTemplate 的服務調用使用 Sentinel 進行保護,核心代碼在org.springframework.cloud.alibaba.sentinel.custom.SentinelBeanPostProcessor中,實現了MergedBeanDefinitionPostProcessor接口,MergedBeanDefinitionPostProcessor接口實現了BeanPostProcessor接口。核心方法就是重寫的postProcessMergedBeanDefinition和postProcessAfterInitialization來對原生的RestTemplate 進行增強。如果想要對其統一自定義返回值處理進行會相對復雜,因此這里采取另一種手段,對@SentinelResource做一些改造來實現。

  • 原理

    SentinelAutoConfiguration ->SentinelResourceAspect
    
  • 改造

    繼承SentinelResourceAspect重寫其默認處理

    @Component
    public class MySentinelResourceAspect extends SentinelResourceAspect {
    
        @Resource
        private BlockExceptionUtil blockExceptionUtil;
    
        @Override
        protected Object handleDefaultFallback(ProceedingJoinPoint pjp, String defaultFallback, Class<?>[] fallbackClass, Throwable e) throws Throwable {
            Method method = resolveMethod(pjp);
            if (StringUtil.isBlank(defaultFallback)) {
                //默認處理邏輯
                Class<?> returnType = method.getReturnType();
                BaseDtoResponse baseDtoResponse = blockExceptionUtil.getResponseDto(e, returnType);
                return baseDtoResponse;
            }else{
                return super.handleDefaultFallback(pjp, defaultFallback, fallbackClass, e);
            }
        }
    
    }
    
    

3.feign統一限流降級返回值

sentinel對feign進行了增強,利用動態代理生成增強類來進行流控降級

  • 原理

    SentinelFeignAutoConfiguration ->SentinelFeign ->SentinelInvocationHandler
    
  • 改造

     @Bean
        @Scope("prototype")
        @ConditionalOnClass({SphU.class, Feign.class})
        @ConditionalOnMissingBean
        @ConditionalOnProperty(name = {"creis.feign.sentinel.enabled"},matchIfMissing = true)
        @Primary
        public Feign.Builder feignSentinelBuilder() {
            return CloudSentinelFeign.builder();
        }
    
    public class CloudSentinelFeign {
    
        private CloudSentinelFeign() {
        }
    
        public static CloudSentinelFeign.Builder builder() {
            return new CloudSentinelFeign.Builder();
        }
    
        public static final class Builder extends Feign.Builder implements ApplicationContextAware {
            private Contract contract = new Contract.Default();
            private ApplicationContext applicationContext;
            private FeignContext feignContext;
    
            public Builder() {
            }
    
            @Override
            public Feign.Builder invocationHandlerFactory(InvocationHandlerFactory invocationHandlerFactory) {
                throw new UnsupportedOperationException();
            }
    
            @Override
            public CloudSentinelFeign.Builder contract(Contract contract) {
                this.contract = contract;
                return this;
            }
    
            @Override
            public Feign build() {
                super.invocationHandlerFactory(new InvocationHandlerFactory() {
                    @Override
                    public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
                        Object feignClientFactoryBean = CloudSentinelFeign.Builder.this.applicationContext.getBean("&" + target.type().getName());
                        Class fallback = (Class) CloudSentinelFeign.Builder.this.getFieldValue(feignClientFactoryBean, "fallback");
                        Class fallbackFactory = (Class) CloudSentinelFeign.Builder.this.getFieldValue(feignClientFactoryBean, "fallbackFactory");
                        String beanName = (String) CloudSentinelFeign.Builder.this.getFieldValue(feignClientFactoryBean, "contextId");
                        if (!StringUtils.hasText(beanName)) {
                            beanName = (String) CloudSentinelFeign.Builder.this.getFieldValue(feignClientFactoryBean, "name");
                        }
    
                        if (Void.TYPE != fallback) {
                            Object fallbackInstance = this.getFromContext(beanName, "fallback", fallback, target.type());
                            return new CloudSentinelInvocationHandler(target, dispatch, new FallbackFactory.Default(fallbackInstance));
                        } else if (Void.TYPE != fallbackFactory) {
                            FallbackFactory fallbackFactoryInstance = (FallbackFactory)this.getFromContext(beanName, "fallbackFactory", fallbackFactory, FallbackFactory.class);
                            return new CloudSentinelInvocationHandler(target, dispatch, fallbackFactoryInstance);
                        } else {
                            return new CloudSentinelInvocationHandler(target, dispatch);
                        }
                    }
    
                    private Object getFromContext(String name, String type, Class fallbackType, Class targetType) {
                        Object fallbackInstance = CloudSentinelFeign.Builder.this.feignContext.getInstance(name, fallbackType);
                        if (fallbackInstance == null) {
                            throw new IllegalStateException(String.format("No %s instance of type %s found for feign client %s", type, fallbackType, name));
                        } else if (!targetType.isAssignableFrom(fallbackType)) {
                            throw new IllegalStateException(String.format("Incompatible %s instance. Fallback/fallbackFactory of type %s is not assignable to %s for feign client %s", type, fallbackType, targetType, name));
                        } else {
                            return fallbackInstance;
                        }
                    }
                });
                super.contract(new SentinelContractHolder(this.contract));
                return super.build();
            }
    
            private Object getFieldValue(Object instance, String fieldName) {
                Field field = ReflectionUtils.findField(instance.getClass(), fieldName);
                field.setAccessible(true);
    
                try {
                    return field.get(instance);
                } catch (IllegalAccessException var5) {
                    return null;
                }
            }
    
            @Override
            public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
                this.applicationContext = applicationContext;
                this.feignContext = (FeignContext)this.applicationContext.getBean(FeignContext.class);
            }
        }
    }
    
    public class CloudSentinelInvocationHandler implements InvocationHandler {
    
        private final Target<?> target;
        private final Map<Method, InvocationHandlerFactory.MethodHandler> dispatch;
        private FallbackFactory fallbackFactory;
        private Map<Method, Method> fallbackMethodMap;
    
        CloudSentinelInvocationHandler(Target<?> target, Map<Method, InvocationHandlerFactory.MethodHandler> dispatch, FallbackFactory fallbackFactory) {
            this.target = (Target) Util.checkNotNull(target, "target", new Object[0]);
            this.dispatch = (Map)Util.checkNotNull(dispatch, "dispatch", new Object[0]);
            this.fallbackFactory = fallbackFactory;
            this.fallbackMethodMap = toFallbackMethod(dispatch);
        }
    
        CloudSentinelInvocationHandler(Target<?> target, Map<Method, InvocationHandlerFactory.MethodHandler> dispatch) {
            this.target = (Target)Util.checkNotNull(target, "target", new Object[0]);
            this.dispatch = (Map)Util.checkNotNull(dispatch, "dispatch", new Object[0]);
        }
    
        @Override
        public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
            if ("equals".equals(method.getName())) {
                try {
                    Object otherHandler = args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
                    return this.equals(otherHandler);
                } catch (IllegalArgumentException var21) {
                    return false;
                }
            } else if ("hashCode".equals(method.getName())) {
                return this.hashCode();
            } else if ("toString".equals(method.getName())) {
                return this.toString();
            } else {
                InvocationHandlerFactory.MethodHandler methodHandler = (InvocationHandlerFactory.MethodHandler)this.dispatch.get(method);
                Object result;
                if (!(this.target instanceof Target.HardCodedTarget)) {
                    result = methodHandler.invoke(args);
                } else {
                    Target.HardCodedTarget hardCodedTarget = (Target.HardCodedTarget)this.target;
                    MethodMetadata methodMetadata = (MethodMetadata) SentinelContractHolder.METADATA_MAP.get(hardCodedTarget.type().getName() + Feign.configKey(hardCodedTarget.type(), method));
                    if (methodMetadata == null) {
                        result = methodHandler.invoke(args);
                    } else {
                        String resourceName = methodMetadata.template().method().toUpperCase() + ":" + hardCodedTarget.url() + methodMetadata.template().path();
                        Entry entry = null;
    
                        Object var12;
                        try {
                            Throwable ex;
                            try {
                                ContextUtil.enter(resourceName);
                                entry = SphU.entry(resourceName, EntryType.OUT, 1, args);
                                result = methodHandler.invoke(args);
                                return result;
                            } catch (Throwable var22) {
                                ex = var22;
                                if (!BlockException.isBlockException(var22)) {
                                    Tracer.trace(var22);
                                }
                            }
    
                            if (this.fallbackFactory == null) {
                                //加入默認處理邏輯
                                BlockExceptionUtil blockExceptionUtil = (BlockExceptionUtil) AppContextUtil.getBean("blockExceptionUtil");
                                return blockExceptionUtil.getResponseDto(ex, method.getReturnType());
                            }
    
                            try {
                                Object fallbackResult = ((Method)this.fallbackMethodMap.get(method)).invoke(this.fallbackFactory.create(ex), args);
                                var12 = fallbackResult;
                            } catch (IllegalAccessException var19) {
                                throw new AssertionError(var19);
                            } catch (InvocationTargetException var20) {
                                throw new AssertionError(var20.getCause());
                            }
                        } finally {
                            if (entry != null) {
                                entry.exit(1, args);
                            }
    
                            ContextUtil.exit();
                        }
    
                        return var12;
                    }
                }
    
                return result;
            }
        }
    
        @Override
        public boolean equals(Object obj) {
            if (obj instanceof SentinelInvocationHandler) {
                CloudSentinelInvocationHandler other = (CloudSentinelInvocationHandler)obj;
                return this.target.equals(other.target);
            } else {
                return false;
            }
        }
    
        @Override
        public int hashCode() {
            return this.target.hashCode();
        }
    
        @Override
        public String toString() {
            return this.target.toString();
        }
    
        static Map<Method, Method> toFallbackMethod(Map<Method, InvocationHandlerFactory.MethodHandler> dispatch) {
            Map<Method, Method> result = new LinkedHashMap();
            Iterator var2 = dispatch.keySet().iterator();
    
            while(var2.hasNext()) {
                Method method = (Method)var2.next();
                method.setAccessible(true);
                result.put(method, method);
            }
    
            return result;
        }
    }
    

5.使用

在加入限流降級機制后調用方一定要考慮被調用方法的返回值有可能是限流降級后的返回結果,后續對這個結果做處理時需進行判斷

server端

使用方式

sentinel會抓取所有請求過的url作為埋點資源,可以在dashboard上對其做限流降級處理,需要注意的是通過dashboard增加的規則並不會持久化(sentinel-dashboard與apollo沒有通信機制),*如果發現server端降級並不會生效,則檢查自己是否在接口層做了異常處理邏輯**(因為sentinel的異常數統計是在攔截器中做的,導致統計不到異常)

自定義限流降級邏輯

在接口入口處使用@SentinelResource即可,參考下方restTemplate方式自定義限流降級邏輯

client端

restTemplate方式

使用方式

在發起restTemplate調用的方法上添加 @SentinelResource即可,此時限流降級走默認邏輯

@Component
public class DemoService {

    @Resource
    private ServiceRestTemplate restTemplate;

    @SentinelResource
    public DtoResponse<DemoInfoDto> getDemoInfo(CurrentBoardInfoBo bo)  {
        ResponseEntity<DtoResponse<DemoInfoDto>> entity = restTemplate.servicePost("http://demo-service/test", bo, new ParameterizedTypeReference<DtoResponse<DemoInfoDto>>() {
        });
        if(entity!= null && entity.getBody()!= null){
            return entity.getBody();
        }
        return null;
    }
}

自定義限流降級邏輯

  • 第一種方式

在原來中blockHandler方法,入參為原方法入參的基礎上加上異常類,注意blockHandler方法的參數必須同原方法一致,異常類參數也必須為BlockException exception;fallbackClass同理,注意異常參數為Throwable throwable

 @Component
public class DemoService {

    @Resource
    private ServiceRestTemplate restTemplate;

    @SentinelResource(value = "sentinel01", blockHandler = "blockHandler" ,fallback = "fackback" )
    public DtoResponse<DemoInfoDto> getDemoInfo(CurrentBoardInfoBo bo)  {
        DtoResponse<DemoInfoDto> dtoDtoResponse = restTemplate.servicePostDtoResponse("http://demo-service/test", bo, DemoInfoDto.class);

        return dtoDtoResponse;
    }
    public DtoResponse<DemoInfoDto> blockHandler(CurrentBoardInfoBo bo, BlockException exception){
        DtoResponse<DemoInfoDto> dtoDtoResponse = new DtoResponse<>();
        //限流邏輯,例如返回固定值或者從redis等拿緩存等來應對大流量
        DemoInfoDto DemoInfoDto = new DemoInfoDto();
        DemoInfoDto.setVersion("v3");
        dtoDtoResponse.success(DemoInfoDto);
        return dtoDtoResponse;
    }
    public DtoResponse<DemoInfoDto> fackback(CurrentBoardInfoBo bo, Throwable throwable){
        DtoResponse<DemoInfoDto> dtoDtoResponse = new DtoResponse<>();
        //降級邏輯,例如返回固定值或者從redis等拿緩存等來應對下游服務不可用
        DemoInfoDto DemoInfoDto = new DemoInfoDto();
        DemoInfoDto.setVersion("v3");
        dtoDtoResponse.success(DemoInfoDto);
        return dtoDtoResponse;
    }
}
  • 第二種方式

限流降級邏輯放到專門的處理類中,注意此時方法需為靜態的,

@Component
public class DemoService {

    @Resource
    private ServiceRestTemplate restTemplate;

    @SentinelResource(value = "sentinel01", blockHandler = "blockHandler" , blockHandlerClass = IndustryblockHandler.class,fallback = "fackback",fallbackClass = DemofallbackHandler.class)
    public DtoResponse<DemoInfoDto> getDemoInfo(CurrentBoardInfoBo bo)  {
        DtoResponse<DemoInfoDto> dtoDtoResponse = restTemplate.servicePostDtoResponse("http://demo-service/test", bo, DemoInfoDto.class);

        return dtoDtoResponse;
    }
   
}
public class DemoblockHandler {

    public static DtoResponse<DemoInfoDto> blockHandler(CurrentBoardInfoBo bo, BlockException exception){
        DtoResponse<DemoInfoDto> dtoDtoResponse = new DtoResponse<>();
        //限流邏輯,例如返回固定值或者從redis等拿緩存等來應對大流量
        DemoInfoDto DemoInfoDto = new DemoInfoDto();
        DemoInfoDto.setVersion("v3");
        dtoDtoResponse.success(DemoInfoDto);
        return dtoDtoResponse;
    }
}
public class DemofallbackHandler {

    public static DtoResponse<DemoInfoDto> fackback(TestInfoBo bo, Throwable throwable){
        DtoResponse<DemoInfoDto> dtoDtoResponse = new DtoResponse<>();
        //降級邏輯,例如返回固定值或者從redis等拿緩存等來應對下游服務不可用
        DemoInfoDto DemoInfoDto = new DemoInfoDto();
        DemoInfoDto.setVersion("v3");
        dtoDtoResponse.success(DemoInfoDto);
        return dtoDtoResponse;
    }
}

feign方式

使用方式

與原來集成方式相同,此時限流降級走默認邏輯

@FeignClient(value = "demo-service")
public interface DemoApi {

    @PostMapping(value = "/test", produces = "application/json")
    DtoResponse<DemoInfoDto> getDemoInfo(@RequestBody DemoInfoBo bo);

}

默認邏輯返回格式

同restTemplate

自定義限流降級規則

  • 第一種方式

    IndustryDimensionApiFallBack類需在spring容器中存在

@FeignClient(value = "demo-service" ,fallback = DemoApiFallBack.class)
public interface DemoApi {

    @PostMapping(value = "/test", produces = "application/json")
    DtoResponse<DemoInfoDto> getDemoInfo(@RequestBody DemoInfoBo bo);
    
}
@Component
public class DemoApiFallBack implements DemoApi {
    @Override
    public DtoResponse<DemoInfoDto> getDemoInfo(DemoInfoBo bo) {
        DtoResponse dtoResponse = new DtoResponse();
        DemoInfoDto DemoInfoDto = new DemoInfoDto();
        DemoInfoDto.setVersion("v3");
        return dtoResponse.success(DemoInfoDto);
    }  
}
  • 第二種方式
@FeignClient(value = "demo-service" ,fallbackFactory = DemofallbackFactory.class)
public interface DemoApi {

    @PostMapping(value = "/test", produces = "application/json")
    DtoResponse<DemoInfoDto> getDemoInfo(@RequestBody DemoInfoBo bo);

}
@Component
public class DemofallbackFactory implements FallbackFactory<DemoApiFallBack> {

    @Override
    public DemoApiFallBack create(Throwable cause) {
        return new DemoApiFallBack();
    }
}
public class DemoApiFallBack implements DemoApi {
    @Override
    public DtoResponse<DemoInfoDto> getDemoInfo(DemoInfoBo bo) {
        DtoResponse dtoResponse = new DtoResponse();
        DemoInfoDto DemoInfoDto = new DemoInfoDto();
        DemoInfoDto.setVersion("v3");
        return dtoResponse.success(DemoInfoDto);
    }

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM