https://blog.csdn.net/gududedabai/article/details/83059226?depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-4&utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-4
https://blog.csdn.net/gududedabai/article/details/83059381
加入以下pom依賴:
<dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.2.0</version> </dependency>
轉載改造hystrix線程池方法:
改造線程池方式
上面介紹了改造線程的方式,並且通過建一個同樣的Java類來覆蓋Jar包中的實現,感覺有點投機取巧,其實不用這么麻煩,Hystrix默認提供了HystrixPlugins類,可以讓用戶自定義線程池,下面來看看怎么使用:
在啟動之前調用進行注冊自定義實現的邏輯:
HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());
ThreadLocalHystrixConcurrencyStrategy就是我們自定義的創建線程池的類,需要繼承HystrixConcurrencyStrategy,前面也有講到通過調試代碼發現最終獲取線程池的代碼就在HystrixConcurrencyStrategy中。
我們只需要重寫getThreadPool方法即可完成對線程池的改造,由於TtlExecutors只能修飾ExecutorService和Executor,而HystrixConcurrencyStrategy中返回的是ThreadPoolExecutor,我們需要對ThreadPoolExecutor進行包裝一層,最終在execute方法中對線程修飾,也就相當於改造了線程池。
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.netflix.hystrix.HystrixThreadPoolKey; import com.netflix.hystrix.HystrixThreadPoolProperties; import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; import com.netflix.hystrix.strategy.properties.HystrixProperty; import com.netflix.hystrix.util.PlatformSpecific; public class ThreadLocalHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { private final static Logger logger = LoggerFactory.getLogger(ThreadLocalHystrixConcurrencyStrategy.class); @Override public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); final int dynamicCoreSize = corePoolSize.get(); final int dynamicMaximumSize = maximumPoolSize.get(); if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory); } else { return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory); } } @Override public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties .getAllowMaximumSizeToDivergeFromCoreSize().get(); final int dynamicCoreSize = threadPoolProperties.coreSize().get(); final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get(); final int maxQueueSize = threadPoolProperties.maxQueueSize().get(); final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize); if (allowMaximumSizeToDivergeFromCoreSize) { final int dynamicMaximumSize = threadPoolProperties.maximumSize().get(); if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } else { return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } } else { return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } } private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) { if (!PlatformSpecific.isAppEngineStandardEnvironment()) { return new ThreadFactory() { private final AtomicInteger threadNumber = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet()); thread.setDaemon(true); return thread; } }; } else { return PlatformSpecific.getAppEngineThreadFactory(); } } }
ThreadLocalThreadPoolExecutor的代碼:
import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import com.alibaba.ttl.TransmittableThreadLocal; import com.alibaba.ttl.TtlRunnable; public class ThreadLocalThreadPoolExecutor extends ThreadPoolExecutor { private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public static TransmittableThreadLocal<Long> THREAD_LOCAL = new TransmittableThreadLocal<Long>(); public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(maximumPoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override public void execute(Runnable command) { super.execute(TtlRunnable.get(command)); } }
啟動時加入插件
HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());
使用方法:調用feign client服務之前,設置線程變量
ThreadLocalThreadPoolExecutor.THREAD_LOCAL.set(10086L);
在FeignAuthConfiguration里,調用appTokenHolder.get();之前加入設置租戶id
Long tenantId = ThreadLocalThreadPoolExecutor.THREAD_LOCAL.get(); DefaultAppTokenHolder.TENANT_FOR_NO_SESSION.set(tenantId);
使用線程變量三種方式測試:
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.alibaba.ttl.TransmittableThreadLocal; import com.alibaba.ttl.TtlRunnable; public class Test { public static void main(String[] args) throws InterruptedException, ExecutionException { // testThreadLocal1(); // testThreadLocal2(); testThreadLocal3(); } private static void testThreadLocal1() throws InterruptedException, ExecutionException { final ThreadLocal<String> local = new java.lang.InheritableThreadLocal<String>(); ExecutorService executorService = Executors.newFixedThreadPool(1); for (int i = 0; i < 20; i++) { local.set(i + ""); System.out.println(local.get()); Future<?> future = executorService.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + ":" + local.get()); local.set(null); } }); future.get(); System.out.println(local.get()); local.set(null); } } private static void testThreadLocal2() throws InterruptedException, ExecutionException { ThreadLocal<String> local = new java.lang.InheritableThreadLocal<String>(); ExecutorService executorService = Executors.newFixedThreadPool(1); for (int i = 0; i < 20; i++) { local.set(i + ""); System.out.println(local.get()); Future<?> future = executorService.submit(new ParamRunnable(i + "")); future.get(); System.out.println(local.get()); local.set(null); } } private static void testThreadLocal3() throws InterruptedException, ExecutionException { final TransmittableThreadLocal<String> context = new TransmittableThreadLocal<String>(); ExecutorService executorService = Executors.newFixedThreadPool(1); for (int i = 0; i < 20; i++) { context.set(i + ""); System.out.println(context.get()); Future<?> future = executorService.submit(TtlRunnable.get(new Runnable() { public void run() { System.out.println(Thread.currentThread().getName() + ":" + context.get()); context.set(null); } })); future.get(); System.out.println(context.get()); context.set(null); } } private static class ParamRunnable implements Runnable { private String param; public ParamRunnable(String param) { this.param = param; } @Override public void run() { System.out.println(Thread.currentThread().getName() + ":" + param); } } }