dubbo 超時機制以及負載均衡、重試機制都是針對客戶端進行的。
1. dubbo 重試機制
dubbo 重試機制針對不同的Invoker。主要的集群Invoker 有如下:
默認的集群Invoker是FailoverClusterInvoker。這里有重試機制。其默認的重試次數是2次(調用1次,重試2次,所以總共嘗試是3次)。

package org.apache.dubbo.rpc.cluster.support; import org.apache.dubbo.common.Version; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcContext; import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.cluster.Directory; import org.apache.dubbo.rpc.cluster.LoadBalance; import org.apache.dubbo.rpc.support.RpcUtils; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import static org.apache.dubbo.rpc.cluster.Constants.DEFAULT_RETRIES; import static org.apache.dubbo.rpc.cluster.Constants.RETRIES_KEY; /** * When invoke fails, log the initial error and retry other invokers (retry n times, which means at most n different invokers will be invoked) * Note that retry causes latency. * <p> * <a href="http://en.wikipedia.org/wiki/Failover">Failover</a> * */ public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker(Directory<T> directory) { super(directory); } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { checkWhetherDestroyed(); copyInvokers = list(invocation); // check again checkInvokers(copyInvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le); } }
如上代碼可以看到負載均衡和重試機制。會重試三次,然后進行負載均衡選擇服務器之后進行RPC調用。
如果正常則返回結果,如果重試都失敗之后就拋出RPC異常。
還有一些其他的Invoker 調用策略,比如:
- failsafe, 這個策略默認是返回一個空值,失敗不做任何異常處理。

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class); public FailsafeClusterInvoker(Directory<T> directory) { super(directory); } @Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failsafe ignore exception: " + e.getMessage(), e); return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore } } }
- failfast, 這個策略不進行重試,會直接拋出異常

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> { public FailfastClusterInvoker(Directory<T> directory) { super(directory); } @Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); try { return invoker.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception. throw (RpcException) e; } throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } } }
重試次數可以通過修改全局配置,也可以針對單個服務進行修改:
(1) 全局配置
dubbo: consumer: retries: 3
(2) 單個配置
@Reference(version = "1.0.0", retries = 5) private UserService userService;
2. dubbo 負載均衡
dubbo的負載均衡選擇器如下:
(1) Random - 隨機算法,(根據權重進行隨機) 默認算法
(2) RoundRobin - 輪詢(基於權重)負載均衡算法
(3) leastactive - 最近最少活躍
(4) consistenthash - 一致性hash, 計算參數的hash, 根據hash 進行選擇
3. 超時機制
參考:https://juejin.cn/post/6887553443880255501
dubbo 超時一般也是針對消費者端。消費者端的超時時間默認是1000 ms,可以通過配置文件進行修改。下面研究其超時實現:
(1) dubbo客戶端服務調用會調用到如下方法: org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker#invoke:
public Result invoke(Invocation invocation) throws RpcException { Result asyncResult = invoker.invoke(invocation); try { if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (ExecutionException e) { Throwable t = e.getCause(); if (t instanceof TimeoutException) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } else if (t instanceof RemotingException) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } catch (Throwable e) { throw new RpcException(e.getMessage(), e); } return asyncResult; }
這里可以看出: 如果是同步模式的話,需要調用asyncResult.get 獲取等待結果。如果是異步模式的話直接返回asyncResult, 不等待返回結果。所以同步是通過java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 同步阻塞等待結果實現的。 內部會判斷如果是超時異常,會拋出異常。
get 后面會調用到: java.util.concurrent.CompletableFuture#reportGet 然后拋出異常 ExecutionException
private static <T> T reportGet(Object r) throws InterruptedException, ExecutionException { if (r == null) // by convention below, null means interrupted throw new InterruptedException(); if (r instanceof AltResult) { Throwable x, cause; if ((x = ((AltResult)r).ex) == null) return null; if (x instanceof CancellationException) throw (CancellationException)x; if ((x instanceof CompletionException) && (cause = x.getCause()) != null) x = cause; throw new ExecutionException(x); } @SuppressWarnings("unchecked") T t = (T) r; return t; }
(2) 下面研究其生成AltResult 的過程
這里可以理解為后面有個定時任務(HashedWheelTimer, 哈希輪定時器)來對發送之后的請求進行判斷。如果超過超時時間的設置就設置響應結果為超時,這樣前面在get 調用到 reportGet 的時候可以獲取到相關的超時異常。
org.apache.dubbo.rpc.protocol.AsyncToSyncInvoker#invoke 經過一系列調用會調用到 org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
1》org.apache.dubbo.remoting.exchange.support.DefaultFuture#newFuture
public static DefaultFuture newFuture(Channel channel, Request request, int timeout) { final DefaultFuture future = new DefaultFuture(channel, request, timeout); // timeout check timeoutCheck(future); return future; } private static void timeoutCheck(DefaultFuture future) { TimeoutCheckTask task = new TimeoutCheckTask(future.getId()); future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS); }
2》org.apache.dubbo.common.timer.HashedWheelTimer#newTimeout 這里創建超時檢測任務並且添加到timeouts 隊列中
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } start(); // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Guard against overflow. if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; }
3》接下來就交給org.apache.dubbo.common.timer.HashedWheelTimer.Worker#run 方法定時跑任務
public void run() { // Initialize the startTime. startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; } // Notify the other threads waiting for the initialization at start(). startTimeInitialized.countDown(); do { final long deadline = waitForNextTick(); if (deadline > 0) { int idx = (int) (tick & mask); processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; transferTimeoutsToBuckets(); bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket : wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (; ; ) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } processCancelledTasks(); }
然后會調用到: org.apache.dubbo.common.timer.HashedWheelTimer.HashedWheelBucket#expireTimeouts
/** * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}. */ void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; // process all timeouts while (timeout != null) { HashedWheelTimeout next = timeout.next; if (timeout.remainingRounds <= 0) { next = remove(timeout); if (timeout.deadline <= deadline) { timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { next = remove(timeout); } else { timeout.remainingRounds--; } timeout = next; } }
繼續調用調用到:org.apache.dubbo.common.timer.HashedWheelTimer.HashedWheelTimeout#expire
public void expire() { if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; } try { task.run(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t); } } }
繼續調用:org.apache.dubbo.remoting.exchange.support.DefaultFuture.TimeoutCheckTask#run
@Override public void run(Timeout timeout) { DefaultFuture future = DefaultFuture.getFuture(requestID); if (future == null || future.isDone()) { return; } // create exception response. Response timeoutResponse = new Response(future.getId()); // set timeout status. timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // handle response. DefaultFuture.received(future.getChannel(), timeoutResponse, true); }
然后調用 org.apache.dubbo.remoting.exchange.support.DefaultFuture#received(org.apache.dubbo.remoting.Channel, org.apache.dubbo.remoting.exchange.Response, boolean) 設置結果為超時結果。 這里的res.getStatus 為SERVER_TIMEOUT。 也就是請求已經發出去,但是在指定的超時時間內沒有響應結果。如果在指定時間內,請求還沒有發出去則認為是客戶端超時。
public static void received(Channel channel, Response response, boolean timeout) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { Timeout t = future.timeoutCheckTask; if (!timeout) { // decrease Time t.cancel(); } future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } } private void doReceived(Response res) { if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } }
然后調用 java.util.concurrent.CompletableFuture#completeExceptionally 設置結果為:AltResult, 並且記錄異常為超時異常。這樣在前面的get 阻塞獲取結果的時候就可以獲取到結果。
public boolean completeExceptionally(Throwable ex) { if (ex == null) throw new NullPointerException(); boolean triggered = internalComplete(new AltResult(ex)); postComplete(); return triggered; }
補充:重試次數、是否異步等都可以單獨配置。
這里需要注意,如果配置不生效,可能是單個服務里面配置了多個Reference。 注解生成的代理對象是單例,所以導致不生效。 最好是將dubbo 接口集中統一管理。
@Reference(version = "1.0.0", retries = 5, async = true) private UserService userService;
查看注解如下:

@Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.FIELD, ElementType.METHOD, ElementType.ANNOTATION_TYPE}) public @interface Reference { /** * Interface class, default value is void.class */ Class<?> interfaceClass() default void.class; /** * Interface class name, default value is empty string */ String interfaceName() default ""; /** * Service version, default value is empty string */ String version() default ""; /** * Service group, default value is empty string */ String group() default ""; /** * Service target URL for direct invocation, if this is specified, then registry center takes no effect. */ String url() default ""; /** * Client transport type, default value is "netty" */ String client() default ""; /** * Whether to enable generic invocation, default value is false */ boolean generic() default false; /** * When enable, prefer to call local service in the same JVM if it's present, default value is true */ boolean injvm() default true; /** * Check if service provider is available during boot up, default value is true */ boolean check() default true; /** * Whether eager initialize the reference bean when all properties are set, default value is false */ boolean init() default false; /** * Whether to make connection when the client is created, the default value is false */ boolean lazy() default false; /** * Export an stub service for event dispatch, default value is false. * * @see org.apache.dubbo.rpc.Constants#STUB_EVENT_METHODS_KEY */ boolean stubevent() default false; /** * Whether to reconnect if connection is lost, if not specify, reconnect is enabled by default, and the interval * for retry connecting is 2000 ms * * @see org.apache.dubbo.remoting.Constants#DEFAULT_RECONNECT_PERIOD */ String reconnect() default ""; /** * Whether to stick to the same node in the cluster, the default value is false * * @see Constants#DEFAULT_CLUSTER_STICKY */ boolean sticky() default false; /** * How the proxy is generated, legal values include: jdk, javassist */ String proxy() default ""; /** * Service stub name, use interface name + Local if not set */ String stub() default ""; /** * Cluster strategy, legal values include: failover, failfast, failsafe, failback, forking */ String cluster() default ""; /** * Maximum connections service provider can accept, default value is 0 - connection is shared */ int connections() default 0; /** * The callback instance limit peer connection * * @see org.apache.dubbo.rpc.Constants#DEFAULT_CALLBACK_INSTANCES */ int callbacks() default 0; /** * Callback method name when connected, default value is empty string */ String onconnect() default ""; /** * Callback method name when disconnected, default value is empty string */ String ondisconnect() default ""; /** * Service owner, default value is empty string */ String owner() default ""; /** * Service layer, default value is empty string */ String layer() default ""; /** * Service invocation retry times * * @see Constants#DEFAULT_RETRIES */ int retries() default 2; /** * Load balance strategy, legal values include: random, roundrobin, leastactive * * @see Constants#DEFAULT_LOADBALANCE */ String loadbalance() default ""; /** * Whether to enable async invocation, default value is false */ boolean async() default false; /** * Maximum active requests allowed, default value is 0 */ int actives() default 0; /** * Whether the async request has already been sent, the default value is false */ boolean sent() default false; /** * Service mock name, use interface name + Mock if not set */ String mock() default ""; /** * Whether to use JSR303 validation, legal values are: true, false */ String validation() default ""; /** * Timeout value for service invocation, default value is 0 */ int timeout() default 0; /** * Specify cache implementation for service invocation, legal values include: lru, threadlocal, jcache */ String cache() default ""; /** * Filters for service invocation * * @see Filter */ String[] filter() default {}; /** * Listeners for service exporting and unexporting * * @see ExporterListener */ String[] listener() default {}; /** * Customized parameter key-value pair, for example: {key1, value1, key2, value2} */ String[] parameters() default {}; /** * Application associated name */ String application() default ""; /** * Module associated name */ String module() default ""; /** * Consumer associated name */ String consumer() default ""; /** * Monitor associated name */ String monitor() default ""; /** * Registry associated name */ String[] registry() default {}; /** * The communication protocol of Dubbo Service * * @return the default value is "" * @since 2.6.6 */ String protocol() default ""; /** * Service tag name */ String tag() default ""; /** * methods support * * @return */ Method[] methods() default {}; /** * The id * * @return default value is empty * @since 2.7.3 */ String id() default ""; }