前言
前情回顧
上一講我們已經知道了Feign的工作原理其實是在項目啟動的時候,通過JDK動態代理為每個FeignClinent生成一個動態代理。
動態代理的數據結構是:ReflectiveFeign.FeignInvocationHandler。其中包含target
(里面是serviceName等信息)和dispatcher
(map數據結構,key是請求的方法名,方法參數等,value是SynchronousMethodHandler
)。
如下圖所示:
本講目錄
這一講主要是Feign與Ribbon結合實現負載均衡的原理分析。
說明
原創不易,如若轉載 請標明來源!
博客地址:一枝花算不算浪漫
微信公眾號:壹枝花算不算浪漫
源碼分析
Feign結合Ribbon實現負載均衡原理
通過前面的分析,我們可以直接來看下SynchronousMethodHandler
中的代碼:
final class SynchronousMethodHandler implements MethodHandler {
@Override
public Object invoke(Object[] argv) throws Throwable {
// 生成請求類似於:GET /sayHello/wangmeng HTTP/1.1
RequestTemplate template = buildTemplateFromArgs.create(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
return executeAndDecode(template);
} catch (RetryableException e) {
retryer.continueOrPropagate(e);
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
Object executeAndDecode(RequestTemplate template) throws Throwable {
// 構建request對象:GET http://serviceA/sayHello/wangmeng HTTP/1.1
Request request = targetRequest(template);
if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}
Response response;
long start = System.nanoTime();
try {
// 這個client就是之前構建的LoadBalancerFeignClient,options是超時時間
response = client.execute(request, options);
// ensure the request is set. TODO: remove in Feign 10
response.toBuilder().request(request).build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
// 下面邏輯都是構建返回值response
boolean shouldClose = true;
try {
if (logLevel != Logger.Level.NONE) {
response =
logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime);
// ensure the request is set. TODO: remove in Feign 10
response.toBuilder().request(request).build();
}
if (Response.class == metadata.returnType()) {
if (response.body() == null) {
return response;
}
if (response.body().length() == null ||
response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
shouldClose = false;
return response;
}
// Ensure the response body is disconnected
byte[] bodyData = Util.toByteArray(response.body().asInputStream());
return response.toBuilder().body(bodyData).build();
}
if (response.status() >= 200 && response.status() < 300) {
if (void.class == metadata.returnType()) {
return null;
} else {
return decode(response);
}
} else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) {
return decode(response);
} else {
throw errorDecoder.decode(metadata.configKey(), response);
}
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime);
}
throw errorReading(request, response, e);
} finally {
if (shouldClose) {
ensureClosed(response.body());
}
}
}
}
這里主要是構建request數據,然后通過request和options去通過LoadBalancerFeignClient.execute()
方法去獲得返回值。我們可以接着看client端的調用:
public class LoadBalancerFeignClient implements Client {
@Override
public Response execute(Request request, Request.Options options) throws IOException {
try {
// asUri: http://serviceA/sayHello/wangmeng
URI asUri = URI.create(request.url());
// clientName:serviceA
String clientName = asUri.getHost();
// uriWithoutHost: http://sayHello/wangmeng
URI uriWithoutHost = cleanUrl(request.url(), clientName);
// 這里ribbonRequest:GET http:///sayHello/wangmeng HTTP/1.1
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
// 這里面config只有兩個超時時間,一個是connectTimeout:5000,一個是readTimeout:5000
IClientConfig requestConfig = getClientConfig(options, clientName);
// 真正執行負載均衡的地方
return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
}
接着我們看下lbClient()
和executeWithLoadBalancer()
public class LoadBalancerFeignClient implements Client {
private FeignLoadBalancer lbClient(String clientName) {
return this.lbClientFactory.create(clientName);
}
}
public class CachingSpringLoadBalancerFactory {
public FeignLoadBalancer create(String clientName) {
if (this.cache.containsKey(clientName)) {
return this.cache.get(clientName);
}
IClientConfig config = this.factory.getClientConfig(clientName);
// 獲取Ribbon ILoadBalancer信息
ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
FeignLoadBalancer client = enableRetry ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
loadBalancedRetryPolicyFactory, loadBalancedBackOffPolicyFactory, loadBalancedRetryListenerFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
this.cache.put(clientName, client);
return client;
}
}
這里是獲取了ILoadBalancer
數據,里面包含了Ribbon獲取的serviceA所有服務節點信息。
這里已經獲取到ILoadBalancer
,里面包含serviceA服務器所有節點請求host信息。接下來就是從中負載均衡選擇一個節點信息host出來。
public abstract class AbstractLoadBalancerAwareClient<S extends ClientRequest, T extends IResponse> extends LoadBalancerContext implements IClient<S, T>, IClientConfigAware {
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
}
public class LoadBalancerCommand<T> {
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
}
// 省略代碼...
// selectServer是真正執行負載均衡的邏輯
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
// loadBalancerURI是http:///sayHello/wangmeng, loadBalancerKey為null
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
}
public class LoadBalancerContext implements IClientConfigAware {
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
String host = null;
int port = -1;
if (original != null) {
host = original.getHost();
}
if (original != null) {
Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);
port = schemeAndPort.second();
}
// 獲取到ILoadBalancer,這里面有IRule的信息及服務節點所有信息
ILoadBalancer lb = getLoadBalancer();
if (host == null) {
// Partial URI or no URI Case
// well we have to just get the right instances from lb - or we fall back
if (lb != null){
// 這里就執行真正的chooseServer的邏輯了。默認的rule為ZoneAvoidanceZule
Server svc = lb.chooseServer(loadBalancerKey);
if (svc == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Load balancer does not have available server for client: "
+ clientName);
}
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
return svc;
}
// 省略代碼
}
}
}
上面代碼已經很清晰了,這里就是真正的通過ribbon的 rule.chooseServer()
負載均衡地選擇了一個服務節點調用,debug如下:
到了這里feign與ribbon的分析也就結束了,返回請求url信息,然后得到response結果:
總結
上面已經分析了Feign與Ribbon的整合,最終還是落到Ribbon中的ILoadBalancer中,使用最后使用IRule去選擇對應的server數據。
下一講 會畫一個很大的圖,包含Feign、Ribbon、Eureka關聯的圖,里面會畫出每個組件的細節及依賴關系。也算是學習至今的一個總復習了。
申明
本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小伙伴可關注個人公眾號:壹枝花算不算浪漫