前提
最近一兩個月花了很大的功夫做UCloud
服務和中間件遷移到阿里雲的工作,沒什么空閑時間擼文。想起很早之前寫過ThreadLocal
的源碼分析相關文章,里面提到了ThreadLocal
存在一個不能向預先創建的線程中進行變量傳遞的局限性,剛好有一位HSBC
的技術大牛前同事提到了團隊引入了transmittable-thread-local解決了此問題。借着這個契機,順便clone
了transmittable-thread-local
源碼進行分析,這篇文章會把ThreadLocal
和InheritableThreadLocal
的局限性分析完畢,並且從一些基本原理以及設計模式的運用分析transmittable-thread-local
(下文簡稱為TTL
)整套框架的實現。
如果對線程池和ThreadLocal
不熟悉的話,可以先參看一下前置文章:
- 《JUC同步器框架AbstractQueuedSynchronizer源碼圖文分析》
- 《JUC線程池ThreadPoolExecutor源碼分析》
- 《ThreadLocal源碼分析-黃金分割數的使用》
這篇文章前后花了兩周時間編寫,行文比價干硬,文字比較多(接近5W
字),希望帶着耐心閱讀。
父子線程的變量傳遞
在Java
中沒有明確給出一個API
可以基於子線程實例獲取其父線程實例,有一個相對可行的方案就是在創建子線程Thread
實例的時候獲取當前線程的實例,用到的API
是Thread#currentThread()
:
public class Thread implements Runnable {
// 省略其他代碼
@HotSpotIntrinsicCandidate
public static native Thread currentThread();
// 省略其他代碼
}
Thread#currentThread()
方法是一個靜態本地方法,它是由JVM
實現,這是在JDK
中唯一可以獲取父線程實例的API
。一般而言,如果想在子線程實例中得到它的父線程實例,那么需要像如下這樣操作:
public class InheritableThread {
public static void main(String[] args) throws Exception{
// 父線程就是main線程
Thread parentThread = Thread.currentThread();
Thread childThread = new Thread(()-> {
System.out.println("Parent thread is:" + parentThread.getName());
},"childThread");
childThread.start();
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
}
// 輸出結果:
Parent thread is:main
類似地,如果我們想把一個父子線程共享的變量實例傳遞,也可以這樣做:
public class InheritableVars {
public static void main(String[] args) throws Exception {
// 父線程就是main線程
Thread parentThread = Thread.currentThread();
final Var var = new Var();
var.setValue1("var1");
var.setValue2("var2");
Thread childThread = new Thread(() -> {
System.out.println("Parent thread is:" + parentThread.getName());
methodFrame1(var);
}, "childThread");
childThread.start();
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
private static void methodFrame1(Var var) {
methodFrame2(var);
}
private static void methodFrame2(Var var) {
}
@Data
private static class Var {
private Object value1;
private Object value2;
}
}
這種做法其實是可行的,子線程調用的方法棧中的所有方法都必須顯示傳入需要從父線程傳遞過來的參數引用Var
實例,這樣就會產生硬編碼問題,既不靈活也導致方法不能復用,所以才衍生出線程本地變量Thread Local
,具體的實現有ThreadLocal
和InheritableThreadLocal
。它們兩者的基本原理是類似的,實際上所有的變量實例是緩存在線程實例的變量ThreadLocal.ThreadLocalMap
中,線程本地變量實例都只是線程實例獲取ThreadLocal.ThreadLocalMap
的一道橋梁:
public class Thread implements Runnable {
// 省略其他代碼
// KEY為ThreadLocal實例,VALUE為具體的值
ThreadLocal.ThreadLocalMap threadLocals = null;
// KEY為InheritableThreadLocal實例,VALUE為具體的值
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
// 省略其他代碼
}
ThreadLocal
和InheritableThreadLocal
之間的區別可以結合源碼分析一下(見下一小節)。前面的分析聽起來如果覺得抽象的話,可以自己寫幾個類推敲一下,假如線程其實叫ThrowableThread
,而線程本地變量叫ThrowableThreadLocal
,那么它們之間的關系如下:
public class Actor {
static ThrowableThreadLocal THREAD_LOCAL = new ThrowableThreadLocal();
public static void main(String[] args) throws Exception {
ThrowableThread throwableThread = new ThrowableThread() {
@Override
public void run() {
methodFrame1();
}
};
throwableThread.start();
}
private static void methodFrame1() {
THREAD_LOCAL.set("throwable");
methodFrame2();
}
private static void methodFrame2() {
System.out.println(THREAD_LOCAL.get());
}
/**
* 這個類暫且認為是java.lang.Thread
*/
private static class ThrowableThread implements Runnable {
ThrowableThreadLocal.ThrowableThreadLocalMap threadLocalMap;
@Override
public void run() {
}
// 這里模擬VM的實現,返回ThrowableThread自身,大家先認為不是返回NULL
public static ThrowableThread getCurrentThread() {
// return new ThrowableThread();
return null; // <--- 假設這里在VM的實現里面返回的不是NULL而是當前的ThrowableThread
}
public void start() {
run();
}
}
private static class ThrowableThreadLocal {
public ThrowableThreadLocal() {
}
public void set(Object value) {
ThrowableThread currentThread = ThrowableThread.getCurrentThread();
assert null != currentThread;
ThrowableThreadLocalMap threadLocalMap = currentThread.threadLocalMap;
if (null == threadLocalMap) {
threadLocalMap = currentThread.threadLocalMap = new ThrowableThreadLocalMap();
}
threadLocalMap.put(this, value);
}
public Object get() {
ThrowableThread currentThread = ThrowableThread.getCurrentThread();
assert null != currentThread;
ThrowableThreadLocalMap threadLocalMap = currentThread.threadLocalMap;
if (null == threadLocalMap) {
return null;
}
return threadLocalMap.get(this);
}
// 這里其實在ThreadLocal中用的是WeakHashMap
public static class ThrowableThreadLocalMap extends HashMap<ThrowableThreadLocal, Object> {
}
}
}
上面的代碼不能運行,只是通過一個自定義的實現說明一下其中的原理和關系。
ThreadLocal和InheritableThreadLocal的局限性
InheritableThreadLocal
是ThreadLocal
的子類,它們之間的聯系是:兩者都是線程Thread
實例獲取ThreadLocal.ThreadLocalMap
的一個中間變量。區別是:兩者控制ThreadLocal.ThreadLocalMap
創建的時機和通過Thread
實例獲取ThreadLocal.ThreadLocalMap
在Thread
實例中對應的屬性並不一樣,導致兩者的功能有一點差別。通俗來說兩者的功能聯系和區別是:
ThreadLocal
:單個線程生命周期強綁定,只能在某個線程的生命周期內對ThreadLocal
進行存取,不能跨線程存取。
public class ThreadLocalMain {
private static ThreadLocal<String> TL = new ThreadLocal<>();
public static void main(String[] args) throws Exception {
new Thread(() -> {
methodFrame1();
}, "childThread").start();
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
private static void methodFrame1() {
TL.set("throwable");
methodFrame2();
}
private static void methodFrame2() {
System.out.println(TL.get());
}
}
// 輸出結果:
throwable
InheritableThreadLocal
:(1)可以無感知替代ThreadLocal
的功能,當成ThreadLocal
使用。(2)明確父-子線程關系的前提下,繼承(拷貝)父線程的線程本地變量緩存過的變量,而這個拷貝的時機是子線程Thread
實例化時候進行的,也就是子線程實例化完畢后已經完成了InheritableThreadLocal
變量的拷貝,這是一個變量傳遞的過程。
public class InheritableThreadLocalMain {
// 此處可以嘗試替換為ThreadLocal,最后會輸出null
static InheritableThreadLocal<String> ITL = new InheritableThreadLocal<>();
public static void main(String[] args) throws Exception {
new Thread(() -> {
// 在父線程中設置變量
ITL.set("throwable");
new Thread(() -> {
methodFrame1();
}, "childThread").start();
}, "parentThread").start();
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
private static void methodFrame1() {
methodFrame2();
}
private static void methodFrame2() {
System.out.println(ITL.get());
}
}
// 輸出結果:
throwable
上面提到的兩點可以具體參看ThreadLocal
、InheritableThreadLocal
和Thread
三個類的源碼,這里筆者把一些必要的注釋和源碼段貼出:
// --> java.lang.Thread類的源碼片段
public class Thread implements Runnable {
// 省略其他代碼
// 這是Thread最基本的構造函數
private Thread(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
// 省略其他代碼
Thread parent = currentThread();
this.group = g;
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
if (security == null || isCCLOverridden(parent.getClass()))
this.contextClassLoader = parent.getContextClassLoader();
else
this.contextClassLoader = parent.contextClassLoader;
this.inheritedAccessControlContext =
acc != null ? acc : AccessController.getContext();
this.target = target;
setPriority(priority);
// inheritThreadLocals一般情況下為true
// 當前子線程實例拷貝父線程的inheritableThreadLocals屬性,創建一個新的ThreadLocal.ThreadLocalMap實例賦值到自身的inheritableThreadLocals屬性
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
this.stackSize = stackSize;
this.tid = nextThreadID();
}
// 省略其他代碼
}
// --> java.lang.ThreadLocal源碼片段
public class ThreadLocal<T> {
// 省略其他代碼
public void set(T value) {
Thread t = Thread.currentThread();
// 通過當前線程獲取線程實例中的threadLocals
ThreadLocalMap map = getMap(t);
// 線程實例中的threadLocals為NULL,實例則創建一個ThreadLocal.ThreadLocalMap實例添加當前ThreadLocal->VALUE到ThreadLocalMap中,如果已經存在ThreadLocalMap則進行覆蓋對應的Entry
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
}
// 通過線程實例獲取該線程的threadLocals實例,其實是ThreadLocal.ThreadLocalMap類型的屬性
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
public T get() {
Thread t = Thread.currentThread();
// 通過當前線程獲取線程實例中的threadLocals,再獲取ThreadLocal.ThreadLocalMap中匹配上KEY為當前ThreadLocal實例的Entry對應的VALUE
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 找不到則嘗試初始化ThreadLocal.ThreadLocalMap
return setInitialValue();
}
// 如果不存在ThreadLocal.ThreadLocalMap,則通過初始化initialValue()方法的返回值,構造一個ThreadLocal.ThreadLocalMap
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
// 省略其他代碼
}
// --> java.lang.InheritableThreadLocal源碼 - 太簡單,全量貼出
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
// 這個方法使用在線程Thread的構造函數里面ThreadLocal.createInheritedMap(),基於父線程InheritableThreadLocal的屬性創建子線程的InheritableThreadLocal屬性,它的返回值決定了拷貝父線程的屬性時候傳入子線程的值
protected T childValue(T parentValue) {
return parentValue;
}
// 覆蓋獲取線程實例中的綁定的ThreadLocalMap為Thread#inheritableThreadLocals,這個方法其實是覆蓋了ThreadLocal中對應的方法,應該加@Override注解
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
// 覆蓋創建ThreadLocalMap的邏輯,賦值到線程實例中的inheritableThreadLocals,而不是threadLocals,這個方法其實是覆蓋了ThreadLocal中對應的方法,應該加@Override注解
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}
一定要注意,這里的setInitialValue()
方法很重要,一個新的線程Thread
實例在初始化(對於InheritableThreadLocal
而言繼承父線程的線程本地變量)或者是首次調用ThreadLocal#set()
,會通過此setInitialValue()
方法去構造一個全新的ThreadLocal.ThreadLocalMap
,會直接使用createMap()
方法。
以前面提到的兩個例子,貼一個圖加深理解:
Example-1
:
Example-2
:
ThreadLocal
、InheritableThreadLocal
的最大局限性就是:無法為預先創建好(未投入使用)的線程實例傳遞變量(准確來說是首次傳遞某些場景是可行的,而后面由於線程池中的線程是復用的,無法進行更新或者修改變量的傳遞值),泛線程池Executor
體系、TimerTask
和ForkJoinPool
等一般會預先創建(核心)線程,也就它們都是無法在線程池中由預創建的子線程執行的Runnable
任務實例中使用。例如下面的方式會導致參數傳遞失敗:
public class InheritableThreadForExecutor {
static final InheritableThreadLocal<String> ITL = new InheritableThreadLocal<>();
static final Executor EXECUTOR = Executors.newFixedThreadPool(1);
public static void main(String[] args) throws Exception {
ITL.set("throwable");
EXECUTOR.execute(() -> {
System.out.println(ITL.get());
});
ITL.set("doge");
EXECUTOR.execute(() -> {
System.out.println(ITL.get());
});
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
}
// 輸出結果:
throwable
throwable # <--- 可見此處參數傳遞出現異常
首次變量傳遞成功是因為線程池中的所有子線程都是派生自main
線程。
TTL的簡單使用
TTL
的使用方式在它的項目README.md
或者項目中的單元測試有十分詳細的介紹,先引入依賴com.alibaba:transmittable-thread-local:2.11.4
,這里演示一個例子:
// 父-子線程
public class TtlSample1 {
static TransmittableThreadLocal<String> TTL = new TransmittableThreadLocal<>();
public static void main(String[] args) throws Exception {
new Thread(() -> {
// 在父線程中設置變量
TTL.set("throwable");
new Thread(TtlRunnable.get(() -> {
methodFrame1();
}), "childThread").start();
}, "parentThread").start();
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
private static void methodFrame1() {
methodFrame2();
}
private static void methodFrame2() {
System.out.println(TTL.get());
}
}
// 輸出:
throwable
// 線程池
public class TtlSample2 {
static TransmittableThreadLocal<String> TTL = new TransmittableThreadLocal<>();
static final Executor EXECUTOR = Executors.newFixedThreadPool(1);
public static void main(String[] args) throws Exception {
TTL.set("throwable");
EXECUTOR.execute(TtlRunnable.get(() -> {
System.out.println(TTL.get());
}));
TTL.set("doge");
EXECUTOR.execute(TtlRunnable.get(() -> {
System.out.println(TTL.get());
}));
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
}
// 輸出:
throwable
doge
TTL實現的基本原理
TTL
設計上使用了大量的委托(Delegate
),委托是C#
里面的說法,對標Java
的設計模式就是代理模式。舉個簡單的例子:
@Slf4j
public class StaticDelegate {
public static void main(String[] args) throws Exception {
new RunnableDelegate(() -> log.info("Hello World!")).run();
}
@Slf4j
@RequiredArgsConstructor
private static final class RunnableDelegate implements Runnable {
private final Runnable runnable;
@Override
public void run() {
try {
log.info("Before run...");
runnable.run();
log.info("After run...");
} finally {
log.info("Finally run...");
}
}
}
}
// 輸出結果:
23:45:27.763 [main] INFO club.throwable.juc.StaticDelegate$RunnableDelegate - Before run...
23:45:27.766 [main] INFO club.throwable.juc.StaticDelegate - Hello World!
23:45:27.766 [main] INFO club.throwable.juc.StaticDelegate$RunnableDelegate - After run...
23:45:27.766 [main] INFO club.throwable.juc.StaticDelegate$RunnableDelegate - Finally run...
委托如果使用純熟的話,可以做出很多十分有用的功能,例如可以基於Micrometer
去統計任務的執行時間,上報到Prometheus
,然后用Grafana
做監控和展示:
// 需要引入io.micrometer:micrometer-core:${version}
@Slf4j
public class MeterDelegate {
public static void main(String[] args) throws Exception {
Executor executor = Executors.newFixedThreadPool(1);
Runnable task = () -> {
try {
// 模擬耗時
Thread.sleep(1000);
} catch (Exception ignore) {
}
};
Map<String, String> tags = new HashMap<>(8);
tags.put("_class", "MeterDelegate");
executor.execute(new MicrometerDelegate(task, "test-task", tags));
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
@Slf4j
@RequiredArgsConstructor
private static final class MicrometerDelegate implements Runnable {
private final Runnable runnable;
private final String taskType;
private final Map<String, String> tags;
@Override
public void run() {
long start = System.currentTimeMillis();
try {
runnable.run();
} finally {
long end = System.currentTimeMillis();
List<Tag> tagsList = Lists.newArrayList();
Optional.ofNullable(tags).ifPresent(x -> x.forEach((k, v) -> {
tagsList.add(Tag.of(k, v));
}));
Metrics.summary(taskType, tagsList).record(end - start);
}
}
}
}
委托理論上只要不線程棧溢出,可以無限層級地包裝,有點像洋蔥的結構,原始的目標方法會被包裹在最里面並且最后執行:
public static void main(String[] args) throws Exception {
Runnable target = () -> log.info("target");
Delegate level1 = new Delegate(target);
Delegate level2 = new Delegate(level1);
Delegate level3 = new Delegate(level2);
// ......
}
@RequiredArgsConstructor
static class Delegate implements Runnable{
private final Runnable runnable;
@Override
public void run() {
runnable.run();
}
}
當然,委托的層級越多,代碼結構就會越復雜,不利於理解和維護。多層級委托這個洋蔥結構,再配合Java
反射API
剝離對具體方法調用的依賴,就是Java
中切面編程的普遍原理,spring-aop
就是這樣實現的。委托如果再結合Agent
和字節碼增強(使用ASM
、Javassist
等),可以實現類加載時期替換對應的Runnable
、Callable
或者一般接口的實現,這樣就能無感知完成了增強功能。此外,TTL
中還使用了模板方法模式,如:
@Slf4j
public class TemplateMethod {
public static void main(String[] args) throws Exception {
Runnable runnable = () -> log.info("Hello World!");
Template template = new Template(runnable) {
@Override
protected void beforeExecute() {
log.info("BeforeExecute...");
}
@Override
protected void afterExecute() {
log.info("AfterExecute...");
}
};
template.run();
}
@RequiredArgsConstructor
static abstract class Template implements Runnable {
private final Runnable runnable;
protected void beforeExecute() {
}
@Override
public void run() {
beforeExecute();
runnable.run();
afterExecute();
}
protected void afterExecute() {
}
}
}
// 輸出結果:
00:25:32.862 [main] INFO club.throwable.juc.TemplateMethod - BeforeExecute...
00:25:32.865 [main] INFO club.throwable.juc.TemplateMethod - Hello World!
00:25:32.865 [main] INFO club.throwable.juc.TemplateMethod - AfterExecute...
分析了兩種設計模式,下面簡單理解一下TTL
實現的偽代碼:
# TTL extends InheritableThreadLocal
# Holder of TTL -> InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> [? => NULL]
(1)創建一個全局的Holder,用於保存父線程(或者明確了父線程的子線程)的TTL對象,這里注意,是TTL對象,Holder是當作Set使用
(2)(父)線程A中使用了TTL,則所有設置的變量會被TTL捕獲
(3)(子)線程B使用了TtlRunnable(Runnable的TTL實現,使用了前面提到的委托,像Callable的實現是TtlCallable),會重放所有存儲在TTL中的,來自於線程A的存儲變量
(4)線程B重放完畢后,清理線程B獨立產生的ThreadLocal變量,歸還變TTL的變量
主要就是這幾步,里面的話術有點抽象,后面一節分析源碼的時候會詳細講解。
TTL的源碼分析
主要分析:
- 框架的骨架。
- 核心類
TransmittableThreadLocal
。 - 發射器
Transmitter
。 - 捕獲、重放和復原。
Agent
模塊。
TTL框架骨架
TTL
是一個十分精悍的框架,它依賴少量的類實現了比較強大的功能,除了提供給用戶使用的API
,還提供了基於Agent
和字節碼增強實現了無感知增強泛線程池對應類的功能,這一點是比較驚艷的。這里先分析編程式的API
,再簡單分析Agent
部分的實現。筆者閱讀TTL
框架的時間是2020
年五一勞動節前后,當前的最新發行版本為2.11.4
。TTL
的項目結構很簡單:
- transmittable-thread-local
- com.alibaba.ttl
- spi SPI接口和一些實現
- threadpool 線程池增強,包括ThreadFactory和線程池的Wrapper等
- agent 線程池的Agent實現相關
最外層的包有一些Wrapper的實現和TTL
先看spi
包:
- spi
TtlAttachments
TtlAttachmentsDelegate
TtlEnhanced
TtlWrapper
TtlEnhanced
是TTL
的標識接口(空接口),標識具體的組件被TTL
增強:
public interface TtlEnhanced {
}
通過instanceof
關鍵字就可以判斷具體的實現是否TTL
增強過的組件。TtlWrapper
接口繼承自接口TtlEnhanced
,用於標記實現類可以解包裝獲得原始實例:
public interface TtlWrapper<T> extends TtlEnhanced {
// 返回解包裝實例,實際是就是原始實例
@NonNull
T unwrap();
}
TtlAttachments
接口也是繼承自接口TtlEnhanced
,用於為TTL
添加K-V
結構的附件,TtlAttachmentsDelegate
是其實現類,K-V
的存儲實際上是委托給ConcurrentHashMap
:
public interface TtlAttachments extends TtlEnhanced {
// 添加K-V附件
void setTtlAttachment(@NonNull String key, Object value);
// 通過KEY獲取值
<T> T getTtlAttachment(@NonNull String key);
// 標識自動包裝的KEY,Agent模式會使用自動包裝,這個時候會傳入一個附件的K-V,其中KEY就是KEY_IS_AUTO_WRAPPER
String KEY_IS_AUTO_WRAPPER = "ttl.is.auto.wrapper";
}
// TtlAttachmentsDelegate
public class TtlAttachmentsDelegate implements TtlAttachments {
private final ConcurrentMap<String, Object> attachments = new ConcurrentHashMap<String, Object>();
@Override
public void setTtlAttachment(@NonNull String key, Object value) {
attachments.put(key, value);
}
@Override
@SuppressWarnings("unchecked")
public <T> T getTtlAttachment(@NonNull String key) {
return (T) attachments.get(key);
}
}
因為TTL
的實現覆蓋了泛線程池Executor
、ExecutorService
、ScheduledExecutorService
、ForkJoinPool
和TimerTask
(在TTL
中組件已經標記為過期,推薦使用ScheduledExecutorService
),范圍比較廣,短篇幅無法分析所有的源碼,而且它們的實現思路是基本一致的,筆者下文只會挑選Executor
的實現路線進行分析。
核心類TransmittableThreadLocal
TransmittableThreadLocal
是TTL
的核心類,TTL
框架就是用這個類來命名的。先看它的構造函數和關鍵屬性:
// 函數式接口,TTL拷貝器
@FunctionalInterface
public interface TtlCopier<T> {
// 拷貝父屬性
T copy(T parentValue);
}
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> {
// 日志句柄,使用的不是SLF4J的接口,而是java.util.logging的實現
private static final Logger logger = Logger.getLogger(TransmittableThreadLocal.class.getName());
// 是否禁用忽略NULL值的語義
private final boolean disableIgnoreNullValueSemantics;
// 默認是false,也就是不禁用忽略NULL值的語義,也就是忽略NULL值,也就是默認的話,NULL值傳入不會覆蓋原來已經存在的值
public TransmittableThreadLocal() {
this(false);
}
// 可以通過手動設置,去覆蓋IgnoreNullValue的語義,如果設置為true,則是支持NULL值的設置,設置為true的時候,與ThreadLocal的語義一致
public TransmittableThreadLocal(boolean disableIgnoreNullValueSemantics) {
this.disableIgnoreNullValueSemantics = disableIgnoreNullValueSemantics;
}
// 先忽略其他代碼
}
disableIgnoreNullValueSemantics
屬性相關可以查看Issue157,下文分析方法的時候也會說明具體的場景。TransmittableThreadLocal
繼承自InheritableThreadLocal
,本質就是ThreadLocal
,那它到底怎么樣保證變量可以在線程池中的線程傳遞?接着分析其他所有方法:
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> {
// 拷貝器的拷貝方法實現
public T copy(T parentValue) {
return parentValue;
}
// 模板方法,留給子類實現,在TtlRunnable或者TtlCallable執行前回調
protected void beforeExecute() {
}
// 模板方法,留給子類實現,在TtlRunnable或者TtlCallable執行后回調
protected void afterExecute() {
}
// 獲取值,直接從InheritableThreadLocal#get()獲取
@Override
public final T get() {
T value = super.get();
// 如果值不為NULL 或者 禁用了忽略空值的語義(也就是和ThreadLocal語義一致),則重新添加TTL實例自身到存儲器
if (disableIgnoreNullValueSemantics || null != value) addThisToHolder();
return value;
}
@Override
public final void set(T value) {
// 如果不禁用忽略空值的語義,也就是需要忽略空值,並且設置的入參值為空,則做一次徹底的移除,包括從存儲器移除TTL自身實例,TTL(ThrealLocalMap)中也移除對應的值
if (!disableIgnoreNullValueSemantics && null == value) {
// may set null to remove value
remove();
} else {
// TTL(ThrealLocalMap)中設置對應的值
super.set(value);
// 添加TTL實例自身到存儲器
addThisToHolder();
}
}
// 從存儲器移除TTL自身實例,從TTL(ThrealLocalMap)中移除對應的值
@Override
public final void remove() {
removeThisFromHolder();
super.remove();
}
// 從TTL(ThrealLocalMap)中移除對應的值
private void superRemove() {
super.remove();
}
// 拷貝值,主要是拷貝get()的返回值
private T copyValue() {
return copy(get());
}
// 存儲器,本身就是一個InheritableThreadLocal(ThreadLocal)
// 它的存放對象是WeakHashMap<TransmittableThreadLocal<Object>, ?>類型,而WeakHashMap的VALUE總是為NULL,這里當做Set容器使用,WeakHashMap支持NULL值
private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
}
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
// 注意這里的WeakHashMap總是拷貝父線程的值
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
}
};
// 添加TTL自身實例到存儲器,不存在則添加策略
@SuppressWarnings("unchecked")
private void addThisToHolder() {
if (!holder.get().containsKey(this)) {
holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value.
}
}
// 從存儲器移除TTL自身的實例
private void removeThisFromHolder() {
holder.get().remove(this);
}
// 執行目標方法,isBefore決定回調beforeExecute還是afterExecute,注意此回調方法會吞掉所有的異常只打印日志
private static void doExecuteCallback(boolean isBefore) {
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
try {
if (isBefore) threadLocal.beforeExecute();
else threadLocal.afterExecute();
} catch (Throwable t) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, "TTL exception when " + (isBefore ? "beforeExecute" : "afterExecute") + ", cause: " + t.toString(), t);
}
}
}
}
// DEBUG模式下打印TTL里面的所有值
static void dump(@Nullable String title) {
if (title != null && title.length() > 0) {
System.out.printf("Start TransmittableThreadLocal[%s] Dump...%n", title);
} else {
System.out.println("Start TransmittableThreadLocal Dump...");
}
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
System.out.println(threadLocal.get());
}
System.out.println("TransmittableThreadLocal Dump end!");
}
// DEBUG模式下打印TTL里面的所有值
static void dump() {
dump(null);
}
// 省略靜態類Transmitter的實現代碼
}
這里一定要記住holder
是全局靜態的,並且它自身也是一個InheritableThreadLocal
(get()
方法也是線程隔離的),它實際上就是父線程管理所有TransmittableThreadLocal
的橋梁。這里可以考慮一個單線程的例子來說明TransmittableThreadLocal
的存儲架構:
public class TtlSample3 {
static TransmittableThreadLocal<String> TTL1 = new TransmittableThreadLocal<>();
static TransmittableThreadLocal<String> TTL2 = new TransmittableThreadLocal<>();
static TransmittableThreadLocal<String> TTL3 = new TransmittableThreadLocal<>();
public static void main(String[] args) throws Exception {
TTL1.set("VALUE-1");
TTL2.set("VALUE-2");
TTL3.set("VALUE-3");
}
}
這里簡化了例子,只演示了單線程的場景,圖中的一些對象的哈希碼有可能每次啟動JVM
實例都不一樣,這里只是做示例:
注釋里面也提到,holder
里面的WeakHashMap
是當成Set
容器使用,映射的值都是NULL
,每次遍歷它的所有KEY
就能獲取holder
里面的所有的TransmittableThreadLocal
實例,它是一個全局的存儲器,但是本身是一個InheritableThreadLocal
,多線程共享后的映射關系會相對復雜:
再聊一下disableIgnoreNullValueSemantics
的作用,默認情況下disableIgnoreNullValueSemantics=false
,TTL
如果設置NULL
值,會直接從holder
移除對應的TTL
實例,在TTL#get()
方法被調用的時候,如果原來持有的屬性不為NULL
,該TTL
實例會重新加到holder
。如果設置disableIgnoreNullValueSemantics=true
,則set(null)
的語義和ThreadLocal
一致。見下面的例子:
public class TtlSample4 {
static TransmittableThreadLocal<Integer> TL1 = new TransmittableThreadLocal<Integer>(false) {
@Override
protected Integer initialValue() {
return 5;
}
@Override
protected Integer childValue(Integer parentValue) {
return 10;
}
};
static TransmittableThreadLocal<Integer> TL2 = new TransmittableThreadLocal<Integer>(true) {
@Override
protected Integer initialValue() {
return 5;
}
@Override
protected Integer childValue(Integer parentValue) {
return 10;
}
};
public static void main(String[] args) throws Exception {
TL1.set(null);
TL2.set(null);
Thread t1 = new Thread(TtlRunnable.get(() -> {
System.out.println(String.format("Thread:%s,value:%s", Thread.currentThread().getName(), TL1.get()));
}), "T1");
Thread t2 = new Thread(TtlRunnable.get(() -> {
System.out.println(String.format("Thread:%s,value:%s", Thread.currentThread().getName(), TL2.get()));
}), "T2");
t1.start();
t2.start();
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
}
}
// 輸出結果:
Thread:T2,value:null
Thread:T1,value:5
這是因為框架的設計者不想把NULL
作為有狀態的值,如果真的有需要保持和ThreadLocal
一致的用法,可以在構造TransmittableThreadLocal
實例的時候傳入true
。
發射器Transmitter
發射器Transmitter
是TransmittableThreadLocal
的一個公有靜態類,它的核心功能是傳輸所有的TransmittableThreadLocal
實例和提供靜態方法注冊當前線程的變量到其他線程。按照筆者閱讀源碼的習慣,先看構造函數和關鍵屬性:
// # TransmittableThreadLocal#Transmitter
public static class Transmitter {
// 保存手動注冊的ThreadLocal->TtlCopier映射,這里是因為部分API提供了TtlCopier給用戶實現
private static volatile WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> threadLocalHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>();
// threadLocalHolder更變時候的監視器
private static final Object threadLocalHolderUpdateLock = new Object();
// 標記WeakHashMap中的ThreadLocal的對應值為NULL的屬性,便於后面清理
private static final Object threadLocalClearMark = new Object();
// 默認的拷貝器,影子拷貝,直接返回父值
private static final TtlCopier<Object> shadowCopier = new TtlCopier<Object>() {
@Override
public Object copy(Object parentValue) {
return parentValue;
}
};
// 私有構造,說明只能通過靜態方法提供外部調用
private Transmitter() {
throw new InstantiationError("Must not instantiate this class");
}
// 私有靜態類,快照,保存從holder中捕獲的所有TransmittableThreadLocal和外部手動注冊保存在threadLocalHolder的ThreadLocal的K-V映射快照
private static class Snapshot {
final WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value;
private Snapshot(WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value, WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value) {
this.ttl2Value = ttl2Value;
this.threadLocal2Value = threadLocal2Value;
}
}
}
Transmitter
在設計上是一個典型的工具類,外部只能調用其公有靜態方法。接着看其他靜態方法:
// # TransmittableThreadLocal#Transmitter
public static class Transmitter {
//######################################### 捕獲 ###########################################################
// 捕獲當前線程綁定的所有的TransmittableThreadLocal和已經注冊的ThreadLocal的值 - 使用了用時拷貝快照的策略
// 筆者注:它一般在構造任務實例的時候被調用,因此當前線程相對於子線程或者線程池的任務就是父線程,其實本質是捕獲父線程的所有線程本地變量的值
@NonNull
public static Object capture() {
return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}
// 新建一個WeakHashMap,遍歷TransmittableThreadLocal#holder中的所有TransmittableThreadLocal的Entry,獲取K-V,存放到這個新的WeakHashMap返回
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
ttl2Value.put(threadLocal, threadLocal.copyValue());
}
return ttl2Value;
}
// 新建一個WeakHashMap,遍歷threadLocalHolder中的所有ThreadLocal的Entry,獲取K-V,存放到這個新的WeakHashMap返回
private static WeakHashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value = new WeakHashMap<ThreadLocal<Object>, Object>();
for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) {
final ThreadLocal<Object> threadLocal = entry.getKey();
final TtlCopier<Object> copier = entry.getValue();
threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
}
return threadLocal2Value;
}
//######################################### 重放 ###########################################################
// 重放capture()方法中捕獲的TransmittableThreadLocal和手動注冊的ThreadLocal中的值,本質是重新拷貝holder中的所有變量,生成新的快照
// 筆者注:重放操作一般會在子線程或者線程池中的線程的任務執行的時候調用,因此此時的holder#get()拿到的是子線程的原來就存在的本地線程變量,重放操作就是把這些子線程原有的本地線程變量備份
@NonNull
public static Object replay(@NonNull Object captured) {
final Snapshot capturedSnapshot = (Snapshot) captured;
return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}
// 重放所有的TTL的值
@NonNull
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) {
// 新建一個新的備份WeakHashMap,其實也是一個快照
WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
// 這里的循環針對的是子線程,用於獲取的是子線程的所有線程本地變量
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();
// 拷貝holder當前線程(子線程)綁定的所有TransmittableThreadLocal的K-V結構到備份中
backup.put(threadLocal, threadLocal.get());
// 清理所有的非捕獲快照中的TTL變量,以防有中間過程引入的額外的TTL變量(除了父線程的本地變量)影響了任務執行后的重放操作
// 簡單來說就是:移除所有子線程的不包含在父線程捕獲的線程本地變量集合的中所有子線程本地變量和對應的值
/**
* 這個問題可以舉個簡單的例子:
* static TransmittableThreadLocal<Integer> TTL = new TransmittableThreadLocal<>();
*
* 線程池中的子線程C中原來初始化的時候,在線程C中綁定了TTL的值為10087,C線程是核心線程不會主動銷毀。
*
* 父線程P在沒有設置TTL值的前提下,調用了線程C去執行任務,那么在C線程的Runnable包裝類中通過TTL#get()就會獲取到10087,顯然是不符合預期的
*
* 所以,在C線程的Runnable包裝類之前之前,要從C線程的線程本地變量,移除掉不包含在父線程P中的所有線程本地變量,確保Runnable包裝類執行期間只能拿到父線程中捕獲到的線程本地變量
*
* 下面這個判斷和移除做的就是這個工作
*/
if (!captured.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// 重新設置TTL的值到捕獲的快照中
// 其實真實的意圖是:把從父線程中捕獲的所有線程本地變量重寫設置到TTL中,本質上,子線程holder里面的TTL綁定的值會被刷新
setTtlValuesTo(captured);
// 回調模板方法beforeExecute
doExecuteCallback(true);
return backup;
}
// 提取WeakHashMap中的KeySet,遍歷所有的TransmittableThreadLocal,重新設置VALUE
private static void setTtlValuesTo(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {
TransmittableThreadLocal<Object> threadLocal = entry.getKey();
// 重新設置TTL值,本質上,當前線程(子線程)holder里面的TTL綁定的值會被刷新
threadLocal.set(entry.getValue());
}
}
// 重放所有的手動注冊的ThreadLocal的值
private static WeakHashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull WeakHashMap<ThreadLocal<Object>, Object> captured) {
// 新建備份
final WeakHashMap<ThreadLocal<Object>, Object> backup = new WeakHashMap<ThreadLocal<Object>, Object>();
// 注意這里是遍歷捕獲的快照中的ThreadLocal
for (Map.Entry<ThreadLocal<Object>, Object> entry : captured.entrySet()) {
final ThreadLocal<Object> threadLocal = entry.getKey();
// 添加到備份中
backup.put(threadLocal, threadLocal.get());
final Object value = entry.getValue();
// 如果值為清除標記則綁定在當前線程的變量進行remove,否則設置值覆蓋
if (value == threadLocalClearMark) threadLocal.remove();
else threadLocal.set(value);
}
return backup;
}
// 從relay()或者clear()方法中恢復TransmittableThreadLocal和手工注冊的ThreadLocal的值對應的備份
// 筆者注:恢復操作一般會在子線程或者線程池中的線程的任務執行的時候調用
public static void restore(@NonNull Object backup) {
final Snapshot backupSnapshot = (Snapshot) backup;
restoreTtlValues(backupSnapshot.ttl2Value);
restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}
private static void restoreTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> backup) {
// 回調模板方法afterExecute
doExecuteCallback(false);
// 這里的循環針對的是子線程,用於獲取的是子線程的所有線程本地變量
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();
// 如果子線程原來就綁定的線程本地變量的值,如果不包含某個父線程傳來的對象,那么就刪除
// 這一步可以結合前面reply操作里面的方法段一起思考,如果不刪除的話,就相當於子線程的原來存在的線程本地變量綁定值被父線程對應的值污染了
if (!backup.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// 重新設置TTL的值到捕獲的快照中
// 其實真實的意圖是:把子線程的線程本地變量恢復到reply()的備份(前面的循環已經做了父線程捕獲變量的判斷),本質上,等於把holder中綁定於子線程本地變量的部分恢復到reply操作之前的狀態
setTtlValuesTo(backup);
}
// 恢復所有的手動注冊的ThreadLocal的值
private static void restoreThreadLocalValues(@NonNull WeakHashMap<ThreadLocal<Object>, Object> backup) {
for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {
final ThreadLocal<Object> threadLocal = entry.getKey();
threadLocal.set(entry.getValue());
}
}
}
這里三個核心方法,看起來比較抽象,要結合多線程的場景和一些空間想象進行推敲才能比較容易地理解:
capture()
:捕獲操作,父線程原來就存在的線程本地變量映射和手動注冊的線程本地變量映射捕獲,得到捕獲的快照值captured
。reply()
:重放操作,子線程原來就存在的線程本地變量映射和手動注冊的線程本地變量生成備份backup
,刷新captured
的所有值到子線程在全局存儲器holder
中綁定的值。restore()
:復原操作,子線程原來就存在的線程本地變量映射和手動注冊的線程本地變量恢復成backup
。
setTtlValuesTo()
這個方法比較隱蔽,要特別要結合多線程和空間思維去思考,例如當入參是captured
,本質是從父線程捕獲到的綁定在父線程的所有線程本地變量,調用的時機在reply()
和restore()
,這兩個方法只會在子線程中調用,setTtlValuesTo()
里面拿到的TransmittableThreadLocal
實例調用set()
方法相當於把綁定在父線程的所有線程本地變量的值全部刷新到子線程當前綁定的TTL
中的線程本地變量的值,更深層次地想,是基於外部的傳入值刷新了子線程綁定在全局存儲器holder
里面綁定到該子線程的線程本地變量的值。
Transmitter
還有不少靜態工具方法,這里不做展開,可以參考項目里面的測試demo
和README.md
進行調試。
捕獲、重放和復原
其實上面一節已經介紹了Transmitter
提供的捕獲、重放和復原的API
,這一節主要結合分析TtlRunnable
中的相關邏輯。TtlRunnable
的源碼如下:
public final class TtlRunnable implements Runnable, TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments {
// 存放從父線程捕獲得到的線程本地變量映射的備份
private final AtomicReference<Object> capturedRef;
// 原始的Runable實例
private final Runnable runnable;
// 執行之后是否釋放TTL值引用
private final boolean releaseTtlValueReferenceAfterRun;
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
// 這里關鍵點:TtlRunnable實例化的時候就已經進行了線程本地變量的捕獲,所以一定是針對父線程的,因為此時任務還沒提交到線程池
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
@Override
public void run() {
// 獲取父線程捕獲到的線程本地變量映射的備份,做一些前置判斷
Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
// 重放操作
Object backup = replay(captured);
try {
// 真正的Runnable調用
runnable.run();
} finally {
// 復原操作
restore(backup);
}
}
@Nullable
public static TtlRunnable get(@Nullable Runnable runnable) {
return get(runnable, false, false);
}
@Nullable
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (null == runnable) return null;
if (runnable instanceof TtlEnhanced) {
// avoid redundant decoration, and ensure idempotency
if (idempotent) return (TtlRunnable) runnable;
else throw new IllegalStateException("Already TtlRunnable!");
}
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}
// 省略其他不太重要的方法
}
其實關注點只需要放在構造函數、run()
方法,其他都是基於此做修飾或者擴展。構造函數的源碼說明,capture()
在TtlRunnable
實例化的時候已經被調用,實例化它的一般就是父線程,所以整體的執行流程如下:
Agent模塊
啟用Agent
功能,需要在Java
的啟動參數添加:-javaagent:path/to/transmittable-thread-local-x.yzx.jar
。原理是通過Instrumentation
回調激發ClassFileTransformer
實現目標類的字節碼增強,使用到javassist
,被增強的類主要是泛線程池的類:
Executor
體系:主要包括ThreadPoolExecutor
和ScheduledThreadPoolExecutor
,對應的字節碼增強類實現是TtlExecutorTransformlet
。ForkJoinPool
:對應的字節碼增強類實現是TtlForkJoinTransformlet
。TimerTask
:對應的字節碼增強類實現是TtlTimerTaskTransformlet
。
Agent
的入口類是TtlAgent
,這里查看對應的源碼:
public final class TtlAgent {
public static void premain(String agentArgs, @NonNull Instrumentation inst) {
kvs = splitCommaColonStringToKV(agentArgs);
Logger.setLoggerImplType(getLogImplTypeFromAgentArgs(kvs));
final Logger logger = Logger.getLogger(TtlAgent.class);
try {
logger.info("[TtlAgent.premain] begin, agentArgs: " + agentArgs + ", Instrumentation: " + inst);
final boolean disableInheritableForThreadPool = isDisableInheritableForThreadPool();
// 裝載所有的JavassistTransformlet
final List<JavassistTransformlet> transformletList = new ArrayList<JavassistTransformlet>();
transformletList.add(new TtlExecutorTransformlet(disableInheritableForThreadPool));
transformletList.add(new TtlForkJoinTransformlet(disableInheritableForThreadPool));
if (isEnableTimerTask()) transformletList.add(new TtlTimerTaskTransformlet());
final ClassFileTransformer transformer = new TtlTransformer(transformletList);
inst.addTransformer(transformer, true);
logger.info("[TtlAgent.premain] addTransformer " + transformer.getClass() + " success");
logger.info("[TtlAgent.premain] end");
ttlAgentLoaded = true;
} catch (Exception e) {
String msg = "Fail to load TtlAgent , cause: " + e.toString();
logger.log(Level.SEVERE, msg, e);
throw new IllegalStateException(msg, e);
}
}
}
List<JavassistTransformlet>
作為參數傳入ClassFileTransformer
的實現類TtlTransformer
中,其中的轉換方法為:
public class TtlTransformer implements ClassFileTransformer {
private final List<JavassistTransformlet> transformletList = new ArrayList<JavassistTransformlet>();
TtlTransformer(List<? extends JavassistTransformlet> transformletList) {
for (JavassistTransformlet transformlet : transformletList) {
this.transformletList.add(transformlet);
logger.info("[TtlTransformer] add Transformlet " + transformlet.getClass() + " success");
}
}
@Override
public final byte[] transform(@Nullable final ClassLoader loader, @Nullable final String classFile, final Class<?> classBeingRedefined,
final ProtectionDomain protectionDomain, @NonNull final byte[] classFileBuffer) {
try {
// Lambda has no class file, no need to transform, just return.
if (classFile == null) return NO_TRANSFORM;
final String className = toClassName(classFile);
ClassInfo classInfo = new ClassInfo(className, classFileBuffer, loader);
// 這里做變量,如果字節碼被修改,則跳出循環返回
for (JavassistTransformlet transformlet : transformletList) {
transformlet.doTransform(classInfo);
if (classInfo.isModified()) return classInfo.getCtClass().toBytecode();
}
} catch (Throwable t) {
String msg = "Fail to transform class " + classFile + ", cause: " + t.toString();
logger.log(Level.SEVERE, msg, t);
throw new IllegalStateException(msg, t);
}
return NO_TRANSFORM;
}
}
這里挑選TtlExecutorTransformlet
的部分方法來看:
@Override
public void doTransform(@NonNull final ClassInfo classInfo) throws IOException, NotFoundException, CannotCompileException {
// 如果當前加載的類包含java.util.concurrent.ThreadPoolExecutor或者java.util.concurrent.ScheduledThreadPoolExecutor
if (EXECUTOR_CLASS_NAMES.contains(classInfo.getClassName())) {
final CtClass clazz = classInfo.getCtClass();
// 遍歷所有的方法進行增強
for (CtMethod method : clazz.getDeclaredMethods()) {
updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(method);
}
// 省略其他代碼
}
// 省略其他代碼
}
private void updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(@NonNull final CtMethod method) throws NotFoundException, CannotCompileException {
final int modifiers = method.getModifiers();
if (!Modifier.isPublic(modifiers) || Modifier.isStatic(modifiers)) return;
// 這里主要在java.lang.Runnable構造時候調用com.alibaba.ttl.TtlRunnable#get()包裝為com.alibaba.ttl.TtlRunnable
// 在java.util.concurrent.Callable構造時候調用com.alibaba.ttl.TtlCallable#get()包裝為com.alibaba.ttl.TtlCallable
// 並且設置附件K-V為ttl.is.auto.wrapper=true
CtClass[] parameterTypes = method.getParameterTypes();
StringBuilder insertCode = new StringBuilder();
for (int i = 0; i < parameterTypes.length; i++) {
final String paramTypeName = parameterTypes[i].getName();
if (PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.containsKey(paramTypeName)) {
String code = String.format(
// decorate to TTL wrapper,
// and then set AutoWrapper attachment/Tag
"$%d = %s.get($%d, false, true);"
+ "\ncom.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.setAutoWrapperAttachment($%<d);",
i + 1, PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.get(paramTypeName), i + 1);
logger.info("insert code before method " + signatureOfMethod(method) + " of class " + method.getDeclaringClass().getName() + ": " + code);
insertCode.append(code);
}
}
if (insertCode.length() > 0) method.insertBefore(insertCode.toString());
}
上面分析的方法的功能,就是讓java.util.concurrent.ThreadPoolExecutor
和java.util.concurrent.ScheduledThreadPoolExecutor
的字節碼被增強,提交的java.lang.Runnable
類型的任務會被包裝為TtlRunnable
,提交的java.util.concurrent.Callable
類型的任務會被包裝為TtlCallable
,實現了無入侵無感知地嵌入TTL
的功能。
小結
TTL
在使用線程池等會池化復用線程的執行組件情況下,提供ThreadLocal
值的傳遞功能,解決異步執行時上下文傳遞的問題。 它是一個Java標准庫,為框架/中間件設施開發提供的標配能力,項目代碼精悍,只依賴了javassist
做字節碼增強,實現Agent
模式下的近乎無入侵提供TTL
功能的特性。TTL
能在業務代碼中實現透明/自動完成所有異步執行上下文的可定制、規范化的捕捉/傳遞,如果恰好碰到異步執行時上下文傳遞的問題,建議可以嘗試此庫。
參考資料:
JDK11
相關源碼- TTL源碼
個人博客
(本文完 c-14-d e-a-20200502)
技術公眾號(《Throwable》),不定期推送筆者原創技術文章(絕不抄襲或者轉載):