Hystrix使用詳解


原文參考:http://hot66hot.iteye.com/blog/2155036

 

一:為什么需要Hystrix?

在大中型分布式系統中,通常系統很多依賴(HTTP,hession,Netty,Dubbo等),如下圖:

 

在高並發訪問下,這些依賴的穩定性與否對系統的影響非常大,但是依賴有很多不可控問題:如網絡連接緩慢,資源繁忙,暫時不可用,服務脫機等.

如下圖:QPS為50的依賴 I 出現不可用,但是其他依賴仍然可用.

 

當依賴I 阻塞時,大多數服務器的線程池就出現阻塞(BLOCK),影響整個線上服務的穩定性.如下圖:

 

在復雜的分布式架構的應用程序有很多的依賴,都會不可避免地在某些時候失敗。高並發的依賴失敗時如果沒有隔離措施,當前應用服務就有被拖垮的風險。

 

Java代碼   收藏代碼
  1. 例如:一個依賴30個SOA服務的系統,每個服務99.99%可用。  
  2. 99.99%的30次方 ≈ 99.7%  
  3. 0.3% 意味着一億次請求 會有 3,000,00次失敗  
  4. 換算成時間大約每月有2個小時服務不穩定.  
  5. 隨着服務依賴數量的變多,服務不穩定的概率會成指數性提高.  

 解決問題方案:對依賴做隔離,Hystrix就是處理依賴隔離的框架,同時也是可以幫我們做依賴服務的治理和監控.

 

Netflix 公司開發並成功使用Hystrix,使用規模如下:

 

Java代碼   收藏代碼
  1. The Netflix API processes 10+ billion HystrixCommand executions per day using thread isolation.   
  2. Each API instance has 40+ thread-pools with 5-20 threads in each (most are set to 10).  

二:Hystrix如何解決依賴隔離

1:Hystrix使用命令模式HystrixCommand(Command)包裝依賴調用邏輯,每個命令在單獨線程中/信號授權下執行。

2:可配置依賴調用超時時間,超時時間一般設為比99.5%平均時間略高即可.當調用超時時,直接返回或執行fallback邏輯。

3:為每個依賴提供一個小的線程池(或信號),如果線程池已滿調用將被立即拒絕,默認不采用排隊.加速失敗判定時間。

4:依賴調用結果分:成功,失敗(拋出異常),超時,線程拒絕,短路。 請求失敗(異常,拒絕,超時,短路)時執行fallback(降級)邏輯。

5:提供熔斷器組件,可以自動運行或手動調用,停止當前依賴一段時間(10秒),熔斷器默認錯誤率閾值為50%,超過將自動運行。

6:提供近實時依賴的統計和監控

Hystrix依賴的隔離架構,如下圖:

三:如何使用Hystrix

1:使用maven引入Hystrix依賴

 

 1 <!-- 依賴版本 -->
 2 <hystrix.version>1.3.16</hystrix.version>
 3 <hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version> 
 4  
 5 <dependency>
 6      <groupId>com.netflix.hystrix</groupId>
 7      <artifactId>hystrix-core</artifactId>
 8      <version>${hystrix.version}</version>
 9  </dependency>
10      <dependency>
11      <groupId>com.netflix.hystrix</groupId>
12      <artifactId>hystrix-metrics-event-stream</artifactId>
13      <version>${hystrix-metrics-event-stream.version}</version>
14  </dependency>
15 <!-- 倉庫地址 -->
16 <repository>
17      <id>nexus</id>
18      <name>local private nexus</name>
19      <url>http://maven.oschina.net/content/groups/public/</url>
20      <releases>
21           <enabled>true</enabled>
22      </releases>
23      <snapshots>
24           <enabled>false</enabled>
25      </snapshots>
26 </repository>
View Code

 

2:使用命令模式封裝依賴邏輯

 

 1 public class HelloWorldCommand extends HystrixCommand<String> {
 2     private final String name;
 3     public HelloWorldCommand(String name) {
 4         //最少配置:指定命令組名(CommandGroup)
 5         super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
 6         this.name = name;
 7     }
 8     @Override
 9     protected String run() {
10         // 依賴邏輯封裝在run()方法中
11         return "Hello " + name +" thread:" + Thread.currentThread().getName();
12     }
13     //調用實例
14     public static void main(String[] args) throws Exception{
15         //每個Command對象只能調用一次,不可以重復調用,
16         //重復調用對應異常信息:This instance can only be executed once. Please instantiate a new instance.
17         HelloWorldCommand helloWorldCommand = new HelloWorldCommand("Synchronous-hystrix");
18         //使用execute()同步調用代碼,效果等同於:helloWorldCommand.queue().get(); 
19         String result = helloWorldCommand.execute();
20         System.out.println("result=" + result);
21  
22         helloWorldCommand = new HelloWorldCommand("Asynchronous-hystrix");
23         //異步調用,可自由控制獲取結果時機,
24         Future<String> future = helloWorldCommand.queue();
25         //get操作不能超過command定義的超時時間,默認:1秒
26         result = future.get(100, TimeUnit.MILLISECONDS);
27         System.out.println("result=" + result);
28         System.out.println("mainThread=" + Thread.currentThread().getName());
29     }
30      
31 }
32     //運行結果: run()方法在不同的線程下執行
33     // result=Hello Synchronous-hystrix thread:hystrix-HelloWorldGroup-1
34     // result=Hello Asynchronous-hystrix thread:hystrix-HelloWorldGroup-2
35     // mainThread=main
View Code

 

 note:異步調用使用 command.queue()get(timeout, TimeUnit.MILLISECONDS);同步調用使用command.execute() 等同於 command.queue().get();

3:注冊異步事件回調執行

 1 //注冊觀察者事件攔截
 2 Observable<String> fs = new HelloWorldCommand("World").observe();
 3 //注冊結果回調事件
 4 fs.subscribe(new Action1<String>() {
 5     @Override
 6     public void call(String result) {
 7          //執行結果處理,result 為HelloWorldCommand返回的結果
 8         //用戶對結果做二次處理.
 9     }
10 });
11 //注冊完整執行生命周期事件
12 fs.subscribe(new Observer<String>() {
13             @Override
14             public void onCompleted() {
15                 // onNext/onError完成之后最后回調
16                 System.out.println("execute onCompleted");
17             }
18             @Override
19             public void onError(Throwable e) {
20                 // 當產生異常時回調
21                 System.out.println("onError " + e.getMessage());
22                 e.printStackTrace();
23             }
24             @Override
25             public void onNext(String v) {
26                 // 獲取結果后回調
27                 System.out.println("onNext: " + v);
28             }
29         });
30 /* 運行結果
31 call execute result=Hello observe-hystrix thread:hystrix-HelloWorldGroup-3
32 onNext: Hello observe-hystrix thread:hystrix-HelloWorldGroup-3
33 execute onCompleted
34 */
View Code

 

4:使用Fallback() 提供降級策略

 

 1 //重載HystrixCommand 的getFallback方法實現邏輯
 2 public class HelloWorldCommand extends HystrixCommand<String> {
 3     private final String name;
 4     public HelloWorldCommand(String name) {
 5         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
 6                 /* 配置依賴超時時間,500毫秒*/
 7                 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(500)));
 8         this.name = name;
 9     }
10     @Override
11     protected String getFallback() {
12         return "exeucute Falled";
13     }
14     @Override
15     protected String run() throws Exception {
16         //sleep 1 秒,調用會超時
17         TimeUnit.MILLISECONDS.sleep(1000);
18         return "Hello " + name +" thread:" + Thread.currentThread().getName();
19     }
20     public static void main(String[] args) throws Exception{
21         HelloWorldCommand command = new HelloWorldCommand("test-Fallback");
22         String result = command.execute();
23     }
24 }
25 /* 運行結果:getFallback() 調用運行
26 getFallback executed
27 */
View Code

 

NOTE: 除了HystrixBadRequestException異常之外,所有從run()方法拋出的異常都算作失敗,並觸發降級getFallback()和斷路器邏輯。

          HystrixBadRequestException用在非法參數或非系統故障異常等不應觸發回退邏輯的場景。

5:依賴命名:CommandKey

1 public HelloWorldCommand(String name) {
2         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
3                 /* HystrixCommandKey工廠定義依賴名稱 */
4                 .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld")));
5         this.name = name;
6     }
View Code

 

 NOTE: 每個CommandKey代表一個依賴抽象,相同的依賴要使用相同的CommandKey名稱。依賴隔離的根本就是對相同CommandKey的依賴做隔離.

6:依賴分組:CommandGroup

命令分組用於對依賴操作分組,便於統計,匯總等.

1 //使用HystrixCommandGroupKey工廠定義
2 public HelloWorldCommand(String name) {
3     Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
4 }
View Code

 

 NOTE: CommandGroup是每個命令最少配置的必選參數,在不指定ThreadPoolKey的情況下,字面值用於對不同依賴的線程池/信號區分.

7:線程池/信號:ThreadPoolKey

1 public HelloWorldCommand(String name) {
2         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
3                 .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))
4                 /* 使用HystrixThreadPoolKey工廠定義線程池名稱*/
5                 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
6         this.name = name;
7     }
View Code

 NOTE: 當對同一業務依賴做隔離時使用CommandGroup做區分,但是對同一依賴的不同遠程調用如(一個是redis 一個是http),可以使用HystrixThreadPoolKey做隔離區分.

           最然在業務上都是相同的組,但是需要在資源上做隔離時,可以使用HystrixThreadPoolKey區分.

8:請求緩存 Request-Cache

 1 public class RequestCacheCommand extends HystrixCommand<String> {
 2     private final int id;
 3     public RequestCacheCommand( int id) {
 4         super(HystrixCommandGroupKey.Factory.asKey("RequestCacheCommand"));
 5         this.id = id;
 6     }
 7     @Override
 8     protected String run() throws Exception {
 9         System.out.println(Thread.currentThread().getName() + " execute id=" + id);
10         return "executed=" + id;
11     }
12     //重寫getCacheKey方法,實現區分不同請求的邏輯
13     @Override
14     protected String getCacheKey() {
15         return String.valueOf(id);
16     }
17  
18     public static void main(String[] args){
19         HystrixRequestContext context = HystrixRequestContext.initializeContext();
20         try {
21             RequestCacheCommand command2a = new RequestCacheCommand(2);
22             RequestCacheCommand command2b = new RequestCacheCommand(2);
23             Assert.assertTrue(command2a.execute());
24             //isResponseFromCache判定是否是在緩存中獲取結果
25             Assert.assertFalse(command2a.isResponseFromCache());
26             Assert.assertTrue(command2b.execute());
27             Assert.assertTrue(command2b.isResponseFromCache());
28         } finally {
29             context.shutdown();
30         }
31         context = HystrixRequestContext.initializeContext();
32         try {
33             RequestCacheCommand command3b = new RequestCacheCommand(2);
34             Assert.assertTrue(command3b.execute());
35             Assert.assertFalse(command3b.isResponseFromCache());
36         } finally {
37             context.shutdown();
38         }
39     }
40 }
View Code

 

 NOTE:請求緩存可以讓(CommandKey/CommandGroup)相同的情況下,直接共享結果,降低依賴調用次數,在高並發和CacheKey碰撞率高場景下可以提升性能.

Servlet容器中,可以直接實用Filter機制Hystrix請求上下文

 1 public class HystrixRequestContextServletFilter implements Filter {
 2     public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) 
 3      throws IOException, ServletException {
 4         HystrixRequestContext context = HystrixRequestContext.initializeContext();
 5         try {
 6             chain.doFilter(request, response);
 7         } finally {
 8             context.shutdown();
 9         }
10     }
11 }
12 <filter>
13       <display-name>HystrixRequestContextServletFilter</display-name>
14       <filter-name>HystrixRequestContextServletFilter</filter-name>
15       <filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class>
16     </filter>
17     <filter-mapping>
18       <filter-name>HystrixRequestContextServletFilter</filter-name>
19       <url-pattern>/*</url-pattern>
20    </filter-mapping>
View Code

 

9:信號量隔離:SEMAPHORE

  隔離本地代碼或可快速返回遠程調用(如memcached,redis)可以直接使用信號量隔離,降低線程隔離開銷.

 1 public class HelloWorldCommand extends HystrixCommand<String> {
 2     private final String name;
 3     public HelloWorldCommand(String name) {
 4         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
 5                 /* 配置信號量隔離方式,默認采用線程池隔離 */
 6                 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)));
 7         this.name = name;
 8     }
 9     @Override
