TransmittableThreadLocal解決線程池變量傳遞以及原理解析
介紹
TransmittableThreadLocal是alibaba提供的一個工具包中的類,主要作用就是解決線程池場景下的變量傳遞問題。繼承自InheritableThreadLocal,我們知道
InheritableThreadLocal解決了主線程與子線程之間的變量傳遞問題,但是在遇到線程池以及線程復用的情況下,就無能為力了,TransmittableThreadLocal通過對InheritableThreadLocal以及線程池的增強,解決了這個問題。
主要用途:
- 就是應用中對於連接池中的全局鏈路追蹤。
- 解決例如Hystrix中出現的ThreadLocal無法傳遞的問題。
使用
依賴
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.11.2</version>
</dependency>
示例代碼
public class TestTTL {
//定義一個線程池執行ttl,這里必須要用TTL線程池封裝
private static ExecutorService TTLExecutor = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(5));
//定義另外一個線程池循環執行,模擬業務場景下多Http請求調用的情況
private static ExecutorService loopExecutor = Executors.newFixedThreadPool(5);
private static AtomicInteger i=new AtomicInteger(0);
//TTL的ThreadLocal
private static ThreadLocal tl = new TransmittableThreadLocal<>(); //這里采用TTL的實現
public static void main(String[] args) {
while (true) {
/*
這里就是循環執行10次,每次對數值加1並設置到threadlocal中,然后再使用TTL去執行來打印這個值。
這里外部為什么使用線程池,是為了證明TTL確實可以達到我們想要的效果:即線程池中多任務帶着
父線程各自的ThreadLocal運行互不影響
*/
loopExecutor.execute( () -> {
if(i.get()<10){
tl.set(i.getAndAdd(1));
TTLExecutor.execute(() -> {
System.out.println(String.format("子線程名稱-%s, 變量值=%s", Thread.currentThread().getName(), tl.get()));
});
}
});
}
}
}
執行結果:
原理
TransmittableThreadLocal執行的關鍵原理在於以下幾個類做了幾件事:
TransmittableThreadLocal
TransmittableThreadLocal本身增加一個靜態的holderMap,里面保存了所有使用過的TransmittableThreadLocal作為key的引用,這樣在復制TransmittableThreadLocal的值到線程本身的ThreadLocal時,就可以通過該holder遍歷到所有的TransmittableThreadLocal。
/*
可以看到,在TransmittableThreadLocal調用get和set方法時,都會將自己作為key放入holder,以便后續復制時遍歷,
這個holder其實就是一個儲存全局TransmittableThreadLocal的集合,不過他不是通過手動add的,
而是通過耦合到TransmittableThreadLocal的方法中自動的去增加。
*/
public final void set(T value) {
super.set(value);
if (null == value) {
this.removeValue();
} else {
this.addValue();
}
}
private void addValue() {
if (!((WeakHashMap)holder.get()).containsKey(this)) {
((WeakHashMap)holder.get()).put(this, (Object)null);
}
}
Transmitter和Snapshot
這兩個都是TransmittableThreadLocal的內部類,前者主要是一些工具方法,后者包含了2個map,ttl2Value和threadLocal2Value,分別存儲ttl設置的ThreadLocal值和父線程中其他的ThreadLocal值。其中Snapshot是一個快照,用作備份還原現場使用。
//Transmitter
/*
可以看到這里就是通過holder來遍歷所有TTLocal,以此來復制值到下面的Runable中。
*/
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap();
Iterator var1 = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator();
while(var1.hasNext()) {
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)var1.next();
ttl2Value.put(threadLocal, threadLocal.copyValue());
}
return ttl2Value;
}
//這個方法會復制一個快照返回,實際調用會將調用線程的2類ThreadLocal值復制為一個快照給Runable使用
@NonNull
public static Object capture() {
return new TransmittableThreadLocal.Transmitter.Snapshot(captureTtlValues(), captureThreadLocalValues());
}
//方法名就是重放,就是將快照的值,copy到執行線程中。
@NonNull
public static Object replay(@NonNull Object captured) {
TransmittableThreadLocal.Transmitter.Snapshot capturedSnapshot = (TransmittableThreadLocal.Transmitter.Snapshot)captured;
return new TransmittableThreadLocal.Transmitter.Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}
//還原現場,在執行實際runnable之后,將執行前的備份,copy回執行線程
public static void restore(@NonNull Object backup) {
TransmittableThreadLocal.Transmitter.Snapshot backupSnapshot = (TransmittableThreadLocal.Transmitter.Snapshot)backup;
restoreTtlValues(backupSnapshot.ttl2Value);
restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}
//一個快照類,保存ttl和原生threadlocal的值
private static class Snapshot {
final WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value;
private Snapshot(WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value, WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value) {
this.ttl2Value = ttl2Value;
this.threadLocal2Value = threadLocal2Value;
}
}
TtlRunnable
這個類就是前面使用TTL封裝線程池的意義,TTL封裝線程池,重寫了其中的sumbit和execute等方法,使得提交到線程池中的實際Runable是封裝過后的TtlRunnable,並且該類完成了復制ThreadLocal值和還原現場等操作。
// ExecutorServiceTtlWrapper,封裝線程池
/*
這里面的TtlCallable.get就會return一個new的TtlRunnable
*/
@NonNull
public <T> Future<T> submit(@NonNull Callable<T> task) {
return this.executorService.submit(TtlCallable.get(task));
}
@NonNull
public <T> Future<T> submit(@NonNull Runnable task, T result) {
return this.executorService.submit(TtlRunnable.get(task), result);
}
@NonNull
public Future<?> submit(@NonNull Runnable task) {
return this.executorService.submit(TtlRunnable.get(task));
}
// TtlRunnable,封裝線程池
public final class TtlRunnable implements Runnable, TtlEnhanced, TtlAttachments {
/*
這個變量是最核心的變量,在初始化時,會調用Transmitter,
將父線程或者說調用線程的TTLThreadLocal和原生ThreadLocal中的值copy出一個快照,
即上面的SnapShot,並且在這里持有那個SnapShot的引用,
注意,這里的構造函數初始化,是在調用線程里執行的,所以拿到的就是調用線程的ThreadLocal值。
*/
private final AtomicReference<Object> capturedRef = new AtomicReference(Transmitter.capture());
//其他代碼省略
//實際執行函數
public void run() {
//獲取上面的那個快照值,此時已經在線程池中執行了
Object captured = this.capturedRef.get();
if (captured != null && (!this.releaseTtlValueReferenceAfterRun || this.capturedRef.compareAndSet(captured, (Object)null))) {
//調用工具類,此時我們有了快照,並且已經在當前線程池中執行了,所以把快照的值全部賦值到當前線程池中的ThreadLocal中去,下一步被包裝的Runable執行run時,就是無感的了。
Object backup = Transmitter.replay(captured);
try {
this.runnable.run();
} finally {
//執行完成后,我們需要將該線程的ThreadLocal還原回之前的快照,因為在線程池中,線程可能復用,為了防止在runnable執行中對該線程的ThreadLocal產生了污染,然后該線程被復用去執行其他Runable時該值已被修改,不再是調用線程的值了,所以需要還原現場。
Transmitter.restore(backup);
}
} else {
throw new IllegalStateException("TTL value reference is released after run!");
}
}
}
用兩張圖可以更加詳細的說明執行的過程。我們給前面的兩個線程池中的線程分別命個名,然后debug代碼。
我們debug到TtlRunnable初始化,可以看到此時初始化的線程是loopExecutor,也就是調用線程,理所當然此時可以制作一個調用線程的ThreadLocal快照。
然后我們debug到runnable執行時走到的replay方法,此時執行的線程就是TTL線程,也就是線程池中的線程了,此時我們當然也可以將之前保存的快照,來賦值到線程池中該線程了,此后的還原也是同理。
總結
該工具類解決了特定場景下的需求,實現方式核心就是封裝+快照。非常值得學習。不過在實際中也不建議在ThreadLocal值過多或者較大時頻繁使用,因為會產生過多的SnapShot臨時對象增加gc負擔,並且每次線程執行都會帶來更多的copy和還原負擔。