ThreadLocal系列(三)-TransmittableThreadLocal的使用及原理解析


 上一篇:ThreadLocal系列(二)-InheritableThreadLocal的使用及原理解析

一、基本使用

首先,TTL是用來解決ITL解決不了的問題而誕生的,所以TTL一定是支持父線程的本地變量傳遞給子線程這種基本操作的,ITL也可以做到,但是前面有講過,ITL在線程池的模式下,就沒辦法再正確傳遞了,所以TTL做出的改進就是即便是在線程池模式下,也可以很好的將父線程本地變量傳遞下去,先來看個例子:


// 需要注意的是,使用TTL的時候,要想傳遞的值不出問題,線程池必須得用TTL加一層代理(下面會講這樣做的目的)
    private static ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(2));

    private static ThreadLocal tl = new TransmittableThreadLocal<>(); //這里采用TTL的實現

    public static void main(String[] args) {

        new Thread(() -> {

            String mainThreadName = "main_01";

            tl.set(1);

            executorService.execute(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之前(1), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            });

            executorService.execute(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之前(1), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            });

            executorService.execute(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之前(1), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            });

            sleep(1L); //確保上面的會在tl.set執行之前執行
            tl.set(2); // 等上面的線程池第一次啟用完了,父線程再給自己賦值

            executorService.execute(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之后(2), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            });

            executorService.execute(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之后(2), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            });

            executorService.execute(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之后(2), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            });

            System.out.println(String.format("線程名稱-%s, 變量值=%s", Thread.currentThread().getName(), tl.get()));

        }).start();


        new Thread(() -> {

            String mainThreadName = "main_02";

            tl.set(3);

            executorService.execute(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之前(3), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            });

            executorService.execute(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之前(3), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            });

            executorService.execute(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之前(3), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            });

            sleep(1L); //確保上面的會在tl.set執行之前執行
            tl.set(4); // 等上面的線程池第一次啟用完了,父線程再給自己賦值

            executorService.execute(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之后(4), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            });

            executorService.execute(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之后(4), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            });

            executorService.execute(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之后(4), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            });

            System.out.println(String.format("線程名稱-%s, 變量值=%s", Thread.currentThread().getName(), tl.get()));

        }).start();

    }

    private static void sleep(long time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

運行結果:


線程名稱-Thread-2, 變量值=4
本地變量改變之前(3), 父線程名稱-main_02, 子線程名稱-pool-1-thread-1, 變量值=3
線程名稱-Thread-1, 變量值=2
本地變量改變之前(1), 父線程名稱-main_01, 子線程名稱-pool-1-thread-2, 變量值=1
本地變量改變之前(1), 父線程名稱-main_01, 子線程名稱-pool-1-thread-1, 變量值=1
本地變量改變之前(3), 父線程名稱-main_02, 子線程名稱-pool-1-thread-2, 變量值=3
本地變量改變之前(3), 父線程名稱-main_02, 子線程名稱-pool-1-thread-2, 變量值=3
本地變量改變之前(1), 父線程名稱-main_01, 子線程名稱-pool-1-thread-1, 變量值=1
本地變量改變之后(2), 父線程名稱-main_01, 子線程名稱-pool-1-thread-2, 變量值=2
本地變量改變之后(4), 父線程名稱-main_02, 子線程名稱-pool-1-thread-1, 變量值=4
本地變量改變之后(4), 父線程名稱-main_02, 子線程名稱-pool-1-thread-1, 變量值=4
本地變量改變之后(4), 父線程名稱-main_02, 子線程名稱-pool-1-thread-2, 變量值=4
本地變量改變之后(2), 父線程名稱-main_01, 子線程名稱-pool-1-thread-1, 變量值=2
本地變量改變之后(2), 父線程名稱-main_01, 子線程名稱-pool-1-thread-2, 變量值=2

程序有些啰嗦,為了說明問題,加了很多說明,但至少通過上面的例子,不難發現,兩個主線程里都使用線程池異步,而且值在主線程里還發生過改變,測試結果展示一切正常,由此可以知道TTL在使用線程池的情況下,也可以很好的完成傳遞,而且不會發生錯亂。

那么是不是對普通線程異步也有這么好的支撐呢?

改造下上面的測試代碼:


private static ThreadLocal tl = new TransmittableThreadLocal<>();

    public static void main(String[] args) {

        new Thread(() -> {

            String mainThreadName = "main_01";

            tl.set(1);

            new Thread(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之前(1), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            }).start();

            new Thread(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之前(1), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            }).start();

            new Thread(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之前(1), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            }).start();

            sleep(1L); //確保上面的會在tl.set執行之前執行
            tl.set(2); // 等上面的線程池第一次啟用完了,父線程再給自己賦值

            new Thread(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之后(2), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            }).start();

            new Thread(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之后(2), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            }).start();

            new Thread(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之后(2), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            }).start();

            System.out.println(String.format("線程名稱-%s, 變量值=%s", Thread.currentThread().getName(), tl.get()));

        }).start();


        new Thread(() -> {

            String mainThreadName = "main_02";

            tl.set(3);

            new Thread(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之前(3), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            }).start();

            new Thread(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之前(3), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            }).start();

            new Thread(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之前(3), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            }).start();

            sleep(1L); //確保上面的會在tl.set執行之前執行
            tl.set(4); // 等上面的線程池第一次啟用完了,父線程再給自己賦值

            new Thread(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之后(4), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            }).start();

            new Thread(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之后(4), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            }).start();

            new Thread(() -> {
                sleep(1L);
                System.out.println(String.format("本地變量改變之后(4), 父線程名稱-%s, 子線程名稱-%s, 變量值=%s", mainThreadName, Thread.currentThread().getName(), tl.get()));
            }).start();

            System.out.println(String.format("線程名稱-%s, 變量值=%s", Thread.currentThread().getName(), tl.get()));

        }).start();

    }

相比第一段測試代碼,這一段的異步全都是普通異步,未采用線程池的方式進行異步,看下運行結果:


本地變量改變之后(4), 父線程名稱-main_02, 子線程名稱-Thread-14, 變量值=4
本地變量改變之前(1), 父線程名稱-main_01, 子線程名稱-Thread-5, 變量值=1
線程名稱-Thread-1, 變量值=2
本地變量改變之前(1), 父線程名稱-main_01, 子線程名稱-Thread-3, 變量值=1
本地變量改變之后(2), 父線程名稱-main_01, 子線程名稱-Thread-11, 變量值=2
本地變量改變之前(3), 父線程名稱-main_02, 子線程名稱-Thread-6, 變量值=3
本地變量改變之后(4), 父線程名稱-main_02, 子線程名稱-Thread-12, 變量值=4
本地變量改變之后(4), 父線程名稱-main_02, 子線程名稱-Thread-10, 變量值=4
本地變量改變之前(3), 父線程名稱-main_02, 子線程名稱-Thread-8, 變量值=3
本地變量改變之前(3), 父線程名稱-main_02, 子線程名稱-Thread-4, 變量值=3
本地變量改變之前(1), 父線程名稱-main_01, 子線程名稱-Thread-7, 變量值=1
線程名稱-Thread-2, 變量值=4
本地變量改變之后(2), 父線程名稱-main_01, 子線程名稱-Thread-9, 變量值=2
本地變量改變之后(2), 父線程名稱-main_01, 子線程名稱-Thread-13, 變量值=2

ok,可以看到,達到了跟第一個測試一致的結果。

到這里,通過上述兩個例子,TTL的基本使用,以及其解決的問題,我們已經有了初步的了解,下面我們來解析一下其內部原理,看看TTL是怎么完成對ITL的優化的。

二、原理分析

先來看TTL里面的幾個重要屬性及方法

TTL定義:


public class TransmittableThreadLocal extends InheritableThreadLocal

可以看到,TTL繼承了ITL,意味着TTL首先具備ITL的功能。

再來看看一個重要屬性holder:


   /**
     * 這是一個ITL類型的對象,持有一個全局的WeakMap(weakMap的key是弱引用,同TL一樣,也是為了解決內存泄漏的問題),里面存放了TTL對象
     * 並且重寫了initialValue和childValue方法,尤其是childValue,可以看到在即將異步時父線程的屬性是直接作為初始化值賦值給子線程的本地變量對象(TTL)的
     */
    private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
            new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
                @Override
                protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
                    return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
                }

                @Override
                protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
                    return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
                }
            };

再來看下set和get:

//下面的方法均屬於TTL類
@Override
    public final void set(T value) {
        super.set(value);
        if (null == value) removeValue();
        else addValue();
    }

    @Override
    public final T get() {
        T value = super.get();
        if (null != value) addValue();
        return value;
    }
    
    private void removeValue() {
        holder.get().remove(this); //從holder持有的map對象中移除
    }

    private void addValue() {
        if (!holder.get().containsKey(this)) {
            holder.get().put(this, null); //從holder持有的map對象中添加
        }
    }

TTL里先了解上述的幾個方法及對象,可以看出,單純的使用TTL是達不到支持線程池本地變量的傳遞的,通過第一部分的例子,可以發現,除了要啟用TTL,還需要通過TtlExecutors.getTtlExecutorService包裝一下線程池才可以,那么,下面就來看看在程序即將通過線程池異步的時候,TTL幫我們做了哪些操作(這一部分是TTL支持線程池傳遞的核心部分):

首先打開包裝類,看下execute方法在執行時做了些什么。

// 此方法屬於線程池包裝類ExecutorTtlWrapper
@Override
    public void execute(@Nonnull Runnable command) {
        executor.execute(TtlRunnable.get(command)); //這里會把Rannable包裝一層,這是關鍵,有些邏輯處理,需要在run之前執行
    }

    // 對應上面的get方法,返回一個TtlRunnable對象,屬於TtLRannable包裝類
    @Nullable
    public static TtlRunnable get(@Nullable Runnable runnable) {
        return get(runnable, false, false);
    }

    // 對應上面的get方法
    @Nullable
    public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
        if (null == runnable) return null;

        if (runnable instanceof TtlEnhanced) { // 若發現已經是目標類型了(說明已經被包裝過了)直接返回
            // avoid redundant decoration, and ensure idempotency
            if (idempotent) return (TtlRunnable) runnable;
            else throw new IllegalStateException("Already TtlRunnable!");
        }
        return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun); //最終初始化
    }

    // 對應上面的TtlRunnable方法
    private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
        this.capturedRef = new AtomicReference<Object>(capture()); //這里將捕獲后的父線程本地變量存儲在當前對象的capturedRef里
        this.runnable = runnable;
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
    }

    // 對應上面的capture方法,用於捕獲當前線程(父線程)里的本地變量,此方法屬於TTL的靜態內部類Transmitter
    @Nonnull
    public static Object capture() {
        Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
        for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) { // holder里目前存放的k-v里的key,就是需要傳給子線程的TTL對象
            captured.put(threadLocal, threadLocal.copyValue());
        }
        return captured; // 這里返回的這個對象,就是當前將要使用線程池異步出來的子線程,所繼承的本地變量合集
    }

    // 對應上面的copyValue,簡單的將TTL對象里的值返回(結合之前的源碼可以知道get方法其實就是獲取當前線程(父線程)里的值,調用super.get方法)
    private T copyValue() {
        return copy(get());
    }
    protected T copy(T parentValue) {
        return parentValue;
    }

結合上述代碼,大致知道了在線程池異步之前需要做的事情,其實就是把當前父線程里的本地變量取出來,然后賦值給Rannable包裝類里的capturedRef屬性,到此為止,下面會發生什么,我們大致上可以猜出來了,接下來大概率會在run方法里,將這些捕獲到的值賦給子線程的holder賦對應的TTL值,那么我們繼續往下看Rannable包裝類里的run方法是怎么實現的:

//run方法屬於Rannable的包裝類TtlRunnable
@Override public void run() { Object captured = capturedRef.get(); // 獲取由之前捕獲到的父線程變量集 if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after run!"); } /** * 重點方法replay,此方法用來給當前子線程賦本地變量,返回的backup是此子線程原來就有的本地變量值(原生本地變量,下面會詳細講), * backup用於恢復數據(如果任務執行完畢,意味着該子線程會歸還線程池,那么需要將其原生本地變量屬性恢復) */ Object backup = replay(captured); try { runnable.run(); // 執行異步邏輯 } finally { restore(backup); // 結合上面對於replay的解釋,不難理解,這個方法就是用來恢復原有值的 } }

根據上述代碼,我們看到了TTL在異步任務執行前,會先進行賦值操作(就是拿着異步發生時捕獲到的父線程的本地變量,賦給自己),當任務執行完,就恢復原生的自己本身的線程變量值。

下面來具體看這倆方法:

//下面的方法均屬於TTL的靜態內部類Transmittable
@Nonnull public static Object replay(@Nonnull Object captured) { @SuppressWarnings("unchecked") Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured; //使用此線程異步時捕獲到的父線程里的本地變量值 Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>(); //當前線程原生的本地變量,用於使用完線程后恢復用 //注意:這里循環的是當前子線程原生的本地變量集合,與本方法相反,restore方法里循環這個holder是指:該線程運行期間產生的變量+父線程繼承來的變量 for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); backup.put(threadLocal, threadLocal.get()); // 所有原生的本地變量都暫時存儲在backup里,用於之后恢復用 /** * 檢查,如果捕獲到的線程變量里,不包含當前原生變量值,則從當前原生變量里清除掉,對應的線程本地變量也清掉 * 這就是為什么會將原生變量保存在backup里的原因,為了恢復原生值使用 * 那么,為什么這里要清除掉呢?因為從使用這個子線程做異步那里,捕獲到的本地變量並不包含原生的變量,當前線程 * 在做任務時的首要目標,是將父線程里的變量完全傳遞給任務,如果不清除這個子線程原生的本地變量, * 意味着很可能會影響到任務里取值的准確性。 * * 打個比方,有ttl對象tl,這個tl在線程池的某個子線程里存在對應的值2,當某個主線程使用該子線程做異步任務時 * tl這個對象在當前主線程里沒有值,那么如果不進行下面這一步的操作,那么在使用該子線程做的任務里就可以通過 * 該tl對象取到值2,不符合預期 */ if (!capturedMap.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // 這一步就是直接把父線程本地變量賦值給當前線程了(這一步起就刷新了holder里的值了,具體往下看該方法,在異步線程運行期間,還可能產生別的本地變量,比如在真正的run方法內的業務代碼,再用一個tl對象設置一個值) setTtlValuesTo(capturedMap); // 這個方法屬於擴展方法,ttl本身支持重寫異步任務執行前后的操作,這里不再具體贅述 doExecuteCallback(true); return backup; } // 結合之前Rannable包裝類的run方法來看,這個方法就是使用上面replay記錄下的原生線程變量做恢復用的 public static void restore(@Nonnull Object backup) { @SuppressWarnings("unchecked") Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup; // call afterExecute callback doExecuteCallback(false); // 注意,這里的holder取出來的,實際上是replay方法設置進去的關於父線程里的所有變量(結合上面來看,就是:該線程運行期間產生的變量+父線程繼承來的變量) for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator(); iterator.hasNext(); ) { Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next(); TransmittableThreadLocal<?> threadLocal = next.getKey(); /** * 同樣的,如果子線程原生變量不包含某個父線程傳來的對象,那么就刪除,可以思考下,這里的清除跟上面replay里的有什么不同? * 這里會把不屬於原生變量的對象給刪除掉(這里被刪除掉的可能是父線程繼承下來的,也可能是異步任務在執行時產生的新值) */ if (!backupMap.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // 同樣調用這個方法,進行值的恢復 setTtlValuesTo(backupMap); } // 真正給當前子線程賦值的方法,對應上面的setTtlValuesTo方法 private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) { for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) { @SuppressWarnings("unchecked") TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey(); threadLocal.set(entry.getValue()); //賦值,注意,從這里開始,子線程的holder里的值會被重新賦值刷新,可以參照上面ttl的set方法的實現 } }

ok,到這里基本上把TTL比較核心的代碼看完了,下面整理下整個流程,這是官方給出的時序圖:

圖2-1

上圖第一行指的是類名稱,下面的流程指的是類所做的事情,根據上面羅列出來的源碼,結合這個時序圖,可以比較直觀一些的理解整個流程。

三、TTL中線程池子線程原生變量的產生

這一節是為了驗證上面replay和restore,現在通過一個例子來驗證下,先把源碼down下來,在源碼的restore和replay上分別加上輸出語句,遍歷holder:


//replay前后打印holder里面的值
public static Object replay(@Nonnull Object captured) {
            @SuppressWarnings("unchecked")
            Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
            Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
            System.out.println("--------------------replay前置,當前拿到的holder里的TTL列表");
            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();
                System.out.println(String.format("replay前置里拿到原生的ttl_k=%s, ttl_value=%s", threadLocal.hashCode(), threadLocal.get()));
            }

            for...//代碼省略,具體看上面
            
            setTtlValuesTo(capturedMap);

            doExecuteCallback(true);

            System.out.println("--------------------reply后置,當前拿到的holder里的TTL列表");
            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();
                System.out.println(String.format("replay后置里拿到原生的ttl_k=%s, ttl_value=%s", threadLocal.hashCode(), threadLocal.get()));
            }

            return backup;
        }

//restore前后打印holder里面的值
public static void restore(@Nonnull Object backup) {
            @SuppressWarnings("unchecked")
            Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
            // call afterExecute callback
            doExecuteCallback(false);

            System.out.println("--------------------restore前置,當前拿到的holder里的TTL列表");
            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();
                System.out.println(String.format("restore前置里拿到當前線程內變量,ttl_k=%s, ttl_value=%s", threadLocal.hashCode(), threadLocal.get()));
            }

            for...//省略代碼,具體具體看上面

            setTtlValuesTo(backupMap);

            System.out.println("--------------------restore后置,當前拿到的holder里的TTL列表");
            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();
                System.out.println(String.format("restore后置里拿到當前線程內變量,ttl_k=%s, ttl_value=%s", threadLocal.hashCode(), threadLocal.get()));
            }
        }

代碼這樣做的目的,就是要說明線程池所謂的原生本地變量是怎么產生的,以及replay和restore是怎么設置和恢復的,下面來看個簡單的例子:


private static ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(1));

    private static ThreadLocal tl = new TransmittableThreadLocal();
    private static ThreadLocal tl2 = new TransmittableThreadLocal();

    public static void main(String[] args) throws InterruptedException {

        tl.set(1);
        tl2.set(2);

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

運行結果如下:


--------------------replay前置,當前拿到的holder里的TTL列表
replay前置里拿到原生的ttl_k=1259475182, ttl_value=2
replay前置里拿到原生的ttl_k=929338653, ttl_value=1
--------------------reply后置,當前拿到的holder里的TTL列表
replay后置里拿到原生的ttl_k=1259475182, ttl_value=2
replay后置里拿到原生的ttl_k=929338653, ttl_value=1
--------------------restore前置,當前拿到的holder里的TTL列表
restore前置里拿到當前線程內變量,ttl_k=1259475182, ttl_value=2
restore前置里拿到當前線程內變量,ttl_k=929338653, ttl_value=1
--------------------restore后置,當前拿到的holder里的TTL列表
restore后置里拿到當前線程內變量,ttl_k=1259475182, ttl_value=2
restore后置里拿到當前線程內變量,ttl_k=929338653, ttl_value=1

我們會發現,原生值產生了,從異步開始,就確定了線程池里的線程具備了1和2的值,那么,再來改動下上面的測試代碼:


public static void main(String[] args) throws InterruptedException {

        tl.set(1);

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread.sleep(1000L);

        tl2.set(2);//較第一次換下位置,換到第一次使用線程池后執行(這意味着下面這次異步不會再觸發Thread的init方法了)

        System.out.println("---------------------------------------------------------------------------------");
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

運行結果為:


--------------------replay前置,當前拿到的holder里的TTL列表
replay前置里拿到原生的ttl_k=929338653, ttl_value=1
--------------------reply后置,當前拿到的holder里的TTL列表
replay后置里拿到原生的ttl_k=929338653, ttl_value=1
--------------------restore前置,當前拿到的holder里的TTL列表
restore前置里拿到當前線程內變量,ttl_k=929338653, ttl_value=1
--------------------restore后置,當前拿到的holder里的TTL列表
restore后置里拿到當前線程內變量,ttl_k=929338653, ttl_value=1
---------------------------------------------------------------------------------
--------------------replay前置,當前拿到的holder里的TTL列表
replay前置里拿到原生的ttl_k=929338653, ttl_value=1
--------------------reply后置,當前拿到的holder里的TTL列表
replay后置里拿到原生的ttl_k=1020371697, ttl_value=2
replay后置里拿到原生的ttl_k=929338653, ttl_value=1
--------------------restore前置,當前拿到的holder里的TTL列表
restore前置里拿到當前線程內變量,ttl_k=1020371697, ttl_value=2
restore前置里拿到當前線程內變量,ttl_k=929338653, ttl_value=1
--------------------restore后置,當前拿到的holder里的TTL列表
restore后置里拿到當前線程內變量,ttl_k=929338653, ttl_value=1

可以發現,第一次異步時,只有一個值被傳遞了下去,然后第二次異步,新加了一個tl2的值,但是看第二次異步的打印,會發現,restore恢復后,仍然是第一次異步發生時放進去的那個tl的值。

通過上面的例子,基本可以確認,所謂線程池內線程的本地原生變量,其實是第一次使用線程時被傳遞進去的值,我們之前有說過TTL是繼承至ITL的,之前的文章也說過,線程池第一次啟用時是會觸發Thread的init方法的,也就是說,在第一次異步時拿到的主線程的變量會被傳遞給子線程,作為子線程的原生本地變量保存起來,后續是replay操作和restore操作也是圍繞着這個原生變量(即原生holder里的值)來進行設置恢復的,設置的是當前父線程捕獲到的本地變量,恢復的是子線程原生本地變量。

holder里持有的可以理解就是當前線程內的所有本地變量,當子線程將異步任務執行完畢后會執行restore進行恢復原生本地變量,具體參照上面的代碼和測試代碼。

四、總結

到這里基本上確認了TTL是如何進行線程池傳值的,以及被包裝的run方法執行異步任務之前,會使用replay進行設置父線程里的本地變量給當前子線程,任務執行完畢,會調用restore恢復該子線程原生的本地變量(目前原生本地變量的產生,就只碰到上述測試代碼中的這一種情況,即線程第一次使用時通過ITL屬性以及Thread的init方法傳給子線程,還不太清楚有沒有其他方式設置)。

其實,正常程序里想要完成線程池上下文傳遞,使用TL就足夠了,我們可以效仿TTL包裝線程池對象的原理,進行值傳遞,異步任務結束后,再remove,以此類推來完成線程池值傳遞,不過這種方式過於單純,且要求上下文為只讀對象,否則子線程存在寫操作,就會發生上下文污染。

TTL項目地址(可以詳細了解下它的其他特性和用法):https://github.com/alibaba/transmittable-thread-local


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM