背景:為了獲取相關字段方便,項目里使用了TransmittableThreadLocal上下文,在異步邏輯中get值時發現並非當前請求的值,且是偶發狀況(並發問題)。
發現:TransmittableThreadLocal是阿里開源的可以實現父子線程值傳遞的工具,其子線程必須使用TtlRunnable\TtlCallable修飾或者線程池使用TtlExecutors修飾(防止數據“污染”),如果沒有使用裝飾后的線程池,那么使用TransmittableThreadLocal上下文,就有可能出現線程不安全的問題。
話不多說上代碼:
封裝的上下文,成員變量RequestHeader
package org.example.ttl.threadLocal; import com.alibaba.ttl.TransmittableThreadLocal; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; import org.apache.commons.lang3.ObjectUtils; /** * description: * author: JohnsonLiu * create at: 2021/12/24 23:19 */ @Data @AllArgsConstructor @NoArgsConstructor public class RequestContext { private static final ThreadLocal<RequestContext> transmittableThreadLocal = new TransmittableThreadLocal(); private static final RequestContext INSTANCE = new RequestContext(); private RequestHeader requestHeader; public static void create(RequestHeader requestHeader) { transmittableThreadLocal.set(new RequestContext(requestHeader)); } public static RequestContext current() { return ObjectUtils.defaultIfNull(transmittableThreadLocal.get(), INSTANCE); } public static RequestHeader get() { return current().getRequestHeader(); } public static void remove() { transmittableThreadLocal.set(null); } @Data @AllArgsConstructor @NoArgsConstructor @ToString static class RequestHeader { private String requestUrl; private String requestType; } }
獲取上下文內容的case:
package org.example.ttl.threadLocal;
import com.alibaba.ttl.threadpool.TtlExecutors;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* description: TransmittableThreadLocal正確使用
* author: JohnsonLiu
* create at: 2021/12/24 22:24
* 驗證結論:
* 1.線程池必須使用TtlExecutors修飾,或者Runnable\Callable必須使用TtlRunnable\TtlCallable修飾
* ---->原因:子線程復用,子線程擁有的上下文內容會對下次使用造成“污染”,而修飾后的子線程在執行run方法后會進行“回放”,防止污染
*/
public class TransmittableThreadLocalCase2 {
// 為達到線程100%復用便於測試,線程池核心數1
private static final Executor TTL_EXECUTOR = TtlExecutors.getTtlExecutor(new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>(1000)));
// 如果使用一般的線程池或者Runnable\Callable時,會存在線程“污染”,比如線程池中線程會復用,復用的線程會“污染”該線程執行下一次任務
private static final Executor EXECUTOR = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>(1000));
public static void main(String[] args) {
RequestContext.create(new RequestContext.RequestHeader("url", "get"));
System.out.println(Thread.currentThread().getName() + " 子線程(rm之前 同步):" + RequestContext.get());
// 模擬另一個線程修改上下文內容
EXECUTOR.execute(() -> {
RequestContext.create(new RequestContext.RequestHeader("url", "put"));
});
// 保證上面子線程修改成功
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 異步獲取上下文內容
TTL_EXECUTOR.execute(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 子線程(rm之前 異步):" + RequestContext.get());
});
// 主線程修改上下文內容
RequestContext.create(new RequestContext.RequestHeader("url", "post"));
System.out.println(Thread.currentThread().getName() + " 子線程(rm之前 同步<reCreate>):" + RequestContext.get());
// 主線程remove
RequestContext.remove();
// 子線程獲取remove后的上下文內容
TTL_EXECUTOR.execute(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 子線程(rm之后 異步):" + RequestContext.get());
});
}
}
使用一般線程池結果:

使用修飾后的線程池結果:
這種問題的解決辦法:
如果大家跟我一樣存在這樣的使用,那么也會低概率存在這樣的問題,正確的使用方式是:
子線程必須使用TtlRunnable\TtlCallable修飾或者線程池使用TtlExecutors修飾,這一點很容易被遺漏,比如上下文和異步邏輯不是同一個人開發的,那么異步邏輯的開發者就很可能直接在異步邏輯中使用上下文,而忽略裝飾線程池,造成線程復用時的“數據污染”。
另外還有一種不同於上面的上下文用法,同樣使用不當也會存在線程安全問題:
上代碼樣例
package org.example.ttl.threadLocal; import com.alibaba.ttl.TransmittableThreadLocal; import java.util.LinkedHashMap; import java.util.Map; /** * description: TransmittableThreadLocal正確使用 * author: JohnsonLiu * create at: 2021/12/24 23:19 */ public class ServiceContext { private static final ThreadLocal<Map<Integer, Integer>> transmittableThreadLocal = new TransmittableThreadLocal() { /** * 如果使用的是TtlExecutors裝飾的線程池或者TtlRunnable、TtlCallable裝飾的任務 * 重寫copy方法且重新賦值給新的LinkedHashMap,不然會導致父子線程都是持有同一個引用,只要有修改取值都會變化。引用值線程不安全 * parentValue是父線程執行子任務那個時刻的快照值,后續父線程再次set值也不會影響子線程get,因為已經不是同一個引用 * @param parentValue * @return */ @Override public Object copy(Object parentValue) { if (parentValue instanceof Map) { System.out.println("copy"); return new LinkedHashMap<Integer, Integer>((Map) parentValue); } return null; } /** * 如果使用普通線程池執行異步任務,重寫childValue即可實現子線程獲取的是父線程執行任務那個時刻的快照值,重新賦值給新的LinkedHashMap,父線程修改不會影響子線程(非共享) * 但是如果使用的是TtlExecutors裝飾的線程池或者TtlRunnable、TtlCallable裝飾的任務,此時就會變成引用共享,必須得重寫copy方法才能實現非共享 * @param parentValue * @return */ @Override protected Object childValue(Object parentValue) { if (parentValue instanceof Map) { System.out.println("childValue"); return new LinkedHashMap<Integer, Integer>((Map) parentValue); } return null; } /** * 初始化,每次get時都會進行初始化 * @return */ @Override protected Object initialValue() { System.out.println("initialValue"); return new LinkedHashMap<Integer, Integer>(); } }; public static void set(Integer key, Integer value) { transmittableThreadLocal.get().put(key, value); } public static Map<Integer, Integer> get() { return transmittableThreadLocal.get(); } public static void remove() { transmittableThreadLocal.remove(); } }
使用case:
package org.example.ttl.threadLocal; import com.alibaba.ttl.threadpool.TtlExecutors; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * description: TransmittableThreadLocal正確使用 * author: JohnsonLiu * create at: 2021/12/24 22:24 */ public class TransmittableThreadLocalCase { private static final Executor executor = TtlExecutors.getTtlExecutor(new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>(1000))); // private static final Executor executor = new ThreadPoolExecutor(1, 1, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>(1000)); static int i = 0; public static void main(String[] args) { ServiceContext.set(++i, i); executor.execute(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 子線程(rm之前):" + ServiceContext.get()); }); ServiceContext.set(++i, i); ServiceContext.remove(); executor.execute(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 子線程(rm之后):" + ServiceContext.get()); }); } }
代碼中已有詳細的說明,大家可自行copy代碼驗證。
以上僅代表個人理解,如有錯誤之處,還請留言指教,進行更正!