前言
在做分布式鏈路追蹤系統的時候,需要解決異步調用透傳上下文的需求,特別是傳遞traceId,本文就線程池透傳幾種方式進行分析。
其他典型場景例子:
-
分布式跟蹤系統 或 全鏈路壓測(即鏈路打標)
-
日志收集記錄系統上下文
-
Session
級Cache
-
應用容器或上層框架跨應用代碼給下層
SDK
傳遞信息
1、JDK對跨線程傳遞ThreadLocal的支持
首先看一個最簡單場景,也是一個錯誤的例子。
void testThreadLocal(){
ThreadLocal<Object> threadLocal = new ThreadLocal<>();
threadLocal.set("not ok");
new Thread(()->{
System.out.println(threadLocal.get());
}).start();
}
java中的threadlocal,是綁定在線程上的。你在一個線程中set的值,在另外一個線程是拿不到的。
上面的輸出是:
null
1.1 InheritableThreadLocal 例子
JDK考慮了這種場景,實現了InheritableThreadLocal ,不要高興太早,這個只是支持父子線程,線程池會有問題。
我們看下InheritableThreadLocal的例子:
InheritableThreadLocal<String> itl = new InheritableThreadLocal<>();
itl.set("father");
new Thread(()->{
System.out.println("subThread:" + itl.get());
itl.set("son");
System.out.println(itl.get());
}).start();
Thread.sleep(500);//等待子線程執行完
System.out.println("thread:" + itl.get());
上面的輸出是:
subThread:father
//子線程可以拿到父線程的變量
son
thread:father
//子線程修改不影響父線程的變量
1.2 InheritableThreadLocal的實現原理
有同學可能想知道InheritableThreadLocal的實現原理,其實特別簡單。就是Thread類里面分開記錄了ThreadLocal、InheritableThreadLocal的ThreadLocalMap,初始化的時候,會拿到parent.InheritableThreadLocal。直接上代碼可以看的很清楚。
class Thread {
...
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
/*
* InheritableThreadLocal values pertaining to this thread. This map is
* maintained by the InheritableThreadLocal class.
*/
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
...
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
}
JDK
的InheritableThreadLocal
類可以完成父線程到子線程的值傳遞。但對於使用線程池等會池化復用線程的執行組件的情況,線程由線程池創建好,並且線程是池化起來反復使用的;這時父子線程關系的ThreadLocal
值傳遞已經沒有意義,應用需要的實際上是把 任務提交給線程池時的ThreadLocal
值傳遞到 任務執行時。
2、日志MDC/Opentracing的實現
如果你的應用實現了Opentracing的規范,比如通過skywalking
的agent對線程池做了攔截,那么自定義Scope實現類,可以跨線程傳遞MDC,然后你的義務可以通過設置MDC的值,傳遞給子線程。
代碼如下:
this.scopeManager = scopeManager;
this.wrapped = wrapped;
this.finishOnClose = finishOnClose;
this.toRestore = (OwlThreadLocalScope)scopeManager.tlsScope.get();
scopeManager.tlsScope.set(this);
if (wrapped instanceof JaegerSpan) {
this.insertMDC(((JaegerSpan)wrapped).context());
} else if (wrapped instanceof JaegerSpanWrapper) {
this.insertMDC(((JaegerSpanWrapper)wrapped).getDelegated().context());
}
3、阿里transmittable-thread-local
github地址:https://github.com/alibaba/transmittable-thread-local
TransmittableThreadLocal(TTL)是框架/中間件缺少的Java™std lib(簡單和0依賴),提供了增強的InheritableThreadLocal,即使使用線程池組件也可以在線程之間傳輸值。
3.1 transmittable-thread-local 官方readme參考:
使用類TransmittableThreadLocal
來保存值,並跨線程池傳遞。
TransmittableThreadLocal
繼承InheritableThreadLocal
,使用方式也類似。相比InheritableThreadLocal
,添加了
-
copy
方法
用於定制 任務提交給線程池時 的ThreadLocal
值傳遞到 任務執行時 的拷貝行為,缺省傳遞的是引用。
注意:如果跨線程傳遞了對象引用因為不再有線程封閉,與InheritableThreadLocal.childValue
一樣,使用者/業務邏輯要注意傳遞對象的線程 -
protected
的beforeExecute
/afterExecute
方法
執行任務(Runnable
/Callable
)的前/后的生命周期回調,缺省是空操作。
3.2 transmittable-thread-local 代碼例子
方式一:TtlRunnable封裝:
ExecutorService executorService = Executors.newCachedThreadPool();
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父線程中設置
context.set("value-set-in-parent");
// 額外的處理,生成修飾了的對象ttlRunnable
Runnable ttlRunnable = TtlRunnable.get(() -> {
System.out.println(context.get());
});
executorService.submit(ttlRunnable);
方式二:ExecutorService封裝:
ExecutorService executorService = ...
// 額外的處理,生成修飾了的對象executorService
executorService = TtlExecutors.getTtlExecutorService(executorService);
方式三:使用java agent,無代碼入侵
這種方式,實現線程池的傳遞是透明的,業務代碼中沒有修飾Runnable
或是線程池的代碼。即可以做到應用代碼 無侵入。
ExecutorService executorService = Executors.newCachedThreadPool();
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父線程中設置
context.set("value-set-in-parent");
executorService.submit(() -> {
System.out.println(context.get());
});
4、grpc的實現
grpc是一種分布式調用協議和實現,也封裝了一套跨線程傳遞上下文的實現。
io.grpc.Context
表示上下文,用來在一次grpc請求鏈路中傳遞用戶登錄信息、tracing信息等。
Context常用用法如下。首先獲取當前context,這個一般是作為參數傳過來的,或通過current()獲取當前的已有context。
然后通過attach方法,綁定到當前線程上,並且返回當前線程
public Runnable wrap(final Runnable r) {
return new Runnable() {
@Override
public void run() {
Context previous = attach();
try {
r.run();
} finally {
detach(previous);
}
}
};
}
Context的主要方法如下
- attach() attach Context自己,從而進入到一個新的scope中,新的scope以此Context實例作為current,並且返回之前的current context
- detach(Context toDetach) attach()方法的反向方法,退出當前Context並且detach到toDetachContext,每個attach方法要對應一個detach,所以一般通過try finally代碼塊或wrap模板方法來使用。
- static storage() 獲取storage,Storage是用來attach和detach當前context用的。
線程池傳遞實現:
ExecutorService executorService = Executors.newCachedThreadPool();
Context.withValue("key","value");
execute(Context.current().wrap(() -> {
System.out.println(Context.current().getValue("key"));
}));
5、總結
以上總結的四種實現跨線程傳遞的方法,最簡單的就是自己定義一個Runnable,添加屬性傳遞即可。如果考慮通用型,需要中間件封裝一個Executor對象,類似transmittable-thread-local的實現,或者直接使用transmittable-thread-local。
實踐的項目中,考慮周全,要支持span、MDC、rpc上下文、業務自定義上下文,可以參考以上方法封裝。
參考資料
[grpc源碼分析1-context] https://www.codercto.com/a/66559.html
[threadlocal變量透傳,這些問題你都遇到過嗎?]https://cloud.tencent.com/developer/article/1492379
掃描二維碼,關注公眾號“猿必過”
回復 “面試題” 自行領取吧。
微信群交流討論,請添加微信號:zyhui98,備注:面試題加群
本文由猿必過 YBG 發布
禁止未經授權轉載,違者依法追究相關法律責任
如需授權可聯系:zhuyunhui@yuanbiguo.com