【Hystrix】實現服務隔離和降級


一、背景

  在今天,基於SOA的架構已經大行其道。伴隨着架構的SOA化,相關聯的服務熔斷、降級、限流等思想,也在各種技術講座中頻繁出現。
  伴隨着業務復雜性的提高,系統的不斷拆分,一個面向用戶端的API,其內部的RPC調用層層嵌套,調用鏈條可能會非常長。這會造成以下問題:

  • API接口可用性降低:引用Hystrix官方的一個例子,假設tomcat對外提供的一個application,其內部依賴了30個服務,每個服務的可用性都很高,為99.99%。那整個applicatiion的可用性就是:99.99%的30次方 = 99.7%,即0.3%的失敗率。這也就意味着,每1億個請求,有30萬個失敗;按時間來算,就是每個月的故障時間超過2小時。

1.1 服務熔斷

  為了解決上述問題,服務熔斷的思想被提出來。類似現實世界中的“保險絲“,當某個異常條件被觸發,直接熔斷整個服務,而不是一直等到此服務超時。 熔斷的觸發條件可以依據不同的場景有所不同,比如統計一個時間窗口內失敗的調用次數。

1.2 服務降級

  有了熔斷,就得有降級。所謂降級,就是當某個服務熔斷之后,服務器將不再被調用,此時客戶端可以自己准備一個本地的fallback回調,返回一個缺省值。 這樣做,雖然服務水平下降,但好歹可用,比直接掛掉要強,當然這也要看適合的業務場景。關於Hystrix中fallback的使用,見官網

1.3 服務隔離

  1. 雪崩效應:服務雪崩效應產生服務堆積在同一個線程池中,因為在同一個線程池中,所有請求全部到一個服務進行訪問,這時候會導致其他服務沒有線程接收請求訪問,所以就會產生服務雪崩效應。
  2. Tomcat底層:http+線程池,每個線程都是獨立的請求。
  3. 當大多數人在使用Tomcat時,多個http服務會共享一個線程池,假設其中一個http服務訪問的數據庫響應非常慢,這將造成服務響應時間延遲增加,大多數線程阻塞等待數據響應返回,導致整個Tomcat線程池都被該服務占用,甚至拖垮整個Tomcat。因此,如果我們能把不同http服務隔離到不同的線程池,則某個http服務的線程池滿了也不會對其他服務造成災難性故障。這就需要線程隔離或者信號量隔離來實現了。
  4. 服務隔離:每個服務接口互不影響,服務隔離有兩種實現方式線程池方式計數器
  5. 作用:服務保護,當服務產生堆積的時候,對服務實現保護功能。(堆積請求:假設默認tomcat最大線程線程池是50。嘗試第51一個請求,第51個請求會阻塞。大量請求正在等待,如果堆積請求過多,可能會造成服務器癱瘓。)
  6. 使用線程隔離或信號隔離的目的是為不同的服務分配一定的資源,當自己的資源用完,直接返回失敗而不是占用別人的資源。

1.4 總結

  1. 服務隔離:保證每個服務互不影響,使用信號量線程池方式
  2. 服務降級:當服務不可用的時候,不會被等待,直接返回一個友好的提示
  3. 服務熔斷:當服務器達到最大的承受能的之后,直接拒絕訪問服務,最會調用服務降級方法,返回友好提示。目的是保證服務不被宕機掉

二、使用Hystrix實現服務隔離和降級

2.1 Hytrix 簡介

  • Hystrix 是一個微服務關於服務保護的框架,是Netflix開源的一款針對分布式系統的延遲和容錯解決框架,目的是用來隔離分布式服務故障。它提供線程和信號量隔離,以減少不同服務之間資源競爭帶來的相互影響;提供優雅降級機制;提供熔斷機制使得服務可以快速失敗,而不是一直阻塞等待服務響應,並能從中快速恢復。Hystrix通過這些機制來阻止級聯失敗並保證系統彈性、可用。
  • Hystrix的資源隔離策略有兩種,分別為:線程池和信號量

2.2 線程池方式

  1. 使用線程池隔離可以完全隔離第三方應用,請求線程可以快速放回。
  2. 請求線程可以繼續接受新的請求,如果出現問題線程池隔離是獨立的不會影響其他應用。
  3. 當失敗的應用再次變得可用時,線程池將清理並可立即恢復,而不需要一個長時間的恢復。
  4. 獨立的線程池提高了並發性

缺點: 線程池隔離的主要缺點是它們增加計算開銷(CPU)。每個命令的執行涉及到排隊、調度和上下文切換都是在一個單獨的線程上運行的。

public class OrderHystrixCommand extends HystrixCommand<JSONObject> {
    @Autowired
    private MemberService memberService;

    /**
     * @param group
     */
    public OrderHystrixCommand(MemberService memberService) {
        super(setter());
        this.memberService = memberService;
    }

    protected JSONObject run() throws Exception {
        JSONObject member = memberService.getMember();
        System.out.println("當前線程名稱:" + Thread.currentThread().getName() + ",訂單服務調用會員服務:member:" + member);
        return member;
    }

    private static Setter setter() {

        // 服務分組
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("orders");
        // 服務標識
        HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("order");
        // 線程池名稱
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("order-pool");
        // #####################################################
        // 線程池配置 線程池大小為10,線程存活時間15秒 隊列等待的閾值為100,超過100執行拒絕策略
        HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter().withCoreSize(10)
                .withKeepAliveTimeMinutes(15).withQueueSizeRejectionThreshold(100);
        // ########################################################
        // 命令屬性配置Hystrix 開啟超時
        HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
                // 采用線程池方式實現服務隔離
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                // 禁止
                .withExecutionTimeoutEnabled(false);
        return HystrixCommand.Setter.withGroupKey(groupKey).andCommandKey(commandKey).andThreadPoolKey(threadPoolKey)
                .andThreadPoolPropertiesDefaults(threadPoolProperties).andCommandPropertiesDefaults(commandProperties);

    }
    
//############   服務降級  ##########
    @Override
    protected JSONObject getFallback() {
        // 如果Hystrix發生熔斷,當前服務不可用,直接執行Fallback方法
        System.out.println("系統錯誤!");
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("code", 500);
        jsonObject.put("msg", "系統錯誤!");
        return jsonObject;
    }
    }

應用場景:

  1. 第三方應用或者接口
  2. 並發量大

2.3 信號量

  • 計數器方式:底層使用原子計數器,針對於每個服務:都設置自己獨立限制閾值,比如設置每個服務接口最多同時只能訪問50次,超出緩存隊列請求后,自己實現拒絕策略。
  • 使用一個原子計數器(或信號量)來記錄當前有多少個線程在運行,當請求進來時先判斷計數器的數值,若超過設置的最大線程個數則拒絕該請求,若不超過則通行,這時候計數器+1,請求返回成功后計數器-1。
  • 與線程池隔離最大不同在於執行依賴代碼的線程依然是請求線程,信號量的大小可以動態調整, 線程池大小不可以

代碼如下:

public class OrderHystrixCommand2 extends HystrixCommand<JSONObject> {
    @Autowired
    private MemberService memberService;

    /**
     * @param group
     */
    public OrderHystrixCommand2(MemberService memberService) {
        super(setter());
        this.memberService = memberService;
    }

    protected JSONObject run() throws Exception {

        // Thread.sleep(500);
        // System.out.println("orderIndex線程名稱" +
        // Thread.currentThread().getName());
        // System.out.println("success");
        JSONObject member = memberService.getMember();
        System.out.println("當前線程名稱:" + Thread.currentThread().getName() + ",訂單服務調用會員服務:member:" + member);
        return member;
    }

    private static Setter setter() {
        // 服務分組
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("order");
        // 命令屬性配置 采用信號量模式
        HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                // 使用一個原子計數器(或信號量)來記錄當前有多少個線程在運行,當請求進來時先判斷計數
                // 器的數值,若超過設置的最大線程個數則拒絕該請求,若不超過則通行,這時候計數器+1,請求返 回成功后計數器-1。
                .withExecutionIsolationSemaphoreMaxConcurrentRequests(50);
        return HystrixCommand.Setter.withGroupKey(groupKey).andCommandPropertiesDefaults(commandProperties);
    }
    
//############   服務降級  ##########
    @Override
    protected JSONObject getFallback() {
        // 如果Hystrix發生熔斷,當前服務不可用,直接執行Fallback方法
        System.out.println("系統錯誤!");
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("code", 500);
        jsonObject.put("msg", "系統錯誤!");
        return jsonObject;
    }
    }

應用場景:

  1. 內部應用或者中間件(redis)
  2. 並發需求不大

三、項目搭建

需求:搭建一套分布式rpc遠程通訊案例:比如訂單服務調用會員服務實現服務隔離,防止雪崩效應案例

3.1 訂單工程

1. 引入Maven依賴

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-metrics-event-stream</artifactId>
            <version>1.5.12</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-javanica</artifactId>
            <version>1.5.12</version>
        </dependency>
</dependencies>

2. Service

@Service
public class MemberService {
public JSONObject getMember() {
    JSONObject result = HttpClientUtils.httpGet("http://127.0.0.1:8081/member/memberIndex");
    return result;
}
}

3.Controller

@RestController
@RequestMapping("/order")
public class OrderController {
    @Autowired
    private MemberService memberService;

    /**
     * 訂單服務會調用會員服務,底層使用rpc遠程調用的方式
     * @return
     * @throws InterruptedException
     */
    @RequestMapping("/orderIndex")
    public Object orderIndex() throws InterruptedException {
        JSONObject member = memberService.getMember();
        //線程池名稱+線程池id組合
        System.out.println("當前線程名稱:" + Thread.currentThread().getName() + ",訂單服務調用會員服務:member:" + member);
        return member;
    }

    // 訂單服務調用會員服務,解決服務雪崩效應,采用線程池的方式
    @RequestMapping("/orderIndexHystrix")
    public Object orderIndexHystrix() throws InterruptedException {
        return new OrderHystrixCommand(memberService).execute();
    }
    // 訂單服務調用會員服務,解決服務雪崩效應,采用信號量的方式
    @RequestMapping("/orderIndexHystrix2")
    public Object orderIndexHystrix2() throws InterruptedException {
        return new OrderHystrixCommand2(memberService).execute();
    }

    @RequestMapping("/findOrderIndex")
    public Object findIndex() {
        System.out.println("當前線程:" + Thread.currentThread().getName() + ",findOrderIndex");
        return "findOrderIndex";
    }
}    

4.配置文件

server:
  port: 8080
  #線程池中的最大線程數為20
  tomcat:
    max-threads: 20

5.工具類

public class HttpClientUtils {
    private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); // 日志記錄

private static RequestConfig requestConfig = null;

static {
    // 設置請求和傳輸超時時間
    requestConfig = RequestConfig.custom().setSocketTimeout(2000).setConnectTimeout(2000).build();
}

/**
 * post請求傳輸json參數
 * 
 * @param url
 *            url地址
 * @param json
 *            參數
 * @return
 */
public static JSONObject httpPost(String url, JSONObject jsonParam) {
    // post請求返回結果
    CloseableHttpClient httpClient = HttpClients.createDefault();
    JSONObject jsonResult = null;
    HttpPost httpPost = new HttpPost(url);
    // 設置請求和傳輸超時時間
    httpPost.setConfig(requestConfig);
    try {
        if (null != jsonParam) {
            // 解決中文亂碼問題
            StringEntity entity = new StringEntity(jsonParam.toString(), "utf-8");
            entity.setContentEncoding("UTF-8");
            entity.setContentType("application/json");
            httpPost.setEntity(entity);
        }
        CloseableHttpResponse result = httpClient.execute(httpPost);
        // 請求發送成功,並得到響應
        if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
            String str = "";
            try {
                // 讀取服務器返回過來的json字符串數據
                str = EntityUtils.toString(result.getEntity(), "utf-8");
                // 把json字符串轉換成json對象
                jsonResult = JSONObject.parseObject(str);
            } catch (Exception e) {
                logger.error("post請求提交失敗:" + url, e);
            }
        }
    } catch (IOException e) {
        logger.error("post請求提交失敗:" + url, e);
    } finally {
        httpPost.releaseConnection();
    }
    return jsonResult;
}

/**
 * post請求傳輸String參數 例如:name=Jack&sex=1&type=2
 * Content-type:application/x-www-form-urlencoded
 * 
 * @param url
 *            url地址
 * @param strParam
 *            參數
 * @return
 */
public static JSONObject httpPost(String url, String strParam) {
    // post請求返回結果
    CloseableHttpClient httpClient = HttpClients.createDefault();
    JSONObject jsonResult = null;
    HttpPost httpPost = new HttpPost(url);
    httpPost.setConfig(requestConfig);
    try {
        if (null != strParam) {
            // 解決中文亂碼問題
            StringEntity entity = new StringEntity(strParam, "utf-8");
            entity.setContentEncoding("UTF-8");
            entity.setContentType("application/x-www-form-urlencoded");
            httpPost.setEntity(entity);
        }
        CloseableHttpResponse result = httpClient.execute(httpPost);
        // 請求發送成功,並得到響應
        if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
            String str = "";
            try {
                // 讀取服務器返回過來的json字符串數據
                str = EntityUtils.toString(result.getEntity(), "utf-8");
                // 把json字符串轉換成json對象
                jsonResult = JSONObject.parseObject(str);
            } catch (Exception e) {
                logger.error("post請求提交失敗:" + url, e);
            }
        }
    } catch (IOException e) {
        logger.error("post請求提交失敗:" + url, e);
    } finally {
        httpPost.releaseConnection();
    }
    return jsonResult;
}

/**
 * 發送get請求
 * 
 * @param url
 *            路徑
 * @return
 */
public static JSONObject httpGet(String url) {
    // get請求返回結果
    JSONObject jsonResult = null;
    CloseableHttpClient client = HttpClients.createDefault();
    // 發送get請求
    HttpGet request = new HttpGet(url);
    request.setConfig(requestConfig);
    try {
        CloseableHttpResponse response = client.execute(request);

        // 請求發送成功,並得到響應
        if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
            // 讀取服務器返回過來的json字符串數據
            HttpEntity entity = response.getEntity();
            String strResult = EntityUtils.toString(entity, "utf-8");
            // 把json字符串轉換成json對象
            jsonResult = JSONObject.parseObject(strResult);
        } else {
            logger.error("get請求提交失敗:" + url);
        }
    } catch (IOException e) {
        logger.error("get請求提交失敗:" + url, e);
    } finally {
        request.releaseConnection();
    }
    return jsonResult;
}
}

3.2 會員工程

@RestController
@RequestMapping("/member")
public class MemberController {
@RequestMapping("/memberIndex")
public Object memberIndex() throws InterruptedException {
    Map<String, Object> hashMap = new HashMap<String, Object>();
    hashMap.put("code", 200);
    hashMap.put("msg", "memberIndex");
    Thread.sleep(1500);
    return hashMap;
}
}

四、項目源碼

會員工程

訂單工程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM