采用Spring Cloud微服務框架后,經常會涉及到服務間調用,服務間調用采用了Feign組件。
由於之前有使用dubbo經驗。dubbo的負載均衡策略(輪訓、最小連接數、隨機輪訓、加權輪訓),dubbo失敗策略(快速失敗、失敗重試等等),
所以Feign負載均衡策略的是什么? 失敗后是否會重試,重試策略又是什么?帶這個疑問,查了一些資料,最后還是看了下代碼。畢竟代碼就是一切
Spring boot集成Feign的大概流程:
1、利用FeignAutoConfiguration自動配置。並根據EnableFeignClients 自動注冊產生Feign的代理類。
2、注冊方式利用FeignClientFactoryBean,熟悉Spring知道FactoryBean 產生bean的工廠,有個重要方法getObject產生FeignClient容器bean
3、同時代理類中使用hystrix做資源隔離,Feign代理類中 構造 RequestTemplate ,RequestTemlate要做的向負載均衡選中的server發送http請求,並進行編碼和解碼一系列操作。
下面只是粗略的看了下整體流程,先有整體再有細節吧,下面利用IDEA看下細節:
一、Feign失敗重試
SynchronousMethodHandler的方法中的處理邏輯:
@Override public Object invoke(Object[] argv) throws Throwable { RequestTemplate template = buildTemplateFromArgs.create(argv); Retryer retryer = this.retryer.clone(); while (true) { try { return executeAndDecode(template); } catch (RetryableException e) { retryer.continueOrPropagate(e); if (logLevel != Logger.Level.NONE) { logger.logRetry(metadata.configKey(), logLevel); } continue; } } }
- 上面的邏輯很簡單。構造 template 並去進行服務間的http調用,然后對返回結果進行解碼
- 當拋出 RetryableException 后,異常邏輯是否重試? 重試多少次? 帶這個問題,看了retryer.continueOrPropagate(e);
具體邏輯如下:
public void continueOrPropagate(RetryableException e) { if (attempt++ >= maxAttempts) { throw e; } long interval; if (e.retryAfter() != null) { interval = e.retryAfter().getTime() - currentTimeMillis(); if (interval > maxPeriod) { interval = maxPeriod; } if (interval < 0) { return; } } else { interval = nextMaxInterval(); } try { Thread.sleep(interval); } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); } sleptForMillis += interval; }
- 當重試次數大於默認次數5時候,直接拋出異常,不在重試
- 否則每隔一段時間 默認值最大1ms 后重試一次。
這就Feign這塊的重試這塊的粗略邏輯,由於之前工作中一直使用dubbo。同樣是否需要將生產環境中重試操作關閉?
思考:之前dubbo生產環境的重試操作都會關閉。原因有幾個:
一、一般第一次失敗,重試也會失敗,極端情況下不斷的重試,會占用大量dubbo連接池,造成連接池被打滿,影響核心功能
二、也是比較重要的一點原因,重試帶來的業務邏輯的影響,即如果接口不是冪等的,重試會帶來業務邏輯的錯誤,引發問題
二、Feign負載均衡策略
那么負載均衡的策略又是什么呢?由上圖中可知 executeAndDecode(template)
1 Object executeAndDecode(RequestTemplate template) throws Throwable { 2 Request request = targetRequest(template); 3 4 if (logLevel != Logger.Level.NONE) { 5 logger.logRequest(metadata.configKey(), logLevel, request); 6 } 7 8 Response response; 9 long start = System.nanoTime(); 10 try { 11 response = client.execute(request, options); 12 // ensure the request is set. TODO: remove in Feign 10 13 response.toBuilder().request(request).build(); 14 } catch (IOException e) { 15 if (logLevel != Logger.Level.NONE) { 16 logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start)); 17 } 18 throw errorExecuting(request, e); 19 } 20 long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); 21 22 boolean shouldClose = true; 23 try { 24 if (logLevel != Logger.Level.NONE) { 25 response = 26 logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime); 27 // ensure the request is set. TODO: remove in Feign 10 28 response.toBuilder().request(request).build(); 29 } 30 if (Response.class == metadata.returnType()) { 31 if (response.body() == null) { 32 return response; 33 } 34 if (response.body().length() == null || 35 response.body().length() > MAX_RESPONSE_BUFFER_SIZE) { 36 shouldClose = false; 37 return response; 38 } 39 // Ensure the response body is disconnected 40 byte[] bodyData = Util.toByteArray(response.body().asInputStream()); 41 return response.toBuilder().body(bodyData).build(); 42 } 43 if (response.status() >= 200 && response.status() < 300) { 44 if (void.class == metadata.returnType()) { 45 return null; 46 } else { 47 return decode(response); 48 } 49 } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) { 50 return decode(response); 51 } else { 52 throw errorDecoder.decode(metadata.configKey(), response); 53 } 54 } catch (IOException e) { 55 if (logLevel != Logger.Level.NONE) { 56 logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime); 57 } 58 throw errorReading(request, response, e); 59 } finally { 60 if (shouldClose) { 61 ensureClosed(response.body()); 62 } 63 } 64 }
概括的說主要做了兩件事:發送HTTP請求,解碼響應數據
想看的負載均衡應該在11行 response = client.execute(request, options); 而client的實現方式有兩種 Default、LoadBalancerFeignClient
猜的話應該是LoadBalancerFeignClient,帶這個問題去看源碼(其實個人更喜歡帶着問題看源碼,沒有目的一是看很難將復雜的源碼關聯起來,二是很容易迷失其中)
果然通過一番查找發現 Client 實例就是LoadBalancerFeignClient,而設置這個Client就是通過上面說的FeignClientFactoryBean的getObject方法中設置的,具體不說了
下面重點看LoadBalancerFeignClient execute(request, options)
1 @Override 2 public Response execute(Request request, Request.Options options) throws IOException { 3 try { 4 URI asUri = URI.create(request.url()); 5 String clientName = asUri.getHost(); 6 URI uriWithoutHost = cleanUrl(request.url(), clientName); 7 FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest( 8 this.delegate, request, uriWithoutHost); 9 10 IClientConfig requestConfig = getClientConfig(options, clientName); 11 return lbClient(clientName).executeWithLoadBalancer(ribbonRequest, 12 requestConfig).toResponse(); 13 } 14 catch (ClientException e) { 15 IOException io = findIOException(e); 16 if (io != null) { 17 throw io; 18 } 19 throw new RuntimeException(e); 20 } 21 }
通過幾行代碼比較重要的點RibbonRequest ,原來Feign負載均衡還是通過Ribbon實現的,那么Ribbo又是如何實現負載均衡的呢?
1 public Observable<T> submit(final ServerOperation<T> operation) { 2 final ExecutionInfoContext context = new ExecutionInfoContext(); 3 4 if (listenerInvoker != null) { 5 try { 6 listenerInvoker.onExecutionStart(); 7 } catch (AbortExecutionException e) { 8 return Observable.error(e); 9 } 10 } 11 12 final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); 13 final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); 14 15 // Use the load balancer 16 Observable<T> o = 17 (server == null ? selectServer() : Observable.just(server)) 18 .concatMap(new Func1<Server, Observable<T>>() { 19 @Override 20 // Called for each server being selected 21 public Observable<T> call(Server server) { 22 context.setServer(server); 23 final ServerStats stats = loadBalancerContext.getServerStats(server); 24 25 // Called for each attempt and retry 26 Observable<T> o = Observable 27 .just(server) 28 .concatMap(new Func1<Server, Observable<T>>() { 29 @Override 30 public Observable<T> call(final Server server) { 31 context.incAttemptCount(); 32 loadBalancerContext.noteOpenConnection(stats); 33 34 if (listenerInvoker != null) { 35 try { 36 listenerInvoker.onStartWithServer(context.toExecutionInfo()); 37 } catch (AbortExecutionException e) { 38 return Observable.error(e); 39 } 40 } 41 42 final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start(); 43 44 return operation.call(server).doOnEach(new Observer<T>() { 45 private T entity; 46 @Override 47 public void onCompleted() { 48 recordStats(tracer, stats, entity, null); 49 // TODO: What to do if onNext or onError are never called? 50 } 51 52 @Override 53 public void onError(Throwable e) { 54 recordStats(tracer, stats, null, e); 55 logger.debug("Got error {} when executed on server {}", e, server); 56 if (listenerInvoker != null) { 57 listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo()); 58 } 59 } 60 61 @Override 62 public void onNext(T entity) { 63 this.entity = entity; 64 if (listenerInvoker != null) { 65 listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo()); 66 } 67 } 68 69 private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) { 70 tracer.stop(); 71 loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler); 72 } 73 }); 74 } 75 }); 76 77 if (maxRetrysSame > 0) 78 o = o.retry(retryPolicy(maxRetrysSame, true)); 79 return o; 80 } 81 }); 82 83 if (maxRetrysNext > 0 && server == null) 84 o = o.retry(retryPolicy(maxRetrysNext, false)); 85 86 return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() { 87 @Override 88 public Observable<T> call(Throwable e) { 89 if (context.getAttemptCount() > 0) { 90 if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) { 91 e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, 92 "Number of retries on next server exceeded max " + maxRetrysNext 93 + " retries, while making a call for: " + context.getServer(), e); 94 } 95 else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) { 96 e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED, 97 "Number of retries exceeded max " + maxRetrysSame 98 + " retries, while making a call for: " + context.getServer(), e); 99 } 100 } 101 if (listenerInvoker != null) { 102 listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo()); 103 } 104 return Observable.error(e); 105 } 106 }); 107 }
通過上面代碼分析,發現Ribbon和Hystrix一樣都是利用了rxjava看來有必要掌握下rxjava了又。這里面 比較重要的就是17行,
selectServer() 方法選擇指定的Server,負載均衡的策略主要是有ILoadBalancer接口不同實現方式:
BaseLoadBalancer采用的規則為RoundRobinRule 輪訓規則
DynamicServerListLoadBalancer繼承了BaseLoadBalancer,主要運行時改變Server列表
NoOpLoadBalancer 什么操作都不做
ZoneAwareLoadBalancer 功能主要是根據區域Zone分組的實例列表