10     protected String run() throws Exception {
11         return "HystrixThread:" + Thread.currentThread().getName();
12     }
13     public static void main(String[] args) throws Exception{
14         HelloWorldCommand command = new HelloWorldCommand("semaphore");
15         String result = command.execute();
16         System.out.println(result);
17         System.out.println("MainThread:" + Thread.currentThread().getName());
18     }
19 }
20 /** 運行結果
21  HystrixThread:main
22  MainThread:main
23 */
View Code

 

10:fallback降級邏輯命令嵌套

 

  適用場景:用於fallback邏輯涉及網絡訪問的情況,如緩存訪問。

 

 1 public class CommandWithFallbackViaNetwork extends HystrixCommand<String> {
 2     private final int id;
 3  
 4     protected CommandWithFallbackViaNetwork(int id) {
 5         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
 6                 .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueCommand")));
 7         this.id = id;
 8     }
 9  
10     @Override
11     protected String run() {
12         // RemoteService.getValue(id);
13         throw new RuntimeException("force failure for example");
14     }
15  
16     @Override
17     protected String getFallback() {
18         return new FallbackViaNetwork(id).execute();
19     }
20  
21     private static class FallbackViaNetwork extends HystrixCommand<String> {
22         private final int id;
23         public FallbackViaNetwork(int id) {
24             super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX"))
25                     .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand"))
26                     // 使用不同的線程池做隔離,防止上層線程池跑滿,影響降級邏輯.
27                     .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback")));
28             this.id = id;
29         }
30         @Override
31         protected String run() {
32             MemCacheClient.getValue(id);
33         }
34  
35         @Override
36         protected String getFallback() {
37             return null;
38         }
39     }
40 }
View Code

 

 NOTE:依賴調用和降級調用使用不同的線程池做隔離,防止上層線程池跑滿,影響二級降級邏輯調用.

 11:顯示調用fallback邏輯,用於特殊業務處理

 

 1 public class CommandFacadeWithPrimarySecondary extends HystrixCommand<String> {
 2     private final static DynamicBooleanProperty usePrimary = DynamicPropertyFactory.getInstance().getBooleanProperty("primarySecondary.usePrimary", true);
 3     private final int id;
 4     public CommandFacadeWithPrimarySecondary(int id) {
 5         super(Setter
 6                 .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
 7                 .andCommandKey(HystrixCommandKey.Factory.asKey("PrimarySecondaryCommand"))
 8                 .andCommandPropertiesDefaults(
 9                         HystrixCommandProperties.Setter()
10                                 .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
11         this.id = id;
12     }
13     @Override
14     protected String run() {
15         if (usePrimary.get()) {
16             return new PrimaryCommand(id).execute();
17         } else {
18             return new SecondaryCommand(id).execute();
19         }
20     }
21     @Override
22     protected String getFallback() {
23         return "static-fallback-" + id;
24     }
25     @Override
26     protected String getCacheKey() {
27         return String.valueOf(id);
28     }
29     private static class PrimaryCommand extends HystrixCommand<String> {
30         private final int id;
31         private PrimaryCommand(int id) {
32             super(Setter
33                     .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
34                     .andCommandKey(HystrixCommandKey.Factory.asKey("PrimaryCommand"))
35                     .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("PrimaryCommand"))
36                     .andCommandPropertiesDefaults(
37                             // we default to a 600ms timeout for primary
38                             HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(600)));
39             this.id = id;
40         }
41         @Override
42         protected String run() {
43             // perform expensive 'primary' service call
44             return "responseFromPrimary-" + id;
45         }
46     }
47     private static class SecondaryCommand extends HystrixCommand<String> {
48         private final int id;
49         private SecondaryCommand(int id) {
50             super(Setter
51                     .withGroupKey(HystrixCommandGroupKey.Factory.asKey("SystemX"))
52                     .andCommandKey(HystrixCommandKey.Factory.asKey("SecondaryCommand"))
53                     .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SecondaryCommand"))
54                     .andCommandPropertiesDefaults(
55                             // we default to a 100ms timeout for secondary
56                             HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100)));
57             this.id = id;
58         }
59         @Override
60         protected String run() {
61             // perform fast 'secondary' service call
62             return "responseFromSecondary-" + id;
63         }
64     }
65     public static class UnitTest {
66         @Test
67         public void testPrimary() {
68             HystrixRequestContext context = HystrixRequestContext.initializeContext();
69             try {
70                 ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", true);
71                 assertEquals("responseFromPrimary-20", new CommandFacadeWithPrimarySecondary(20).execute());
72             } finally {
73                 context.shutdown();
74                 ConfigurationManager.getConfigInstance().clear();
75             }
76         }
77         @Test
78         public void testSecondary() {
79             HystrixRequestContext context = HystrixRequestContext.initializeContext();
80             try {
81                 ConfigurationManager.getConfigInstance().setProperty("primarySecondary.usePrimary", false);
82                 assertEquals("responseFromSecondary-20", new CommandFacadeWithPrimarySecondary(20).execute());
83             } finally {
84                 context.shutdown();
85                 ConfigurationManager.getConfigInstance().clear();
86             }
87         }
88     }
89 }
View Code

 

 NOTE:顯示調用降級適用於特殊需求的場景,fallback用於業務處理,fallback不再承擔降級職責,建議慎重使用,會造成監控統計換亂等問題.

12:命令調用合並:HystrixCollapser

命令調用合並允許多個請求合並到一個線程/信號下批量執行。

執行流程圖如下:

 1 public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {
 2     private final Integer key;
 3     public CommandCollapserGetValueForKey(Integer key) {
 4         this.key = key;
 5     }
 6     @Override
 7     public Integer getRequestArgument() {
 8         return key;
 9     }
10     @Override
11     protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
12         //創建返回command對象
13         return new BatchCommand(requests);
14     }
15     @Override
16     protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
17         int count = 0;
18         for (CollapsedRequest<String, Integer> request : requests) {
19             //手動匹配請求和響應
20             request.setResponse(batchResponse.get(count++));
21         }
22     }
23     private static final class BatchCommand extends HystrixCommand<List<String>> {
24         private final Collection<CollapsedRequest<String, Integer>> requests;
25         private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
26                 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
27                     .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
28             this.requests = requests;
29         }
30         @Override
31         protected List<String> run() {
32             ArrayList<String> response = new ArrayList<String>();
33             for (CollapsedRequest<String, Integer> request : requests) {
34                 response.add("ValueForKey: " + request.getArgument());
35             }
36             return response;
37         }
38     }
39     public static class UnitTest {
40         HystrixRequestContext context = HystrixRequestContext.initializeContext();
41         try {
42             Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();
43             Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();
44             Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();
45             Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();
46             assertEquals("ValueForKey: 1", f1.get());
47             assertEquals("ValueForKey: 2", f2.get());
48             assertEquals("ValueForKey: 3", f3.get());
49             assertEquals("ValueForKey: 4", f4.get());
50             assertEquals(1, HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());
51             HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand<?>[1])[0];
52             assertEquals("GetValueForKey", command.getCommandKey().name());
53             assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
54             assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
55         } finally {
56          context.shutdown();
57         }   
58     }
59 }
View Code

 

 NOTE:使用場景:HystrixCollapser用於對多個相同業務的請求合並到一個線程甚至可以合並到一個連接中執行,降低線程交互次和IO數,但必須保證他們屬於同一依賴.

四:監控平台搭建Hystrix-dashboard

1:監控dashboard介紹

dashboard面板可以對依賴關鍵指標提供實時監控,如下圖:

 

2:實例暴露command統計數據

Hystrix使用Servlet對當前JVM下所有command調用情況作數據流輸出

配置如下:

 

 1 <servlet>
 2     <display-name>HystrixMetricsStreamServlet</display-name>
 3     <servlet-name>HystrixMetricsStreamServlet</servlet-name>
 4     <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class>
 5 </servlet>
 6 <servlet-mapping>
 7     <servlet-name>HystrixMetricsStreamServlet</servlet-name>
 8     <url-pattern>/hystrix.stream</url-pattern>
 9 </servlet-mapping>
10 <!-- 
11     對應URL格式 : http://hostname:port/application/hystrix.stream
12 -->
View Code
 

3:集群模式監控統計搭建

1)使用Turbine組件做集群數據匯總

結構圖如下;

2)內嵌jetty提供Servlet容器,暴露HystrixMetrics

 1 public class JettyServer {
 2     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 3     private int port;
 4     private ExecutorService executorService = Executors.newFixedThreadPool(1);
 5     private Server server = null;
 6     public void init() {
 7         try {
 8             executorService.execute(new Runnable() {
 9                 @Override
10                 public void run() {
11                     try {
12                         //綁定8080端口,加載HystrixMetricsStreamServlet並映射url
13                         server = new Server(8080);
14                         WebAppContext context = new WebAppContext();
15                         context.setContextPath("/");
16                         context.addServlet(HystrixMetricsStreamServlet.class, "/hystrix.stream");
17                         context.setResourceBase(".");
18                         server.setHandler(context);
19                         server.start();
20                         server.join();
21                     } catch (Exception e) {
22                         logger.error(e.getMessage(), e);
23                     }
24                 }
25             });
26         } catch (Exception e) {
27             logger.error(e.getMessage(), e);
28         }
29     }
30     public void destory() {
31         if (server != null) {
32             try {
33                 server.stop();
34                 server.destroy();
35                 logger.warn("jettyServer stop and destroy!");
36             } catch (Exception e) {
37                 logger.error(e.getMessage(), e);
38             }
39         }
40     }
41 }
View Code

 

3)Turbine搭建和配置

  a:配置Turbine Servlet收集器

 1 <servlet>
 2    <description></description>
 3    <display-name>TurbineStreamServlet</display-name>
 4    <servlet-name>TurbineStreamServlet</servlet-name>
 5    <servlet-class>com.netflix.turbine.streaming.servlet.TurbineStreamServlet</servlet-class>
 6  </servlet>
 7  <servlet-mapping>
 8    <servlet-name>TurbineStreamServlet</servlet-name>
 9    <url-pattern>/turbine.stream</url-pattern>
10  </servlet-mapping>
View Code

 

   b:編寫config.properties配置集群實例

#配置兩個集群:mobil-online,ugc-online
turbine.aggregator.clusterConfig=mobil-online,ugc-online
#配置mobil-online集群實例
turbine.ConfigPropertyBasedDiscovery.mobil-online.instances=10.10.*.*,10.10.*.*,10.10.*.*,10.10.*.*,10.10.*.*,10.10.*.*,10.16.*.*,10.16.*.*,10.16.*.*,10.16.*.*
#配置mobil-online數據流servlet
turbine.instanceUrlSuffix.mobil-online=:8080/hystrix.stream
#配置ugc-online集群實例
turbine.ConfigPropertyBasedDiscovery.ugc-online.instances=10.10.*.*,10.10.*.*,10.10.*.*,10.10.*.*#配置ugc-online數據流servlet
turbine.instanceUrlSuffix.ugc-online=:8080/hystrix.stream
View Code

 

  c:使用Dashboard配置連接Turbine

  如下圖 :

 

五:Hystrix配置與分析

1:Hystrix 配置

1):Command 配置

Command配置源碼在HystrixCommandProperties,構造Command時通過Setter進行配置

具體配置解釋和默認值如下

 1 //使用命令調用隔離方式,默認:采用線程隔離,ExecutionIsolationStrategy.THREAD
 2 private final HystrixProperty<ExecutionIsolationStrategy> executionIsolationStrategy; 
 3 //使用線程隔離時,調用超時時間,默認:1秒
 4 private final HystrixProperty<Integer> executionIsolationThreadTimeoutInMilliseconds; 
 5 //線程池的key,用於決定命令在哪個線程池執行
 6 private final HystrixProperty<String> executionIsolationThreadPoolKeyOverride; 
 7 //使用信號量隔離時,命令調用最大的並發數,默認:10
 8 private final HystrixProperty<Integer> executionIsolationSemaphoreMaxConcurrentRequests;
 9 //使用信號量隔離時,命令fallback(降級)調用最大的並發數,默認:10
10 private final HystrixProperty<Integer> fallbackIsolationSemaphoreMaxConcurrentRequests; 
11 //是否開啟fallback降級策略 默認:true 
12 private final HystrixProperty<Boolean> fallbackEnabled; 
13 // 使用線程隔離時,是否對命令執行超時的線程調用中斷(Thread.interrupt())操作.默認:true
14 private final HystrixProperty<Boolean> executionIsolationThreadInterruptOnTimeout; 
15 // 統計滾動的時間窗口,默認:5000毫秒circuitBreakerSleepWindowInMilliseconds
16 private final HystrixProperty<Integer> metricsRollingStatisticalWindowInMilliseconds;
17 // 統計窗口的Buckets的數量,默認:10個,每秒一個Buckets統計
18 private final HystrixProperty<Integer> metricsRollingStatisticalWindowBuckets; // number of buckets in the statisticalWindow
19 //是否開啟監控統計功能,默認:true
20 private final HystrixProperty<Boolean> metricsRollingPercentileEnabled; 
21 // 是否開啟請求日志,默認:true
22 private final HystrixProperty<Boolean> requestLogEnabled; 
23 //是否開啟請求緩存,默認:true
24 private final HystrixProperty<Boolean> requestCacheEnabled; // Whether request caching is enabled.
View Code

 

2):熔斷器(Circuit Breaker)配置

Circuit Breaker配置源碼在HystrixCommandProperties,構造Command時通過Setter進行配置,每種依賴使用一個Circuit Breaker

 1 // 熔斷器在整個統計時間內是否開啟的閥值,默認20秒。也就是10秒鍾內至少請求20次,熔斷器才發揮起作用  
 2 private final HystrixProperty<Integer> circuitBreakerRequestVolumeThreshold;   
 3 //熔斷器默認工作時間,默認:5秒.熔斷器中斷請求5秒后會進入半打開狀態,放部分流量過去重試  
 4 private final HystrixProperty<Integer> circuitBreakerSleepWindowInMilliseconds;   
 5 //是否啟用熔斷器,默認true. 啟動  
 6 private final HystrixProperty<Boolean> circuitBreakerEnabled;   
 7 //默認:50%。當出錯率超過50%后熔斷器啟動.  
 8 private final HystrixProperty<Integer> circuitBreakerErrorThresholdPercentage;  
 9 //是否強制開啟熔斷器阻斷所有請求,默認:false,不開啟  
10 private final HystrixProperty<Boolean> circuitBreakerForceOpen;   
11 //是否允許熔斷器忽略錯誤,默認false, 不開啟  
12 private final HystrixProperty<Boolean> circuitBreakerForceClosed; 
View Code

 

 

3):命令合並(Collapser)配置

Command配置源碼在HystrixCollapserProperties,構造Collapser時通過Setter進行配置

1 //請求合並是允許的最大請求數,默認: Integer.MAX_VALUE  
2 private final HystrixProperty<Integer> maxRequestsInBatch;  
3 //批處理過程中每個命令延遲的時間,默認:10毫秒  
4 private final HystrixProperty<Integer> timerDelayInMilliseconds;  
5 //批處理過程中是否開啟請求緩存,默認:開啟  
6 private final HystrixProperty<Boolean> requestCacheEnabled;
View Code

 

 

 

4):線程池(ThreadPool)配置

 1 /** 
 2 配置線程池大小,默認值10個. 
 3 建議值:請求高峰時99.5%的平均響應時間 + 向上預留一些即可 
 4 */  
 5 HystrixThreadPoolProperties.Setter().withCoreSize(int value)  
 6 /** 
 7 配置線程值等待隊列長度,默認值:-1 
 8 建議值:-1表示不等待直接拒絕,測試表明線程池使用直接決絕策略+ 合適大小的非回縮線程池效率最高.所以不建議修改此值。 
 9 當使用非回縮線程池時,queueSizeRejectionThreshold,keepAliveTimeMinutes 參數無效 
10 */  
11 HystrixThreadPoolProperties.Setter().withMaxQueueSize(int value) 
View Code

 

 

2:Hystrix關鍵組件分析

 1):Hystrix流程結構解析

流程說明:

1:每次調用創建一個新的HystrixCommand,把依賴調用封裝在run()方法中.

2:執行execute()/queue做同步或異步調用.

3:判斷熔斷器(circuit-breaker)是否打開,如果打開跳到步驟8,進行降級策略,如果關閉進入步驟.

4:判斷線程池/隊列/信號量是否跑滿,如果跑滿進入降級步驟8,否則繼續后續步驟.

5:調用HystrixCommand的run方法.運行依賴邏輯

5a:依賴邏輯調用超時,進入步驟8.

6:判斷邏輯是否調用成功

6a:返回成功調用結果

6b:調用出錯,進入步驟8.

7:計算熔斷器狀態,所有的運行狀態(成功, 失敗, 拒絕,超時)上報給熔斷器,用於統計從而判斷熔斷器狀態.

8:getFallback()降級邏輯.

  以下四種情況將觸發getFallback調用:

 (1):run()方法拋出非HystrixBadRequestException異常。

 (2):run()方法調用超時

 (3):熔斷器開啟攔截調用

 (4):線程池/隊列/信號量是否跑滿

8a:沒有實現getFallback的Command將直接拋出異常

8b:fallback降級邏輯調用成功直接返回

8c:降級邏輯調用失敗拋出異常

9:返回執行成功結果

2):熔斷器:Circuit Breaker 

Circuit Breaker 流程架構和統計

每個熔斷器默認維護10個bucket,每秒一個bucket,每個blucket記錄成功,失敗,超時,拒絕的狀態,

默認錯誤超過50%且10秒內超過20個請求進行中斷攔截. 

3)隔離(Isolation)分析

Hystrix隔離方式采用線程/信號的方式,通過隔離限制依賴的並發量和阻塞擴散.

(1):線程隔離

         把執行依賴代碼的線程與請求線程(如:jetty線程)分離,請求線程可以自由控制離開的時間(異步過程)。

   通過線程池大小可以控制並發量,當線程池飽和時可以提前拒絕服務,防止依賴問題擴散。

   線上建議線程池不要設置過大,否則大量堵塞線程有可能會拖慢服務器。

(2):線程隔離的優缺點

線程隔離的優點:

[1]:使用線程可以完全隔離第三方代碼,請求線程可以快速放回。

[2]:當一個失敗的依賴再次變成可用時,線程池將清理,並立即恢復可用,而不是一個長時間的恢復。

[3]:可以完全模擬異步調用,方便異步編程。

線程隔離的缺點:

[1]:線程池的主要缺點是它增加了cpu,因為每個命令的執行涉及到排隊(默認使用SynchronousQueue避免排隊),調度和上下文切換。

[2]:對使用ThreadLocal等依賴線程狀態的代碼增加復雜性,需要手動傳遞和清理線程狀態。

NOTE: Netflix公司內部認為線程隔離開銷足夠小,不會造成重大的成本或性能的影響。

Netflix 內部API 每天100億的HystrixCommand依賴請求使用線程隔,每個應用大約40多個線程池,每個線程池大約5-20個線程。

(3):信號隔離

      信號隔離也可以用於限制並發訪問,防止阻塞擴散, 與線程隔離最大不同在於執行依賴代碼的線程依然是請求線程(該線程需要通過信號申請),

   如果客戶端是可信的且可以快速返回,可以使用信號隔離替換線程隔離,降低開銷.

   

線程隔離與信號隔離區別如下圖:

 解析圖片出自官網wiki , 更多內容請見官網: https://github.com/Netflix/Hystrix


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM