Hystrix框架3--線程池


線程池

在Hystrix中Command默認是運行在一個單獨的線程池中的,線程池的名稱是根據設定的ThreadPoolKey定義的,如果沒有設置那么會使用CommandGroupKey作為線程池。
這樣每個Command都可以擁有自己的線程池而不會互相影響,同時線程池也可以很好地控制Command的並發量。

設置線程池配置

可以使用Setter來初始化Command在Setter中可以配置線程池的大小和等待隊列的長度:

public CommandHelloWorld(String name) {
	super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
			//配置線程池相關屬性,隊列大小和線程數
			.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withMaxQueueSize(10).withCoreSize(10))
			//配置運行相關參數如運行超時時間
			.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(100000000)));
    //super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
}

在運行時Hystrix會使用ConcurrencyStrategy來創建線程池以及對應的隊列,下面是默認的HystrixConcurrencyStrategy

//線程池中的等待隊列
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
    /*
     * We are using SynchronousQueue if maxQueueSize <= 0 (meaning a queue is not wanted).
     * <p>
     * SynchronousQueue will do a handoff from calling thread to worker thread and not allow queuing which is what we want.
     * <p>
     * Queuing results in added latency and would only occur when the thread-pool is full at which point there are latency issues
     * and rejecting is the preferred solution.
     */
    if (maxQueueSize <= 0) {
		//當配置的queue大小小於等於0使用SynchronousQueue,也就是如果沒有空閑線程就導致失敗
        return new SynchronousQueue<Runnable>();
    } else {
		//其他情況使用阻塞Queue來配置等待隊列
        return new LinkedBlockingQueue<Runnable>(maxQueueSize);
    }
}
//創建線程池的方法
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    ThreadFactory threadFactory = null;
    if (!PlatformSpecific.isAppEngine()) {
        threadFactory = new ThreadFactory() {
            protected final AtomicInteger threadNumber = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
                thread.setDaemon(true);
                return thread;
            }

        };
    } else {
        threadFactory = PlatformSpecific.getAppEngineThreadFactory();
    }
	//通過配置的信息創建線程池
    return new ThreadPoolExecutor(corePoolSize.get(), maximumPoolSize.get(), keepAliveTime.get(), unit, workQueue, threadFactory);
}

當然以上是默認的ConcurrencyStrategy,Hystrix中可以通過Plugin配置自定義的Strategy:

HystrixPlugins.getInstance().registerConcurrencyStrategy

但這個Plugin是單例的且register方法只能調用一次,也就是無法設置多個Strategy,如果想要使用不同的Strategy只能在方法內部使用一定邏輯來完成。

Semaphore

還有一種不用ThreadPool的方法,是配置SEMAPHORE

HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(10).withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)

這種模式會在主線程自身運行(在調用queue時就會執行)。同時可以通過withExecutionIsolationSemaphoreMaxConcurrentRequests設置並發的數量。

fallback

當配置的等待隊列滿了的時候Hystrix會直接調用Command的fallback方法。
下面來看下各種情況下代碼是執行在哪個線程中的

ThreadPool模式下

超時調用getFallback:Timer線程
線程池隊列滿調用getFallback:主線程
Command出錯調用getFallback:Command線程池

Semaphore模式下

超時調用getFallback:Timer線程
並發數滿調用getFallback:主線程
Command出錯調用getFallback:主線程

總結

在使用Hystrix時要注意各個代碼是運行在哪個線程中的,防止在不必要的地方有阻塞的調用,如在fallback中如果有阻塞耗時操作,那么在隊列滿時會導致主線程阻塞,可以考慮在Fallback中再調用新Command,這時還要考慮使用不同的線程池防止任務互相排隊。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM