從 9.1 客戶端發起請求源碼 的客戶端請求總體流程圖中,截取部分如下:
//代理發出請求
proxy0.sayHello(String paramString)
-->InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args)
-->new RpcInvocation(method, args)
-->MockClusterInvoker.invoke(Invocation invocation)//服務降級的地方
dubbo就是通過MockClusterInvoker來實現服務降級的。
一、示例
1 public interface DemoService { 2 // String sayHello(String name); 3 Car sayHello(String name); 4 }
將dubbo-demo中的服務接口定義一個返回模型Car。提供者實現如下:
1 public class DemoServiceImpl implements DemoService { 2 public Car sayHello(String name) { 3 Car car = new Car(); 4 car.setCarNum("浙A10000"); 5 car.setGoMile(100); 6 return car; 7 } 8 }
消費者使用如下:
1 public class Consumer { 2 public static void main(String[] args) { 3 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"}); 4 context.start(); 5 DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy 6 7 while (true) { 8 try { 9 Thread.sleep(1000); 10 Car hello = demoService.sayHello("world"); // call remote method 11 System.out.println(hello.getCarNum() + "-" + hello.getGoMile()); // get result 12 } catch (Throwable throwable) { 13 throwable.printStackTrace(); 14 } 15 } 16 } 17 }
二、使用方式
實際使用中,會通過直接在dubbo-admin中設置服務降級策略,這里使用dubbo用戶手冊中的方式來更清晰的看一下服務降級的配置(實際上就是進行配置覆蓋)
配置規則:
1、使用自定義mock類(接口名+Mock)
- mock = default => DemoServiceMock
- mock = true => DemoServiceMock
- mock = fail => DemoServiceMock
- mock = force => DemoServiceMock
2、先普通執行,執行失敗之后再執行相應的mock邏輯
- mock = fail:throw => throw new RpcException(" mocked exception for Service degradation. ");
- mock = fail:throw XxxException => throw new RpcException(RpcException.BIZ_EXCEPTION, XxxException);
- mock = fail:return => return null
- mock = fail:return xxx => return xxx
- mock = fail:return empty => return new Car()
3、直接執行相應的mock邏輯
- mock = force:throw => throw new RpcException(" mocked exception for Service degradation. ");
- mock = force:throw XxxException => throw new RpcException(RpcException.BIZ_EXCEPTION, XxxException);
- mock = force:return => return null
- mock = force:return xxx => return xxx
- mock = force:return empty => return new Car()
進行配置:
1 public class DegradeTest { 2 public static void main(String[] args) { 3 RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension(); 4 Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://10.211.55.5:2181")); 5 // return null; 6 registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:return")); 7 registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:return+null")); 8 // return 空對象; 9 registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:return+empty")); 10 // return value; 11 registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:return+hello")); 12 // throw exception 13 registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:throw")); 14 // throw custom-msg exception 15 registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:throw+com.alibaba.dubbo.Test.MyRuntimeException")); 16 // 執行mock類 17 registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:com.alibaba.dubbo.demo.DemoServiceMock")); 18 } 19 }
上述需要注意的是需要配置為“force:return+null”的格式而非“force:return null”。(實際上空格的url encode就是+號),上述代碼的執行,實際上是在zk上創建configurators的子節點:
關於覆蓋配置:http://dubbo.io/books/dubbo-user-book/demos/config-rule.html
override://
表示數據采用覆蓋方式,支持override
和absent
,可擴展,必填。0.0.0.0
表示對所有 IP 地址生效,如果只想覆蓋某個 IP 的數據,請填入具體 IP,必填。
表示只對指定服務生效,必填。com.alibaba.dubbo.demo.DemoService
category=configurators
表示該數據為動態配置類型,必填。dynamic=false
表示該數據為持久數據,當注冊方退出時,數據依然保存在注冊中心,必填。enabled=true
覆蓋規則是否生效,可不填,缺省生效。application=demo-consumer
表示只對指定應用生效,可不填,表示對所有應用生效。
表示將滿足以上條件的 mock 參數的值覆蓋為 force:return+null。如果想覆蓋其它參數,直接加在mock=force:return+null
override
的 URL 參數上。
三、源碼分析
1 public class MockClusterInvoker<T> implements Invoker<T> { 2 private final Directory<T> directory; //RegistryDirectory:存儲invoker列表 3 private final Invoker<T> invoker; //FailoverClusterInvoker:容錯策略 4 5 public Result invoke(Invocation invocation) throws RpcException { 6 Result result = null; 7 8 String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 9 if (value.length() == 0 || value.equalsIgnoreCase("false")) { 10 //no mock 11 result = this.invoker.invoke(invocation); 12 } else if (value.startsWith("force")) { 13 ... 14 //force:direct mock 15 result = doMockInvoke(invocation, null); 16 } else { 17 //fail-mock 18 try { 19 result = this.invoker.invoke(invocation); 20 } catch (RpcException e) { 21 if (e.isBiz()) { 22 throw e; 23 } else { 24 ... 25 result = doMockInvoke(invocation, e); 26 } 27 } 28 } 29 return result; 30 } 31 }
首先去獲取mock參數,
- 如果沒有配置,則直接使用FailoverClusterInvoker去正常的向provider發出請求;
- 如果配置為以force開頭的,則直接執行doMockInvoke(Invocation invocation, RpcException e),不再向provider發送請求;
- 如果配置為以fail開頭的,則先使用FailoverClusterInvoker去正常的向provider發出請求,如果失敗拋出了非業務異常,則執行doMockInvoke(Invocation invocation, RpcException e);
1 private Result doMockInvoke(Invocation invocation, RpcException e) { 2 Result result = null; 3 Invoker<T> minvoker; 4 5 List<Invoker<T>> mockInvokers = selectMockInvoker(invocation); //獲取mock類型的Invoker 6 if (mockInvokers == null || mockInvokers.size() == 0) { 7 minvoker = (Invoker<T>) new MockInvoker(directory.getUrl()); //如果沒有配置mock類型的Invoker,則自己創建一個MockInvoker 8 } else { 9 minvoker = mockInvokers.get(0); 10 } 11 try { 12 result = minvoker.invoke(invocation); //執行MockInvoker的invoke(Invocation invocation)方法 13 } catch (RpcException me) { 14 if (me.isBiz()) { 15 result = new RpcResult(me.getCause()); 16 } else { //非業務異常 17 throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause()); 18 } 19 } catch (Throwable me) { 20 throw new RpcException(getMockExceptionMessage(e, me), me.getCause()); 21 } 22 return result; 23 }
從RegistryDirectory中獲取MockInvoker:
1 /** 2 * Return MockInvoker 3 * Contract: 4 * directory.list() will return a list of normal invokers if Constants.INVOCATION_NEED_MOCK is present in invocation, otherwise, a list of mock invokers will return. 5 * if directory.list() returns more than one mock invoker, only one of them will be used. 6 * 7 * @param invocation 8 * @return 9 */ 10 private List<Invoker<T>> selectMockInvoker(Invocation invocation) { 11 List<Invoker<T>> invokers = null; 12 //TODO generic invoker? 13 if (invocation instanceof RpcInvocation) { 14 //Note the implicit contract (although the description is added to the interface declaration, but extensibility is a problem. The practice placed in the attachement needs to be improved) 15 ((RpcInvocation) invocation).setAttachment(Constants.INVOCATION_NEED_MOCK, Boolean.TRUE.toString()); 16 //directory will return a list of normal invokers if Constants.INVOCATION_NEED_MOCK is present in invocation, otherwise, a list of mock invokers will return. 17 try { 18 invokers = directory.list(invocation); 19 } catch (RpcException e) { 20 if (logger.isInfoEnabled()) { 21 logger.info("Exception when try to invoke mock. Get mock invokers error for service:" 22 + directory.getUrl().getServiceInterface() + ", method:" + invocation.getMethodName() 23 + ", will contruct a new mock with 'new MockInvoker()'.", e); 24 } 25 } 26 } 27 return invokers; 28 }
首先使用RegistryDirectory獲取出方法名為sayHello的Invoker列表,之后使用MockInvokersSelector(Router)選取出MockInvoker。
1 public class MockInvokersSelector implements Router { 2 3 public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers, 4 URL url, final Invocation invocation) throws RpcException { 5 if (invocation.getAttachments() == null) { 6 return getNormalInvokers(invokers); 7 } else { 8 String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK); 9 if (value == null) 10 return getNormalInvokers(invokers); 11 else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) { 12 return getMockedInvokers(invokers); 13 } 14 } 15 return invokers; 16 } 17 18 private <T> List<Invoker<T>> getMockedInvokers(final List<Invoker<T>> invokers) { 19 if (!hasMockProviders(invokers)) { 20 return null; 21 } 22 List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(1); 23 for (Invoker<T> invoker : invokers) { 24 if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) { 25 sInvokers.add(invoker); 26 } 27 } 28 return sInvokers; 29 } 30 31 private <T> boolean hasMockProviders(final List<Invoker<T>> invokers) { 32 boolean hasMockProvider = false; 33 for (Invoker<T> invoker : invokers) { 34 if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) { 35 hasMockProvider = true; 36 break; 37 } 38 } 39 return hasMockProvider; 40 } 41 }
這里獲取到的是空列表。
所以會先創建一個MockInvoker對象,之后執行其invoker方法。
MockInvoker:
1 public Result invoke(Invocation invocation) throws RpcException { 2 String mock = getUrl().getParameter(invocation.getMethodName() + "." + Constants.MOCK_KEY); //sayHello.mock 3 if (invocation instanceof RpcInvocation) { 4 ((RpcInvocation) invocation).setInvoker(this); 5 } 6 if (StringUtils.isBlank(mock)) { 7 mock = getUrl().getParameter(Constants.MOCK_KEY); //mock 8 } 9 10 if (StringUtils.isBlank(mock)) { 11 throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url)); 12 } 13 mock = normallizeMock(URL.decode(mock)); 14 if (Constants.RETURN_PREFIX.trim().equalsIgnoreCase(mock.trim())) { // return 15 RpcResult result = new RpcResult(); 16 result.setValue(null); 17 return result; 18 } else if (mock.startsWith(Constants.RETURN_PREFIX)) { // return value(包括return null) 19 mock = mock.substring(Constants.RETURN_PREFIX.length()).trim(); 20 mock = mock.replace('`', '"'); 21 try { 22 Type[] returnTypes = RpcUtils.getReturnTypes(invocation); 23 Object value = parseMockValue(mock, returnTypes); 24 return new RpcResult(value); 25 } catch (Exception ew) { 26 throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: " + url, ew); 27 } 28 } else if (mock.startsWith(Constants.THROW_PREFIX)) { // throw xxx 29 mock = mock.substring(Constants.THROW_PREFIX.length()).trim(); 30 mock = mock.replace('`', '"'); 31 if (StringUtils.isBlank(mock)) {// throw 32 throw new RpcException(" mocked exception for Service degradation. "); 33 } else { // user customized class : throw xxx 34 Throwable t = getThrowable(mock); 35 throw new RpcException(RpcException.BIZ_EXCEPTION, t); 36 } 37 } else { //impl mock: 自定義mock類 38 try { 39 Invoker<T> invoker = getInvoker(mock); 40 return invoker.invoke(invocation); 41 } catch (Throwable t) { 42 throw new RpcException("Failed to create mock implemention class " + mock, t); 43 } 44 } 45 }
首先獲取到mock配置,例如:mock=force:return+null,之后進行url解碼為mock=force:return null,最后進行處理為mock=return null,然后根據規則走分支。
mock參數的處理函數:
1 /** 2 * 一、使用自定義mock類 3 * mock = default => DemoServiceMock 4 * mock = true => DemoServiceMock 5 * mock = fail => DemoServiceMock 6 * mock = force => DemoServiceMock 7 * 8 * 二、先普通執行,執行失敗之后再執行相應的mock邏輯 9 * mock = fail:throw => throw new RpcException(" mocked exception for Service degradation. "); 10 * mock = fail:throw XxxException => throw new RpcException(RpcException.BIZ_EXCEPTION, XxxException); 11 * mock = fail:return => return null 12 * mock = fail:return xxx => return xxx 13 * 14 * 三、直接執行相應的mock邏輯 15 * mock = force:throw => throw new RpcException(" mocked exception for Service degradation. "); 16 * mock = force:throw XxxException => throw new RpcException(RpcException.BIZ_EXCEPTION, XxxException); 17 * mock = force:return => return null 18 * mock = force:return xxx => return xxx 19 * 20 * @param mock 21 * @return 22 */ 23 private String normallizeMock(String mock) { 24 if (mock == null || mock.trim().length() == 0) { 25 return mock; 26 } else if (ConfigUtils.isDefault(mock) || "fail".equalsIgnoreCase(mock.trim()) || "force".equalsIgnoreCase(mock.trim())) { 27 mock = url.getServiceInterface() + "Mock"; 28 } 29 if (mock.startsWith(Constants.FAIL_PREFIX)) { 30 mock = mock.substring(Constants.FAIL_PREFIX.length()).trim(); 31 } else if (mock.startsWith(Constants.FORCE_PREFIX)) { 32 mock = mock.substring(Constants.FORCE_PREFIX.length()).trim(); 33 } 34 return mock; 35 }
我們這里來看一下自定義mock類。消費端編寫:
1 public class DemoServiceMock implements DemoService { 2 3 @Override 4 public Car sayHello(String name) { 5 Car car = new Car(); 6 car.setCarNum("mock中"); 7 car.setGoMile(666); 8 return car; 9 } 10 }
配置覆蓋:
1 registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:com.alibaba.dubbo.demo.DemoServiceMock"));
MockInvoker.invoke
1 try { 2 Invoker<T> invoker = getInvoker(mock); 3 return invoker.invoke(invocation); 4 } catch (Throwable t) { 5 throw new RpcException("Failed to create mock implemention class " + mock, t); 6 }
1 private Invoker<T> getInvoker(String mockService) { 2 Invoker<T> invoker = (Invoker<T>) mocks.get(mockService); 3 if (invoker != null) { 4 return invoker; 5 } else { 6 Class<T> serviceType = (Class<T>) ReflectUtils.forName(url.getServiceInterface()); 7 if (ConfigUtils.isDefault(mockService)) { 8 mockService = serviceType.getName() + "Mock"; 9 } 10 11 Class<?> mockClass = ReflectUtils.forName(mockService); 12 if (!serviceType.isAssignableFrom(mockClass)) { 13 throw new IllegalArgumentException("The mock implemention class " + mockClass.getName() + " not implement interface " + serviceType.getName()); 14 } 15 16 if (!serviceType.isAssignableFrom(mockClass)) { 17 throw new IllegalArgumentException("The mock implemention class " + mockClass.getName() + " not implement interface " + serviceType.getName()); 18 } 19 try { 20 T mockObject = (T) mockClass.newInstance(); // 獲取自定義mock類實例 21 invoker = proxyFactory.getInvoker(mockObject, (Class<T>) serviceType, url); // 和普通類一樣創建Invoker 22 if (mocks.size() < 10000) { 23 mocks.put(mockService, invoker); 24 } 25 return invoker; 26 } catch (InstantiationException e) { 27 throw new IllegalStateException("No such empty constructor \"public " + mockClass.getSimpleName() + "()\" in mock implemention class " + mockClass.getName(), e); 28 } catch (IllegalAccessException e) { 29 throw new IllegalStateException(e); 30 } 31 } 32 }
上邊看了return和自定義mock類,最后來看一下throw異常。
默認拋出RpcException,異常信息:mocked exception for Service degradation. 也可以自定義異常,例如:
1 public class MyRuntimeException extends RuntimeException { 2 private String msg; 3 4 public MyRuntimeException(String msg){ 5 this.msg = msg; 6 } 7 }
自定義異常必須具有單參構造器且參數為String。
配置覆蓋:
MockInvoker.invoke
1 registry.register(URL.valueOf("override://0.0.0.0/com.alibaba.dubbo.demo.DemoService?category=configurators&dynamic=false&application=demo-consumer&mock=force:throw+com.alibaba.dubbo.Test.MyRuntimeException"));
1 private Throwable getThrowable(String throwstr) { 2 Throwable throwable = (Throwable) throwables.get(throwstr); 3 if (throwable != null) { 4 return throwable; 5 } else { 6 Throwable t = null; 7 try { 8 Class<?> bizException = ReflectUtils.forName(throwstr); 9 Constructor<?> constructor; 10 constructor = ReflectUtils.findConstructor(bizException, String.class); 11 t = (Throwable) constructor.newInstance(new Object[]{" mocked exception for Service degradation. "}); 12 if (throwables.size() < 1000) { 13 throwables.put(throwstr, t); 14 } 15 } catch (Exception e) { 16 throw new RpcException("mock throw error :" + throwstr + " argument error.", e); 17 } 18 return t; 19 } 20 }
服務降級結束!!!