SpringCloud 源碼系列(5)—— 負載均衡 Ribbon(下)


SpringCloud 源碼系列(1)—— 注冊中心 Eureka(上)

SpringCloud 源碼系列(2)—— 注冊中心 Eureka(中)

SpringCloud 源碼系列(3)—— 注冊中心 Eureka(下)

SpringCloud 源碼系列(4)—— 負載均衡 Ribbon(上)

SpringCloud 源碼系列(5)—— 負載均衡 Ribbon(下)

SpringCloud 源碼系列(6)—— 聲明式服務調用 Feign

 

五、Ribbon 核心接口

前面已經了解到 Ribbon 核心接口以及默認實現如何協作來查找要調用的一個實例,這節再來看下各個核心接口的一些特性及其它實現類。

1、客戶端配置 — IClientConfig

IClientConfig 就是管理客戶端配置的核心接口,它的默認實現類是 DefaultClientConfigImpl。可以看到在創建 IClientConfig 時,設置了 Ribbon 客戶端默認的連接和讀取超時時間為 1 秒,例如讀取如果超過1秒,就會返回超時,這兩個一般需要根據實際情況來調整。、

 1 @Bean
 2 @ConditionalOnMissingBean
 3 public IClientConfig ribbonClientConfig() {
 4     DefaultClientConfigImpl config = new DefaultClientConfigImpl();
 5     // 加載配置
 6     config.loadProperties(this.name);
 7     // 連接超時默認 1 秒
 8     config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
 9     // 讀取超時默認 1 秒
10     config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
11     config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
12     return config;
13 }

CommonClientConfigKey 這個類定義了 Ribbon 客戶端相關的所有配置的鍵常量,可以通過這個類來看有哪些配置。

  1 public abstract class CommonClientConfigKey<T> implements IClientConfigKey<T> {
  2 
  3     public static final IClientConfigKey<String> AppName = new CommonClientConfigKey<String>("AppName"){};
  4 
  5     public static final IClientConfigKey<String> Version = new CommonClientConfigKey<String>("Version"){};
  6 
  7     public static final IClientConfigKey<Integer> Port = new CommonClientConfigKey<Integer>("Port"){};
  8 
  9     public static final IClientConfigKey<Integer> SecurePort = new CommonClientConfigKey<Integer>("SecurePort"){};
 10 
 11     public static final IClientConfigKey<String> VipAddress = new CommonClientConfigKey<String>("VipAddress"){};
 12 
 13     public static final IClientConfigKey<Boolean> ForceClientPortConfiguration = new CommonClientConfigKey<Boolean>("ForceClientPortConfiguration"){}; // use client defined port regardless of server advert
 14 
 15     public static final IClientConfigKey<String> DeploymentContextBasedVipAddresses = new CommonClientConfigKey<String>("DeploymentContextBasedVipAddresses"){};
 16 
 17     public static final IClientConfigKey<Integer> MaxAutoRetries = new CommonClientConfigKey<Integer>("MaxAutoRetries"){};
 18 
 19     public static final IClientConfigKey<Integer> MaxAutoRetriesNextServer = new CommonClientConfigKey<Integer>("MaxAutoRetriesNextServer"){};
 20 
 21     public static final IClientConfigKey<Boolean> OkToRetryOnAllOperations = new CommonClientConfigKey<Boolean>("OkToRetryOnAllOperations"){};
 22 
 23     public static final IClientConfigKey<Boolean> RequestSpecificRetryOn = new CommonClientConfigKey<Boolean>("RequestSpecificRetryOn"){};
 24 
 25     public static final IClientConfigKey<Integer> ReceiveBufferSize = new CommonClientConfigKey<Integer>("ReceiveBufferSize"){};
 26 
 27     public static final IClientConfigKey<Boolean> EnablePrimeConnections = new CommonClientConfigKey<Boolean>("EnablePrimeConnections"){};
 28 
 29     public static final IClientConfigKey<String> PrimeConnectionsClassName = new CommonClientConfigKey<String>("PrimeConnectionsClassName"){};
 30 
 31     public static final IClientConfigKey<Integer> MaxRetriesPerServerPrimeConnection = new CommonClientConfigKey<Integer>("MaxRetriesPerServerPrimeConnection"){};
 32 
 33     public static final IClientConfigKey<Integer> MaxTotalTimeToPrimeConnections = new CommonClientConfigKey<Integer>("MaxTotalTimeToPrimeConnections"){};
 34 
 35     public static final IClientConfigKey<Float> MinPrimeConnectionsRatio = new CommonClientConfigKey<Float>("MinPrimeConnectionsRatio"){};
 36 
 37     public static final IClientConfigKey<String> PrimeConnectionsURI = new CommonClientConfigKey<String>("PrimeConnectionsURI"){};
 38 
 39     public static final IClientConfigKey<Integer> PoolMaxThreads = new CommonClientConfigKey<Integer>("PoolMaxThreads"){};
 40 
 41     public static final IClientConfigKey<Integer> PoolMinThreads = new CommonClientConfigKey<Integer>("PoolMinThreads"){};
 42 
 43     public static final IClientConfigKey<Integer> PoolKeepAliveTime = new CommonClientConfigKey<Integer>("PoolKeepAliveTime"){};
 44 
 45     public static final IClientConfigKey<String> PoolKeepAliveTimeUnits = new CommonClientConfigKey<String>("PoolKeepAliveTimeUnits"){};
 46 
 47     public static final IClientConfigKey<Boolean> EnableConnectionPool = new CommonClientConfigKey<Boolean>("EnableConnectionPool") {};
 48 
 49     /**
 50      * Use {@link #MaxConnectionsPerHost}
 51      */
 52     @Deprecated
 53     public static final IClientConfigKey<Integer> MaxHttpConnectionsPerHost = new CommonClientConfigKey<Integer>("MaxHttpConnectionsPerHost"){};
 54 
 55     /**
 56      * Use {@link #MaxTotalConnections}
 57      */
 58     @Deprecated
 59     public static final IClientConfigKey<Integer> MaxTotalHttpConnections = new CommonClientConfigKey<Integer>("MaxTotalHttpConnections"){};
 60 
 61     public static final IClientConfigKey<Integer> MaxConnectionsPerHost = new CommonClientConfigKey<Integer>("MaxConnectionsPerHost"){};
 62 
 63     public static final IClientConfigKey<Integer> MaxTotalConnections = new CommonClientConfigKey<Integer>("MaxTotalConnections"){};
 64 
 65     public static final IClientConfigKey<Boolean> IsSecure = new CommonClientConfigKey<Boolean>("IsSecure"){};
 66 
 67     public static final IClientConfigKey<Boolean> GZipPayload = new CommonClientConfigKey<Boolean>("GZipPayload"){};
 68 
 69     public static final IClientConfigKey<Integer> ConnectTimeout = new CommonClientConfigKey<Integer>("ConnectTimeout"){};
 70 
 71     public static final IClientConfigKey<Integer> BackoffInterval = new CommonClientConfigKey<Integer>("BackoffTimeout"){};
 72 
 73     public static final IClientConfigKey<Integer> ReadTimeout = new CommonClientConfigKey<Integer>("ReadTimeout"){};
 74 
 75     public static final IClientConfigKey<Integer> SendBufferSize = new CommonClientConfigKey<Integer>("SendBufferSize"){};
 76 
 77     public static final IClientConfigKey<Boolean> StaleCheckingEnabled = new CommonClientConfigKey<Boolean>("StaleCheckingEnabled"){};
 78 
 79     public static final IClientConfigKey<Integer> Linger = new CommonClientConfigKey<Integer>("Linger"){};
 80 
 81     public static final IClientConfigKey<Integer> ConnectionManagerTimeout = new CommonClientConfigKey<Integer>("ConnectionManagerTimeout"){};
 82 
 83     public static final IClientConfigKey<Boolean> FollowRedirects = new CommonClientConfigKey<Boolean>("FollowRedirects"){};
 84 
 85     public static final IClientConfigKey<Boolean> ConnectionPoolCleanerTaskEnabled = new CommonClientConfigKey<Boolean>("ConnectionPoolCleanerTaskEnabled"){};
 86 
 87     public static final IClientConfigKey<Integer> ConnIdleEvictTimeMilliSeconds = new CommonClientConfigKey<Integer>("ConnIdleEvictTimeMilliSeconds"){};
 88 
 89     public static final IClientConfigKey<Integer> ConnectionCleanerRepeatInterval = new CommonClientConfigKey<Integer>("ConnectionCleanerRepeatInterval"){};
 90 
 91     public static final IClientConfigKey<Boolean> EnableGZIPContentEncodingFilter = new CommonClientConfigKey<Boolean>("EnableGZIPContentEncodingFilter"){};
 92 
 93     public static final IClientConfigKey<String> ProxyHost = new CommonClientConfigKey<String>("ProxyHost"){};
 94 
 95     public static final IClientConfigKey<Integer> ProxyPort = new CommonClientConfigKey<Integer>("ProxyPort"){};
 96 
 97     public static final IClientConfigKey<String> KeyStore = new CommonClientConfigKey<String>("KeyStore"){};
 98 
 99     public static final IClientConfigKey<String> KeyStorePassword = new CommonClientConfigKey<String>("KeyStorePassword"){};
100 
101     public static final IClientConfigKey<String> TrustStore = new CommonClientConfigKey<String>("TrustStore"){};
102 
103     public static final IClientConfigKey<String> TrustStorePassword = new CommonClientConfigKey<String>("TrustStorePassword"){};
104 
105     // if this is a secure rest client, must we use client auth too?
106     public static final IClientConfigKey<Boolean> IsClientAuthRequired = new CommonClientConfigKey<Boolean>("IsClientAuthRequired"){};
107 
108     public static final IClientConfigKey<String> CustomSSLSocketFactoryClassName = new CommonClientConfigKey<String>("CustomSSLSocketFactoryClassName"){};
109      // must host name match name in certificate?
110     public static final IClientConfigKey<Boolean> IsHostnameValidationRequired = new CommonClientConfigKey<Boolean>("IsHostnameValidationRequired"){};
111 
112     // see also http://hc.apache.org/httpcomponents-client-ga/tutorial/html/advanced.html
113     public static final IClientConfigKey<Boolean> IgnoreUserTokenInConnectionPoolForSecureClient = new CommonClientConfigKey<Boolean>("IgnoreUserTokenInConnectionPoolForSecureClient"){};
114 
115     // Client implementation
116     public static final IClientConfigKey<String> ClientClassName = new CommonClientConfigKey<String>("ClientClassName"){};
117 
118     //LoadBalancer Related
119     public static final IClientConfigKey<Boolean> InitializeNFLoadBalancer = new CommonClientConfigKey<Boolean>("InitializeNFLoadBalancer"){};
120 
121     public static final IClientConfigKey<String> NFLoadBalancerClassName = new CommonClientConfigKey<String>("NFLoadBalancerClassName"){};
122 
123     public static final IClientConfigKey<String> NFLoadBalancerRuleClassName = new CommonClientConfigKey<String>("NFLoadBalancerRuleClassName"){};
124 
125     public static final IClientConfigKey<String> NFLoadBalancerPingClassName = new CommonClientConfigKey<String>("NFLoadBalancerPingClassName"){};
126 
127     public static final IClientConfigKey<Integer> NFLoadBalancerPingInterval = new CommonClientConfigKey<Integer>("NFLoadBalancerPingInterval"){};
128 
129     public static final IClientConfigKey<Integer> NFLoadBalancerMaxTotalPingTime = new CommonClientConfigKey<Integer>("NFLoadBalancerMaxTotalPingTime"){};
130 
131     public static final IClientConfigKey<String> NFLoadBalancerStatsClassName = new CommonClientConfigKey<String>("NFLoadBalancerStatsClassName"){};
132 
133     public static final IClientConfigKey<String> NIWSServerListClassName = new CommonClientConfigKey<String>("NIWSServerListClassName"){};
134 
135     public static final IClientConfigKey<String> ServerListUpdaterClassName = new CommonClientConfigKey<String>("ServerListUpdaterClassName"){};
136 
137     public static final IClientConfigKey<String> NIWSServerListFilterClassName = new CommonClientConfigKey<String>("NIWSServerListFilterClassName"){};
138 
139     public static final IClientConfigKey<Integer> ServerListRefreshInterval = new CommonClientConfigKey<Integer>("ServerListRefreshInterval"){};
140 
141     public static final IClientConfigKey<Boolean> EnableMarkingServerDownOnReachingFailureLimit = new CommonClientConfigKey<Boolean>("EnableMarkingServerDownOnReachingFailureLimit"){};
142 
143     public static final IClientConfigKey<Integer> ServerDownFailureLimit = new CommonClientConfigKey<Integer>("ServerDownFailureLimit"){};
144 
145     public static final IClientConfigKey<Integer> ServerDownStatWindowInMillis = new CommonClientConfigKey<Integer>("ServerDownStatWindowInMillis"){};
146 
147     public static final IClientConfigKey<Boolean> EnableZoneAffinity = new CommonClientConfigKey<Boolean>("EnableZoneAffinity"){};
148 
149     public static final IClientConfigKey<Boolean> EnableZoneExclusivity = new CommonClientConfigKey<Boolean>("EnableZoneExclusivity"){};
150 
151     public static final IClientConfigKey<Boolean> PrioritizeVipAddressBasedServers = new CommonClientConfigKey<Boolean>("PrioritizeVipAddressBasedServers"){};
152 
153     public static final IClientConfigKey<String> VipAddressResolverClassName = new CommonClientConfigKey<String>("VipAddressResolverClassName"){};
154 
155     public static final IClientConfigKey<String> TargetRegion = new CommonClientConfigKey<String>("TargetRegion"){};
156 
157     public static final IClientConfigKey<String> RulePredicateClasses = new CommonClientConfigKey<String>("RulePredicateClasses"){};
158 
159     public static final IClientConfigKey<String> RequestIdHeaderName = new CommonClientConfigKey<String>("RequestIdHeaderName") {};
160 
161     public static final IClientConfigKey<Boolean> UseIPAddrForServer = new CommonClientConfigKey<Boolean>("UseIPAddrForServer") {};
162 
163     public static final IClientConfigKey<String> ListOfServers = new CommonClientConfigKey<String>("listOfServers") {};
164 
165     private static final Set<IClientConfigKey> keys = new HashSet<IClientConfigKey>();
166 
167     // ...
168 }
View Code

進入到 DefaultClientConfigImpl,可以看到 CommonClientConfigKey 中的每個配置都對應了一個默認值。在加載配置的時候,如果用戶沒有定制配置,就會使用默認的配置。

  1 public class DefaultClientConfigImpl implements IClientConfig {
  2 
  3     public static final Boolean DEFAULT_PRIORITIZE_VIP_ADDRESS_BASED_SERVERS = Boolean.TRUE;
  4 
  5     public static final String DEFAULT_NFLOADBALANCER_PING_CLASSNAME = "com.netflix.loadbalancer.DummyPing"; // DummyPing.class.getName();
  6 
  7     public static final String DEFAULT_NFLOADBALANCER_RULE_CLASSNAME = "com.netflix.loadbalancer.AvailabilityFilteringRule";
  8 
  9     public static final String DEFAULT_NFLOADBALANCER_CLASSNAME = "com.netflix.loadbalancer.ZoneAwareLoadBalancer";
 10 
 11     public static final boolean DEFAULT_USEIPADDRESS_FOR_SERVER = Boolean.FALSE;
 12 
 13     public static final String DEFAULT_CLIENT_CLASSNAME = "com.netflix.niws.client.http.RestClient";
 14 
 15     public static final String DEFAULT_VIPADDRESS_RESOLVER_CLASSNAME = "com.netflix.client.SimpleVipAddressResolver";
 16 
 17     public static final String DEFAULT_PRIME_CONNECTIONS_URI = "/";
 18 
 19     public static final int DEFAULT_MAX_TOTAL_TIME_TO_PRIME_CONNECTIONS = 30000;
 20 
 21     public static final int DEFAULT_MAX_RETRIES_PER_SERVER_PRIME_CONNECTION = 9;
 22 
 23     public static final Boolean DEFAULT_ENABLE_PRIME_CONNECTIONS = Boolean.FALSE;
 24 
 25     public static final int DEFAULT_MAX_REQUESTS_ALLOWED_PER_WINDOW = Integer.MAX_VALUE;
 26 
 27     public static final int DEFAULT_REQUEST_THROTTLING_WINDOW_IN_MILLIS = 60000;
 28 
 29     public static final Boolean DEFAULT_ENABLE_REQUEST_THROTTLING = Boolean.FALSE;
 30 
 31     public static final Boolean DEFAULT_ENABLE_GZIP_CONTENT_ENCODING_FILTER = Boolean.FALSE;
 32 
 33     public static final Boolean DEFAULT_CONNECTION_POOL_CLEANER_TASK_ENABLED = Boolean.TRUE;
 34 
 35     public static final Boolean DEFAULT_FOLLOW_REDIRECTS = Boolean.FALSE;
 36 
 37     public static final float DEFAULT_PERCENTAGE_NIWS_EVENT_LOGGED = 0.0f;
 38 
 39     public static final int DEFAULT_MAX_AUTO_RETRIES_NEXT_SERVER = 1;
 40 
 41     public static final int DEFAULT_MAX_AUTO_RETRIES = 0;
 42 
 43     public static final int DEFAULT_BACKOFF_INTERVAL = 0;
 44 
 45     public static final int DEFAULT_READ_TIMEOUT = 5000;
 46 
 47     public static final int DEFAULT_CONNECTION_MANAGER_TIMEOUT = 2000;
 48 
 49     public static final int DEFAULT_CONNECT_TIMEOUT = 2000;
 50 
 51     public static final Boolean DEFAULT_ENABLE_CONNECTION_POOL = Boolean.TRUE;
 52 
 53     @Deprecated
 54     public static final int DEFAULT_MAX_HTTP_CONNECTIONS_PER_HOST = 50;
 55 
 56     @Deprecated
 57     public static final int DEFAULT_MAX_TOTAL_HTTP_CONNECTIONS = 200;
 58 
 59     public static final int DEFAULT_MAX_CONNECTIONS_PER_HOST = 50;
 60 
 61     public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 200;
 62 
 63     public static final float DEFAULT_MIN_PRIME_CONNECTIONS_RATIO = 1.0f;
 64 
 65     public static final String DEFAULT_PRIME_CONNECTIONS_CLASS = "com.netflix.niws.client.http.HttpPrimeConnection";
 66 
 67     public static final String DEFAULT_SEVER_LIST_CLASS = "com.netflix.loadbalancer.ConfigurationBasedServerList";
 68 
 69     public static final String DEFAULT_SERVER_LIST_UPDATER_CLASS = "com.netflix.loadbalancer.PollingServerListUpdater";
 70 
 71     public static final int DEFAULT_CONNECTION_IDLE_TIMERTASK_REPEAT_IN_MSECS = 30000; // every half minute (30 secs)
 72 
 73     public static final int DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS = 30000; // all connections idle for 30 secs
 74 
 75     protected volatile Map<String, Object> properties = new ConcurrentHashMap<String, Object>();
 76 
 77     protected Map<IClientConfigKey<?>, Object> typedProperties = new ConcurrentHashMap<IClientConfigKey<?>, Object>();
 78 
 79     private static final Logger LOG = LoggerFactory.getLogger(DefaultClientConfigImpl.class);
 80 
 81     private String clientName = null;
 82 
 83     private VipAddressResolver resolver = null;
 84 
 85     private boolean enableDynamicProperties = true;
 86     /**
 87      * Defaults for the parameters for the thread pool used by batchParallel
 88      * calls
 89      */
 90     public static final int DEFAULT_POOL_MAX_THREADS = DEFAULT_MAX_TOTAL_HTTP_CONNECTIONS;
 91     public static final int DEFAULT_POOL_MIN_THREADS = 1;
 92     public static final long DEFAULT_POOL_KEEP_ALIVE_TIME = 15 * 60L;
 93     public static final TimeUnit DEFAULT_POOL_KEEP_ALIVE_TIME_UNITS = TimeUnit.SECONDS;
 94     public static final Boolean DEFAULT_ENABLE_ZONE_AFFINITY = Boolean.FALSE;
 95     public static final Boolean DEFAULT_ENABLE_ZONE_EXCLUSIVITY = Boolean.FALSE;
 96     public static final int DEFAULT_PORT = 7001;
 97     public static final Boolean DEFAULT_ENABLE_LOADBALANCER = Boolean.TRUE;
 98 
 99     public static final String DEFAULT_PROPERTY_NAME_SPACE = "ribbon";
100 
101     private String propertyNameSpace = DEFAULT_PROPERTY_NAME_SPACE;
102 
103     public static final Boolean DEFAULT_OK_TO_RETRY_ON_ALL_OPERATIONS = Boolean.FALSE;
104 
105     public static final Boolean DEFAULT_ENABLE_NIWS_EVENT_LOGGING = Boolean.TRUE;
106 
107     public static final Boolean DEFAULT_IS_CLIENT_AUTH_REQUIRED = Boolean.FALSE;
108 
109     private final Map<String, DynamicStringProperty> dynamicProperties = new ConcurrentHashMap<String, DynamicStringProperty>();
110 
111     public Boolean getDefaultPrioritizeVipAddressBasedServers() {
112         return DEFAULT_PRIORITIZE_VIP_ADDRESS_BASED_SERVERS;
113     }
114 
115     public String getDefaultNfloadbalancerPingClassname() {
116         return DEFAULT_NFLOADBALANCER_PING_CLASSNAME;
117     }
118 
119     public String getDefaultNfloadbalancerRuleClassname() {
120         return DEFAULT_NFLOADBALANCER_RULE_CLASSNAME;
121     }
122 
123     public String getDefaultNfloadbalancerClassname() {
124         return DEFAULT_NFLOADBALANCER_CLASSNAME;
125     }
126 
127     public boolean getDefaultUseIpAddressForServer() {
128         return DEFAULT_USEIPADDRESS_FOR_SERVER;
129     }
130 
131     public String getDefaultClientClassname() {
132         return DEFAULT_CLIENT_CLASSNAME;
133     }
134 
135     public String getDefaultVipaddressResolverClassname() {
136         return DEFAULT_VIPADDRESS_RESOLVER_CLASSNAME;
137     }
138 
139     public String getDefaultPrimeConnectionsUri() {
140         return DEFAULT_PRIME_CONNECTIONS_URI;
141     }
142 
143     public int getDefaultMaxTotalTimeToPrimeConnections() {
144         return DEFAULT_MAX_TOTAL_TIME_TO_PRIME_CONNECTIONS;
145     }
146 
147     public int getDefaultMaxRetriesPerServerPrimeConnection() {
148         return DEFAULT_MAX_RETRIES_PER_SERVER_PRIME_CONNECTION;
149     }
150 
151     public Boolean getDefaultEnablePrimeConnections() {
152         return DEFAULT_ENABLE_PRIME_CONNECTIONS;
153     }
154 
155     public int getDefaultMaxRequestsAllowedPerWindow() {
156         return DEFAULT_MAX_REQUESTS_ALLOWED_PER_WINDOW;
157     }
158 
159     public int getDefaultRequestThrottlingWindowInMillis() {
160         return DEFAULT_REQUEST_THROTTLING_WINDOW_IN_MILLIS;
161     }
162 
163     public Boolean getDefaultEnableRequestThrottling() {
164         return DEFAULT_ENABLE_REQUEST_THROTTLING;
165     }
166 
167     public Boolean getDefaultEnableGzipContentEncodingFilter() {
168         return DEFAULT_ENABLE_GZIP_CONTENT_ENCODING_FILTER;
169     }
170 
171     public Boolean getDefaultConnectionPoolCleanerTaskEnabled() {
172         return DEFAULT_CONNECTION_POOL_CLEANER_TASK_ENABLED;
173     }
174 
175     public Boolean getDefaultFollowRedirects() {
176         return DEFAULT_FOLLOW_REDIRECTS;
177     }
178 
179     public float getDefaultPercentageNiwsEventLogged() {
180         return DEFAULT_PERCENTAGE_NIWS_EVENT_LOGGED;
181     }
182 
183     public int getDefaultMaxAutoRetriesNextServer() {
184         return DEFAULT_MAX_AUTO_RETRIES_NEXT_SERVER;
185     }
186 
187     public int getDefaultMaxAutoRetries() {
188         return DEFAULT_MAX_AUTO_RETRIES;
189     }
190 
191     public int getDefaultReadTimeout() {
192         return DEFAULT_READ_TIMEOUT;
193     }
194 
195     public int getDefaultConnectionManagerTimeout() {
196         return DEFAULT_CONNECTION_MANAGER_TIMEOUT;
197     }
198 
199     public int getDefaultConnectTimeout() {
200         return DEFAULT_CONNECT_TIMEOUT;
201     }
202 
203     @Deprecated
204     public int getDefaultMaxHttpConnectionsPerHost() {
205         return DEFAULT_MAX_HTTP_CONNECTIONS_PER_HOST;
206     }
207 
208     @Deprecated
209     public int getDefaultMaxTotalHttpConnections() {
210         return DEFAULT_MAX_TOTAL_HTTP_CONNECTIONS;
211     }
212 
213     public int getDefaultMaxConnectionsPerHost() {
214         return DEFAULT_MAX_CONNECTIONS_PER_HOST;
215     }
216 
217     public int getDefaultMaxTotalConnections() {
218         return DEFAULT_MAX_TOTAL_CONNECTIONS;
219     }
220 
221     public float getDefaultMinPrimeConnectionsRatio() {
222         return DEFAULT_MIN_PRIME_CONNECTIONS_RATIO;
223     }
224 
225     public String getDefaultPrimeConnectionsClass() {
226         return DEFAULT_PRIME_CONNECTIONS_CLASS;
227     }
228 
229     public String getDefaultSeverListClass() {
230         return DEFAULT_SEVER_LIST_CLASS;
231     }
232 
233     public int getDefaultConnectionIdleTimertaskRepeatInMsecs() {
234         return DEFAULT_CONNECTION_IDLE_TIMERTASK_REPEAT_IN_MSECS;
235     }
236 
237     public int getDefaultConnectionidleTimeInMsecs() {
238         return DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS;
239     }
240 
241     public VipAddressResolver getResolver() {
242         return resolver;
243     }
244 
245     public boolean isEnableDynamicProperties() {
246         return enableDynamicProperties;
247     }
248 
249     public int getDefaultPoolMaxThreads() {
250         return DEFAULT_POOL_MAX_THREADS;
251     }
252 
253     public int getDefaultPoolMinThreads() {
254         return DEFAULT_POOL_MIN_THREADS;
255     }
256 
257     public long getDefaultPoolKeepAliveTime() {
258         return DEFAULT_POOL_KEEP_ALIVE_TIME;
259     }
260 
261     public TimeUnit getDefaultPoolKeepAliveTimeUnits() {
262         return DEFAULT_POOL_KEEP_ALIVE_TIME_UNITS;
263     }
264 
265     public Boolean getDefaultEnableZoneAffinity() {
266         return DEFAULT_ENABLE_ZONE_AFFINITY;
267     }
268 
269     public Boolean getDefaultEnableZoneExclusivity() {
270         return DEFAULT_ENABLE_ZONE_EXCLUSIVITY;
271     }
272 
273     public int getDefaultPort() {
274         return DEFAULT_PORT;
275     }
276 
277     public Boolean getDefaultEnableLoadbalancer() {
278         return DEFAULT_ENABLE_LOADBALANCER;
279     }
280 
281 
282     public Boolean getDefaultOkToRetryOnAllOperations() {
283         return DEFAULT_OK_TO_RETRY_ON_ALL_OPERATIONS;
284     }
285 
286     public Boolean getDefaultIsClientAuthRequired(){
287         return DEFAULT_IS_CLIENT_AUTH_REQUIRED;
288     }
289 
290 
291     /**
292      * Create instance with no properties in default name space {@link #DEFAULT_PROPERTY_NAME_SPACE}
293      */
294     public DefaultClientConfigImpl() {
295         this.dynamicProperties.clear();
296         this.enableDynamicProperties = false;
297     }
298 
299     /**
300      * Create instance with no properties in the specified name space
301      */
302     public DefaultClientConfigImpl(String nameSpace) {
303         this();
304         this.propertyNameSpace = nameSpace;
305     }
306 
307     public void loadDefaultValues() {
308         putDefaultIntegerProperty(CommonClientConfigKey.MaxHttpConnectionsPerHost, getDefaultMaxHttpConnectionsPerHost());
309         putDefaultIntegerProperty(CommonClientConfigKey.MaxTotalHttpConnections, getDefaultMaxTotalHttpConnections());
310         putDefaultBooleanProperty(CommonClientConfigKey.EnableConnectionPool, getDefaultEnableConnectionPool());
311         putDefaultIntegerProperty(CommonClientConfigKey.MaxConnectionsPerHost, getDefaultMaxConnectionsPerHost());
312         putDefaultIntegerProperty(CommonClientConfigKey.MaxTotalConnections, getDefaultMaxTotalConnections());
313         putDefaultIntegerProperty(CommonClientConfigKey.ConnectTimeout, getDefaultConnectTimeout());
314         putDefaultIntegerProperty(CommonClientConfigKey.ConnectionManagerTimeout, getDefaultConnectionManagerTimeout());
315         putDefaultIntegerProperty(CommonClientConfigKey.ReadTimeout, getDefaultReadTimeout());
316         putDefaultIntegerProperty(CommonClientConfigKey.MaxAutoRetries, getDefaultMaxAutoRetries());
317         putDefaultIntegerProperty(CommonClientConfigKey.MaxAutoRetriesNextServer, getDefaultMaxAutoRetriesNextServer());
318         putDefaultBooleanProperty(CommonClientConfigKey.OkToRetryOnAllOperations, getDefaultOkToRetryOnAllOperations());
319         putDefaultBooleanProperty(CommonClientConfigKey.FollowRedirects, getDefaultFollowRedirects());
320         putDefaultBooleanProperty(CommonClientConfigKey.ConnectionPoolCleanerTaskEnabled, getDefaultConnectionPoolCleanerTaskEnabled());
321         putDefaultIntegerProperty(CommonClientConfigKey.ConnIdleEvictTimeMilliSeconds, getDefaultConnectionidleTimeInMsecs());
322         putDefaultIntegerProperty(CommonClientConfigKey.ConnectionCleanerRepeatInterval, getDefaultConnectionIdleTimertaskRepeatInMsecs());
323         putDefaultBooleanProperty(CommonClientConfigKey.EnableGZIPContentEncodingFilter, getDefaultEnableGzipContentEncodingFilter());
324         String proxyHost = ConfigurationManager.getConfigInstance().getString(getDefaultPropName(CommonClientConfigKey.ProxyHost.key()));
325         if (proxyHost != null && proxyHost.length() > 0) {
326             setProperty(CommonClientConfigKey.ProxyHost, proxyHost);
327         }
328         Integer proxyPort = ConfigurationManager
329                 .getConfigInstance()
330                 .getInteger(
331                         getDefaultPropName(CommonClientConfigKey.ProxyPort),
332                         (Integer.MIN_VALUE + 1)); // + 1 just to avoid potential clash with user setting
333         if (proxyPort != (Integer.MIN_VALUE + 1)) {
334             setProperty(CommonClientConfigKey.ProxyPort, proxyPort);
335         }
336         putDefaultIntegerProperty(CommonClientConfigKey.Port, getDefaultPort());
337         putDefaultBooleanProperty(CommonClientConfigKey.EnablePrimeConnections, getDefaultEnablePrimeConnections());
338         putDefaultIntegerProperty(CommonClientConfigKey.MaxRetriesPerServerPrimeConnection, getDefaultMaxRetriesPerServerPrimeConnection());
339         putDefaultIntegerProperty(CommonClientConfigKey.MaxTotalTimeToPrimeConnections, getDefaultMaxTotalTimeToPrimeConnections());
340         putDefaultStringProperty(CommonClientConfigKey.PrimeConnectionsURI, getDefaultPrimeConnectionsUri());
341         putDefaultIntegerProperty(CommonClientConfigKey.PoolMinThreads, getDefaultPoolMinThreads());
342         putDefaultIntegerProperty(CommonClientConfigKey.PoolMaxThreads, getDefaultPoolMaxThreads());
343         putDefaultLongProperty(CommonClientConfigKey.PoolKeepAliveTime, getDefaultPoolKeepAliveTime());
344         putDefaultTimeUnitProperty(CommonClientConfigKey.PoolKeepAliveTimeUnits, getDefaultPoolKeepAliveTimeUnits());
345         putDefaultBooleanProperty(CommonClientConfigKey.EnableZoneAffinity, getDefaultEnableZoneAffinity());
346         putDefaultBooleanProperty(CommonClientConfigKey.EnableZoneExclusivity, getDefaultEnableZoneExclusivity());
347         putDefaultStringProperty(CommonClientConfigKey.ClientClassName, getDefaultClientClassname());
348         putDefaultStringProperty(CommonClientConfigKey.NFLoadBalancerClassName, getDefaultNfloadbalancerClassname());
349         putDefaultStringProperty(CommonClientConfigKey.NFLoadBalancerRuleClassName, getDefaultNfloadbalancerRuleClassname());
350         putDefaultStringProperty(CommonClientConfigKey.NFLoadBalancerPingClassName, getDefaultNfloadbalancerPingClassname());
351         putDefaultBooleanProperty(CommonClientConfigKey.PrioritizeVipAddressBasedServers, getDefaultPrioritizeVipAddressBasedServers());
352         putDefaultFloatProperty(CommonClientConfigKey.MinPrimeConnectionsRatio, getDefaultMinPrimeConnectionsRatio());
353         putDefaultStringProperty(CommonClientConfigKey.PrimeConnectionsClassName, getDefaultPrimeConnectionsClass());
354         putDefaultStringProperty(CommonClientConfigKey.NIWSServerListClassName, getDefaultSeverListClass());
355         putDefaultStringProperty(CommonClientConfigKey.VipAddressResolverClassName, getDefaultVipaddressResolverClassname());
356         putDefaultBooleanProperty(CommonClientConfigKey.IsClientAuthRequired, getDefaultIsClientAuthRequired());
357         // putDefaultStringProperty(CommonClientConfigKey.RequestIdHeaderName, getDefaultRequestIdHeaderName());
358         putDefaultBooleanProperty(CommonClientConfigKey.UseIPAddrForServer, getDefaultUseIpAddressForServer());
359         putDefaultStringProperty(CommonClientConfigKey.ListOfServers, "");
360     }
361 }
View Code

也可以在配置文件中定制配置,例如配置超時和重試:

 1 # 全局配置
 2 ribbon:
 3   # 客戶端讀取超時時間
 4   ReadTimeout: 3000
 5   # 客戶端連接超時時間
 6   ConnectTimeout: 3000
 7   # 默認只重試 GET,設置為 true 時將重試所有類型,如 POST、PUT、DELETE
 8   OkToRetryOnAllOperations: false
 9   # 重試次數
10   MaxAutoRetries: 1
11   # 最多重試幾個實例
12   MaxAutoRetriesNextServer: 1
13 
14 # 只針對 demo-producer 客戶端
15 demo-producer:
16   ribbon:
17     # 客戶端讀取超時時間
18     ReadTimeout: 5000
19     # 客戶端連接超時時間
20     ConnectTimeout: 3000

2、均衡策略 — IRule

IRule 是最終選擇 Server 的策略規則類,核心的接口就是 choose。

 1 public interface IRule{
 2 
 3     // 選擇 Server
 4     public Server choose(Object key);
 5 
 6     // 設置 ILoadBalancer
 7     public void setLoadBalancer(ILoadBalancer lb);
 8 
 9     // 獲取 ILoadBalancer
10     public ILoadBalancer getLoadBalancer();
11 }

Ribbon 提供了豐富的負載均衡策略,我們也可以通過配置指定使用某個均衡策略。下面是整個Ribbon提供的 IRule 均衡策略。

3、服務檢查 — IPing

IPing 是用於定期檢查 Server 的可用性的,它只提供了一個接口,用來判斷 Server 是否存活:

1 public interface IPing { 2 3 public boolean isAlive(Server server); 4 }

IPing 也提供了多種策略可選,下面是整個 IPing 體系結構:

4、獲取服務列表 — ServerList

ServerList 提供了兩個接口,一個是第一次獲取 Server 列表,一個是更新 Server 列表,其中 getUpdatedListOfServers 會每被 Loadbalancer 隔 30 秒調一次來更新 allServerList。

 1 public interface ServerList<T extends Server> {  2  3 public List<T> getInitialListOfServers();  4  5 /**  6  * Return updated list of servers. This is called say every 30 secs  7  * (configurable) by the Loadbalancer's Ping cycle  8 */  9 public List<T> getUpdatedListOfServers(); 10 }

ServerList 也提供了多種實現,ServerList 體系結構如下:

5、過濾服務 — ServerListFilter

ServerListFilter 提供了一個接口用來過濾出可用的 Server。

1 public interface ServerListFilter<T extends Server> {
2 
3     public List<T> getFilteredListOfServers(List<T> servers);
4 }

ServerListFilter 體系結構如下:

6、服務列表更新 — ServerListUpdater

ServerListUpdater 有多個接口,最核心的就是 start 開啟定時任務調用 updateAction 來更新 allServerList。

 1 public interface ServerListUpdater {  2  3 /**  4  * an interface for the updateAction that actually executes a server list update  5 */  6 public interface UpdateAction {  7 void doUpdate();  8  }  9 10 /** 11  * start the serverList updater with the given update action 12  * This call should be idempotent. 13 */ 14 void start(UpdateAction updateAction); 15 }

默認有兩個實現類:

7、負載均衡器 — ILoadBalancer

ILoadBalancer 是負載均衡選擇服務的核心接口,主要提供了如下的獲取Server列表和根據客戶端名稱選擇Server的接口。

 1 public interface ILoadBalancer {  2  3 // 添加Server  4 public void addServers(List<Server> newServers);  5  6 // 根據key選擇一個Server  7 public Server chooseServer(Object key);  8  9 // 獲取存活的Server列表,返回 upServerList 10 public List<Server> getReachableServers(); 11 12 // 獲取所有Server列表,返回 allServerList 13 public List<Server> getAllServers(); 14 }

ILoadBalancer 的體系結構如下:

8、Ribbon 相關配置類

從前面一直看下來,可以發現有很多與 Ribbon 相關的配置類,這里總結下與 Ribbon 相關的配置類,看每個配置類的配置順序,以及都主要配置了哪些東西。

① 首先是Eureka客戶端配置類 EurekaClientAutoConfiguration,這個自動化配置類主要配置了 Ribbon 所需的 EurekaClient。

 1 @Configuration(proxyBeanMethods = false)
 2 @EnableConfigurationProperties
 3 @ConditionalOnClass(EurekaClientConfig.class)
 4 @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
 5 @ConditionalOnDiscoveryEnabled
 6 @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
 7         CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
 8 @AutoConfigureAfter(name = {
 9         "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
10         "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
11         "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
12         "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
13 public class EurekaClientAutoConfiguration {
14     // ....
15 }
View Code

② 接着是Ribbon自動化配置類 RibbonAutoConfiguration,這個類主要配置了如下類:

  • SpringClientFactory:管理 Ribbon 客戶端上下文。
  • LoadBalancerClient:負載均衡客戶端,默認實現類為 RibbonLoadBalancerClient(實際是在 RibbonClientConfiguration 中配置的)。
  • PropertiesFactory:用於判斷配置文件中是否自定義了核心接口的實現類,如 NFLoadBalancerClassName、NFLoadBalancerPingClassName 等。
  • RibbonApplicationContextInitializer:開啟飢餓配置的時候,用這個類來在啟動時初始化 Ribbon 客戶端上下文。
 1 package org.springframework.cloud.netflix.ribbon;
 2 
 3 @Configuration
 4 @Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
 5 @RibbonClients
 6 // 在 EurekaClientAutoConfiguration 之后配置
 7 @AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
 8 // 在 LoadBalancerAutoConfiguration、AsyncLoadBalancerAutoConfiguration 之前配置
 9 @AutoConfigureBefore({ LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class })
10 @EnableConfigurationProperties({ RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class })
11 public class RibbonAutoConfiguration {
12 
13     @Autowired(required = false)
14     private List<RibbonClientSpecification> configurations = new ArrayList<>();
15 
16     @Autowired
17     private RibbonEagerLoadProperties ribbonEagerLoadProperties;
18 
19     @Bean
20     public HasFeatures ribbonFeature() {
21         return HasFeatures.namedFeature("Ribbon", Ribbon.class);
22     }
23 
24     @Bean
25     @ConditionalOnMissingBean
26     public SpringClientFactory springClientFactory() {
27         SpringClientFactory factory = new SpringClientFactory();
28         factory.setConfigurations(this.configurations);
29         return factory;
30     }
31 
32     @Bean
33     @ConditionalOnMissingBean(LoadBalancerClient.class)
34     public LoadBalancerClient loadBalancerClient() {
35         return new RibbonLoadBalancerClient(springClientFactory());
36     }
37 
38     @Bean
39     @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
40     @ConditionalOnMissingBean
41     public LoadBalancedRetryFactory loadBalancedRetryPolicyFactory(final SpringClientFactory clientFactory) {
42         return new RibbonLoadBalancedRetryFactory(clientFactory);
43     }
44 
45     @Bean
46     @ConditionalOnMissingBean
47     public PropertiesFactory propertiesFactory() {
48         return new PropertiesFactory();
49     }
50 
51     @Bean
52     @ConditionalOnProperty("ribbon.eager-load.enabled")
53     public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() {
54         return new RibbonApplicationContextInitializer(springClientFactory(), ribbonEagerLoadProperties.getClients());
55     }
56 }
View Code

③ 接着是負載均衡器配置類 LoadBalancerAutoConfiguration,這個類主要是創建了負載均衡攔截器 LoadBalancerInterceptor,並添加到 RestTemplae 的攔截器中。

 1 package org.springframework.cloud.client.loadbalancer;
 2 
 3 @Configuration(proxyBeanMethods = false)
 4 @ConditionalOnClass(RestTemplate.class)
 5 @ConditionalOnBean(LoadBalancerClient.class)
 6 @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
 7 public class LoadBalancerAutoConfiguration {
 8 
 9     @LoadBalanced
10     @Autowired(required = false)
11     private List<RestTemplate> restTemplates = Collections.emptyList();
12 
13     @Autowired(required = false)
14     private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
15 
16     // 對 RestTemplate 定制化
17     @Bean
18     public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
19             final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
20         return () -> restTemplateCustomizers.ifAvailable(customizers -> {
21             for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
22                 for (RestTemplateCustomizer customizer : customizers) {
23                     customizer.customize(restTemplate);
24                 }
25             }
26         });
27     }
28 
29     @Bean
30     @ConditionalOnMissingBean
31     public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
32         return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
33     }
34 
35     @Configuration(proxyBeanMethods = false)
36     @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
37     static class LoadBalancerInterceptorConfig {
38 
39         // 創建 RestTemplate 攔截器
40         @Bean
41         public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient,
42                                                          LoadBalancerRequestFactory requestFactory) {
43             return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
44         }
45 
46         @Bean
47         @ConditionalOnMissingBean
48         public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
49             return restTemplate -> {
50                 List<ClientHttpRequestInterceptor> list = new ArrayList<>(
51                         restTemplate.getInterceptors());
52                 list.add(loadBalancerInterceptor);
53                 restTemplate.setInterceptors(list);
54             };
55         }
56 
57     }
58 }
View Code

④ 之后是默認的 Ribbon 客戶端配置類 RibbonClientConfiguration,這個類主要配置了 Ribbon 核心接口的默認實現。

  • IClientConfig:Ribbon 客戶端配置類,默認實現是 DefaultClientConfigImpl。
  • IRule:負載均衡策略規則組件,默認實現是 ZoneAvoidanceRule。
  • IPing:判斷 Server 是否存活,默認實現是 DummyPing,永遠都是返回 true。
  • ServerList:獲取 Server 的組件,默認實現類為 ConfigurationBasedServerList,從配置文件獲取。
  • ServerListUpdater:Server 列表更新組件,默認實現類為 PollingServerListUpdater。
  • ServerListFilter:過濾可用的 Server 列表,默認實現類為 ZonePreferenceServerListFilter。
  • RibbonLoadBalancerContext:負載均衡客戶端。
  • RetryHandler:重試處理器,默認實現類為 DefaultLoadBalancerRetryHandler。
  1 package org.springframework.cloud.netflix.ribbon;
  2 
  3 @SuppressWarnings("deprecation")
  4 @Configuration(proxyBeanMethods = false)
  5 @EnableConfigurationProperties
  6 @Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
  7         RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
  8 public class RibbonClientConfiguration {
  9     public static final int DEFAULT_CONNECT_TIMEOUT = 1000;
 10     public static final int DEFAULT_READ_TIMEOUT = 1000;
 11     public static final boolean DEFAULT_GZIP_PAYLOAD = true;
 12 
 13     @RibbonClientName
 14     private String name = "client";
 15 
 16     @Autowired
 17     private PropertiesFactory propertiesFactory;
 18 
 19     @Bean
 20     @ConditionalOnMissingBean
 21     public IClientConfig ribbonClientConfig() {
 22         DefaultClientConfigImpl config = new DefaultClientConfigImpl();
 23         config.loadProperties(this.name);
 24         config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
 25         config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
 26         config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
 27         return config;
 28     }
 29 
 30     @Bean
 31     @ConditionalOnMissingBean
 32     public IRule ribbonRule(IClientConfig config) {
 33         if (this.propertiesFactory.isSet(IRule.class, name)) {
 34             return this.propertiesFactory.get(IRule.class, config, name);
 35         }
 36         ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
 37         rule.initWithNiwsConfig(config);
 38         return rule;
 39     }
 40 
 41     @Bean
 42     @ConditionalOnMissingBean
 43     public IPing ribbonPing(IClientConfig config) {
 44         if (this.propertiesFactory.isSet(IPing.class, name)) {
 45             return this.propertiesFactory.get(IPing.class, config, name);
 46         }
 47         return new DummyPing();
 48     }
 49 
 50     @Bean
 51     @ConditionalOnMissingBean
 52     @SuppressWarnings("unchecked")
 53     public ServerList<Server> ribbonServerList(IClientConfig config) {
 54         if (this.propertiesFactory.isSet(ServerList.class, name)) {
 55             return this.propertiesFactory.get(ServerList.class, config, name);
 56         }
 57         ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
 58         serverList.initWithNiwsConfig(config);
 59         return serverList;
 60     }
 61 
 62     @Bean
 63     @ConditionalOnMissingBean
 64     public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
 65         return new PollingServerListUpdater(config);
 66     }
 67 
 68     @Bean
 69     @ConditionalOnMissingBean
 70     public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
 71             ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
 72             IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
 73         if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
 74             return this.propertiesFactory.get(ILoadBalancer.class, config, name);
 75         }
 76         return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
 77                 serverListFilter, serverListUpdater);
 78     }
 79 
 80     @Bean
 81     @ConditionalOnMissingBean
 82     @SuppressWarnings("unchecked")
 83     public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
 84         if (this.propertiesFactory.isSet(ServerListFilter.class, name)) {
 85             return this.propertiesFactory.get(ServerListFilter.class, config, name);
 86         }
 87         ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
 88         filter.initWithNiwsConfig(config);
 89         return filter;
 90     }
 91 
 92     @Bean
 93     @ConditionalOnMissingBean
 94     public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer,
 95             IClientConfig config, RetryHandler retryHandler) {
 96         return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler);
 97     }
 98 
 99     @Bean
100     @ConditionalOnMissingBean
101     public RetryHandler retryHandler(IClientConfig config) {
102         return new DefaultLoadBalancerRetryHandler(config);
103     }
104 
105     @Bean
106     @ConditionalOnMissingBean
107     public ServerIntrospector serverIntrospector() {
108         return new DefaultServerIntrospector();
109     }
110 }
View Code

⑤ Ribbon Eureka 自動化配置類 RibbonEurekaAutoConfiguration,判斷是否啟用 Ribbon Eureka,並觸發 EurekaRibbonClientConfiguration 配置類。

 1 package org.springframework.cloud.netflix.ribbon.eureka;
 2 
 3 @Configuration(proxyBeanMethods = false)
 4 @EnableConfigurationProperties
 5 @ConditionalOnRibbonAndEurekaEnabled
 6 @AutoConfigureAfter(RibbonAutoConfiguration.class)
 7 @RibbonClients(defaultConfiguration = EurekaRibbonClientConfiguration.class)
 8 public class RibbonEurekaAutoConfiguration {
 9 
10 }
View Code

⑥ 默認啟用 Ribbon Eureka 的情況下,會使用 Ribbon Eureka 客戶端配置類 EurekaRibbonClientConfiguration:

  • IPing:替換了默認實現類 DummyPing,改為 NIWSDiscoveryPing,通過判斷 InstanceInfo 的狀態是否為 UP 來判斷 Server 是否存活。
  • ServerList:替換了默認的實現類 ConfigurationBasedServerList,改為 DomainExtractingServerList,實際是 DiscoveryEnabledNIWSServerList,從 EurekaClient 獲取 Server 列表。
 1 package org.springframework.cloud.netflix.ribbon.eureka;
 2 
 3 @Configuration(proxyBeanMethods = false)
 4 public class EurekaRibbonClientConfiguration {
 5 
 6     private static final Log log = LogFactory.getLog(EurekaRibbonClientConfiguration.class);
 7 
 8     @Value("${ribbon.eureka.approximateZoneFromHostname:false}")
 9     private boolean approximateZoneFromHostname = false;
10     @RibbonClientName
11     private String serviceId = "client";
12     @Autowired(required = false)
13     private EurekaClientConfig clientConfig;
14     @Autowired(required = false)
15     private EurekaInstanceConfig eurekaConfig;
16     @Autowired
17     private PropertiesFactory propertiesFactory;
18 
19     public EurekaRibbonClientConfiguration() {
20     }
21 
22     public EurekaRibbonClientConfiguration(EurekaClientConfig clientConfig,
23             String serviceId, EurekaInstanceConfig eurekaConfig,
24             boolean approximateZoneFromHostname) {
25         this.clientConfig = clientConfig;
26         this.serviceId = serviceId;
27         this.eurekaConfig = eurekaConfig;
28         this.approximateZoneFromHostname = approximateZoneFromHostname;
29     }
30 
31     @Bean
32     @ConditionalOnMissingBean
33     public IPing ribbonPing(IClientConfig config) {
34         if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
35             return this.propertiesFactory.get(IPing.class, config, serviceId);
36         }
37         NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
38         ping.initWithNiwsConfig(config);
39         return ping;
40     }
41 
42     @Bean
43     @ConditionalOnMissingBean
44     public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
45         if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
46             return this.propertiesFactory.get(ServerList.class, config, serviceId);
47         }
48         DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
49                 config, eurekaClientProvider);
50         DomainExtractingServerList serverList = new DomainExtractingServerList(
51                 discoveryServerList, config, this.approximateZoneFromHostname);
52         return serverList;
53     }
54 }
View Code

⑦ 各個配置類所屬模塊

spring-cloud-netflix-eureka-client:

  • org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration
  • org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration
  • org.springframework.cloud.netflix.ribbon.eureka.EurekaRibbonClientConfiguration

spring-cloud-netflix-ribbon:

  • org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration
  • org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration

spring-cloud-commons:

  • org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration

六、Ribbon HTTP客戶端組件

1、Java HTTP 組件庫

① HTTP 組件庫

首先簡單了解下常用的 Java HTTP 組件庫,Ribbon 中通過不同的配置便可以啟用某個 HTTP 組件來進行服務間的通信。

Java 中的 HTTP 組件庫,大體可以分為三類:

  • JDK 自帶的標准庫 HttpURLConnection
  • Apache HttpComponents HttpClient
  • OkHttp

HttpURLConnection 發起 HTTP 請求最大的優點是不需要引入額外的依賴,但是 HttpURLConnection 封裝層次太低,使用起來非常繁瑣。支持的特性太少,缺乏連接池管理、域名機械控制,無法支持 HTTP/2等。

Apache HttpComponents HttpClient 和 OkHttp 都支持連接池管理、超時、空閑連接數控制等特性。OkHttp 接口設計更友好,且支持 HTTP/2,Android 開發中用得更多。

② 超時重試配置

先給 demo-consumer 中添加如下默認配置,即讀取、連接超時時間設置為 1 秒,這也是默認值。然后重試次數為1,重試一個Server。后面基於這些配置來驗證Ribbon HTTP客戶端的超時和重試。

 1 ribbon:
 2   # 客戶端讀取超時時間
 3   ReadTimeout: 1000
 4   # 客戶端連接超時時間
 5   ConnectTimeout: 1000
 6   # 默認只重試 GET,設置為 true 時將重試所有類型,如 POST、PUT、DELETE
 7   OkToRetryOnAllOperations: false
 8   # 重試次數
 9   MaxAutoRetries: 1
10   # 最多重試幾個實例
11   MaxAutoRetriesNextServer: 1

然后 demo-producer 的接口休眠3秒,造成網絡延遲的現象,並且 demo-producer 啟兩個實例。

1 @GetMapping("/v1/uuid")
2 public ResponseEntity<String> getUUID() throws InterruptedException {
3     String uuid = UUID.randomUUID().toString();
4     logger.info("generate uuid: {}", uuid);
5     Thread.sleep(3000);
6     return ResponseEntity.ok(uuid);
7 }

2、Ribbon 默認使用 HttpURLConnection

① Ribbon 默認的 HTTP 組件

在不添加其它配置的情況下,我們來看下 Ribbon 默認使用的 HTTP 組件是什么。

首先通過之前的分析可以知道,默認情況下,LoadBalancerAutoConfiguration 配置類會向 RestTemplate 添加 LoadBalancerInterceptor 攔截器。然后在 RestTemplate 調用時,即在 doExecute 方法中,創建 ClientHttpRequest 時,因為配置了攔截器,所以 ClientHttpRequestFactory 就是 InterceptingClientHttpRequestFactory,而且創建 InterceptingClientHttpRequestFactory 傳入的 ClientHttpRequestFactory 默認是父類的 SimpleClientHttpRequestFactory。

 1 protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
 2         @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
 3     //...
 4     ClientHttpResponse response = null;
 5     try {
 6         // 創建 ClientHttpRequest
 7         ClientHttpRequest request = createRequest(url, method);
 8         if (requestCallback != null) {
 9             requestCallback.doWithRequest(request);
10         }
11         // ClientHttpRequest 發起請求
12         response = request.execute();
13         handleResponse(url, method, response);
14         return (responseExtractor != null ? responseExtractor.extractData(response) : null);
15     }
16     catch (IOException ex) {
17         // ...
18     }
19     finally {
20         if (response != null) {
21             response.close();
22         }
23     }
24 }
25 
26 protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
27     // getRequestFactory 獲取 ClientHttpRequestFactory
28     ClientHttpRequest request = getRequestFactory().createRequest(url, method);
29     initialize(request);
30     if (logger.isDebugEnabled()) {
31         logger.debug("HTTP " + method.name() + " " + url);
32     }
33     return request;
34 }
35 
36 public ClientHttpRequestFactory getRequestFactory() {
37     List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
38     if (!CollectionUtils.isEmpty(interceptors)) {
39         ClientHttpRequestFactory factory = this.interceptingRequestFactory;
40         if (factory == null) {
41             // 有攔截器的情況,super.getRequestFactory() 默認返回的是 SimpleClientHttpRequestFactory
42             factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
43             this.interceptingRequestFactory = factory;
44         }
45         return factory;
46     }
47     else {
48         // 無攔截器的情況
49         return super.getRequestFactory();
50     }
51 }
View Code

InterceptingClientHttpRequestFactory 這個工廠類創建的 ClientHttpRequest 類型是 InterceptingClientHttpRequest。最終 RestTemplate 的 doExecute 方法中調用 ClientHttpRequest 的 execute 方法時,就調用到了 InterceptingClientHttpRequest 中的內部類 InterceptingRequestExecution 中。

在 InterceptingRequestExecution 的 execute 方法中,首先是遍歷所有攔截器對 RestTemplate 定制化,最后則通過 requestFactory 創建 ClientHttpRequest 來發起最終的 HTTP 調用。從這里可以看出,無論有沒有攔截器,其實最終都會使用 requestFactory 來創建 ClientHttpRequest。

 1 private class InterceptingRequestExecution implements ClientHttpRequestExecution {
 2     private final Iterator<ClientHttpRequestInterceptor> iterator;
 3 
 4     @Override
 5     public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
 6         if (this.iterator.hasNext()) {
 7             // 攔截器定制化 RestTemplate
 8             ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
 9             return nextInterceptor.intercept(request, body, this);
10         }
11         else {
12             HttpMethod method = request.getMethod();
13             // delegate => SimpleBufferingClientHttpRequest
14             ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
15             //...
16             return delegate.execute();
17         }
18     }
19 }

這里的 requestFactory 就是前面傳進來的 SimpleClientHttpRequestFactory,從它的 createRequest 方法可以看出,默認情況下,就是用的 JDK 標准 HTTP 庫組件 HttpURLConnection 來進行服務間的請求通信。

 1 public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IOException {
 2     // JDK 標准HTTP庫 HttpURLConnection
 3     HttpURLConnection connection = openConnection(uri.toURL(), this.proxy);
 4     prepareConnection(connection, httpMethod.name());
 5 
 6     if (this.bufferRequestBody) {
 7         return new SimpleBufferingClientHttpRequest(connection, this.outputStreaming);
 8     }
 9     else {
10         return new SimpleStreamingClientHttpRequest(connection, this.chunkSize, this.outputStreaming);
11     }
12 }

總結:

從前面的源碼分析可以看出,Ribbon 默認的 HTTP 客戶端是 HttpURLConnection。

在前面默認的超時配置下,可以驗證出超時配置並未生效,一直阻塞3秒后才返回了結果,說明 Ribbon 默認情況下就不支持超時重試。

而且 HttpURLConnection 每次都是新創建的,請求返回來之后就關閉連接,沒有連接池管理機制,網絡連接的建立和關閉本身就會損耗一定的性能,所以正式環境下,最好不要使用默認的配置。

② HttpClient 配置類

另外,我們從 RibbonClientConfiguration 配置類的定義可以看到,其導入了 HttpClientConfiguration、OkHttpRibbonConfiguration、RestClientRibbonConfiguration、HttpClientRibbonConfiguration 四個 HttpClient 的配置類,通過注釋也可以了解到,最后一個是默認配置類,前面三個在某些配置啟用的情況下才會生效。

1 @Configuration(proxyBeanMethods = false)
2 @EnableConfigurationProperties
3 // Order is important here, last should be the default, first should be optional
4 @Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
5         RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
6 public class RibbonClientConfiguration {
7 
8 }

進入 HttpClientRibbonConfiguration,這個配置類在 ribbon.httpclient.enabled=true 時才生效,而且默認為 true。在從 SpringClientFactory 中獲取 ILoadBalancer 時,會通過這個配置類初始化 HttpClient,按先后順序會初始化 HttpClientConnectionManager、CloseableHttpClient、RibbonLoadBalancingHttpClient。CloseableHttpClient 是 Apache HttpComponents HttpClient 中的組件,也就是說默認情況下應該是使用 apache HttpComponents 作為 HTTP 組件庫。

但經過前面源碼的分析,以及測試發現,最終其實走的的 HttpURLConnection,並沒有用到 CloseableHttpClient。把 ribbon.httpclient.enabled 設置為 false,也沒有什么影響,還是默認走 HttpURLConnection。我們后面再來分析這個問題。

 1 @Configuration(proxyBeanMethods = false)
 2 @ConditionalOnClass(name = "org.apache.http.client.HttpClient")
 3 // ribbon.httpclient.enabled more文為 true
 4 @ConditionalOnProperty(name = "ribbon.httpclient.enabled", matchIfMissing = true)
 5 public class HttpClientRibbonConfiguration {
 6 
 7     @RibbonClientName
 8     private String name = "client";
 9 
10     // RibbonLoadBalancingHttpClient
11     @Bean
12     @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
13     @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
14     public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient(
15             IClientConfig config, ServerIntrospector serverIntrospector,
16             ILoadBalancer loadBalancer, RetryHandler retryHandler,
17             CloseableHttpClient httpClient) {
18         RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient(
19                 httpClient, config, serverIntrospector);
20         client.setLoadBalancer(loadBalancer);
21         client.setRetryHandler(retryHandler);
22         Monitors.registerObject("Client_" + this.name, client);
23         return client;
24     }
25 
26     // 在引入了 spring-retry 時,即可以重試的 RetryTemplate 時,就創建 RetryableRibbonLoadBalancingHttpClient
27     @Bean
28     @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
29     @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
30     public RetryableRibbonLoadBalancingHttpClient retryableRibbonLoadBalancingHttpClient(
31             IClientConfig config, ServerIntrospector serverIntrospector,
32             ILoadBalancer loadBalancer, RetryHandler retryHandler,
33             LoadBalancedRetryFactory loadBalancedRetryFactory,
34             CloseableHttpClient httpClient,
35             RibbonLoadBalancerContext ribbonLoadBalancerContext) {
36         RetryableRibbonLoadBalancingHttpClient client = new RetryableRibbonLoadBalancingHttpClient(
37                 httpClient, config, serverIntrospector, loadBalancedRetryFactory);
38         client.setLoadBalancer(loadBalancer);
39         client.setRetryHandler(retryHandler);
40         client.setRibbonLoadBalancerContext(ribbonLoadBalancerContext);
41         Monitors.registerObject("Client_" + this.name, client);
42         return client;
43     }
44 
45     @Configuration(proxyBeanMethods = false)
46     protected static class ApacheHttpClientConfiguration {
47 
48         private final Timer connectionManagerTimer = new Timer(
49                 "RibbonApacheHttpClientConfiguration.connectionManagerTimer", true);
50 
51         private CloseableHttpClient httpClient;
52 
53         @Autowired(required = false)
54         private RegistryBuilder registryBuilder;
55 
56         // HttpClient 連接池管理器
57         @Bean
58         @ConditionalOnMissingBean(HttpClientConnectionManager.class)
59         public HttpClientConnectionManager httpClientConnectionManager(
60                 IClientConfig config,
61                 ApacheHttpClientConnectionManagerFactory connectionManagerFactory) {
62             RibbonProperties ribbon = RibbonProperties.from(config);
63             int maxTotalConnections = ribbon.maxTotalConnections();
64             int maxConnectionsPerHost = ribbon.maxConnectionsPerHost();
65             int timerRepeat = ribbon.connectionCleanerRepeatInterval();
66             long timeToLive = ribbon.poolKeepAliveTime();
67             TimeUnit ttlUnit = ribbon.getPoolKeepAliveTimeUnits();
68             final HttpClientConnectionManager connectionManager = connectionManagerFactory
69                     .newConnectionManager(false, maxTotalConnections,
70                             maxConnectionsPerHost, timeToLive, ttlUnit, registryBuilder);
71             this.connectionManagerTimer.schedule(new TimerTask() {
72                 @Override
73                 public void run() {
74                     connectionManager.closeExpiredConnections();
75                 }
76             }, 30000, timerRepeat);
77             return connectionManager;
78         }
79 
80         // HttpClient => CloseableHttpClient
81         @Bean
82         @ConditionalOnMissingBean(CloseableHttpClient.class)
83         public CloseableHttpClient httpClient(ApacheHttpClientFactory httpClientFactory,
84                 HttpClientConnectionManager connectionManager, IClientConfig config) {
85             RibbonProperties ribbon = RibbonProperties.from(config);
86             Boolean followRedirects = ribbon.isFollowRedirects();
87             Integer connectTimeout = ribbon.connectTimeout();
88             RequestConfig defaultRequestConfig = RequestConfig.custom()
89                     .setConnectTimeout(connectTimeout)
90                     .setRedirectsEnabled(followRedirects).build();
91             this.httpClient = httpClientFactory.createBuilder()
92                     .setDefaultRequestConfig(defaultRequestConfig)
93                     .setConnectionManager(connectionManager).build();
94             return httpClient;
95         }
96     }
97 }
View Code

③ 默認配置下的 RestTemplate 的調用過程大致可以用下圖來表示。

3、啟用 RestClient

① 啟用 RestClient

可以添加如下配置啟用 RestClient:

 1 ribbon:
 2   # 關閉 httpclient
 3   httpclient:
 4     enabled: false
 5   # 啟用 RestClient
 6   restclient:
 7     enabled: true
 8   # 啟用 RestClient
 9   http:
10     client:
11       enabled: true

進入 RestClientRibbonConfiguration  可以看到,只要 ribbon.http.client.enabled、ribbon.restclient.enabled 其中一個配置了啟用,就可以啟用 RestClient。

 1 @SuppressWarnings("deprecation")
 2 @Configuration(proxyBeanMethods = false)
 3 // 啟用條件 ConditionalOnRibbonRestClient
 4 @RibbonAutoConfiguration.ConditionalOnRibbonRestClient
 5 class RestClientRibbonConfiguration {
 6     @RibbonClientName
 7     private String name = "client";
 8 
 9     // RestClient 已過期
10     @Bean
11     @Lazy
12     @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
13     public RestClient ribbonRestClient(IClientConfig config, ILoadBalancer loadBalancer,
14             ServerIntrospector serverIntrospector, RetryHandler retryHandler) {
15         RestClient client = new RibbonClientConfiguration.OverrideRestClient(config, serverIntrospector);
16         client.setLoadBalancer(loadBalancer);
17         client.setRetryHandler(retryHandler);
18         return client;
19     }
20 
21 }
22 
23 @Target({ ElementType.TYPE, ElementType.METHOD })
24 @Retention(RetentionPolicy.RUNTIME)
25 @Documented
26 @Conditional(OnRibbonRestClientCondition.class)
27 @interface ConditionalOnRibbonRestClient {
28 }
29 
30 private static class OnRibbonRestClientCondition extends AnyNestedCondition {
31     @Deprecated // remove in Edgware"
32     @ConditionalOnProperty("ribbon.http.client.enabled")
33     static class ZuulProperty {
34     }
35 
36     @ConditionalOnProperty("ribbon.restclient.enabled")
37     static class RibbonProperty {
38     }
39 }

RestClient 繼承自 AbstractLoadBalancerAwareClient。需要注意的是,RestClient 已經過期,所以生產環境中我們就不要啟用 RestTemplate 了。

1 @Deprecated
2 public class RestClient extends AbstractLoadBalancerAwareClient<HttpRequest, HttpResponse> {
3 
4 }

② LoadBalancerContext 類體系結構

負載均衡上下文 LoadBalancerContext 體系的類結構如下。可以看出,Ribbon 是支持 Feign、OkHttp、HttpClient、RestClient 的。默認配置下使用的實現類是 RibbonLoadBalancerContext。

③ RestTemplate 的 ClientHttpRequest 工廠類配置

接着看 RibbonAutoConfiguration 中有如下的配置,跟前面 RestClientRibbonConfiguration 也是一樣,滿足 @ConditionalOnRibbonRestClient 的條件。

可以看到,它會創建 RibbonClientHttpRequestFactory 並設置到 RestTemplate 中,也就是說,這時 RestTemplate 中的 requestFactory 就不是默認的 SimpleClientHttpRequestFactory 了,而是 RibbonClientHttpRequestFactory。

 1 @Configuration(proxyBeanMethods = false)
 2 @ConditionalOnClass(HttpRequest.class)
 3 @ConditionalOnRibbonRestClient
 4 protected static class RibbonClientHttpRequestFactoryConfiguration {
 5     @Autowired
 6     private SpringClientFactory springClientFactory;
 7 
 8     @Bean
 9     public RestTemplateCustomizer restTemplateCustomizer(
10             final RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory) {
11         // RestTemplate 設置 requestFactory 為 RibbonClientHttpRequestFactory
12         return restTemplate -> restTemplate
13                 .setRequestFactory(ribbonClientHttpRequestFactory);
14     }
15 
16     // ClientHttpRequest 工廠類 => RibbonClientHttpRequestFactory
17     @Bean
18     public RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory() {
19         return new RibbonClientHttpRequestFactory(this.springClientFactory);
20     }
21 }

而且,由於這里配置了 RestTemplateCustomizer,原本默認配置下,在 LoadBalancerAutoConfiguration 中創建 RestTemplateCustomizer 的方法就不會生效了。

LoadBalancerAutoConfiguration 中的 RestTemplateCustomizer 是向 RestTemplate 中添加 LoadBalancerInterceptor 攔截器,所以在啟用了 RestClient 的情況下,原本的 LoadBalancerInterceptor 就不會生效了。

 1 @Configuration(proxyBeanMethods = false)
 2 @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
 3 static class LoadBalancerInterceptorConfig {
 4 
 5     @Bean
 6     public LoadBalancerInterceptor ribbonInterceptor(
 7             LoadBalancerClient loadBalancerClient,
 8             LoadBalancerRequestFactory requestFactory) {
 9         return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
10     }
11 
12     @Bean
13     @ConditionalOnMissingBean
14     public RestTemplateCustomizer restTemplateCustomizer(
15             final LoadBalancerInterceptor loadBalancerInterceptor) {
16         return restTemplate -> {
17             List<ClientHttpRequestInterceptor> list = new ArrayList<>(
18                     restTemplate.getInterceptors());
19             list.add(loadBalancerInterceptor);
20             restTemplate.setInterceptors(list);
21         };
22     }
23 }
View Code

那么 RestTemplate 的 doExecute 方法中,在調用 createRequest 方法創建 ClientHttpRequest 時,就會用 RibbonClientHttpRequestFactory 來創建,進去可以看到 ClientHttpRequest 的實際類型就是 RibbonHttpRequest。

 1 public ClientHttpRequest createRequest(URI originalUri, HttpMethod httpMethod) throws IOException {
 2     String serviceId = originalUri.getHost();
 3     if (serviceId == null) {
 4         throw new IOException("Invalid hostname in the URI [" + originalUri.toASCIIString() + "]");
 5     }
 6     IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId);
 7     RestClient client = this.clientFactory.getClient(serviceId, RestClient.class);
 8     HttpRequest.Verb verb = HttpRequest.Verb.valueOf(httpMethod.name());
 9 
10     return new RibbonHttpRequest(originalUri, verb, client, clientConfig);
11 }

調用 RibbonHttpRequest 的 execute 方法,實際組中是調用了它的 executeInternal 方法,然后最后是使用 RestClient 來發起負載均衡的調用。

 1 protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
 2     try {
 3         // ...
 4         HttpRequest request = builder.build();
 5         // client => RestClient
 6         HttpResponse response = client.executeWithLoadBalancer(request, config);
 7         return new RibbonHttpResponse(response);
 8     }
 9     catch (Exception e) {
10         throw new IOException(e);
11     }
12 }

④ RestClient HTTP 調用

RestClient 的 executeWithLoadBalancer 實際是進入到父類 AbstractLoadBalancerAwareClient 的 executeWithLoadBalancer  方法中。

從這個方法可以知道,主要的負載均衡請求是在 LoadBalancerCommand 中的,LoadBalancerCommand 必定會通過負載均衡器 ILoadBalancer 得到一個 Server,然后通過 submit 的這個 ServerOperation 對原始URI進行重構,重構之后調用 RestClient 的 execute 發起HTTP請求。

 1 public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
 2     // 負載均衡命令
 3     LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
 4 
 5     try {
 6         // 發起負載均衡請求
 7         return command.submit(
 8             new ServerOperation<T>() {
 9                 @Override
10                 public Observable<T> call(Server server) {
11                     // 重構 URI,將服務名用 Server 的 IP 和端口替換
12                     URI finalUri = reconstructURIWithServer(server, request.getUri());
13                     S requestForServer = (S) request.replaceUri(finalUri);
14                     try {
15                         // execute 發起調用,實際調用的是 RestClient 中的 execute
16                         return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
17                     }
18                     catch (Exception e) {
19                         return Observable.error(e);
20                     }
21                 }
22             })
23             .toBlocking()
24             .single();
25     } catch (Exception e) {
26         //....
27     }
28 }

再看 RestClient 的 execute 方法,最終可以發現,RestClient 其實是使用基於 jersey 的 WebResource 來發起 HTTP 請求的。

 1 private HttpResponse execute(HttpRequest.Verb verb, URI uri,
 2         Map<String, Collection<String>> headers, Map<String, Collection<String>> params,
 3         IClientConfig overriddenClientConfig, Object requestEntity) throws Exception {
 4     // ...
 5     // WebResource 是基於 jersey 封裝的 HTTP 客戶端組件
 6     WebResource xResource = restClient.resource(uri.toString());
 7     ClientResponse jerseyResponse;
 8     Builder b = xResource.getRequestBuilder();
 9     Object entity = requestEntity;
10 
11     switch (verb) {
12     case GET:
13         jerseyResponse = b.get(ClientResponse.class);
14         break;
15     case POST:
16         jerseyResponse = b.post(ClientResponse.class, entity);
17         break;
18     case PUT:
19         jerseyResponse = b.put(ClientResponse.class, entity);
20         break;
21     case DELETE:
22         jerseyResponse = b.delete(ClientResponse.class);
23         break;
24     case HEAD:
25         jerseyResponse = b.head();
26         break;
27     case OPTIONS:
28         jerseyResponse = b.options(ClientResponse.class);
29         break;
30     default:
31         throw new ClientException(
32                 ClientException.ErrorType.GENERAL,
33                 "You have to one of the REST verbs such as GET, POST etc.");
34     }
35 
36     thisResponse = new HttpClientResponse(jerseyResponse, uri, overriddenClientConfig);
37     if (thisResponse.getStatus() == 503){
38         thisResponse.close();
39         throw new ClientException(ClientException.ErrorType.SERVER_THROTTLED);
40     }
41     return thisResponse;
42 }
View Code

⑤ 最后,RestTemplate 基於 RestClient 的請求流程可以用下圖來做個總結。

4、Apache HttpClient 或 OkHttp 對 RestTemplate 不生效(BUG?)

① Apache HttpClient

默認情況下,ribbon.httpclient.enabled=true,在 HttpClientRibbonConfiguration 中會初始化 apache httpcomponents 相關的組件,前已經分析過了,但是在 RestTemplate 中並未使用相關的組件。

也就是說,默認情況下啟用了 apache httpcomponents,但是 RestTemplate 最后是使用 HttpURLConnection 來發起 HTTP 請求的,而不是配置的 CloseableHttpClient。

② OkHttp

首先需要加入 OkHttp 的依賴:

1 <dependency>
2     <groupId>com.squareup.okhttp3</groupId>
3     <artifactId>okhttp</artifactId>
4 </dependency>

然后添加如下配置就可以啟用 OkHttp:

1 ribbon: 2  httpclient: 3 enabled: false 4  # 啟用 okhttp 5  okhttp: 6 enabled: true

配置 ribbon.okhttp.enabled=true 后,在 OkHttpRibbonConfiguration 中會初始化 OkHttp 相關的組件。

但是調試之后會發現,其實它還是走的默認的流程,就是最終用 HttpURLConnection 發起 HTTP 請求,跟 httpcomponents 是一樣的效果。

 1 @Configuration(proxyBeanMethods = false)
 2 @ConditionalOnProperty("ribbon.okhttp.enabled")
 3 @ConditionalOnClass(name = "okhttp3.OkHttpClient")
 4 public class OkHttpRibbonConfiguration {
 5 
 6     @RibbonClientName
 7     private String name = "client";
 8 
 9     @Bean
10     @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
11     @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
12     public RetryableOkHttpLoadBalancingClient retryableOkHttpLoadBalancingClient(
13             IClientConfig config, ServerIntrospector serverIntrospector,
14             ILoadBalancer loadBalancer, RetryHandler retryHandler,
15             LoadBalancedRetryFactory loadBalancedRetryFactory, OkHttpClient delegate,
16             RibbonLoadBalancerContext ribbonLoadBalancerContext) {
17         RetryableOkHttpLoadBalancingClient client = new RetryableOkHttpLoadBalancingClient(
18                 delegate, config, serverIntrospector, loadBalancedRetryFactory);
19         client.setLoadBalancer(loadBalancer);
20         client.setRetryHandler(retryHandler);
21         client.setRibbonLoadBalancerContext(ribbonLoadBalancerContext);
22         Monitors.registerObject("Client_" + this.name, client);
23         return client;
24     }
25 
26     @Bean
27     @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
28     @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
29     public OkHttpLoadBalancingClient okHttpLoadBalancingClient(IClientConfig config,
30             ServerIntrospector serverIntrospector, ILoadBalancer loadBalancer,
31             RetryHandler retryHandler, OkHttpClient delegate) {
32         OkHttpLoadBalancingClient client = new OkHttpLoadBalancingClient(delegate, config,
33                 serverIntrospector);
34         client.setLoadBalancer(loadBalancer);
35         client.setRetryHandler(retryHandler);
36         Monitors.registerObject("Client_" + this.name, client);
37         return client;
38     }
39 
40     @Configuration(proxyBeanMethods = false)
41     protected static class OkHttpClientConfiguration {
42 
43         private OkHttpClient httpClient;
44 
45         @Bean
46         @ConditionalOnMissingBean(ConnectionPool.class)
47         public ConnectionPool httpClientConnectionPool(IClientConfig config,
48                 OkHttpClientConnectionPoolFactory connectionPoolFactory) {
49             RibbonProperties ribbon = RibbonProperties.from(config);
50             int maxTotalConnections = ribbon.maxTotalConnections();
51             long timeToLive = ribbon.poolKeepAliveTime();
52             TimeUnit ttlUnit = ribbon.getPoolKeepAliveTimeUnits();
53             return connectionPoolFactory.create(maxTotalConnections, timeToLive, ttlUnit);
54         }
55 
56         @Bean
57         @ConditionalOnMissingBean(OkHttpClient.class)
58         public OkHttpClient client(OkHttpClientFactory httpClientFactory,
59                 ConnectionPool connectionPool, IClientConfig config) {
60             RibbonProperties ribbon = RibbonProperties.from(config);
61             this.httpClient = httpClientFactory.createBuilder(false)
62                     .connectTimeout(ribbon.connectTimeout(), TimeUnit.MILLISECONDS)
63                     .readTimeout(ribbon.readTimeout(), TimeUnit.MILLISECONDS)
64                     .followRedirects(ribbon.isFollowRedirects())
65                     .connectionPool(connectionPool).build();
66             return this.httpClient;
67         }
68 
69         @PreDestroy
70         public void destroy() {
71             if (httpClient != null) {
72                 httpClient.dispatcher().executorService().shutdown();
73                 httpClient.connectionPool().evictAll();
74             }
75         }
76 
77     }
78 
79 }
View Cod

③ 啟用 HttpClient 或 OkHttp 不生效的原因

經過前面的分析,可以知道啟用 apache httpcomponents 或者 OkHttp,對 RestTemplate 都沒有起作用,最終還是用 HttpURLConnection 發起 HTTP 請求。那為什么為出現這種情況呢?我們可以看下 RestTemplate  的 setRequestFactory 方法。

通過 RestTemplate 的 setRequestFactory 方法的注釋也可以了解到,默認的 requestFactory 是 SimpleClientHttpRequestFactory,它是基於 JDK 標准 HTTP 庫的 HttpURLConnection。

默認的 HttpURLConnection 不支持 PATCH,如果想支持,需設置為 Apache HttpComponents 或 OkHttp 的 request factory。

 1 /**
 2  * Set the request factory that this accessor uses for obtaining client request handles.
 3  * <p>The default is a {@link SimpleClientHttpRequestFactory} based on the JDK's own
 4  * HTTP libraries ({@link java.net.HttpURLConnection}).
 5  * <p><b>Note that the standard JDK HTTP library does not support the HTTP PATCH method.
 6  * Configure the Apache HttpComponents or OkHttp request factory to enable PATCH.</b>
 7  * @see #createRequest(URI, HttpMethod)
 8  * @see SimpleClientHttpRequestFactory
 9  * @see org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory
10  * @see org.springframework.http.client.OkHttp3ClientHttpRequestFactory
11  */
12 public void setRequestFactory(ClientHttpRequestFactory requestFactory) {
13     Assert.notNull(requestFactory, "ClientHttpRequestFactory must not be null");
14     this.requestFactory = requestFactory;
15 }

ClientHttpRequestFactory 體系類結構如下:

那 RestClient 又是如何生效的呢?通過上一節的分析可以知道,在 RibbonAutoConfiguration 中有如下的配置,這個 RibbonClientHttpRequestFactoryConfiguration 通過自定義 RestTemplateCustomizer 向 RestTemplate 設置了 requestFactory 為 RibbonClientHttpRequestFactory。

 1 @Configuration(proxyBeanMethods = false)
 2 @ConditionalOnClass(HttpRequest.class)
 3 @ConditionalOnRibbonRestClient
 4 protected static class RibbonClientHttpRequestFactoryConfiguration {
 5     @Autowired
 6     private SpringClientFactory springClientFactory;
 7 
 8     @Bean
 9     public RestTemplateCustomizer restTemplateCustomizer(final RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory) {
10         return restTemplate -> restTemplate.setRequestFactory(ribbonClientHttpRequestFactory);
11     }
12 
13     @Bean
14     public RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory() {
15         return new RibbonClientHttpRequestFactory(this.springClientFactory);
16     }
17 }

RibbonClientHttpRequestFactory 是對應 RestClient 的,也就是說要啟用 OkHttp 或 HttpClient,還需自己創建對應的 ClientHttpRequestFactory,並設置給 RestTemplate。從上面的類結構可以看出,是提供了 HttpComponentsClientHttpRequestFactory 和 OkHttp3ClientHttpRequestFactory 工廠類了的。

這里其實也比較奇怪,既然啟用了 apache httpcomponents 或者 OkHttp,卻沒有創建默認的 ClientHttpRequestFactory 實現類設置給 RestTemplate,感覺這是 spring-cloud-netflix-ribbon 的一個 BUG。

5、定制 RestTemplate 使用 Apache httpcomponents

如果想讓 RestTemplate 使用 httpcomponents  的組件,就需要自己創建一個 ClientHttpRequestFactory,並設置給 RestTemplate。下面我們一步步來看看如何修復這個問題。

① 設置 HttpComponentsClientHttpRequestFactory

httpcomponents  中提供的 ClientHttpRequestFactory 實現類是 HttpComponentsClientHttpRequestFactory,但是並不能直接使用這個工廠類,因為它創建的 HttpComponentsClientHttpRequest 不具備重試的能力,它直接使用 CloseableHttpClient 執行請求,雖然有超時的功能,但不能重試。而且,它本質上也沒有負載均衡的能力,需要借助 LoadBalancerInterceptor 攔截器來重構 URI。

 1 final class HttpComponentsClientHttpRequest extends AbstractBufferingClientHttpRequest {
 2     private final HttpClient httpClient;
 3     private final HttpUriRequest httpRequest;
 4     private final HttpContext httpContext;
 5 
 6     @Override
 7     protected ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
 8         // ...
 9         // httpClient => CloseableHttpClient
10         HttpResponse httpResponse = this.httpClient.execute(this.httpRequest, this.httpContext);
11         return new HttpComponentsClientHttpResponse(httpResponse);
12     }
13 }

所以,如果不需要重試的功能,可以直接創建一個 HttpComponentsClientHttpRequest,並設置給 RestTemplate 即可。這樣就會使用 LoadBalancerInterceptor 來做負載均衡,重構 URI,然后用 HttpComponentsClientHttpRequest 來執行請求。

1 @Bean
2 @LoadBalanced
3 public RestTemplate restTemplate() {
4     HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory();
5     RestTemplate restTemplate = new RestTemplate();
6     restTemplate.setRequestFactory(requestFactory);
7     return restTemplate;
8 }

② 定制 apache ClientHttpRequestFactory

如果想讓 RestTemplate 即有負載均衡的能力,又能使用 apache HttpComponents 組件,且具備重試的功能,我們就需要自己定制 ClientHttpRequestFactory 了。關於重試后面再單獨來講。

對比 RestClient 可以發現,RibbonClientHttpRequestFactory 創建的 RibbonHttpRequest 其實是使用 RestClient 執行請求,而 RestClient  內部使用 LoadBalancerCommand 來進行重試。

類似的,我們至少要用上已經配置好的 RibbonLoadBalancingHttpClient 來執行請求,所以需要自定義一個類似的 RibbonHttpRequest 。

1)定制 apache ClientHttpRequest

創建 ApacheClientHttpRequest 繼承自 RibbonHttpRequest,核心的點在於要注入 RibbonLoadBalancingHttpClient,如果要支持重試,需注入 RetryableRibbonLoadBalancingHttpClient。RetryableRibbonLoadBalancingHttpClient 在引入 spring-retry 后才會創建,這個后面分析重試時再看。

然后在 executeInternal 根據 retryable 判斷,如果要重試,就調用 execute 方法,看 RetryableRibbonLoadBalancingHttpClient 的源碼可以發現,它本身是支持負載均衡的,會自動選擇 Server。

如果不需要重試,就需要調用 executeWithLoadBalancer,它是利用 LoadBalancerCommand 來提交請求,就跟 RestClient 是一樣的了。但是不一樣的地方是 RibbonLoadBalancingHttpClient 的 executeWithLoadBalancer 是不會進行重試的,這個也放到后面分析。

 1 package com.lyyzoo.sunny.register.ribbon.apache;
 2 
 3 import java.io.ByteArrayInputStream;
 4 import java.io.ByteArrayOutputStream;
 5 import java.io.IOException;
 6 import java.net.URI;
 7 import java.util.ArrayList;
 8 
 9 import com.netflix.client.config.IClientConfig;
10 import com.netflix.client.http.HttpResponse;
11 import org.springframework.cloud.netflix.ribbon.RibbonHttpRequest;
12 import org.springframework.cloud.netflix.ribbon.RibbonHttpResponse;
13 import org.springframework.cloud.netflix.ribbon.apache.RetryableRibbonLoadBalancingHttpClient;
14 import org.springframework.cloud.netflix.ribbon.apache.RibbonApacheHttpRequest;
15 import org.springframework.cloud.netflix.ribbon.apache.RibbonLoadBalancingHttpClient;
16 import org.springframework.cloud.netflix.ribbon.support.RibbonCommandContext;
17 import org.springframework.http.HttpHeaders;
18 import org.springframework.http.HttpMethod;
19 import org.springframework.http.client.ClientHttpResponse;
20 import org.springframework.util.LinkedMultiValueMap;
21 
22 /**
23  * Apache ClientHttpRequest
24  *
25  * @author bojiangzhou
26  */
27 public class ApacheClientHttpRequest extends RibbonHttpRequest {
28 
29     private final URI uri;
30 
31     private final HttpMethod httpMethod;
32 
33     private final String serviceId;
34 
35     private final RibbonLoadBalancingHttpClient client;
36 
37     private final IClientConfig config;
38     /**
39      * 是否重試
40      */
41     private final boolean retryable;
42 
43     public ApacheClientHttpRequest(URI uri,
44                                    HttpMethod httpMethod,
45                                    String serviceId,
46                                    RibbonLoadBalancingHttpClient client,
47                                    IClientConfig config,
48                                    boolean retryable) {
49         super(uri, null, null, config);
50         this.uri = uri;
51         this.httpMethod = httpMethod;
52         this.serviceId = serviceId;
53         this.client = client;
54         this.config = config;
55         this.retryable = retryable;
56         if (retryable && !(client instanceof RetryableRibbonLoadBalancingHttpClient)) {
57             throw new IllegalArgumentException("Retryable client must be RetryableRibbonLoadBalancingHttpClient");
58         }
59     }
60 
61     @Override
62     protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
63         try {
64             RibbonApacheHttpRequest request = new RibbonApacheHttpRequest(buildCommandContext(headers));
65 
66             HttpResponse response;
67             if (retryable) {
68                 // RetryableRibbonLoadBalancingHttpClient 使用 RetryTemplate 做負載均衡和重試
69                 response = client.execute(request, config);
70             } else {
71                 // RibbonLoadBalancingHttpClient 需調用 executeWithLoadBalancer 才具備負載均衡的能力
72                 response = client.executeWithLoadBalancer(request, config);
73             }
74 
75             return new RibbonHttpResponse(response);
76         } catch (Exception e) {
77             throw new IOException(e);
78         }
79     }
80 
81     protected RibbonCommandContext buildCommandContext(HttpHeaders headers) throws IOException {
82         ByteArrayInputStream requestEntity = null;
83         ByteArrayOutputStream bufferedOutput = (ByteArrayOutputStream) this.getBodyInternal(headers);
84         if (bufferedOutput != null) {
85             requestEntity = new ByteArrayInputStream(bufferedOutput.toByteArray());
86             bufferedOutput.close();
87         }
88 
89         return new RibbonCommandContext(serviceId, httpMethod.name(), uri.toString(), retryable,
90                 headers, new LinkedMultiValueMap<>(), requestEntity, new ArrayList<>());
91     }
92 }
View Code

2)定制 apache ClientHttpRequestFactory

創建 ApacheClientHttpRequestFactory 繼承自 HttpComponentsClientHttpRequestFactory,主要是在 createRequest 方法中創建自定義的 ApacheClientHttpRequest。RibbonLoadBalancingHttpClient 可以從 SpringClientFactory 中獲取。

 1 package com.lyyzoo.sunny.register.ribbon.apache;
 2 
 3 import java.io.IOException;
 4 import java.net.URI;
 5 
 6 import com.netflix.client.config.IClientConfig;
 7 import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
 8 import org.springframework.cloud.netflix.ribbon.apache.RibbonLoadBalancingHttpClient;
 9 import org.springframework.http.HttpMethod;
10 import org.springframework.http.client.ClientHttpRequest;
11 import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
12 import org.springframework.lang.NonNull;
13 
14 /**
15  * Apache HttpComponents ClientHttpRequest factory
16  *
17  * @author bojiangzhou
18  */
19 public class ApacheClientHttpRequestFactory extends HttpComponentsClientHttpRequestFactory {
20 
21     private final SpringClientFactory clientFactory;
22     private final boolean retryable;
23 
24     public ApacheClientHttpRequestFactory(SpringClientFactory clientFactory, boolean retryable) {
25         this.clientFactory = clientFactory;
26         this.retryable = retryable;
27     }
28 
29     @Override
30     @NonNull
31     public ClientHttpRequest createRequest(URI originalUri, HttpMethod httpMethod) throws IOException {
32         String serviceId = originalUri.getHost();
33         if (serviceId == null) {
34             throw new IOException(
35                     "Invalid hostname in the URI [" + originalUri.toASCIIString() + "]");
36         }
37         IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId);
38         RibbonLoadBalancingHttpClient httpClient = this.clientFactory.getClient(serviceId, RibbonLoadBalancingHttpClient.class);
39 
40         return new ApacheClientHttpRequest(originalUri, httpMethod, serviceId, httpClient, clientConfig, retryable);
41     }
42 }
View Code

3)定制 apache ClientHttpRequestFactory 配置類

跟 RestClient 的配置類類似,定制 ApacheClientHttpRequestFactory 的配置類,同樣的,默認啟用 httpclient。在存在 RetryTemplate 時,就設置 ApacheClientHttpRequestFactory 的 retryable 參數為 true,否則為 false。

然后自定義 RestTemplateCustomizer,將 ApacheClientHttpRequestFactory 設置到 RestTemplate 中,注意這時 LoadBalancerInterceptor 就不會添加到 RestTemplate 中了。

 1 package com.lyyzoo.sunny.register.ribbon.apache;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 
 6 import org.apache.http.client.HttpClient;
 7 import org.apache.http.protocol.HTTP;
 8 import org.springframework.beans.factory.annotation.Autowired;
 9 import org.springframework.boot.autoconfigure.AutoConfigureAfter;
10 import org.springframework.boot.autoconfigure.AutoConfigureBefore;
11 import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
12 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
13 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
14 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
15 import org.springframework.cloud.client.loadbalancer.AsyncLoadBalancerAutoConfiguration;
16 import org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration;
17 import org.springframework.cloud.client.loadbalancer.RestTemplateCustomizer;
18 import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
19 import org.springframework.context.annotation.Bean;
20 import org.springframework.context.annotation.Configuration;
21 import org.springframework.http.client.ClientHttpRequestInterceptor;
22 import org.springframework.web.client.RestTemplate;
23 
24 /**
25  *
26  * @author bojiangzhou
27  */
28 @Configuration
29 @ConditionalOnClass(RestTemplate.class)
30 @AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
31 @AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
32 @ConditionalOnProperty(name = "ribbon.httpclient.restTemplate.enabled", matchIfMissing = true)
33 public class ApacheClientHttpRequestFactoryConfiguration {
34 
35     @Configuration(proxyBeanMethods = false)
36     @ConditionalOnClass(HttpClient.class)
37     @ConditionalOnProperty(name = "ribbon.httpclient.enabled", matchIfMissing = true)
38     static class ClientHttpRequestFactoryConfiguration {
39 
40         @Autowired
41         private SpringClientFactory springClientFactory;
42 
43         @Bean
44         @ConditionalOnMissingBean
45         public RestTemplateCustomizer restTemplateCustomizer(
46                 final ApacheClientHttpRequestFactory apacheClientHttpRequestFactory) {
47             return restTemplate -> {
48                 // 設置 RequestFactory
49                 restTemplate.setRequestFactory(apacheClientHttpRequestFactory);
50 
51                 // 添加移除 Content-Length 的攔截器,否則會報錯
52                 ClientHttpRequestInterceptor removeHeaderLenInterceptor = (request, bytes, execution) -> {
53                     request.getHeaders().remove(HTTP.CONTENT_LEN);
54                     return execution.execute(request, bytes);
55                 };
56 
57                 List<ClientHttpRequestInterceptor> interceptors = new ArrayList<>(restTemplate.getInterceptors());
58                 // 添加移除Content-Length請求頭的Interceptor
59                 interceptors.add(removeHeaderLenInterceptor);
60                 restTemplate.setInterceptors(interceptors);
61             };
62         }
63 
64         @Bean
65         @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
66         public ApacheClientHttpRequestFactory apacheClientHttpRequestFactory() {
67             return new ApacheClientHttpRequestFactory(springClientFactory, false);
68         }
69 
70         @Bean
71         @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
72         public ApacheClientHttpRequestFactory retryableApacheClientHttpRequestFactory() {
73             return new ApacheClientHttpRequestFactory(springClientFactory, true);
74         }
75 
76 
77     }
78 }
View Code

4)簡單調試下

配置好之后,把 demo-consumer 服務啟動起來,簡單測試下。

a) 首先請求會進入到 RestTemplate 的 doExecute 中,然后通過 createRequest,調用 ApacheClientHttpRequestFactory 創建 ApacheClientHttpRequest。

b) 接着調用 ApacheClientHttpRequest 的 execute 方法,在 ApacheClientHttpRequest  的 executeInternal 中,就會調用 RibbonLoadBalancingHttpClient 的 executeWithLoadBalancer 方法。

c) 最后,進入 RibbonLoadBalancingHttpClient 的 execute 方法中,它又將請求轉給了代理對象 delegate 來執行,delegate 就是在 HttpClientRibbonConfiguration 中配置的 CloseableHttpClient 對象,實際類型是 InternalHttpClient。

經過驗證,通過自定義的配置,最終使得 RestTemplate 可以使用 apache httpcomponents 組件來執行 HTTP 請求。重試那塊后面再來研究。

③ 還是用一張圖來總結下 RestTemplate 基於 apache HttpClient 后的執行流程

6、定制 RestTemplate 使用 OkHttp

① 設置 OkHttp3ClientHttpRequestFactory

類似的,可以給 RestTemplate 直接設置 OkHttp3ClientHttpRequestFactory,但它同樣也不具備重試的能力。

1 @Bean
2 @LoadBalanced
3 public RestTemplate restTemplate() {
4     OkHttp3ClientHttpRequestFactory requestFactory = new OkHttp3ClientHttpRequestFactory();
5     RestTemplate restTemplate = new RestTemplate();
6     restTemplate.setRequestFactory(requestFactory);
7     return restTemplate;
8 }

② 定制 OkHttp ClientHttpRequestFactory

與定制 apache httpcomponents  類似,我這里就直接把三個類的代碼放出來了。主要的差異就在於使用的 AbstractLoadBalancingClient 不同,apache 是 RibbonLoadBalancingHttpClient,okhttp 是 OkHttpLoadBalancingClient。

a) OkHttpClientHttpRequest:

 1 package com.lyyzoo.sunny.register.ribbon.okhttp;
 2 
 3 import java.io.ByteArrayInputStream;
 4 import java.io.ByteArrayOutputStream;
 5 import java.io.IOException;
 6 import java.net.URI;
 7 import java.util.ArrayList;
 8 
 9 import com.netflix.client.config.IClientConfig;
10 import com.netflix.client.http.HttpResponse;
11 import org.springframework.cloud.netflix.ribbon.RibbonHttpRequest;
12 import org.springframework.cloud.netflix.ribbon.RibbonHttpResponse;
13 import org.springframework.cloud.netflix.ribbon.okhttp.OkHttpLoadBalancingClient;
14 import org.springframework.cloud.netflix.ribbon.okhttp.OkHttpRibbonRequest;
15 import org.springframework.cloud.netflix.ribbon.okhttp.RetryableOkHttpLoadBalancingClient;
16 import org.springframework.cloud.netflix.ribbon.support.RibbonCommandContext;
17 import org.springframework.http.HttpHeaders;
18 import org.springframework.http.HttpMethod;
19 import org.springframework.http.client.ClientHttpResponse;
20 import org.springframework.util.LinkedMultiValueMap;
21 
22 /**
23  * OkHttp ClientHttpRequest
24  *
25  * @author bojiangzhou
26  */
27 public class OkHttpClientHttpRequest extends RibbonHttpRequest {
28 
29     private final URI uri;
30 
31     private final HttpMethod httpMethod;
32 
33     private final String serviceId;
34 
35     private final OkHttpLoadBalancingClient client;
36 
37     private final IClientConfig config;
38     /**
39      * 是否重試
40      */
41     private final boolean retryable;
42 
43     public OkHttpClientHttpRequest(URI uri,
44                                    HttpMethod httpMethod,
45                                    String serviceId,
46                                    OkHttpLoadBalancingClient client,
47                                    IClientConfig config,
48                                    boolean retryable) {
49         super(uri, null, null, config);
50         this.uri = uri;
51         this.httpMethod = httpMethod;
52         this.serviceId = serviceId;
53         this.client = client;
54         this.config = config;
55         this.retryable = retryable;
56         if (retryable && !(client instanceof RetryableOkHttpLoadBalancingClient)) {
57             throw new IllegalArgumentException("Retryable client must be RetryableOkHttpLoadBalancingClient");
58         }
59     }
60 
61     @Override
62     protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
63         try {
64             OkHttpRibbonRequest request = new OkHttpRibbonRequest(buildCommandContext(headers));
65 
66             HttpResponse response;
67             if (retryable) {
68                 // RetryableRibbonLoadBalancingHttpClient 本身具備負載均衡的能力
69                 response = client.execute(request, config);
70             } else {
71                 // RibbonLoadBalancingHttpClient 需調用 executeWithLoadBalancer 才具備負載均衡的能力
72                 response = client.executeWithLoadBalancer(request, config);
73             }
74 
75             return new RibbonHttpResponse(response);
76         } catch (Exception e) {
77             throw new IOException(e);
78         }
79     }
80 
81     protected RibbonCommandContext buildCommandContext(HttpHeaders headers) throws IOException {
82         ByteArrayInputStream requestEntity = null;
83         ByteArrayOutputStream bufferedOutput = (ByteArrayOutputStream) this.getBodyInternal(headers);
84         if (bufferedOutput != null) {
85             requestEntity = new ByteArrayInputStream(bufferedOutput.toByteArray());
86             bufferedOutput.close();
87         }
88 
89         return new RibbonCommandContext(serviceId, httpMethod.name(), uri.toString(), retryable,
90                 headers, new LinkedMultiValueMap<>(), requestEntity, new ArrayList<>());
91     }
92 }
View Code

b) OkHttpClientHttpRequestFactory

 1 package com.lyyzoo.sunny.register.ribbon.okhttp;
 2 
 3 import java.net.URI;
 4 
 5 import com.netflix.client.config.IClientConfig;
 6 import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
 7 import org.springframework.cloud.netflix.ribbon.okhttp.OkHttpLoadBalancingClient;
 8 import org.springframework.http.HttpMethod;
 9 import org.springframework.http.client.ClientHttpRequest;
10 import org.springframework.http.client.OkHttp3ClientHttpRequestFactory;
11 import org.springframework.lang.NonNull;
12 
13 /**
14  * OkHttp ClientHttpRequest factory
15  *
16  * @author bojiangzhou
17  */
18 public class OkHttpClientHttpRequestFactory extends OkHttp3ClientHttpRequestFactory {
19 
20     private final SpringClientFactory clientFactory;
21     private final boolean retryable;
22 
23     public OkHttpClientHttpRequestFactory(SpringClientFactory clientFactory, boolean retryable) {
24         this.clientFactory = clientFactory;
25         this.retryable = retryable;
26     }
27 
28     @Override
29     @NonNull
30     public ClientHttpRequest createRequest(URI originalUri, HttpMethod httpMethod) {
31         String serviceId = originalUri.getHost();
32         if (serviceId == null) {
33             throw new IllegalStateException(
34                     "Invalid hostname in the URI [" + originalUri.toASCIIString() + "]");
35         }
36         IClientConfig clientConfig = this.clientFactory.getClientConfig(serviceId);
37         OkHttpLoadBalancingClient httpClient = this.clientFactory.getClient(serviceId, OkHttpLoadBalancingClient.class);
38 
39         return new OkHttpClientHttpRequest(originalUri, httpMethod, serviceId, httpClient, clientConfig, retryable);
40     }
41 }
View Code

c) OkHttpClientHttpRequestFactoryConfiguration

 1 package com.lyyzoo.sunny.register.ribbon.okhttp;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.boot.autoconfigure.AutoConfigureAfter;
 5 import org.springframework.boot.autoconfigure.AutoConfigureBefore;
 6 import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
 7 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 8 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
 9 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
10 import org.springframework.cloud.client.loadbalancer.AsyncLoadBalancerAutoConfiguration;
11 import org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration;
12 import org.springframework.cloud.client.loadbalancer.RestTemplateCustomizer;
13 import org.springframework.cloud.netflix.ribbon.SpringClientFactory;
14 import org.springframework.context.annotation.Bean;
15 import org.springframework.context.annotation.Configuration;
16 import org.springframework.web.client.RestTemplate;
17 
18 /**
19  *
20  * @author bojiangzhou
21  */
22 @Configuration
23 @ConditionalOnClass(RestTemplate.class)
24 @AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
25 @AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class})
26 @ConditionalOnProperty(name = "ribbon.okhttp.restTemplate.enabled", matchIfMissing = true)
27 public class OkHttpClientHttpRequestFactoryConfiguration {
28 
29     @Configuration(proxyBeanMethods = false)
30     @ConditionalOnProperty("ribbon.okhttp.enabled")
31     @ConditionalOnClass(name = "okhttp3.OkHttpClient")
32     static class ClientHttpRequestFactoryConfiguration {
33 
34         @Autowired
35         private SpringClientFactory springClientFactory;
36 
37         @Bean
38         @ConditionalOnMissingBean
39         public RestTemplateCustomizer restTemplateCustomizer(
40                 final OkHttpClientHttpRequestFactory okHttpClientHttpRequestFactory) {
41             return restTemplate -> restTemplate
42                     .setRequestFactory(okHttpClientHttpRequestFactory);
43         }
44 
45         @Bean
46         @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
47         public OkHttpClientHttpRequestFactory okHttpClientHttpRequestFactory() {
48             return new OkHttpClientHttpRequestFactory(springClientFactory, false);
49         }
50 
51         @Bean
52         @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
53         public OkHttpClientHttpRequestFactory retryableOkHttpClientHttpRequestFactory() {
54             return new OkHttpClientHttpRequestFactory(springClientFactory, true);
55         }
56     }
57 }
View Code

七、RetryTemplate 和 Ribbon 重試

1、AbstractLoadBalancerAwareClient

① AbstractLoadBalancerAwareClient

通過上一節的分析,可以知道有重試功能的其實有兩個組件,一個是 Ribbon 的 LoadBalancerCommand,一個是 spring-retry 的 RetryTemplate。RetryableRibbonLoadBalancingHttpClient 和 RetryableOkHttpLoadBalancingClient 都要依賴 RetryTemplate,所以必須先引入 spring-retry 依賴,它們最終都是使用 RetryTemplate 實現請求重試的能力的。除了 RetryTemplate,其它客戶端想要獲取重試的功能,就要用 ribbon 中的 AbstractLoadBalancerAwareClient 相關的組件,並調用 executeWithLoadBalancer 方法。

再看下 AbstractLoadBalancerAwareClient 的體系,通過源碼可以了解到:

  • RetryableFeignLoadBalancer、RetryableRibbonLoadBalancingHttpClient、RetryableOkHttpLoadBalancingClient 都是使用 RetryTemplate 實現重試功能的,也就是 spring-retry 的重試。
  • RestClient、FeignLoadBalancer、RibbonLoadBalancingHttpClient、OkHttpLoadBalancingClient 是在 AbstractLoadBalancerAwareClient 中使用 LoadBalancerCommand 實現重試功能的,就是是 Ribbon 的重試。

② executeWithLoadBalancer

具體的 AbstractLoadBalancerAwareClient 客戶端想要負載均衡調用以及能進行重試,需調用 AbstractLoadBalancerAwareClient 的 executeWithLoadBalancer 方法。

在這個方法里面,它先構建了 LoadBalancerCommand,然后用 command 提交了一個 ServerOperation,這個 ServerOperation 中對 URI 進行了 重構,轉到具體的 LoadBalancerContext 去執行請求。

 1 public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
 2     // 負載均衡命令
 3     LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
 4 
 5     try {
 6         return command.submit(
 7             new ServerOperation<T>() {
 8                 @Override
 9                 public Observable<T> call(Server server) {
10                     // 重構URI
11                     URI finalUri = reconstructURIWithServer(server, request.getUri());
12                     S requestForServer = (S) request.replaceUri(finalUri);
13                     try {
14                         // 使用具體的 AbstractLoadBalancerAwareClient 客戶端執行請求
15                         return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
16                     }
17                     catch (Exception e) {
18                         return Observable.error(e);
19                     }
20                 }
21             })
22             .toBlocking()
23             .single();
24     }
25 }

再看 buildLoadBalancerCommand 方法,它首先會通過 getRequestSpecificRetryHandler 方法獲取請求重試處理器 RequestSpecificRetryHandler,而 getRequestSpecificRetryHandler 是一個抽象方法。這里就要重點注意了。

 1 // 抽象方法,獲取請求重試處理器
 2 public abstract RequestSpecificRetryHandler getRequestSpecificRetryHandler(S request, IClientConfig requestConfig);
 3 
 4 protected LoadBalancerCommand<T> buildLoadBalancerCommand(final S request, final IClientConfig config) {
 5     // 獲取請求重試處理器
 6     RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config);
 7     LoadBalancerCommand.Builder<T> builder = LoadBalancerCommand.<T>builder()
 8             .withLoadBalancerContext(this)
 9             .withRetryHandler(handler)
10             .withLoadBalancerURI(request.getUri());
11     customizeLoadBalancerCommandBuilder(request, config, builder);
12     return builder.build();
13 }

2、請求重試處理器 RequestSpecificRetryHandler

① 先了解下 RequestSpecificRetryHandler:

  • 首先看它的構造方法,注意第一個參數和第二個參數,因為不同的 getRequestSpecificRetryHandler 方法實現,主要差異就在於這兩個參數。
  • 然后看 isRetriableException,這個方法就是 LoadBalancerCommand 用來判斷異常后是否需要重試的方法,可以了解到 okToRetryOnAllErrors=true 時就可以重試,否則 okToRetryOnConnectErrors=true 才可能重試。需要注意的是就算這個方法返回 true 也不一定會重試,這跟重試次數也是有一定關系的。
 1 public RequestSpecificRetryHandler(boolean okToRetryOnConnectErrors, boolean okToRetryOnAllErrors, RetryHandler baseRetryHandler, @Nullable IClientConfig requestConfig) {
 2     Preconditions.checkNotNull(baseRetryHandler);
 3     this.okToRetryOnConnectErrors = okToRetryOnConnectErrors;
 4     this.okToRetryOnAllErrors = okToRetryOnAllErrors;
 5     this.fallback = baseRetryHandler;
 6     if (requestConfig != null) {
 7         // 在同一個Server上重試的次數
 8         if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetries)) {
 9             retrySameServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetries);
10         }
11         // 重試下一個Server的次數
12         if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetriesNextServer)) {
13             retryNextServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetriesNextServer);
14         }
15     }
16 }
17 
18 @Override
19 public boolean isRetriableException(Throwable e, boolean sameServer) {
20     // 所有錯誤都重試
21     if (okToRetryOnAllErrors) {
22         return true;
23     }
24     // ClientException 才可能重試
25     else if (e instanceof ClientException) {
26         ClientException ce = (ClientException) e;
27         if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
28             return !sameServer;
29         } else {
30             return false;
31         }
32     }
33     else  {
34         // 連接錯誤才重試,就是拋出 SocketException 異常時才重試
35         return okToRetryOnConnectErrors && isConnectionException(e);
36     }
37 }

② 不同 AbstractLoadBalancerAwareClient 的 getRequestSpecificRetryHandler 實現

a)RestClient

默認配置下,RestClient 的 getRequestSpecificRetryHandler 會走到最后一步,okToRetryOnConnectErrors、okToRetryOnAllErrors 都為 true,也就是說 isRetriableException 始終返回 true,也就是說拋出異常都會重試。非GET請求時,okToRetryOnAllErrors為 false,只有連接異常時才會重試。

 1 @Override
 2 public RequestSpecificRetryHandler getRequestSpecificRetryHandler(
 3         HttpRequest request, IClientConfig requestConfig) {
 4     if (!request.isRetriable()) {
 5         return new RequestSpecificRetryHandler(false, false, this.getRetryHandler(), requestConfig);
 6     }
 7     if (this.ncc.get(CommonClientConfigKey.OkToRetryOnAllOperations, false)) {
 8         return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig);
 9     }
10     if (request.getVerb() != HttpRequest.Verb.GET) {
11         return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(), requestConfig);
12     } else {
13         // okToRetryOnConnectErrors、okToRetryOnAllErrors 都為 true
14         return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig);
15     }
16 }

b)AbstractLoadBalancingClient

AbstractLoadBalancingClient 中的 getRequestSpecificRetryHandler 相當於一個默認實現,默認情況下 okToRetryOnAllOperations 為 false,最后也會到最后一步,即 okToRetryOnConnectErrors、okToRetryOnAllErrors 都為 true,isRetriableException 始終返回 true。非GET請求時,okToRetryOnAllErrors為 false,只有連接異常時才會重試。

 1 @Override
 2 public RequestSpecificRetryHandler getRequestSpecificRetryHandler(final S request, final IClientConfig requestConfig) {
 3     // okToRetryOnAllOperations:是否所有操作都重試,默認 false
 4     if (this.okToRetryOnAllOperations) {
 5         return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig);
 6     }
 7     if (!request.getContext().getMethod().equals("GET")) {
 8         return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(), requestConfig);
 9     }
10     else {
11         return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig);
12     }
13 }

c)RibbonLoadBalancingHttpClient

RibbonLoadBalancingHttpClient 也重載了 getRequestSpecificRetryHandler,但是它設置了 okToRetryOnConnectErrors、okToRetryOnAllErrors 都為 false,isRetriableException 始終返回 false。

至此我們應該就知道為什么調用 RibbonLoadBalancingHttpClient 的 executeWithLoadBalancer 不具備重試的功能的原因了。所以啟用 apache httpclient 時,RibbonLoadBalancingHttpClient 調用是不支持重試的。

1 @Override
2 public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) {
3     // okToRetryOnConnectErrors、okToRetryOnAllErrors 都為 false
4     return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, requestConfig);
5 }

RetryableRibbonLoadBalancingHttpClient 中也重寫了 getRequestSpecificRetryHandler,同樣也是設置 okToRetryOnConnectErrors、okToRetryOnAllErrors 都為 false。但是在引入 spring-retry 后,它會使用 RetryTemplate 實現重試的功能。

1 @Override
2 public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) {
3     // okToRetryOnConnectErrors、okToRetryOnAllErrors 都為 false
4     return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, null);
5 }

d)OkHttpLoadBalancingClient

OkHttpLoadBalancingClient 並沒有重寫 getRequestSpecificRetryHandler,所以它是使用父類 AbstractLoadBalancingClient 中的方法,也就是 okToRetryOnConnectErrors、okToRetryOnAllErrors 都為 true。

所以,啟用 okhttp 時,OkHttpLoadBalancingClient 是支持所有GET重試的,非GET請求則在拋出連接異常(SocketException)時支持重試。

而 RetryableOkHttpLoadBalancingClient 跟 RetryableRibbonLoadBalancingHttpClient 一樣的重寫方式,使用 RetryTemplate 實現重試。

1 @Override
2 public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) {
3     // okToRetryOnConnectErrors、okToRetryOnAllErrors 都為 false
4     return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, null);
5 }

3、LoadBalancerCommand

看 LoadBalancerCommand 的 submit 方法,這個方法是重試的核心代碼。

  • 首先獲取了同一個Server重試次數 maxRetrysSame 和 重試下一個Server的次數 maxRetrysNext,其實就是前面配置的 ribbon.MaxAutoRetries 和 ribbon.MaxAutoRetriesNextServer,我設置的是 1。
  • 然后創建了一個 Observable,它的第一層會先通過 loadBalancerContext 獲取 Server。在重試下一個 Server 時,這里就會獲取下一個 Server。
  • 在第二層,又創建了一個 Observable,這個 Observable 就是調用 ServerOperation 的,就是重構 URI,調用具體的 AbstractLoadBalancerAwareClient 執行請求。
  • 在第二層里,會根據 maxRetrysSame 重試同一個 Server,從 retryPolicy 中可以了解到,當重試次數大於 maxRetrysSame 后,同一個 Server 重試就結束了,否則就用 retryHandler.isRetriableException 判斷是否重試,這個前面已經分析過了。
  • 在外層,則根據 maxRetrysNext 重試不同的 Server,從 retryPolicy 中可以了解到,當不同Server重試次數大於 maxRetrysNext 后,就重試結束了,整個重試也就結束了,如果還是失敗,就會進入 onErrorResumeNext 進行最后的失敗處理。

最后來總結一下 LoadBalancerCommand 重試:

  • 重試分為同一個 Server 重試和重試下一個Server,當重試次數大於設置的重試值時,就停止重試。否則通過 retryHandler.isRetriableException 判斷是否重試。
  • 那這里一共請求了多少次呢?可以總結出如下公式:請求次數 = (maxRetrysSame + 1) * (maxRetrysNext + 1),所以按 ribbon.MaxAutoRetries = 1、ribbon.MaxAutoRetriesNextServer = 1 的配置,如果每次請求都超時,就會發起 4 次請求。
 1 public Observable<T> submit(final ServerOperation<T> operation) {
 2     final ExecutionInfoContext context = new ExecutionInfoContext();
 3 
 4     // 同一個Server重試次數
 5     final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
 6     // 重試下一個Server的次數
 7     final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
 8 
 9     // 創建一個 Observable
10     Observable<T> o =
11             // 使用 loadBalancerContext 獲取 Server
12             (server == null ? selectServer() : Observable.just(server))
13             .concatMap(new Func1<Server, Observable<T>>() {
14                 @Override
15                 public Observable<T> call(Server server) {
16                     // 設置Server
17                     context.setServer(server);
18                     final ServerStats stats = loadBalancerContext.getServerStats(server);
19 
20                     // 創建 Observable
21                     Observable<T> o = Observable
22                             .just(server)
23                             .concatMap(new Func1<Server, Observable<T>>() {
24                                 @Override
25                                 public Observable<T> call(final Server server) {
26                                     // 增加嘗試次數
27                                     context.incAttemptCount();
28                                     // ...
29                                     // 調用 ServerOperation
30                                     return operation.call(server).doOnEach(new Observer<T>() {
31                                         // 一些回調方法
32                                     });
33                                 }
34                             });
35                     // 重試同一個Server
36                     if (maxRetrysSame > 0)
37                         o = o.retry(retryPolicy(maxRetrysSame, true));
38                     return o;
39                 }
40             });
41 
42     if (maxRetrysNext > 0 && server == null)
43         // 重試不同Server
44         o = o.retry(retryPolicy(maxRetrysNext, false));
45 
46     return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
47         @Override
48         public Observable<T> call(Throwable e) {
49             // 異常處理
50             return Observable.error(e);
51         }
52     });
53 }
54 
55 // retryPolicy 返回一個是否重試的斷言
56 private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
57     return new Func2<Integer, Throwable, Boolean>() {
58         @Override
59         public Boolean call(Integer tryCount, Throwable e) {
60             // 請求拒絕異常就不允許重試
61             if (e instanceof AbortExecutionException) {
62                 return false;
63             }
64             // 嘗試次數是否大於最大重試次數
65             if (tryCount > maxRetrys) {
66                 return false;
67             }
68             // 使用 RequestSpecificRetryHandler 判斷是否重試
69             return retryHandler.isRetriableException(e, same);
70         }
71     };
72 }

4、RetryTemplate

① spring-retry

要啟用 RetryTemplate 需先引入 spring-retry:

1 <dependency>
2     <groupId>org.springframework.retry</groupId>
3     <artifactId>spring-retry</artifactId>
4 </dependency>

以 RetryableRibbonLoadBalancingHttpClient 為例,先看看它的 execute 方法,它先創建了負載均衡重試策略類 LoadBalancedRetryPolicy,然后將請求調用的邏輯封裝到 RetryCallback 中,最后其實就是用 RetryTemplate 執行這個 RetryCallback,也就是說請求重試的邏輯都在 RetryTemplate 中。

 1 public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
 2     //...
 3 
 4     // 負載均衡重試策略 RibbonLoadBalancedRetryPolicy
 5     final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(this.getClientName(), this);
 6 
 7     RetryCallback<RibbonApacheHttpResponse, Exception> retryCallback = context -> {
 8         // ...
 9         // delegate => CloseableHttpClient
10         final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient.this.delegate.execute(httpUriRequest);
11         // ...
12         // 成功 返回結果
13         return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
14     };
15 
16     LoadBalancedRecoveryCallback<RibbonApacheHttpResponse, HttpResponse> recoveryCallback = new LoadBalancedRecoveryCallback<RibbonApacheHttpResponse, HttpResponse>()//...
17 
18     return this.executeWithRetry(request, retryPolicy, retryCallback, recoveryCallback);
19 }
20 
21 private RibbonApacheHttpResponse executeWithRetry(RibbonApacheHttpRequest request,
22         LoadBalancedRetryPolicy retryPolicy,
23         RetryCallback<RibbonApacheHttpResponse, Exception> callback,
24         RecoveryCallback<RibbonApacheHttpResponse> recoveryCallback)
25         throws Exception {
26     RetryTemplate retryTemplate = new RetryTemplate();
27 
28     // retryable => 取自 RibbonCommandContext 設置的 retryable 參數
29     boolean retryable = isRequestRetryable(request);
30     // 設置重試策略
31     retryTemplate.setRetryPolicy(retryPolicy == null || !retryable
32             ? new NeverRetryPolicy() : new RetryPolicy(request, retryPolicy, this, this.getClientName()));
33 
34     BackOffPolicy backOffPolicy = loadBalancedRetryFactory.createBackOffPolicy(this.getClientName());
35     retryTemplate.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy);
36 
37     // 利用 retryTemplate 執行請求 callback
38     return retryTemplate.execute(callback, recoveryCallback);
39 }

需要注意的是,在 executeWithRetry 中,會判斷是否要重試,判斷的邏輯中 getRetryable 其實就是取的 ApacheClientHttpRequest 中 executeInternal 方法里創建的 RibbonCommandContext 設置的 retryable 參數,這就和前面定制化的邏輯銜接上了。

1 private boolean isRequestRetryable(ContextAwareRequest request) {
2     if (request.getContext() == null || request.getContext().getRetryable() == null) {
3         return true;
4     }
5     return request.getContext().getRetryable();
6 }

② RetryTemplate

進入 RetryTemplate 的 execute 方法,核心的邏輯我精簡成如下代碼,主要就是一個 while 循環判斷是否可以重試,然后調用 retryCallback 執行請求。請求失敗后,比如超時,拋出異常,就會 registerThrowable 來注冊異常。

 1 protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
 2         RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
 3 
 4     // retryPolicy => InterceptorRetryPolicy
 5     RetryPolicy retryPolicy = this.retryPolicy;
 6     BackOffPolicy backOffPolicy = this.backOffPolicy;
 7     //....
 8     try {
 9         // ...
10         // canRetry 判斷是否重試
11         while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
12             try {
13                 // retryCallback 調用
14                 return retryCallback.doWithRetry(context);
15             }
16             catch (Throwable e) {
17                 // ...
18                 // 注冊異常
19                 registerThrowable(retryPolicy, state, context, e);
20                 // ...
21             }
22         }
23         exhausted = true;
24         return handleRetryExhausted(recoveryCallback, context, state);
25     }
26     //...
27 }

看 canRetry 方法,它實際是調用了 InterceptorRetryPolicy 的 canRetry。第一次調用時,會去獲取 Server;否則就用 RibbonLoadBalancedRetryPolicy 判斷是否重試下一個 Server,注意它判斷的邏輯是 GET 請求或者允許所有操作操作重試,且 Server 重試次數 nextServerCount  小於等於配置的 MaxAutoRetriesNextServer 。也就是說,while 循環判斷的 canRetry 是重試下一個 Server 的。

 1 protected boolean canRetry(RetryPolicy retryPolicy, RetryContext context) {
 2     return retryPolicy.canRetry(context);
 3 }
 4 
 5 //////////// InterceptorRetryPolicy
 6 public boolean canRetry(RetryContext context) {
 7     LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
 8     if (lbContext.getRetryCount() == 0 && lbContext.getServiceInstance() == null) {
 9         // 獲取 Server
10         lbContext.setServiceInstance(this.serviceInstanceChooser.choose(this.serviceName));
11         return true;
12     }
13     // RibbonLoadBalancedRetryPolicy => 重試下一個Server
14     return this.policy.canRetryNextServer(lbContext);
15 }
16 
17 ///////// RibbonLoadBalancedRetryPolicy
18 public boolean canRetryNextServer(LoadBalancedRetryContext context) {
19     // 判斷重試下一個Server
20     return nextServerCount <= lbContext.getRetryHandler().getMaxRetriesOnNextServer()
21             && canRetry(context);
22 }
23 
24 public boolean canRetry(LoadBalancedRetryContext context) {
25     // GET 請求或者允許所有操作重試時,就允許重試
26     HttpMethod method = context.getRequest().getMethod();
27     return HttpMethod.GET == method || lbContext.isOkToRetryOnAllOperations();
28 }

接着看請求失敗后的注冊異常 registerThrowable,它最后會向 RibbonLoadBalancedRetryPolicy 注冊異常。在 RibbonLoadBalancedRetryPolicy 的 registerThrowable 方法中,如果不重試同一個Server且可以重試下一個Server,就會輪詢獲取下一個Server。如果可以在同一個Server上重試,sameServerCount 計數器就+1,否則重置 sameServerCount,然后 nextServerCount +1。

 1 protected void registerThrowable(RetryPolicy retryPolicy, RetryState state,
 2         RetryContext context, Throwable e) {
 3     retryPolicy.registerThrowable(context, e);
 4     registerContext(context, state);
 5 }
 6 
 7 ///////// InterceptorRetryPolicy /////////
 8 public void registerThrowable(RetryContext context, Throwable throwable) {
 9     LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;
10     lbContext.registerThrowable(throwable);
11     // RibbonLoadBalancedRetryPolicy
12     this.policy.registerThrowable(lbContext, throwable);
13 }
14 
15 ///////// RibbonLoadBalancedRetryPolicy /////////
16 public void registerThrowable(LoadBalancedRetryContext context, Throwable throwable) {
17     //...
18     // 如果不在在同一個Server 上重試且可以重試下一個Server,則重新選擇一個 Server
19     if (!canRetrySameServer(context) && canRetryNextServer(context)) {
20         context.setServiceInstance(loadBalanceChooser.choose(serviceId));
21     }
22 
23     // 同一個Server重試超過設置的值后,就重置 sameServerCount
24     if (sameServerCount >= lbContext.getRetryHandler().getMaxRetriesOnSameServer()
25             && canRetry(context)) {
26         // 重置 nextServerCount
27         sameServerCount = 0;
28         // 下一個Server重試次數+1
29         nextServerCount++;
30         if (!canRetryNextServer(context)) {
31             // 不能重試下一個Server了
32             context.setExhaustedOnly();
33         }
34     }
35     else {
36         // 同一個Server重試次數+1
37         sameServerCount++;
38     }
39 }
40 
41 // 判斷是否重試同一個Server
42 public boolean canRetrySameServer(LoadBalancedRetryContext context) {
43     return sameServerCount < lbContext.getRetryHandler().getMaxRetriesOnSameServer()
44             && canRetry(context);
45 }
46 
47 public boolean canRetry(LoadBalancedRetryContext context) {
48     // GET 請求或者允許所有操作重試時,就允許重試
49     HttpMethod method = context.getRequest().getMethod();
50     return HttpMethod.GET == method || lbContext.isOkToRetryOnAllOperations();
51 }

5、Ribbon 重試總結

① 首先,Ribbon 關於超時和重試的配置參數如下,這些參數也可以針對某個客戶端配置:

 1 ribbon:
 2   # 客戶端讀取超時時間
 3   ReadTimeout: 1000
 4   # 客戶端連接超時時間
 5   ConnectTimeout: 1000
 6   # 默認只重試 GET,設置為 true 時將重試所有類型,如 POST、PUT、DELETE
 7   OkToRetryOnAllOperations: false
 8   # 同一個Server重試次數
 9   MaxAutoRetries: 1
10   # 最多重試幾個Server
11   MaxAutoRetriesNextServer: 1

② RetryTemplate 是 spring-retry 的重試組件,LoadBalancerCommand 是 Ribbon 的重試組件。它們重試的請求次數是一樣的,重試邏輯也是類似,都是先重試當前 Server,再重試下一個Server,總的請求次數 = (MaxAutoRetries + 1) * (MaxAutoRetriesNextServer + 1)。

③ 但是有點差別的是,RetryTemplate 會判斷請求方法為 GET 或者 OkToRetryOnAllOperations=true 時才允許重試,而 LoadBalancerCommand  是GET方法都可以重試,非GET方法在拋出連接異常時也可以重試。這個要注意下,一般只有GET才允許重試,因為GET是查詢操作,接口是冪等的,而POST、PUT、DELETE一般是非冪等的。所以一般更建議使用 RetryTemplate,並且配置 OkToRetryOnAllOperations=false。

④ 為了提升服務間通信性能,一般可以啟用 apache httpclient 或者 OkHttp,如果要啟用重試功能,還需要引入 spring-retry 依賴。重試時,當前Server就不要重試了(MaxAutoRetries=0),直接重試下一個Server。

 1 ribbon:
 2   # 客戶端讀取超時時間
 3   ReadTimeout: 1000
 4   # 客戶端連接超時時間
 5   ConnectTimeout: 1000
 6   # 默認只重試 GET,設置為 true 時將重試所有類型,如 POST、PUT、DELETE
 7   OkToRetryOnAllOperations: false
 8   # 同一個Server重試次數
 9   MaxAutoRetries: 0
10   # 最多重試幾個Server
11   MaxAutoRetriesNextServer: 1
12   # 啟用 httpclient
13   httpclient:
14     enabled: false
15   # 啟用 RestClient
16   restclient:
17     enabled: false
18   # 啟用 okhttp
19   okhttp:
20     enabled: true

八、Ribbon 架構圖

最后,將 Ribbon 核心組件架構用兩張類圖總結下。

① 負載均衡器 ILoadBalancer

② 負載均衡客戶端

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM