基於hystrix的線程池隔離


hystrix進行資源隔離,其實是提供了一個抽象,叫做command,就是說,你如果要把對某一個依賴服務的所有調用請求,全部隔離在同一份資源池內

對這個依賴服務的所有調用請求,全部走這個資源池內的資源,不會去用其他的資源了,這個就叫做資源隔離

hystrix最最基本的資源隔離的技術,線程池隔離技術

對某一個依賴服務,商品服務,所有的調用請求,全部隔離到一個線程池內,對商品服務的每次調用請求都封裝在一個command里面

每個command(每次服務調用請求)都是使用線程池內的一個線程去執行的

所以哪怕是對這個依賴服務,商品服務,現在同時發起的調用量已經到了1000了,但是線程池內就10個線程,最多就只會用這10個線程去執行

不會說,對商品服務的請求,因為接口調用延遲,將tomcat內部所有的線程資源全部耗盡,不會出現了

1.pox

<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-core</artifactId>
    <version>1.5.12</version>
</dependency>

  

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

import com.alibaba.fastjson.JSONObject;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import com.roncoo.eshop.cache.ha.http.HttpClientUtils;
import com.roncoo.eshop.cache.ha.model.ProductInfo;

public class GetProductInfosCommand extends HystrixObservableCommand<ProductInfo> {
	private String[] productIds;
	public GetProductInfosCommand(String[] productIds) {
		super(HystrixCommandGroupKey.Factory.asKey("GetProductInfoGroup"));
		this.productIds = productIds;
	}
	
	@Override
	protected Observable<ProductInfo> construct() {
		return Observable.create(new Observable.OnSubscribe<ProductInfo>() {

			public void call(Subscriber<? super ProductInfo> observer) {
				try {
					for(String productId : productIds) {
						String url = "http://127.0.0.1:8082/getProductInfo?productId=" + productId;
						String response = HttpClientUtils.sendGetRequest(url);
						ProductInfo productInfo = JSONObject.parseObject(response, ProductInfo.class); 
						observer.onNext(productInfo); 
					}
					observer.onCompleted();
				} catch (Exception e) {
					observer.onError(e);  
				}
			}
			
		}).subscribeOn(Schedulers.io());
	}

}

  

import com.alibaba.fastjson.JSONObject;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;

import com.roncoo.eshop.cache.ha.http.HttpClientUtils;
import com.roncoo.eshop.cache.ha.model.ProductInfo;

public class GetProductInfoCommand extends HystrixCommand<ProductInfo> {

	private Long productId;
	
	public GetProductInfoCommand(Long productId) {
		super(HystrixCommandGroupKey.Factory.asKey("GetProductInfoGroup"));
		this.productId = productId;
	}
	
	@Override
	protected ProductInfo run() throws Exception {
		String url = "http://127.0.0.1:8082/getProductInfo?productId=" + productId;
		String response = HttpClientUtils.sendGetRequest(url);
		return JSONObject.parseObject(response, ProductInfo.class);  
	}

}

  

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import rx.Observable;
import rx.Observer;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixObservableCommand;
import com.roncoo.eshop.cache.ha.http.HttpClientUtils;
import com.roncoo.eshop.cache.ha.hystrix.command.GetProductInfoCommand;
import com.roncoo.eshop.cache.ha.hystrix.command.GetProductInfosCommand;
import com.roncoo.eshop.cache.ha.model.ProductInfo;


@Controller
public class CacheController {
	@RequestMapping("/change/product")
	@ResponseBody
	public String changeProduct(Long productId) {
		// 拿到一個商品id
		// 調用商品服務的接口,獲取商品id對應的商品的最新數據
		// 用HttpClient去調用商品服務的http接口
		String url = "http://127.0.0.1:8082/getProductInfo?productId=" + productId;
		String response = HttpClientUtils.sendGetRequest(url);

//		Future<ProductInfo> future = getProductInfoCommand.queue();
//		try {
//			Thread.sleep(1000); 
//			System.out.println(future.get());  
//		} catch (Exception e) {
//			e.printStackTrace();
//		}
		System.out.println(response);

		return "success";
	}

	/**
	 * nginx開始,各級緩存都失效了,nginx發送很多的請求直接到緩存服務要求拉取最原始的數據
	 * 
	 * @param productId
	 * @return
	 */
	@RequestMapping("/getProductInfo")
	@ResponseBody
	public String getProductInfo(Long productId) {
		// 拿到一個商品id
		// 調用商品服務的接口,獲取商品id對應的商品的最新數據
		// 用HttpClient去調用商品服務的http接口
		HystrixCommand<ProductInfo> getProductInfoCommand = new GetProductInfoCommand(productId);
		ProductInfo productInfo = getProductInfoCommand.execute();
		System.out.println(productInfo);  
		return "success";
	}
	
	/**
	 * 一次性批量查詢多條商品數據的請求
	 */
	@RequestMapping("/getProductInfos")
	@ResponseBody
	public String getProductInfos(String productIds) {
		HystrixObservableCommand<ProductInfo> getProductInfosCommand = 
				new GetProductInfosCommand(productIds.split(","));  
		Observable<ProductInfo> observable = getProductInfosCommand.observe();
		
//		observable = getProductInfosCommand.toObservable(); // 還沒有執行
		
		observable.subscribe(new Observer<ProductInfo>() { // 等到調用subscribe然后才會執行

			public void onCompleted() {
				System.out.println("獲取完了所有的商品數據");
			}

			public void onError(Throwable e) {
				e.printStackTrace();
			}

			public void onNext(ProductInfo productInfo) {
				System.out.println(productInfo);  
			}
			
		});
		return "success";
	}
}

  

command的四種調用方式

同步:new CommandHelloWorld("World").execute(),new ObservableCommandHelloWorld("World").toBlocking().toFuture().get()

如果你認為observable command只會返回一條數據,那么可以調用上面的模式,去同步執行,返回一條數據

異步:new CommandHelloWorld("World").queue(),new ObservableCommandHelloWorld("World").toBlocking().toFuture()

對command調用queue(),僅僅將command放入線程池的一個等待隊列,就立即返回,拿到一個Future對象,后面可以做一些其他的事情,然后過一段時間對future調用get()方法獲取數據。

1.線程池隔離技術與信號量隔離技術的區別

hystrix里面,核心的一項功能,其實就是所謂的資源隔離,要解決的最最核心的問題,就是將多個依賴服務的調用分別隔離到各自自己的資源池內

避免說對某一個依賴服務的調用,因為依賴服務的接口調用的延遲或者失敗,導致服務所有的線程資源全部耗費在這個服務的接口調用上

一旦說某個服務的線程資源全部耗盡的話,可能就導致服務就會崩潰,甚至說這種故障會不斷蔓延

hystrix,資源隔離,兩種技術,線程池的資源隔離,信號量的資源隔離

信號量,semaphore

信號量:適合,你的訪問不是對外部依賴的訪問,而是對內部的一些比較復雜的業務邏輯的訪問,但是像這種訪問,系統內部的代碼,其實不涉及任何的網絡請求,
那么只要做信號量的普通限流就可以了,因為不需要去捕獲timeout類似的問題,算法+數據結構的效率不是太高,
並發量突然太高,因為這里稍微耗時一些,導致很多線程卡在這里的話,不太好,
所以進行一個基本的資源隔離和訪問,避免內部復雜的低效率的代碼,導致大量的線程被卡住。

 

 

一般我們在獲取到商品數據之后,都要去獲取商品是屬於哪個地理位置,省,市,賣家的,可能在自己的純內存中,比如就一個Map去獲取

對於這種直接訪問本地內存的邏輯,比較適合用信號量做一下簡單的隔離

優點在於,不用自己管理線程池,不用care timeout超時了,信號量做隔離的話,性能會相對來說高一些。

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.roncoo.eshop.cache.ha.local.LocationCache;

public class GetCityNameCommand extends HystrixCommand<String> {
	private Long cityId;
	public GetCityNameCommand(Long cityId){
		super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetCityNameGroup"))
		        .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
		               .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
		this.cityId = cityId;
	}
	@Override
	protected String run() throws Exception {
		return LocationCache.getCityName(cityId);
	}
}

 

execution.isolation.strategy:

指定了HystrixCommand.run()的資源隔離策略,THREAD或者SEMAPHORE,一種是基於線程池,一種是信號量

線程池機制,每個command運行在一個線程中,限流是通過線程池的大小來控制的

信號量機制,command是運行在調用線程中,但是通過信號量的容量來進行限流

如何在線程池和信號量之間做選擇?

默認的策略就是線程池

而使用信號量的場景,通常是針對超大並發量的場景下,每個服務實例每秒都幾百的QPS,那么此時你用線程池的話,線程一般不會太多,
可能撐不住那么高的並發,如果要撐住,可能要耗費大量的線程資源,那么就是用信號量,來進行限流保護


一般用信號量常見於那種基於純內存的一些業務邏輯服務,而不涉及到任何網絡訪問請求

netflix有100+的command運行在40+的線程池中,只有少數command是不運行在線程池中的,就是從純內存中獲取一些元數據,或者是對多個command包裝起來的facacde command,是用信號量限流的.

// to use thread isolation
HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.THREAD)
// to use semaphore isolation
HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)

  2、command名稱和command組

每個command,都可以設置一個自己的名稱,同時可以設置一個自己的組.

command group,是一個非常重要的概念,默認情況下,因為就是通過command group來定義一個線程池的,而且還會通過command group來聚合一些監控和報警信息

同一個command group中的請求,都會進入同一個線程池中

3、command線程池

threadpool key代表了一個HystrixThreadPool,用來進行統一監控,統計,緩存 

默認的threadpool key就是command group名稱

每個command都會跟它的threadpool key對應的thread pool綁定在一起

如果不想直接用command group,也可以手動設置thread pool name

public CommandHelloWorld(String name) {
    super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
            .andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))
            .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
    this.name = name;
}

  

command threadpool -> command group -> command key

command key,代表了一類command,一般來說,代表了底層的依賴服務的一個接口

command group,代表了某一個底層的依賴服務,合理,一個依賴服務可能會暴露出來多個接口,每個接口就是一個command key

command group,在邏輯上去組織起來一堆command key的調用,統計信息,成功次數,timeout超時次數,失敗次數,可以看到某一個服務整體的一些訪問情況

command group,一般來說,推薦是根據一個服務去划分出一個線程池,command key默認都是屬於同一個線程池的


command group,對應了一個服務,但是這個服務暴露出來的幾個接口,訪問量很不一樣,差異非常之大

你可能就希望在這個服務command group內部,包含的對應多個接口的command key,做一些細粒度的資源隔離

對同一個服務的不同接口,都使用不同的線程池.

邏輯上來說,多個command key屬於一個command group,在做統計的時候,會放在一起統計

每個command key有自己的線程池,每個接口有自己的線程池,去做資源隔離和限流

但是對於thread pool資源隔離來說,可能是希望能夠拆分的更加一致一些,比如在一個功能模塊內,對不同的請求可以使用不同的thread pool

command group一般來說,可以是對應一個服務,多個command key對應這個服務的多個接口,多個接口的調用共享同一個線程池

如果說你的command key,要用自己的線程池,可以定義自己的threadpool key.

 

4、coreSize

設置線程池的大小,默認是10

HystrixThreadPoolProperties.Setter()
.withCoreSize(int value)

一般來說,用這個默認的10個線程大小就夠了

5、queueSizeRejectionThreshold

控制queue滿后reject的threshold,因為maxQueueSize不允許熱修改,因此提供這個參數可以熱修改,控制隊列的最大大小

HystrixCommand在提交到線程池之前,其實會先進入一個隊列中,這個隊列滿了之后,才會reject

默認值是5

HystrixThreadPoolProperties.Setter()
.withQueueSizeRejectionThreshold(int value)

6、execution.isolation.semaphore.maxConcurrentRequests

設置使用SEMAPHORE隔離策略的時候,允許訪問的最大並發量,超過這個最大並發量,請求直接被reject

這個並發量的設置,跟線程池大小的設置,應該是類似的,但是基於信號量的話,性能會好很多,而且hystrix框架本身的開銷會小很多

默認值是10,設置的小一些,否則因為信號量是基於調用線程去執行command的,而且不能從timeout中抽離,因此一旦設置的太大,而且有延時發生,可能瞬間導致tomcat本身的線程資源本占滿

HystrixCommandProperties.Setter()
.withExecutionIsolationSemaphoreMaxConcurrentRequests(int value)

 

 

創建command,執行這個command,配置這個command對應的group和線程池,以及線程池/信號量的容量和大小

畫圖分析整個8大步驟的流程,然后再對每個步驟進行細致的講解

1、構建一個HystrixCommand或者HystrixObservableCommand

一個HystrixCommand或一個HystrixObservableCommand對象,代表了對某個依賴服務發起的一次請求或者調用

構造的時候,可以在構造函數中傳入任何需要的參數

HystrixCommand主要用於僅僅會返回一個結果的調用
HystrixObservableCommand主要用於可能會返回多條結果的調用

HystrixCommand command = new HystrixCommand(arg1, arg2);
HystrixObservableCommand command = new HystrixObservableCommand(arg1, arg2);

2、調用command的執行方法

執行Command就可以發起一次對依賴服務的調用

要執行Command,需要在4個方法中選擇其中的一個:execute(),queue(),observe(),toObservable()

其中execute()和queue()僅僅對HystrixCommand適用

execute():調用后直接block住,屬於同步調用,直到依賴服務返回單條結果,或者拋出異常
queue():返回一個Future,屬於異步調用,后面可以通過Future獲取單條結果
observe():訂閱一個Observable對象,Observable代表的是依賴服務返回的結果,獲取到一個那個代表結果的Observable對象的拷貝對象
toObservable():返回一個Observable對象,如果我們訂閱這個對象,就會執行command並且獲取返回結果

 

K value = command.execute();
Future<K> fValue = command.queue();
Observable<K> ohValue = command.observe();
Observable<K> ocValue = command.toObservable();

execute()實際上會調用queue().get().queue(),接着會調用toObservable().toBlocking().toFuture()

也就是說,無論是哪種執行command的方式,最終都是依賴toObservable()去執行的

3、檢查是否開啟緩存

如果這個command開啟了請求緩存,request cache,而且這個調用的結果在緩存中存在,那么直接從緩存中返回結果

4、檢查是否開啟了短路器

檢查這個command對應的依賴服務是否開啟了短路器

如果斷路器被打開了,那么hystrix就不會執行這個command,而是直接去執行fallback降級機制

5、檢查線程池/隊列/semaphore是否已經滿了

如果command對應的線程池/隊列/semaphore已經滿了,那么也不會執行command,而是直接去調用fallback降級機制

6、執行command
調用HystrixObservableCommand.construct()或HystrixCommand.run()來實際執行這個command

HystrixCommand.run()是返回一個單條結果,或者拋出一個異常
HystrixObservableCommand.construct()是返回一個Observable對象,可以獲取多條結果

如果HystrixCommand.run()或HystrixObservableCommand.construct()的執行,超過了timeout時長的話,那么command所在的線程就會拋出一個TimeoutException

如果timeout了,也會去執行fallback降級機制,而且就不會管run()或construct()返回的值了

這里要注意的一點是,我們是不可能終止掉一個調用嚴重延遲的依賴服務的線程的,只能說給你拋出來一個TimeoutException,但是還是可能會因為嚴重延遲的調用線程占滿整個線程池的

即使這個時候新來的流量都被限流了。。。

如果沒有timeout的話,那么就會拿到一些調用依賴服務獲取到的結果,然后hystrix會做一些logging記錄和metric統計

7、短路健康檢查

Hystrix會將每一個依賴服務的調用成功,失敗,拒絕,超時,等事件,都會發送給circuit breaker斷路器

短路器就會對調用成功/失敗/拒絕/超時等事件的次數進行統計

短路器會根據這些統計次數來決定,是否要進行短路,如果打開了短路器,那么在一段時間內就會直接短路,然后如果在之后第一次檢查發現調用成功了,就關閉斷路器

8、調用fallback降級機制

在以下幾種情況中,hystrix會調用fallback降級機制:run()或construct()拋出一個異常,短路器打開,線程池/隊列/semaphore滿了,command執行超時了

一般在降級機制中,都建議給出一些默認的返回值,比如靜態的一些代碼邏輯,或者從內存中的緩存中提取一些數據,盡量在這里不要再進行網絡請求了

即使在降級中,一定要進行網絡調用,也應該將那個調用放在一個HystrixCommand中,進行隔離

在HystrixCommand中,上線getFallback()方法,可以提供降級機制

在HystirxObservableCommand中,實現一個resumeWithFallback()方法,返回一個Observable對象,可以提供降級結果

如果fallback返回了結果,那么hystrix就會返回這個結果

對於HystrixCommand,會返回一個Observable對象,其中會發返回對應的結果
對於HystrixObservableCommand,會返回一個原始的Observable對象

如果沒有實現fallback,或者是fallback拋出了異常,Hystrix會返回一個Observable,但是不會返回任何數據

不同的command執行方式,其fallback為空或者異常時的返回結果不同

對於execute(),直接拋出異常
對於queue(),返回一個Future,調用get()時拋出異常
對於observe(),返回一個Observable對象,但是調用subscribe()方法訂閱它時,理解拋出調用者的onError方法
對於toObservable(),返回一個Observable對象,但是調用subscribe()方法訂閱它時,理解拋出調用者的onError方法

9、不同的執行方式

execute(),獲取一個Future.get(),然后拿到單個結果
queue(),返回一個Future
observer(),立即訂閱Observable,然后啟動8大執行步驟,返回一個拷貝的Observable,訂閱時理解回調給你結果
toObservable(),返回一個原始的Observable,必須手動訂閱才會去執行8大步驟

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM