Spring Cloud Feign組件


采用Spring Cloud微服務框架后,經常會涉及到服務間調用,服務間調用采用了Feign組件。

由於之前有使用dubbo經驗。dubbo的負載均衡策略(輪訓、最小連接數、隨機輪訓、加權輪訓),dubbo失敗策略(快速失敗、失敗重試等等),

所以Feign負載均衡策略的是什么? 失敗后是否會重試,重試策略又是什么?帶這個疑問,查了一些資料,最后還是看了下代碼。畢竟代碼就是一切

 

Spring boot集成Feign的大概流程:

1、利用FeignAutoConfiguration自動配置。並根據EnableFeignClients 自動注冊產生Feign的代理類。

2、注冊方式利用FeignClientFactoryBean,熟悉Spring知道FactoryBean 產生bean的工廠,有個重要方法getObject產生FeignClient容器bean

3、同時代理類中使用hystrix做資源隔離,Feign代理類中 構造 RequestTemplate ,RequestTemlate要做的向負載均衡選中的server發送http請求,並進行編碼和解碼一系列操作。

下面只是粗略的看了下整體流程,先有整體再有細節吧,下面利用IDEA看下細節:

一、Feign失敗重試

SynchronousMethodHandler的方法中的處理邏輯
 @Override
  public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    Retryer retryer = this.retryer.clone();
    while (true) {
      try {
        return executeAndDecode(template);
      } catch (RetryableException e) {
        retryer.continueOrPropagate(e);
        if (logLevel != Logger.Level.NONE) {
          logger.logRetry(metadata.configKey(), logLevel);
        }
        continue;
      }
    }
  }
  •  上面的邏輯很簡單。構造 template 並去進行服務間的http調用,然后對返回結果進行解碼
  •      當拋出 RetryableException 后,異常邏輯是否重試? 重試多少次? 帶這個問題,看了retryer.continueOrPropagate(e);
具體邏輯如下:
public void continueOrPropagate(RetryableException e) {
      if (attempt++ >= maxAttempts) {
        throw e;
      }

      long interval;
      if (e.retryAfter() != null) {
        interval = e.retryAfter().getTime() - currentTimeMillis();
        if (interval > maxPeriod) {
          interval = maxPeriod;
        }
        if (interval < 0) {
          return;
        }
      } else {
        interval = nextMaxInterval();
      }
      try {
        Thread.sleep(interval);
      } catch (InterruptedException ignored) {
        Thread.currentThread().interrupt();
      }
      sleptForMillis += interval;
    }
  •   當重試次數大於默認次數5時候,直接拋出異常,不在重試
  •        否則每隔一段時間 默認值最大1ms 后重試一次。

     這就Feign這塊的重試這塊的粗略邏輯,由於之前工作中一直使用dubbo。同樣是否需要將生產環境中重試操作關閉?

     思考:之前dubbo生產環境的重試操作都會關閉。原因有幾個:

         一、一般第一次失敗,重試也會失敗,極端情況下不斷的重試,會占用大量dubbo連接池,造成連接池被打滿,影響核心功能

        二、也是比較重要的一點原因,重試帶來的業務邏輯的影響,即如果接口不是冪等的,重試會帶來業務邏輯的錯誤,引發問題

     

二、Feign負載均衡策略

那么負載均衡的策略又是什么呢?由上圖中可知 executeAndDecode(template

 1 Object executeAndDecode(RequestTemplate template) throws Throwable {
 2     Request request = targetRequest(template);
 3 
 4     if (logLevel != Logger.Level.NONE) {
 5       logger.logRequest(metadata.configKey(), logLevel, request);
 6     }
 7 
 8     Response response;
 9     long start = System.nanoTime();
10     try {
11       response = client.execute(request, options);
12       // ensure the request is set. TODO: remove in Feign 10
13       response.toBuilder().request(request).build();
14     } catch (IOException e) {
15       if (logLevel != Logger.Level.NONE) {
16         logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
17       }
18       throw errorExecuting(request, e);
19     }
20     long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
21 
22     boolean shouldClose = true;
23     try {
24       if (logLevel != Logger.Level.NONE) {
25         response =
26             logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime);
27         // ensure the request is set. TODO: remove in Feign 10
28         response.toBuilder().request(request).build();
29       }
30       if (Response.class == metadata.returnType()) {
31         if (response.body() == null) {
32           return response;
33         }
34         if (response.body().length() == null ||
35                 response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
36           shouldClose = false;
37           return response;
38         }
39         // Ensure the response body is disconnected
40         byte[] bodyData = Util.toByteArray(response.body().asInputStream());
41         return response.toBuilder().body(bodyData).build();
42       }
43       if (response.status() >= 200 && response.status() < 300) {
44         if (void.class == metadata.returnType()) {
45           return null;
46         } else {
47           return decode(response);
48         }
49       } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) {
50         return decode(response);
51       } else {
52         throw errorDecoder.decode(metadata.configKey(), response);
53       }
54     } catch (IOException e) {
55       if (logLevel != Logger.Level.NONE) {
56         logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime);
57       }
58       throw errorReading(request, response, e);
59     } finally {
60       if (shouldClose) {
61         ensureClosed(response.body());
62       }
63     }
64   }

 

概括的說主要做了兩件事:發送HTTP請求,解碼響應數據

想看的負載均衡應該在11行  response = client.execute(request, options); 而client的實現方式有兩種 Default、LoadBalancerFeignClient

猜的話應該是LoadBalancerFeignClient,帶這個問題去看源碼(其實個人更喜歡帶着問題看源碼,沒有目的一是看很難將復雜的源碼關聯起來,二是很容易迷失其中

果然通過一番查找發現 Client 實例就是LoadBalancerFeignClient,而設置這個Client就是通過上面說的FeignClientFactoryBean的getObject方法中設置的,具體不說了

下面重點看LoadBalancerFeignClient execute(request, options)

 

 1 @Override
 2     public Response execute(Request request, Request.Options options) throws IOException {
 3         try {
 4             URI asUri = URI.create(request.url());
 5             String clientName = asUri.getHost();
 6             URI uriWithoutHost = cleanUrl(request.url(), clientName);
 7             FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
 8                     this.delegate, request, uriWithoutHost);
 9 
10             IClientConfig requestConfig = getClientConfig(options, clientName);
11             return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
12                     requestConfig).toResponse();
13         }
14         catch (ClientException e) {
15             IOException io = findIOException(e);
16             if (io != null) {
17                 throw io;
18             }
19             throw new RuntimeException(e);
20         }
21     }

通過幾行代碼比較重要的點RibbonRequest ,原來Feign負載均衡還是通過Ribbon實現的,那么Ribbo又是如何實現負載均衡的呢? 

  1 public Observable<T> submit(final ServerOperation<T> operation) {
  2         final ExecutionInfoContext context = new ExecutionInfoContext();
  3         
  4         if (listenerInvoker != null) {
  5             try {
  6                 listenerInvoker.onExecutionStart();
  7             } catch (AbortExecutionException e) {
  8                 return Observable.error(e);
  9             }
 10         }
 11 
 12         final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
 13         final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
 14 
 15         // Use the load balancer
 16         Observable<T> o = 
 17                 (server == null ? selectServer() : Observable.just(server))
 18                 .concatMap(new Func1<Server, Observable<T>>() {
 19                     @Override
 20                     // Called for each server being selected
 21                     public Observable<T> call(Server server) {
 22                         context.setServer(server);
 23                         final ServerStats stats = loadBalancerContext.getServerStats(server);
 24                         
 25                         // Called for each attempt and retry
 26                         Observable<T> o = Observable
 27                                 .just(server)
 28                                 .concatMap(new Func1<Server, Observable<T>>() {
 29                                     @Override
 30                                     public Observable<T> call(final Server server) {
 31                                         context.incAttemptCount();
 32                                         loadBalancerContext.noteOpenConnection(stats);
 33                                         
 34                                         if (listenerInvoker != null) {
 35                                             try {
 36                                                 listenerInvoker.onStartWithServer(context.toExecutionInfo());
 37                                             } catch (AbortExecutionException e) {
 38                                                 return Observable.error(e);
 39                                             }
 40                                         }
 41                                         
 42                                         final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
 43                                         
 44                                         return operation.call(server).doOnEach(new Observer<T>() {
 45                                             private T entity;
 46                                             @Override
 47                                             public void onCompleted() {
 48                                                 recordStats(tracer, stats, entity, null);
 49                                                 // TODO: What to do if onNext or onError are never called?
 50                                             }
 51 
 52                                             @Override
 53                                             public void onError(Throwable e) {
 54                                                 recordStats(tracer, stats, null, e);
 55                                                 logger.debug("Got error {} when executed on server {}", e, server);
 56                                                 if (listenerInvoker != null) {
 57                                                     listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
 58                                                 }
 59                                             }
 60 
 61                                             @Override
 62                                             public void onNext(T entity) {
 63                                                 this.entity = entity;
 64                                                 if (listenerInvoker != null) {
 65                                                     listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
 66                                                 }
 67                                             }                            
 68                                             
 69                                             private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
 70                                                 tracer.stop();
 71                                                 loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
 72                                             }
 73                                         });
 74                                     }
 75                                 });
 76                         
 77                         if (maxRetrysSame > 0) 
 78                             o = o.retry(retryPolicy(maxRetrysSame, true));
 79                         return o;
 80                     }
 81                 });
 82             
 83         if (maxRetrysNext > 0 && server == null) 
 84             o = o.retry(retryPolicy(maxRetrysNext, false));
 85         
 86         return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
 87             @Override
 88             public Observable<T> call(Throwable e) {
 89                 if (context.getAttemptCount() > 0) {
 90                     if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
 91                         e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
 92                                 "Number of retries on next server exceeded max " + maxRetrysNext
 93                                 + " retries, while making a call for: " + context.getServer(), e);
 94                     }
 95                     else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
 96                         e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
 97                                 "Number of retries exceeded max " + maxRetrysSame
 98                                 + " retries, while making a call for: " + context.getServer(), e);
 99                     }
100                 }
101                 if (listenerInvoker != null) {
102                     listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
103                 }
104                 return Observable.error(e);
105             }
106         });
107     }

 

通過上面代碼分析,發現Ribbon和Hystrix一樣都是利用了rxjava看來有必要掌握下rxjava了又。這里面 比較重要的就是17行,

selectServer() 方法選擇指定的Server,負載均衡的策略主要是有ILoadBalancer接口不同實現方式:

BaseLoadBalancer采用的規則為RoundRobinRule 輪訓規則
DynamicServerListLoadBalancer繼承了BaseLoadBalancer,主要運行時改變Server列表
NoOpLoadBalancer 什么操作都不做
ZoneAwareLoadBalancer 功能主要是根據區域Zone分組的實例列表

 

     


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM