一、背景
在今天,基於SOA的架構已經大行其道。伴隨着架構的SOA化,相關聯的服務熔斷、降級、限流等思想,也在各種技術講座中頻繁出現。
伴隨着業務復雜性的提高,系統的不斷拆分,一個面向用戶端的API,其內部的RPC調用層層嵌套,調用鏈條可能會非常長。這會造成以下問題:
- API接口可用性降低:引用Hystrix官方的一個例子,假設tomcat對外提供的一個application,其內部依賴了30個服務,每個服務的可用性都很高,為99.99%。那整個applicatiion的可用性就是:99.99%的30次方 = 99.7%,即0.3%的失敗率。這也就意味着,每1億個請求,有30萬個失敗;按時間來算,就是每個月的故障時間超過2小時。
1.1 服務熔斷
為了解決上述問題,服務熔斷的思想被提出來。類似現實世界中的“保險絲“,當某個異常條件被觸發,直接熔斷整個服務,而不是一直等到此服務超時。 熔斷的觸發條件可以依據不同的場景有所不同,比如統計一個時間窗口內失敗的調用次數。
1.2 服務降級
有了熔斷,就得有降級。所謂降級,就是當某個服務熔斷之后,服務器將不再被調用,此時客戶端可以自己准備一個本地的fallback回調,返回一個缺省值。 這樣做,雖然服務水平下降,但好歹可用,比直接掛掉要強,當然這也要看適合的業務場景。關於Hystrix中fallback的使用,見官網
1.3 服務隔離
- 雪崩效應:服務雪崩效應產生服務堆積在同一個線程池中,因為在同一個線程池中,所有請求全部到一個服務進行訪問,這時候會導致其他服務沒有線程接收請求訪問,所以就會產生服務雪崩效應。
- Tomcat底層:http+線程池,每個線程都是獨立的請求。
- 當大多數人在使用Tomcat時,多個http服務會共享一個線程池,假設其中一個http服務訪問的數據庫響應非常慢,這將造成服務響應時間延遲增加,大多數線程阻塞等待數據響應返回,導致整個Tomcat線程池都被該服務占用,甚至拖垮整個Tomcat。因此,如果我們能把不同http服務隔離到不同的線程池,則某個http服務的線程池滿了也不會對其他服務造成災難性故障。這就需要線程隔離或者信號量隔離來實現了。
- 服務隔離:每個服務接口互不影響,服務隔離有兩種實現方式線程池方式、計數器
- 作用:服務保護,當服務產生堆積的時候,對服務實現保護功能。(堆積請求:假設默認tomcat最大線程線程池是50。嘗試第51一個請求,第51個請求會阻塞。大量請求正在等待,如果堆積請求過多,可能會造成服務器癱瘓。)
使用線程隔離或信號隔離的目的是為不同的服務分配一定的資源,當自己的資源用完,直接返回失敗而不是占用別人的資源。
1.4 總結
- 服務隔離:保證每個服務互不影響,使用信號量和線程池方式
- 服務降級:當服務不可用的時候,不會被等待,直接返回一個友好的提示
- 服務熔斷:當服務器達到最大的承受能的之后,直接拒絕訪問服務,最會調用服務降級方法,返回友好提示。目的是保證服務不被宕機掉
二、使用Hystrix實現服務隔離和降級
2.1 Hytrix 簡介
- Hystrix 是一個微服務關於服務保護的框架,是Netflix開源的一款針對分布式系統的延遲和容錯解決框架,目的是用來隔離分布式服務故障。它提供線程和信號量隔離,以減少不同服務之間資源競爭帶來的相互影響;提供優雅降級機制;提供熔斷機制使得服務可以快速失敗,而不是一直阻塞等待服務響應,並能從中快速恢復。Hystrix通過這些機制來阻止級聯失敗並保證系統彈性、可用。
- Hystrix的資源隔離策略有兩種,分別為:線程池和信號量。
2.2 線程池方式

- 使用線程池隔離可以完全隔離第三方應用,請求線程可以快速放回。
- 請求線程可以繼續接受新的請求,如果出現問題線程池隔離是獨立的不會影響其他應用。
- 當失敗的應用再次變得可用時,線程池將清理並可立即恢復,而不需要一個長時間的恢復。
- 獨立的線程池提高了並發性
缺點: 線程池隔離的主要缺點是它們增加計算開銷(CPU)。每個命令的執行涉及到排隊、調度和上下文切換都是在一個單獨的線程上運行的。
public class OrderHystrixCommand extends HystrixCommand<JSONObject> {
@Autowired
private MemberService memberService;
/**
* @param group
*/
public OrderHystrixCommand(MemberService memberService) {
super(setter());
this.memberService = memberService;
}
protected JSONObject run() throws Exception {
JSONObject member = memberService.getMember();
System.out.println("當前線程名稱:" + Thread.currentThread().getName() + ",訂單服務調用會員服務:member:" + member);
return member;
}
private static Setter setter() {
// 服務分組
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("orders");
// 服務標識
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("order");
// 線程池名稱
HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("order-pool");
// #####################################################
// 線程池配置 線程池大小為10,線程存活時間15秒 隊列等待的閾值為100,超過100執行拒絕策略
HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter().withCoreSize(10)
.withKeepAliveTimeMinutes(15).withQueueSizeRejectionThreshold(100);
// ########################################################
// 命令屬性配置Hystrix 開啟超時
HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
// 采用線程池方式實現服務隔離
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
// 禁止
.withExecutionTimeoutEnabled(false);
return HystrixCommand.Setter.withGroupKey(groupKey).andCommandKey(commandKey).andThreadPoolKey(threadPoolKey)
.andThreadPoolPropertiesDefaults(threadPoolProperties).andCommandPropertiesDefaults(commandProperties);
}
//############ 服務降級 ##########
@Override
protected JSONObject getFallback() {
// 如果Hystrix發生熔斷,當前服務不可用,直接執行Fallback方法
System.out.println("系統錯誤!");
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", 500);
jsonObject.put("msg", "系統錯誤!");
return jsonObject;
}
}
應用場景:
- 第三方應用或者接口
- 並發量大
2.3 信號量
- 計數器方式:底層使用原子計數器,針對於每個服務:都設置自己獨立限制閾值,比如設置每個服務接口最多同時只能訪問50次,超出緩存隊列請求后,自己實現拒絕策略。
- 使用一個原子計數器(或信號量)來記錄當前有多少個線程在運行,當請求進來時先判斷計數器的數值,若超過設置的最大線程個數則拒絕該請求,若不超過則通行,這時候計數器+1,請求返回成功后計數器-1。
- 與線程池隔離最大不同在於執行依賴代碼的線程依然是請求線程,信號量的大小可以動態調整, 線程池大小不可以
代碼如下:
public class OrderHystrixCommand2 extends HystrixCommand<JSONObject> {
@Autowired
private MemberService memberService;
/**
* @param group
*/
public OrderHystrixCommand2(MemberService memberService) {
super(setter());
this.memberService = memberService;
}
protected JSONObject run() throws Exception {
// Thread.sleep(500);
// System.out.println("orderIndex線程名稱" +
// Thread.currentThread().getName());
// System.out.println("success");
JSONObject member = memberService.getMember();
System.out.println("當前線程名稱:" + Thread.currentThread().getName() + ",訂單服務調用會員服務:member:" + member);
return member;
}
private static Setter setter() {
// 服務分組
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("order");
// 命令屬性配置 采用信號量模式
HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
// 使用一個原子計數器(或信號量)來記錄當前有多少個線程在運行,當請求進來時先判斷計數
// 器的數值,若超過設置的最大線程個數則拒絕該請求,若不超過則通行,這時候計數器+1,請求返 回成功后計數器-1。
.withExecutionIsolationSemaphoreMaxConcurrentRequests(50);
return HystrixCommand.Setter.withGroupKey(groupKey).andCommandPropertiesDefaults(commandProperties);
}
//############ 服務降級 ##########
@Override
protected JSONObject getFallback() {
// 如果Hystrix發生熔斷,當前服務不可用,直接執行Fallback方法
System.out.println("系統錯誤!");
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", 500);
jsonObject.put("msg", "系統錯誤!");
return jsonObject;
}
}
應用場景:
- 內部應用或者中間件(redis)
- 並發需求不大
三、項目搭建
需求:搭建一套分布式rpc遠程通訊案例:比如訂單服務調用會員服務實現服務隔離,防止雪崩效應案例
3.1 訂單工程
1. 引入Maven依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-metrics-event-stream</artifactId>
<version>1.5.12</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-javanica</artifactId>
<version>1.5.12</version>
</dependency>
</dependencies>
2. Service
@Service
public class MemberService {
public JSONObject getMember() {
JSONObject result = HttpClientUtils.httpGet("http://127.0.0.1:8081/member/memberIndex");
return result;
}
}
3.Controller
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private MemberService memberService;
/**
* 訂單服務會調用會員服務,底層使用rpc遠程調用的方式
* @return
* @throws InterruptedException
*/
@RequestMapping("/orderIndex")
public Object orderIndex() throws InterruptedException {
JSONObject member = memberService.getMember();
//線程池名稱+線程池id組合
System.out.println("當前線程名稱:" + Thread.currentThread().getName() + ",訂單服務調用會員服務:member:" + member);
return member;
}
// 訂單服務調用會員服務,解決服務雪崩效應,采用線程池的方式
@RequestMapping("/orderIndexHystrix")
public Object orderIndexHystrix() throws InterruptedException {
return new OrderHystrixCommand(memberService).execute();
}
// 訂單服務調用會員服務,解決服務雪崩效應,采用信號量的方式
@RequestMapping("/orderIndexHystrix2")
public Object orderIndexHystrix2() throws InterruptedException {
return new OrderHystrixCommand2(memberService).execute();
}
@RequestMapping("/findOrderIndex")
public Object findIndex() {
System.out.println("當前線程:" + Thread.currentThread().getName() + ",findOrderIndex");
return "findOrderIndex";
}
}
4.配置文件
server:
port: 8080
#線程池中的最大線程數為20
tomcat:
max-threads: 20
5.工具類
public class HttpClientUtils {
private static Logger logger = LoggerFactory.getLogger(HttpClientUtils.class); // 日志記錄
private static RequestConfig requestConfig = null;
static {
// 設置請求和傳輸超時時間
requestConfig = RequestConfig.custom().setSocketTimeout(2000).setConnectTimeout(2000).build();
}
/**
* post請求傳輸json參數
*
* @param url
* url地址
* @param json
* 參數
* @return
*/
public static JSONObject httpPost(String url, JSONObject jsonParam) {
// post請求返回結果
CloseableHttpClient httpClient = HttpClients.createDefault();
JSONObject jsonResult = null;
HttpPost httpPost = new HttpPost(url);
// 設置請求和傳輸超時時間
httpPost.setConfig(requestConfig);
try {
if (null != jsonParam) {
// 解決中文亂碼問題
StringEntity entity = new StringEntity(jsonParam.toString(), "utf-8");
entity.setContentEncoding("UTF-8");
entity.setContentType("application/json");
httpPost.setEntity(entity);
}
CloseableHttpResponse result = httpClient.execute(httpPost);
// 請求發送成功,並得到響應
if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String str = "";
try {
// 讀取服務器返回過來的json字符串數據
str = EntityUtils.toString(result.getEntity(), "utf-8");
// 把json字符串轉換成json對象
jsonResult = JSONObject.parseObject(str);
} catch (Exception e) {
logger.error("post請求提交失敗:" + url, e);
}
}
} catch (IOException e) {
logger.error("post請求提交失敗:" + url, e);
} finally {
httpPost.releaseConnection();
}
return jsonResult;
}
/**
* post請求傳輸String參數 例如:name=Jack&sex=1&type=2
* Content-type:application/x-www-form-urlencoded
*
* @param url
* url地址
* @param strParam
* 參數
* @return
*/
public static JSONObject httpPost(String url, String strParam) {
// post請求返回結果
CloseableHttpClient httpClient = HttpClients.createDefault();
JSONObject jsonResult = null;
HttpPost httpPost = new HttpPost(url);
httpPost.setConfig(requestConfig);
try {
if (null != strParam) {
// 解決中文亂碼問題
StringEntity entity = new StringEntity(strParam, "utf-8");
entity.setContentEncoding("UTF-8");
entity.setContentType("application/x-www-form-urlencoded");
httpPost.setEntity(entity);
}
CloseableHttpResponse result = httpClient.execute(httpPost);
// 請求發送成功,並得到響應
if (result.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String str = "";
try {
// 讀取服務器返回過來的json字符串數據
str = EntityUtils.toString(result.getEntity(), "utf-8");
// 把json字符串轉換成json對象
jsonResult = JSONObject.parseObject(str);
} catch (Exception e) {
logger.error("post請求提交失敗:" + url, e);
}
}
} catch (IOException e) {
logger.error("post請求提交失敗:" + url, e);
} finally {
httpPost.releaseConnection();
}
return jsonResult;
}
/**
* 發送get請求
*
* @param url
* 路徑
* @return
*/
public static JSONObject httpGet(String url) {
// get請求返回結果
JSONObject jsonResult = null;
CloseableHttpClient client = HttpClients.createDefault();
// 發送get請求
HttpGet request = new HttpGet(url);
request.setConfig(requestConfig);
try {
CloseableHttpResponse response = client.execute(request);
// 請求發送成功,並得到響應
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
// 讀取服務器返回過來的json字符串數據
HttpEntity entity = response.getEntity();
String strResult = EntityUtils.toString(entity, "utf-8");
// 把json字符串轉換成json對象
jsonResult = JSONObject.parseObject(strResult);
} else {
logger.error("get請求提交失敗:" + url);
}
} catch (IOException e) {
logger.error("get請求提交失敗:" + url, e);
} finally {
request.releaseConnection();
}
return jsonResult;
}
}
3.2 會員工程
@RestController
@RequestMapping("/member")
public class MemberController {
@RequestMapping("/memberIndex")
public Object memberIndex() throws InterruptedException {
Map<String, Object> hashMap = new HashMap<String, Object>();
hashMap.put("code", 200);
hashMap.put("msg", "memberIndex");
Thread.sleep(1500);
return hashMap;
}
}