TransmittableThreadLocal解決線程池變量傳遞以及原理解析


TransmittableThreadLocal解決線程池變量傳遞以及原理解析

介紹

TransmittableThreadLocal是alibaba提供的一個工具包中的類,主要作用就是解決線程池場景下的變量傳遞問題。繼承自InheritableThreadLocal,我們知道
InheritableThreadLocal解決了主線程與子線程之間的變量傳遞問題,但是在遇到線程池以及線程復用的情況下,就無能為力了,TransmittableThreadLocal通過對InheritableThreadLocal以及線程池的增強,解決了這個問題。

主要用途:

  1. 就是應用中對於連接池中的全局鏈路追蹤。
  2. 解決例如Hystrix中出現的ThreadLocal無法傳遞的問題。

使用

依賴

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.11.2</version>
</dependency>

示例代碼

public class TestTTL {
    //定義一個線程池執行ttl,這里必須要用TTL線程池封裝
    private static ExecutorService TTLExecutor = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(5));
    //定義另外一個線程池循環執行,模擬業務場景下多Http請求調用的情況
    private static ExecutorService loopExecutor = Executors.newFixedThreadPool(5);
    private static AtomicInteger i=new AtomicInteger(0);
    //TTL的ThreadLocal
    private static ThreadLocal tl = new TransmittableThreadLocal<>(); //這里采用TTL的實現
    public static void main(String[] args) {

        while (true) {
            /*
             這里就是循環執行10次,每次對數值加1並設置到threadlocal中,然后再使用TTL去執行來打印這個值。
             這里外部為什么使用線程池,是為了證明TTL確實可以達到我們想要的效果:即線程池中多任務帶着
             父線程各自的ThreadLocal運行互不影響
            */
            loopExecutor.execute( () -> {
                if(i.get()<10){
                    tl.set(i.getAndAdd(1));
                    TTLExecutor.execute(() -> {
                        System.out.println(String.format("子線程名稱-%s, 變量值=%s", Thread.currentThread().getName(), tl.get()));
                    });
                }
            });
        }
    }
}

執行結果:

原理

TransmittableThreadLocal執行的關鍵原理在於以下幾個類做了幾件事:

TransmittableThreadLocal

TransmittableThreadLocal本身增加一個靜態的holderMap,里面保存了所有使用過的TransmittableThreadLocal作為key的引用,這樣在復制TransmittableThreadLocal的值到線程本身的ThreadLocal時,就可以通過該holder遍歷到所有的TransmittableThreadLocal。

/*
    可以看到,在TransmittableThreadLocal調用get和set方法時,都會將自己作為key放入holder,以便后續復制時遍歷,
    這個holder其實就是一個儲存全局TransmittableThreadLocal的集合,不過他不是通過手動add的,
    而是通過耦合到TransmittableThreadLocal的方法中自動的去增加。
*/
public final void set(T value) {
        super.set(value);
        if (null == value) {
            this.removeValue();
        } else {
            this.addValue();
        }
}
private void addValue() {
        if (!((WeakHashMap)holder.get()).containsKey(this)) {
            ((WeakHashMap)holder.get()).put(this, (Object)null);
        }

}

Transmitter和Snapshot

這兩個都是TransmittableThreadLocal的內部類,前者主要是一些工具方法,后者包含了2個map,ttl2Value和threadLocal2Value,分別存儲ttl設置的ThreadLocal值和父線程中其他的ThreadLocal值。其中Snapshot是一個快照,用作備份還原現場使用。

//Transmitter
/*
    可以看到這里就是通過holder來遍歷所有TTLocal,以此來復制值到下面的Runable中。
*/
 private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
            WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap();
            Iterator var1 = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator();

            while(var1.hasNext()) {
                TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)var1.next();
                ttl2Value.put(threadLocal, threadLocal.copyValue());
            }

            return ttl2Value;
}
//這個方法會復制一個快照返回,實際調用會將調用線程的2類ThreadLocal值復制為一個快照給Runable使用
@NonNull
public static Object capture() {
    return new TransmittableThreadLocal.Transmitter.Snapshot(captureTtlValues(), captureThreadLocalValues());
}

//方法名就是重放,就是將快照的值,copy到執行線程中。
@NonNull
public static Object replay(@NonNull Object captured) {
    TransmittableThreadLocal.Transmitter.Snapshot capturedSnapshot = (TransmittableThreadLocal.Transmitter.Snapshot)captured;
    return new TransmittableThreadLocal.Transmitter.Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}
//還原現場,在執行實際runnable之后,將執行前的備份,copy回執行線程
public static void restore(@NonNull Object backup) {
        TransmittableThreadLocal.Transmitter.Snapshot backupSnapshot = (TransmittableThreadLocal.Transmitter.Snapshot)backup;
        restoreTtlValues(backupSnapshot.ttl2Value);
        restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
    }
//一個快照類,保存ttl和原生threadlocal的值
private static class Snapshot {
        final WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
        final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value;

        private Snapshot(WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value, WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value) {
            this.ttl2Value = ttl2Value;
            this.threadLocal2Value = threadLocal2Value;
        }
}

TtlRunnable

這個類就是前面使用TTL封裝線程池的意義,TTL封裝線程池,重寫了其中的sumbit和execute等方法,使得提交到線程池中的實際Runable是封裝過后的TtlRunnable,並且該類完成了復制ThreadLocal值和還原現場等操作。

// ExecutorServiceTtlWrapper,封裝線程池
/*
    這里面的TtlCallable.get就會return一個new的TtlRunnable
*/
@NonNull
    public <T> Future<T> submit(@NonNull Callable<T> task) {
        return this.executorService.submit(TtlCallable.get(task));
    }

    @NonNull
    public <T> Future<T> submit(@NonNull Runnable task, T result) {
        return this.executorService.submit(TtlRunnable.get(task), result);
    }

    @NonNull
    public Future<?> submit(@NonNull Runnable task) {
        return this.executorService.submit(TtlRunnable.get(task));
    }
// TtlRunnable,封裝線程池
public final class TtlRunnable implements Runnable, TtlEnhanced, TtlAttachments {
    /* 
        這個變量是最核心的變量,在初始化時,會調用Transmitter,
        將父線程或者說調用線程的TTLThreadLocal和原生ThreadLocal中的值copy出一個快照,
        即上面的SnapShot,並且在這里持有那個SnapShot的引用,
        注意,這里的構造函數初始化,是在調用線程里執行的,所以拿到的就是調用線程的ThreadLocal值。
    */
    private final AtomicReference<Object> capturedRef = new AtomicReference(Transmitter.capture());
    //其他代碼省略

    //實際執行函數
    public void run() {
        //獲取上面的那個快照值,此時已經在線程池中執行了
        Object captured = this.capturedRef.get();
        if (captured != null && (!this.releaseTtlValueReferenceAfterRun || this.capturedRef.compareAndSet(captured, (Object)null))) {
            //調用工具類,此時我們有了快照,並且已經在當前線程池中執行了,所以把快照的值全部賦值到當前線程池中的ThreadLocal中去,下一步被包裝的Runable執行run時,就是無感的了。
            Object backup = Transmitter.replay(captured);

            try {
                this.runnable.run();
            } finally {
                //執行完成后,我們需要將該線程的ThreadLocal還原回之前的快照,因為在線程池中,線程可能復用,為了防止在runnable執行中對該線程的ThreadLocal產生了污染,然后該線程被復用去執行其他Runable時該值已被修改,不再是調用線程的值了,所以需要還原現場。
                Transmitter.restore(backup);
            }

        } else {
            throw new IllegalStateException("TTL value reference is released after run!");
        }
    }
}

用兩張圖可以更加詳細的說明執行的過程。我們給前面的兩個線程池中的線程分別命個名,然后debug代碼。

我們debug到TtlRunnable初始化,可以看到此時初始化的線程是loopExecutor,也就是調用線程,理所當然此時可以制作一個調用線程的ThreadLocal快照。

然后我們debug到runnable執行時走到的replay方法,此時執行的線程就是TTL線程,也就是線程池中的線程了,此時我們當然也可以將之前保存的快照,來賦值到線程池中該線程了,此后的還原也是同理。

總結

該工具類解決了特定場景下的需求,實現方式核心就是封裝+快照。非常值得學習。不過在實際中也不建議在ThreadLocal值過多或者較大時頻繁使用,因為會產生過多的SnapShot臨時對象增加gc負擔,並且每次線程執行都會帶來更多的copy和還原負擔。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM