背景
本文基於Spring-Cloud, Daltson SR4
微服務一般多實例部署,在發布的時候,我們要做到無感知發布;微服務調用總會通過Ribbon,同時里面會實現一些重試的機制,相關配置是:
#最多重試多少台服務器 ribbon.MaxAutoRetriesNextServer=2 #每台服務器最多重試次數,但是首次調用不包括在內 ribbon.MaxAutoRetries=1
- 1
- 2
- 3
- 4
在發布時,為了適應Eureka注冊中心的注冊信息變換(參考Eureka上線下線解析),我們挨個重啟實例,並且在每個實例啟動后等待一段時間((Eureka客戶端注冊信息刷新時間+Eureka客戶端Ribbon刷新事件)*3)再重啟另外一個實例,來避免注冊信息變化帶來的影響,這樣這個被重啟的實例的微服務的調用方總能負載均衡重試調用到可用的實例。
但是,實際生產中,我們發現,某個實例重啟其他實例正常工作時,會有一小段時間,調用方調用到被重啟的實例,直接失敗,沒有觸發重試。
代碼分析
無論上層是Feign調用還是Zuul調用,到了Ribbon這一層都是創建一個LoadBalancerCommand,調用其中的submit方法執行http請求,這里利用了RxJava機制:
public Observable<T> submit(final ServerOperation<T> operation) { final ExecutionInfoContext context = new ExecutionInfoContext(); if (listenerInvoker != null) { try { listenerInvoker.onExecutionStart(); } catch (AbortExecutionException e) { return Observable.error(e); } } //這里就是讀取上面說的配置最多重試多少台服務器以及每台服務器最多重試次數 final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); // 利用RxJava生成一個Observable用於后面的回調 Observable<T> o = //選擇一個server進行調用 (server == null ? selectServer() : Observable.just(server)) .concatMap(new Func1<Server, Observable<T>>() { @Override // Called for each server being selected public Observable<T> call(Server server) { context.setServer(server); //獲取這個server調用監控記錄,用於各種統計和LoadBalanceRule的篩選server處理 final ServerStats stats = loadBalancerContext.getServerStats(server); //獲取本次server調用的回調入口,用於重試同一實例的重試回調 Observable<T> o = Observable .just(server) .concatMap(new Func1<Server, Observable<T>>() { @Override public Observable<T> call(final Server server) { context.incAttemptCount(); loadBalancerContext.noteOpenConnection(stats); if (listenerInvoker != null) { try { listenerInvoker.onStartWithServer(context.toExecutionInfo()); } catch (AbortExecutionException e) { return Observable.error(e); } } final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start(); return operation.call(server).doOnEach(new Observer<T>() { private T entity; @Override public void onCompleted() { recordStats(tracer, stats, entity, null); // TODO: What to do if onNext or onError are never called? } @Override public void onError(Throwable e) { recordStats(tracer, stats, null, e); logger.debug("Got error {} when executed on server {}", e, server); if (listenerInvoker != null) { listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo()); } } @Override public void onNext(T entity) { this.entity = entity; if (listenerInvoker != null) { listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo()); } } private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) { tracer.stop(); loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler); } }); } }); //設置針對同一實例的重試回調 if (maxRetrysSame > 0) o = o.retry(retryPolicy(maxRetrysSame, true)); return o; } }); //設置重試下一個實例的回調 if (maxRetrysNext > 0 && server == null) o = o.retry(retryPolicy(maxRetrysNext, false)); //設置重試超過次數則終止調用並設置對應異常的回調 return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() { @Override public Observable<T> call(Throwable e) { if (context.getAttemptCount() > 0) { if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), e); } else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED, "Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), e); } } if (listenerInvoker != null) { listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo()); } return Observable.error(e); } }); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
我們重點看一下設置重試的回調的詳細回調代碼:
private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) { return new Func2<Integer, Throwable, Boolean>() { //只有返回為true的時候才會retry @Override public Boolean call(Integer tryCount, Throwable e) { //拋出的異常是AbortExecutionException則不重試 if (e instanceof AbortExecutionException) { return false; } //超過最大重試次數則不重試 if (tryCount > maxRetrys) { return false; } if (e.getCause() != null && e instanceof RuntimeException) { e = e.getCause(); } //判斷是否是可以重試的exception return retryHandler.isRetriableException(e, same); } }; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
這個判斷是否是可以重試的exception里面的邏輯是:
public boolean isRetriableException(Throwable e, boolean sameServer) { //如果已經配置了ribbon.okToRetryOnAllErrors為true,則不論什么異常都會重試,我們沒有這么配置,一般也不會這么配置 if (okToRetryOnAllErrors) { return true; } else if (e instanceof ClientException) { ClientException ce = (ClientException) e; if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) { return !sameServer; } else { return false; } } else { if (e instanceof RetryableHttpCodeAndMethodException) { //如果是有response返回的異常就會到這里 if (((RetryableHttpCodeAndMethodException) e).getMethod().equals("GET") || okToRetryOnAllOperations) return true; return false; } //其他情況,就是連接失敗的判斷。首先需要配置ribbon.okToRetryOnConnectErrors為true,這個默認就是true;然后通過isConnectionException判斷 return okToRetryOnConnectErrors && isConnectionException(e); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
最后,我們來看看如何判斷一個Exception為ConnectionException:
protected List<Class<? extends Throwable>> connectionRelated = Lists .<Class<? extends Throwable>> newArrayList(SocketException.class); public boolean isConnectionException(Throwable e) { return Utils.isPresentAsCause(e, connectionRelated); }
- 1
- 2
- 3
- 4
- 5
- 6
這個方法其實就看這個異常的異常以及Cause中是否有SocketException,如果有則返回true。
問題定位
在Windows環境下調試,我們發現一個有意思的現象,當我們設置ribbon連接超時
ribbon.ConnectTimeout=500時(這個和我們線上配置一樣),重試失敗,捕獲到“java.net.SocketTimeoutException: connect timed out”這個Exception;當設置連接超時為1000ms以上時(不包括1000),拋出的異常就是“java.net.ConnectException: Connection refused: connect”
我們寫一段測試代碼看一下:
public static void main(String[] args) throws IOException { Socket socket = new Socket(); try { socket.connect(new InetSocketAddress("127.0.0.1", 8080), 500); } catch (Exception e) { e.printStackTrace(); } socket = new Socket(); socket.connect(new InetSocketAddress("127.0.0.1", 8080), 1100); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
這個端口沒有啟用,輸出為:
java.net.SocketTimeoutException: connect timed out at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at com.hash.test.TestRxJava.main(TestRxJava.java:14) Exception in thread "main" java.net.ConnectException: Connection refused: connect at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at com.hash.test.TestRxJava.main(TestRxJava.java:19)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
就是不一樣的Exception
而SocketTimeoutException不是一種SocketException,所以,原有的重試邏輯不能重試。
對於這個問題,我在Feign的github源代碼庫提了個issue
所以,我們要改造isConnectionException這個方法;對於SocketTimeoutException,不是全都重試,只重試msg為connect timed out的Exception。同時,SocketTimeoutException可能會被封裝,我們為了簡單,只通過msg進行判斷:
public boolean isConnectionException(Throwable e) { return Utils.isPresentAsCause(e, connectionRelated) || e.getMessage().contains("connect timed out"); }
- 1
- 2
- 3
- 4
- 5
這段代碼,也提了Pull Request
修改替換源代碼后,線上問題解決
https://blog.csdn.net/zhxdick/article/details/78906462