TransmittableThreadLocal使用踩坑-(主線程set,異步線程get)


背景:為了獲取相關字段方便,項目里使用了TransmittableThreadLocal上下文,在異步邏輯中get值時發現並非當前請求的值,且是偶發狀況(並發問題)。

發現:TransmittableThreadLocal是阿里開源的可以實現父子線程值傳遞的工具,其子線程必須使用TtlRunnable\TtlCallable修飾或者線程池使用TtlExecutors修飾(防止數據“污染”),如果沒有使用裝飾后的線程池,那么使用TransmittableThreadLocal上下文,就有可能出現線程不安全的問題。

話不多說上代碼:

封裝的上下文,成員變量RequestHeader

package org.example.ttl.threadLocal;

import com.alibaba.ttl.TransmittableThreadLocal;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.ObjectUtils;

/**
 * description:
 * author: JohnsonLiu
 * create at:  2021/12/24  23:19
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RequestContext {

    private static final ThreadLocal<RequestContext> transmittableThreadLocal = new TransmittableThreadLocal();


    private static final RequestContext INSTANCE = new RequestContext();
    private RequestHeader requestHeader;


    public static void create(RequestHeader requestHeader) {
        transmittableThreadLocal.set(new RequestContext(requestHeader));
    }

    public static RequestContext current() {
        return ObjectUtils.defaultIfNull(transmittableThreadLocal.get(), INSTANCE);
    }

    public static RequestHeader get() {
        return current().getRequestHeader();
    }

    public static void remove() {
        transmittableThreadLocal.set(null);
    }


    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    static class RequestHeader {
        private String requestUrl;
        private String requestType;
    }
}

獲取上下文內容的case:

 
         
package org.example.ttl.threadLocal;

import com.alibaba.ttl.threadpool.TtlExecutors;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* description: TransmittableThreadLocal正確使用
* author: JohnsonLiu
* create at: 2021/12/24 22:24
* 驗證結論:
* 1.線程池必須使用TtlExecutors修飾,或者Runnable\Callable必須使用TtlRunnable\TtlCallable修飾
* ---->原因:子線程復用,子線程擁有的上下文內容會對下次使用造成“污染”,而修飾后的子線程在執行run方法后會進行“回放”,防止污染
*/
public class TransmittableThreadLocalCase2 {

// 為達到線程100%復用便於測試,線程池核心數1

private static final Executor TTL_EXECUTOR = TtlExecutors.getTtlExecutor(new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>(1000)));

// 如果使用一般的線程池或者Runnable\Callable時,會存在線程“污染”,比如線程池中線程會復用,復用的線程會“污染”該線程執行下一次任務
private static final Executor EXECUTOR = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>(1000));


public static void main(String[] args) {

RequestContext.create(new RequestContext.RequestHeader("url", "get"));
System.out.println(Thread.currentThread().getName() + " 子線程(rm之前 同步):" + RequestContext.get());
// 模擬另一個線程修改上下文內容
EXECUTOR.execute(() -> {
RequestContext.create(new RequestContext.RequestHeader("url", "put"));
});

// 保證上面子線程修改成功
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 異步獲取上下文內容
TTL_EXECUTOR.execute(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 子線程(rm之前 異步):" + RequestContext.get());
});

// 主線程修改上下文內容
RequestContext.create(new RequestContext.RequestHeader("url", "post"));
System.out.println(Thread.currentThread().getName() + " 子線程(rm之前 同步<reCreate>):" + RequestContext.get());

// 主線程remove
RequestContext.remove();

// 子線程獲取remove后的上下文內容
TTL_EXECUTOR.execute(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 子線程(rm之后 異步):" + RequestContext.get());
});
}
}
 
使用一般線程池結果:
使用修飾后的線程池結果:

 

這種問題的解決辦法:

如果大家跟我一樣存在這樣的使用,那么也會低概率存在這樣的問題,正確的使用方式是:

子線程必須使用TtlRunnable\TtlCallable修飾或者線程池使用TtlExecutors修飾,這一點很容易被遺漏,比如上下文和異步邏輯不是同一個人開發的,那么異步邏輯的開發者就很可能直接在異步邏輯中使用上下文,而忽略裝飾線程池,造成線程復用時的“數據污染”。

 

另外還有一種不同於上面的上下文用法,同樣使用不當也會存在線程安全問題:

上代碼樣例

package org.example.ttl.threadLocal;

import com.alibaba.ttl.TransmittableThreadLocal;

import java.util.LinkedHashMap;
import java.util.Map;

/**
 * description: TransmittableThreadLocal正確使用
 * author: JohnsonLiu
 * create at:  2021/12/24  23:19
 */
public class ServiceContext {

    private static final ThreadLocal<Map<Integer, Integer>> transmittableThreadLocal = new TransmittableThreadLocal() {
        /**
         * 如果使用的是TtlExecutors裝飾的線程池或者TtlRunnable、TtlCallable裝飾的任務
         * 重寫copy方法且重新賦值給新的LinkedHashMap,不然會導致父子線程都是持有同一個引用,只要有修改取值都會變化。引用值線程不安全
         * parentValue是父線程執行子任務那個時刻的快照值,后續父線程再次set值也不會影響子線程get,因為已經不是同一個引用
         * @param parentValue
         * @return
         */
        @Override
        public Object copy(Object parentValue) {
            if (parentValue instanceof Map) {
                System.out.println("copy");
                return new LinkedHashMap<Integer, Integer>((Map) parentValue);
            }
            return null;
        }

        /**
         * 如果使用普通線程池執行異步任務,重寫childValue即可實現子線程獲取的是父線程執行任務那個時刻的快照值,重新賦值給新的LinkedHashMap,父線程修改不會影響子線程(非共享)
         * 但是如果使用的是TtlExecutors裝飾的線程池或者TtlRunnable、TtlCallable裝飾的任務,此時就會變成引用共享,必須得重寫copy方法才能實現非共享
         * @param parentValue
         * @return
         */
        @Override
        protected Object childValue(Object parentValue) {
            if (parentValue instanceof Map) {
                System.out.println("childValue");
                return new LinkedHashMap<Integer, Integer>((Map) parentValue);
            }
            return null;
        }

        /**
         * 初始化,每次get時都會進行初始化
         * @return
         */
        @Override
        protected Object initialValue() {
            System.out.println("initialValue");
            return new LinkedHashMap<Integer, Integer>();
        }
    };

    public static void set(Integer key, Integer value) {
        transmittableThreadLocal.get().put(key, value);
    }

    public static Map<Integer, Integer> get() {
        return transmittableThreadLocal.get();
    }

    public static void remove() {
        transmittableThreadLocal.remove();
    }
}

使用case:

package org.example.ttl.threadLocal;

import com.alibaba.ttl.threadpool.TtlExecutors;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * description: TransmittableThreadLocal正確使用
 * author: JohnsonLiu
 * create at:  2021/12/24  22:24
 */
public class TransmittableThreadLocalCase {


    private static final Executor executor = TtlExecutors.getTtlExecutor(new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>(1000)));
//    private static final Executor executor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>(1000));

    static int i = 0;

    public static void main(String[] args) {

        ServiceContext.set(++i, i);

        executor.execute(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " 子線程(rm之前):" + ServiceContext.get());
        });

        ServiceContext.set(++i, i);
        ServiceContext.remove();

        executor.execute(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " 子線程(rm之后):" + ServiceContext.get());
        });
    }
}

 

代碼中已有詳細的說明,大家可自行copy代碼驗證。

 

以上僅代表個人理解,如有錯誤之處,還請留言指教,進行更正!

 

 

 

 

 

 

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM