Netflix Hystrix - 快速入門


Hystrix最初是由Netflix的API team研發的,用於提高API的彈性和性能,2012年在公司內部廣受好評。


如果你的應用是一個單獨的應用,那幾乎不用在意斷路的問題。

但在分布式環境中,各個應用錯綜復雜的依賴關系,一個不穩定的服務會拖累依賴它的服務。


簡單來說,就是將服務之間的訪問隔離開來,在錯誤(包括超時)被傳播之前攔截下來,並提供相應的處理邏輯,讓這個分布式應用更有彈性。

Hystrix就是用來解決這一問題的lib,幫助開發者更方便的控制服務之間的通信。


在分布式系統中,你使用的第三方RPC API可能會提供服務通信攔截的功能,但通常不會涉及方方面面,更不能單獨拿出來給其他API使用。

而Hystrix會提供這些:

  • 為服務通信提供保護、容錯
  • 在復雜的依賴關系鏈中阻止錯誤傳播
  • 快速失敗
  • 根據具體事件進行毀掉
  • 支持降級
  • 近實時監控

 

假設你的分布式系統中存在幾十甚至上百個服務,即使每個服務都能保證99.99的可用性,但是在依賴關系錯綜復雜,單個服務依賴數量過多,隨着請求數量的上升,帶來的損失也是慘重的。


如果我為這個服務本身設置了超時時間,該服務對於不同的依賴方的權重不盡相同。

假設服務A和B都依賴服務C。對於A,它可能依賴很多服務,但C無法在1秒內響應時就放棄。而對於B,C是至關重要的服務,除非是業務數據異常,否則絕對不能中途停止。


如果C設置的超時時間為30s,那么A和B則同樣需要等待30s,這顯然是不合理的。而等待中的這些請求會耗費什么資源就看具體情況了,最壞的情況是拖垮了整個應用。

因此,延遲(lagency)和失敗(failure)都需要被隔離。


Hystrix如何做到這點?

  • 通過HystrixCommand在獨立的線程調用服務。
  • 超時時間由調用方掌握。
  • 為每個依賴維護一個小線程池,線程池滿時可以拒絕請求,而不是將請求放入隊列。
  • 區分事件,比如successe、failure, timeout、rejection,針對不同事件進行相應的回調。
  • 當失敗占比超過指定閾值時啟動斷路器(circuit-breaker),一段時間內阻止對特定依賴訪問。

 

下面用幾個簡單的例子進行說明。

  

Getting Started

通過幾個簡單的例子,對Hystrix有個粗淺的認識。

首先,添加以下依賴

compile group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.5.10'

 

參考如下main

package com.kavlez.lab.hystrix;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;

/**
 * @author Kavlez
 */
public class Hello {

    public static void main(String[] args) {
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ExampleGroup");
        HystrixCommand<String> hystrixCommand = new HystrixCommand<String>(groupKey) {
            @Override
            protected String run() throws Exception {
                return "hi";
            }
        };

        System.out.printf("exec command ... result = %s", hystrixCommand.execute());
    }
}

 

被覆寫的run()為HystrixCommand中定義的抽象方法,調用依賴服務時也是在run中調用。

說明下上面的例子中出現的兩個類。

  • HystrixCommand: 用command的包含任何潛在風險(延時、失敗)的代碼,進而對其進行處理,比如容錯、統計、斷路...
  • HystrixCommandGroupKey: 所謂command的group,用於對一系列command統一進行一些操作。

    A group name for a {@link HystrixCommand}. This is used for grouping together commands such as for reporting, alerting, dashboards or team/library ownership.

 

和group一樣,command也是有名稱的。默認為類名

getClass().getSimpleName();

 

但並沒有提供相應的setter,只是提供了一個構造方法

protected HystrixCommand(Setter setter)

 

因此,如需指定command名稱,參考如下

final HystrixCommand.Setter setter =
        HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("ExampleCommand"));

HystrixCommand<String> hystrixCommand = new HystrixCommand<String>(setter) {
    //...
};

 

異步執行

如上面的例子中,我們可以通過execute()執行command,這是一種同步執行方式。

如果需要異步執行,只需要用queue()替代execute()即可。

Future<String> future = hystrixCommand.queue();
try {
    System.out.printf("exec command ... result = %s\n", future.get());
} catch (ExecutionException e) {
    e.printStackTrace();
}

 

事實上,execute()不過是queue().get()而已。

 

Observe

嘗試執行gradle dependencies,打印如下

compile - Dependencies for source set 'main'.
+--- org.slf4j:slf4j-api:1.7.21
\--- com.netflix.hystrix:hystrix-core:1.5.10
     +--- org.slf4j:slf4j-api:1.7.0 -> 1.7.21
     +--- com.netflix.archaius:archaius-core:0.4.1
     |    +--- commons-configuration:commons-configuration:1.8
     |    |    +--- commons-lang:commons-lang:2.6
     |    |    \--- commons-logging:commons-logging:1.1.1
     |    \--- org.slf4j:slf4j-api:1.6.4 -> 1.7.21
     +--- io.reactivex:rxjava:1.2.0
     \--- org.hdrhistogram:HdrHistogram:2.1.9

 

我想說的是hystrix依賴RxJava

其中observe是比較典型的用法,HystrixCommand提供了兩種方法observetoObservable,官方對兩者描述如下。

  • observe() — returns a “hot” Observable that executes the command immediately, though because the Observable is filtered through a ReplaySubject you are not in danger of losing any items that it emits before you have a chance to subscribe
  • toObservable() — returns a “cold” Observable that won’t execute the command and begin emitting its results until you subscribe to the Observable

 

顯然,如果是通過toObservable,同一個command實例是無法被subscribe多次的。

盡管兩者都返回Observable對象,但行為上稍有區別。


但本質上observe()幾乎等同於toObservable().subscribe(subject) 

下面是一段例子,由於是通過observe(),命令可以有多個subscriber:

package com.kavlez.lab.hystrix;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import rx.Observable;
import rx.Observer;

/**
 * @author Kavlez
 */
public class HelloObservable {

    public static void main(String[] args) {

        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("ExampleGroup");
        HystrixCommand<String> hystrixCommand = new HystrixCommand<String>(groupKey) {
            @Override
            protected String run() throws Exception {
                return "hi";
            }
        };

        Observable<String> observe = hystrixCommand.observe();

        observe.subscribe(s -> {
            System.out.printf("from action1...%s\n", s);
        });

        observe.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                System.out.println("completed...");
            }

            @Override
            public void onError(Throwable e) {
                System.out.printf("error...%s\n", e.getMessage());
            }

            @Override
            public void onNext(String s) {
                System.out.printf("from next...%s\n", s);
            }
        });
    }
}

 

Fallback

試試在run中寫一段Thread.sleep(1000),或者加個斷點讓程序暫停一段時間。
出現j.u.c.TimeoutException和HystrixRuntimeException,且后者提示

timed-out and no fallback available.

 

這是因為HystrixCommand的getFallback()默認為

protected R getFallback() { throw new UnsupportedOperationException("No fallback available."); }

 

既然如此,我們只需要覆寫該方法就可以實現降級(degradation)。

比如,把之前的例子改為:

HystrixCommand<String> hystrixCommand = new HystrixCommand<String>(groupKey) {
    @Override
    protected String run() throws Exception {
        Thread.sleep(1000);
        return "hi";
    }

    @Override
    protected String getFallback() {
        return "hi, sorry i am late...";
    }
};

 

getFallback並沒有提供參數,這意味着fallback不止發生在timeout一種情況,failure、timeout、thread pool rejection都可以觸發fallback。

 

Circuit Breaker

接下來說說Command是如何和斷路器(circuit breaker)交互的。

HystrixCommand屬性及默認值可以參考抽象類HystrixCommandProperties,其中以circuitBreaker開頭為斷路器相關屬性。 

這里先列出3個關於斷路器的屬性,分別為:

  • HystrixCommandProperties.circuitBreakerRequestVolumeThreshold() : 請求容量閾值
  • HystrixCommandProperties.circuitBreakerErrorThresholdPercentage(): 錯誤占比閾值
  • HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds() : 狀態時長

 

工作流程大致如下:

  1. 假設達到了request volume threshold,也就是metrics.healthCounts.totalCount大於該項
  2. 並且失敗次數的占比也達到了error percentage,默認為50%
  3. 此時,斷路器的狀態從CLOSED變為OPEN
  4. 斷路器狀態變為OPEN后,接收到的請求將全部斷路
  5. 過了恢復時間后,也就是sleep window in milliseconds(默認為5s),斷路器從OPEN變為HALF-OPEN狀態。
  6. 如果變更為HALF-OPEN后的下一次請求失敗,則變回OPEN狀態,反之為CLOSED。

 

Request Cache

之前並沒有注意Hystrix也提供了這樣一個特性,command中可以通過覆寫getCacheKey對請求進行緩存。

該方法默認返回null,也就是不緩存。

如果n個命令都在同一個request scope,則只有一個命令會被執行,其余n-1都是緩存。

 

代碼參考如下

package com.kavlez.lab.hystrix;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

/**
 * @author Kavlez
 */
public class ReqCache {

    static class HelloCommand extends HystrixCommand<String> {

        private static final HystrixCommand.Setter setter =
                HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                        .andCommandKey(HystrixCommandKey.Factory.asKey("ExampleCommand"));

        private String requestKey;

        protected HelloCommand(String requestKey) {
            super(setter);
            this.requestKey = requestKey;
        }

        @Override
        protected String run() throws Exception {
            return null;
        }

        @Override
        protected String getCacheKey() {
            return this.requestKey;
        }
    }

    public static void main(String[] args) {
        HystrixRequestContext.initializeContext();

        HelloCommand hello1 = new HelloCommand("Billy");
        hello1.execute();
        System.out.println(hello1.isResponseFromCache());

        hello1 = new HelloCommand("Billy");
        hello1.execute();
        System.out.println(hello1.isResponseFromCache());

        hello1 = new HelloCommand("Van");
        hello1.execute();
        System.out.println(hello1.isResponseFromCache());
    }
}

 

注意這一行

HystrixRequestContext.initializeContext();

缺少HystrixRequestContext會提示illegal state。

 

Request Collapsing

Hystrix提供了一個叫collapse的特性,將多個請求進行合並,方便將多個請求限制在同一個time windows。

官方給出的例子是獲取收藏夾中的300部電影,類似的場景確實常見。



或者再復雜一點,比如我要獲取300部電影的工作人員信息,幾部不同電影很可能存在相同的工作人員。

也許我可以...首先獲取300部電影的列表,對其進行循環並get工作人員列表,然后再對其進行循環,依次請求工作人員的REST API...無論列表中是否有多個相同的電影和工作人員。

或者我可以僅僅為了這樣的應用場景而專門設計一套API,專門用於獲取電影列表中每一部電影的工作人員的信息。

但這顯然是個笨方法,默許這樣的方法會導致莫名其妙的API越來越多。

因此,為了應付這樣的場景而抽象出一層collapsing layer是值得的。

這樣一來,REST API和實體類可以依然保持單純,而開發者只需要使用HystrixCollapser即可。

假設多個命令同時進行相同的請求,collapser可以將請求進行合並,批量請求,並將結果分發給各個命令。

 

參考如下例子

package com.kavlez.lab.hystrix;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.netflix.hystrix.HystrixCollapser;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

/**
 * @author Kavlez
 * @since 5/12/17.
 */
public class CollapserExample {

    private static final String[] names = {"Protos", "Terran", "Hulk", "Anderson", "Uriah", "Gegard", "Velasquez", "Mcgregor", "Jose"};

    static class User {

        private int id;
        private int code;
        private String name;

        public User(int id, int code, String name) {
            this.id = id;
            this.code = code;
            this.name = name;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public int getCode() {
            return code;
        }

        public void setCode(int code) {
            this.code = code;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }


    static class UserCollapser extends HystrixCollapser<Map<Integer, User>, User, Integer> {

        private int userId;

        private static final Setter setter = Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("UserCollapser"));

        public UserCollapser(int userId) {
            super(setter);
            this.userId = userId;
        }

        @Override
        public Integer getRequestArgument() {
            return this.userId;
        }

        @Override
        protected HystrixCommand<Map<Integer, User>> createCommand(
                Collection<CollapsedRequest<User, Integer>> collapsedRequests) {

            return new UserBatchCommand(collapsedRequests.stream().map(request -> {
                System.out.println("arg mapped...");
                return request.getArgument();
            }).collect(Collectors.toList()));
        }

        @Override
        protected void mapResponseToRequests(
                Map<Integer, User> batchResponse, Collection<CollapsedRequest<User, Integer>> collapsedRequests) {
            for (CollapsedRequest<User, Integer> request : collapsedRequests) {
                Integer userId = request.getArgument();
                request.setResponse(batchResponse.get(userId));
            }
        }
    }

    static class UserBatchCommand extends HystrixCommand<Map<Integer, User>> {

        private List<Integer> ids;

        private final static Setter setter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("UserBatchGroup"));

        public UserBatchCommand(List<Integer> ids) {
            super(setter);
            this.ids = ids;
        }

        @Override
        protected Map<Integer, User> run() throws Exception {
            return this.getUsers();
        }

        Map<Integer, User> getUsers() {

            Map<Integer, User> users = Maps.newHashMap();

            for (Integer id : ids) {
                int randomCode = (int) (Math.random() * 100);
                users.put(id, new User(id, randomCode, names[randomCode % names.length]));
            }

            return users;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        HystrixRequestContext.initializeContext();
        List<Future<User>> futures = Lists.newArrayList(1, 1, 1, 1, 2, 2, 2, 3, 3, 4).stream()
                .map(userId -> new UserCollapser(userId).queue()).collect(Collectors.toList());

        for (Future<User> future : futures) {
            future.get();
        }
    }
}

 

覆寫HystrixCollapser時指定的3個泛型類型,依次為

  • batch command返回類型
  • response類型
  • request參數類型

 

繼承HystrixCollapser需要覆寫3個方法,分別為

  • protected abstract HystrixCommand createCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests); 工廠方法,用於創建HystrixCommand對象,或者說是一個專門用於處理批量請求的command。
    多數情況下,該方法創建的命令執行一次后就沒什么用了,所以通常返回一個新的實例。
    由於是用於處理批量請求,所以通常會把CollapsedRequest集合整個傳給command。

  • public abstract RequestArgumentType getRequestArgument(); 通過該方法來提供傳遞給HystrixCommand的參數,如果你需要傳遞多個參數,則封裝到一個對象即可。

  • protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests); createCommand創建了對應的command,該command結束后會調用mapResponseToRequests,該方法將BatchReturnType映射為Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests。

 

Request Context Setup

上面提到的內容都涉及到了request context。

事實上,Hystrix的一些功能都需要request context,也就是request-scoped features。

比如,上面的例子中的main方法中都有這么一行

HystrixRequestContext context = HystrixRequestContext.initializeContext();

 

這就需要開發者按需管理HystrixRequestContext的生命周期。

而request多是在web應用中比較常見,比如實現一個servlet filter,在doFilter方法中進行管理。

public class HystrixRequestContextServletFilter implements Filter {

    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) 
     throws IOException, ServletException {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        try {
            chain.doFilter(request, response);
        } finally {
            context.shutdown();
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM