跨線程池傳遞線程變量,使用阿里的transmittable-thread-local


https://blog.csdn.net/gududedabai/article/details/83059226?depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-4&utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-4

https://blog.csdn.net/gududedabai/article/details/83059381

加入以下pom依賴:

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.2.0</version>
</dependency>

  

轉載改造hystrix線程池方法:

改造線程池方式

上面介紹了改造線程的方式,並且通過建一個同樣的Java類來覆蓋Jar包中的實現,感覺有點投機取巧,其實不用這么麻煩,Hystrix默認提供了HystrixPlugins類,可以讓用戶自定義線程池,下面來看看怎么使用:

在啟動之前調用進行注冊自定義實現的邏輯:

HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());

ThreadLocalHystrixConcurrencyStrategy就是我們自定義的創建線程池的類,需要繼承HystrixConcurrencyStrategy,前面也有講到通過調試代碼發現最終獲取線程池的代碼就在HystrixConcurrencyStrategy中。

我們只需要重寫getThreadPool方法即可完成對線程池的改造,由於TtlExecutors只能修飾ExecutorService和Executor,而HystrixConcurrencyStrategy中返回的是ThreadPoolExecutor,我們需要對ThreadPoolExecutor進行包裝一層,最終在execute方法中對線程修飾,也就相當於改造了線程池。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.properties.HystrixProperty;
import com.netflix.hystrix.util.PlatformSpecific;

public class ThreadLocalHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
	private final static Logger logger = LoggerFactory.getLogger(ThreadLocalHystrixConcurrencyStrategy.class);

	@Override
	public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize,
			HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue) {
		final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

		final int dynamicCoreSize = corePoolSize.get();
		final int dynamicMaximumSize = maximumPoolSize.get();

		if (dynamicCoreSize > dynamicMaximumSize) {
			logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name()
					+ " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize
					+ ".  Maximum size will be set to " + dynamicCoreSize
					+ ", the coreSize value, since it must be equal to or greater than the coreSize value");
			return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit,
					workQueue, threadFactory);
		} else {
			return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit,
					workQueue, threadFactory);
		}
	}

	@Override
	public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
			HystrixThreadPoolProperties threadPoolProperties) {
		final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

		final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties
				.getAllowMaximumSizeToDivergeFromCoreSize().get();
		final int dynamicCoreSize = threadPoolProperties.coreSize().get();
		final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
		final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
		final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

		if (allowMaximumSizeToDivergeFromCoreSize) {
			final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
			if (dynamicCoreSize > dynamicMaximumSize) {
				logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name()
						+ " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize
						+ ".  Maximum size will be set to " + dynamicCoreSize
						+ ", the coreSize value, since it must be equal to or greater than the coreSize value");
				return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime,
						TimeUnit.MINUTES, workQueue, threadFactory);
			} else {
				return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime,
						TimeUnit.MINUTES, workQueue, threadFactory);
			}
		} else {
			return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES,
					workQueue, threadFactory);
		}
	}

	private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {
		if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
			return new ThreadFactory() {
				private final AtomicInteger threadNumber = new AtomicInteger(0);

				@Override
				public Thread newThread(Runnable r) {
					Thread thread = new Thread(r,
							"hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
					thread.setDaemon(true);
					return thread;
				}

			};
		} else {
			return PlatformSpecific.getAppEngineThreadFactory();
		}
	}
}

ThreadLocalThreadPoolExecutor的代碼:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;

public class ThreadLocalThreadPoolExecutor extends ThreadPoolExecutor {
	private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

	public static TransmittableThreadLocal<Long> THREAD_LOCAL = new TransmittableThreadLocal<Long>();

	public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}

	public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
	}

	public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
	}

	public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
		super(maximumPoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
	}

	@Override
	public void execute(Runnable command) {
		super.execute(TtlRunnable.get(command));
	}
}

 

啟動時加入插件

HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());

 使用方法:調用feign client服務之前,設置線程變量

ThreadLocalThreadPoolExecutor.THREAD_LOCAL.set(10086L);

 在FeignAuthConfiguration里,調用appTokenHolder.get();之前加入設置租戶id

Long tenantId = ThreadLocalThreadPoolExecutor.THREAD_LOCAL.get();
DefaultAppTokenHolder.TENANT_FOR_NO_SESSION.set(tenantId);

  

  

 

使用線程變量三種方式測試:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;

public class Test {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
//		testThreadLocal1();
		// testThreadLocal2();
		testThreadLocal3();
	}

	private static void testThreadLocal1() throws InterruptedException, ExecutionException {
		final ThreadLocal<String> local = new java.lang.InheritableThreadLocal<String>();
		ExecutorService executorService = Executors.newFixedThreadPool(1);
		for (int i = 0; i < 20; i++) {
			local.set(i + "");
			System.out.println(local.get());
			Future<?> future = executorService.submit(new Runnable() {

				@Override
				public void run() {
					System.out.println(Thread.currentThread().getName() + ":" + local.get());
					local.set(null);
				}
			});
			future.get();
			System.out.println(local.get());
			local.set(null);
		}
	}

	private static void testThreadLocal2() throws InterruptedException, ExecutionException {
		ThreadLocal<String> local = new java.lang.InheritableThreadLocal<String>();
		ExecutorService executorService = Executors.newFixedThreadPool(1);
		for (int i = 0; i < 20; i++) {
			local.set(i + "");
			System.out.println(local.get());
			Future<?> future = executorService.submit(new ParamRunnable(i + ""));
			future.get();
			System.out.println(local.get());
			local.set(null);
		}
	}

	private static void testThreadLocal3() throws InterruptedException, ExecutionException {
		final TransmittableThreadLocal<String> context = new TransmittableThreadLocal<String>();
		ExecutorService executorService = Executors.newFixedThreadPool(1);
		for (int i = 0; i < 20; i++) {
			context.set(i + "");
			System.out.println(context.get());
			Future<?> future = executorService.submit(TtlRunnable.get(new Runnable() {
				public void run() {
					System.out.println(Thread.currentThread().getName() + ":" + context.get());
					context.set(null);
				}
			}));
			future.get();
			System.out.println(context.get());
			context.set(null);
		}
	}

	private static class ParamRunnable implements Runnable {

		private String param;

		public ParamRunnable(String param) {
			this.param = param;
		}

		@Override
		public void run() {
			System.out.println(Thread.currentThread().getName() + ":" + param);
		}

	}

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM