ThreadLocal
定義
ThreadLocal很容易讓人望文生義,想當然地認為是一個“本地線程”。
其實,ThreadLocal並不是一個Thread,而是Thread的局部變量,也許把它命名為ThreadLocalVariable更容易讓人理解一些。
各個線程的ThreadLocal關聯的實例互不干擾。特征:
- ThreadLocal表示線程的"局部變量",它確保每個線程的ThreadLocal變量都是各自獨立的
- ThreadLocal適合在一個線程的處理流程中保持上下文(避免了同一參數在所有方法中傳遞)
- 使用ThreadLocal要用try ... finally結構,並在finally中清除
常用方法
- set:為當前線程設置變量,當前ThreadLocal作為索引
- get:獲取當前線程變量,當前ThreadLocal作為索引
- initialValue:(需要子類實現,默認null)執行get時,發現線程本地變量為null,就會執行initialValue的內容
- remove:清空當前線程的ThreadLocal索引與映射的元素
底層結構及邏輯
Thread對象的屬性
public class Thread implements Runnable {
// .....
ThreadLocal.ThreadLocalMap threadLocals = null;
// .....
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}
ThreadLocalMap對象
public class ThreadLocal<T> {
// .....
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
/**
* The initial capacity -- MUST be a power of two.
*/
private static final int INITIAL_CAPACITY = 16;
/**
* The table, resized as necessary.
* table.length MUST always be a power of two.
*/
private Entry[] table;
/**
* The number of entries in the table.
*/
private int size = 0;
/**
* The next size value at which to resize.
*/
private int threshold; // Default to 0
/**
* Set the resize threshold to maintain at worst a 2/3 load factor.
*/
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
/**
* Increment i modulo len.
*/
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
/**
* Decrement i modulo len.
*/
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}
}
}
set值流程
源碼摘要:
// java.lang.ThreadLocal#set
public void set(T value) {
Thread t = Thread.currentThread();
// map惰性創建
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
重點來關注下 java.lang.ThreadLocal.ThreadLocalMap#set 方法
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
// 根據ThreadLocal對象的hash值,定位到table中的位置i
int i = key.threadLocalHashCode & (len - 1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
// 如果位置i不為空,且這個Entry對象的key正好是即將設置的key,那么就覆蓋Entry中的value
if (k == key) {
e.value = value;
return;
}
// 如果當前位置是空的,就初始化一個Entry對象放在位置i上
if (k == null) {
// 里面會調到 expungeStaleEntry
replaceStaleEntry(key, value, i);
return;
}
// 如果位置i的不為空,而且key不等於entry,那就找下一個空位置,直到為空為止
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
結合代碼,set的過程如下圖
沖突解決
線性探測的方式解決hash沖突的問題,如果沒有找到空閑的slot,就不斷往后嘗試,直到找到一個空閑的位置,插入entry
get流程
源碼摘要:
// java.lang.ThreadLocal#get
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
T result = (T)e.value;
return result;
}
}
return setInitialValue();// 調用initialValue方法
}
// java.lang.ThreadLocal.ThreadLocalMap#getEntry
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
// 可能是沒有,或者hash沖突了
return getEntryAfterMiss(key, i, e);
}
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
// get的時候一樣是根據ThreadLocal獲取到table的i值,然后查找數據拿到后會對比key是否相等
while (e != null) {
ThreadLocal<?> k = e.get();
// 相等就直接返回,不相等就繼續查找,找到相等位置。
if (k == key)
return e;
if (k == null)
// 清理回收無效value、entry
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
弱引用
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
為什么使用弱引用?
弱引用的特點:弱引用的對象擁有更短暫的生命周期。垃圾回收器線程掃描的時候,一旦發現了只具有弱引用的對象,不管當前內存空間足夠與否,都會回收它的內存。
結合到這里的場景,當ThreadLocal在沒有外部強引用的時候,一旦發生gc,key就會被回收。
內存泄露問題
因為有了弱引用,可以確保Entry的key會被內存回收掉。但是Entry的value和Entry對象本身還是沒有得到回收。
如果ThreadLocal的線程一直保持運行,那么這個Entry對象中的value就有可能一直得不到回收,發生內存泄露。
解決辦法:在finally里面調用remove方法
擴展
InheritableThreadLocal
InheritableThreadLocal
是 JDK 本身自帶的一種線程傳遞解決方案,以完成父線程到子線程的值傳遞。在創建子線程的時候,就把父線程的ThreadLocal的內容復制過去。
// java.lang.Thread#init(java.lang.ThreadGroup, java.lang.Runnable, java.lang.String, long, java.security.AccessControlContext, boolean)
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
// ...
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
// 復制父線程的InheritableThreadLocal內容
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
// ...
}
// java.lang.ThreadLocal#createInheritedMap
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}
private ThreadLocalMap(ThreadLocalMap parentMap) {
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];
for (int j = 0; j < len; j++) {
Entry e = parentTable[j];
if (e != null) {
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
// 這里的value 是同一個對象
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}
}
不過,子線程ThreadLocalMap里的Entry.value指向的對象和父線程是同一個。
特殊場景下的缺陷
在線程池的場景下,線程由線程池創建好,並且線程是池化起來反復使用的;這時父子線程關系的ThreadLocal值傳遞已經沒有意義。比如:
public static void main(String[] args) throws InterruptedException {
// 線程池提前創建好
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 提前創建了一個子線程 [pool-1-thread-1]
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName());
});
Thread.sleep(1000);
InheritableThreadLocal<String> threadLocal = new InheritableThreadLocal();
threadLocal.set("start");
System.out.println(threadLocal.get());
// 后續,[pool-1-thread-1]線程的ThreadLocal值永遠是null
executorService.submit(() -> {
System.out.println(threadLocal.get() + " -> " + Thread.currentThread().getName());
});
executorService.submit(() -> {
System.out.println(threadLocal.get() + " -> " + Thread.currentThread().getName());
});
executorService.submit(() -> {
System.out.println(threadLocal.get() + " -> " + Thread.currentThread().getName());
});
Thread.sleep(100);
System.out.println(threadLocal.get());
executorService.shutdown();
}
// 輸出結果
pool-1-thread-1
start
start -> pool-1-thread-2
start -> pool-1-thread-2
null -> pool-1-thread-1
start
尤其是現在都是基於框架開發,線程池一般在項目啟動的時候,就創建好了。業務代碼提交執行任務的時候,如果復用之前的線程,那么值就沒傳到子線程中去!
像這種情況,我們至少要求 把任務提交給線程池時 的ThreadLocal值傳遞到執行線程中。TransmittableThreadLocal的出現就是為了解決這個問題。
TransmittableThreadLocal
TransmittableThreadLocal是Alibaba開源的一個類,它繼承了InheritableThreadLocal。能實現在線程池和主線程之間傳遞,需要配合TtlRunnable 和 TtlCallable使用。
使用示例
public static void main(String[] args) throws InterruptedException {
// 線程池提前創建好
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 提前創建了一個子線程 [pool-1-thread-1]
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName());
});
Thread.sleep(1000);
TransmittableThreadLocal<String> threadLocal = new TransmittableThreadLocal();
threadLocal.set("start");
System.out.println(threadLocal.get());
// 每次提交時都需要通過修飾操作(即TtlRunnable.get(task))以抓取這次提交時的TransmittableThreadLocal上下文的值
executorService.submit(TtlRunnable.get(() -> {
System.out.println(threadLocal.get() + " -> " + Thread.currentThread().getName());
}));
executorService.submit(TtlRunnable.get(() -> {
System.out.println(threadLocal.get() + " -> " + Thread.currentThread().getName());
}));
executorService.submit(TtlRunnable.get(() -> {
System.out.println(threadLocal.get() + " -> " + Thread.currentThread().getName());
}));
Thread.sleep(100);
System.out.println(threadLocal.get());
executorService.shutdown();
}
// 輸出結果
pool-1-thread-1
start
start -> pool-1-thread-1
start -> pool-1-thread-2
start -> pool-1-thread-1
start
整個過程的完整時序圖
修飾線程池
使用TTL的時候,每次提交任務時,都需要用TtlRunnable 或者 TtlCallable對任務修飾一下。這個修飾邏輯可以再線程池中完成。
通過工具類com.alibaba.ttl.threadpool.TtlExecutors
完成,有下面的方法:
getTtlExecutor
:修飾接口Executor
getTtlExecutorService
:修飾接口ExecutorService
getTtlScheduledExecutorService
:修飾接口ScheduledExecutorService
示例代碼:
ExecutorService executorService = ...
// 額外的處理,生成修飾了的對象executorService
executorService = TtlExecutors.getTtlExecutorService(executorService);
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父線程中設置
context.set("value-set-in-parent");
Runnable task = new RunnableTask();
Callable call = new CallableTask();
executorService.submit(task);
executorService.submit(call);
// =====================================================
// Task或是Call中可以讀取,值是"value-set-in-parent"
String value = context.get();
FastThreadLocal
前面分析了ThreadLocal的get和set,當遇到hash沖突的時候,會以nextIndex計算下一個位置的方式來解決hash沖突。
使用線性探測的方式解決hash沖突的問題,如果沒有找到空閑的slot,就不斷往后嘗試,直到找到一個空閑的位置,插入entry,這種方式在經常遇到hash沖突時,影響效率。
鑒於此,netty提供了FastThreadLocal。與之配套的還有FastThreadLocalThread和FastThreadLocalRunnable。
創建FastThreadLocal對象的時候,直接把位置index(使用AtomicInteger實現)確定下來。每個FastThreadLocal都能獲取到一個不重復的下標
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement();
if (index < 0) {
nextIndex.decrementAndGet();
throw new IllegalStateException("too many thread-local indexed variables");
}
return index;
}
不過,FastThreadLocal需要配合FastThreadLocalThread使用,才能發揮它的效率。
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
setKnownNotUnset(threadLocalMap, value);
} else {
remove();
}
}
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);// 直接定位
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
return initialize(threadLocalMap);
}
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length? lookup[index] : UNSET;
}
// InternalThreadLocalMap.get()
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
// 判斷當前Thread類型
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
// FastThreadLocalThread繼承Thread,額外有InternalThreadLocalMap類型屬性
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
private static InternalThreadLocalMap slowGet() {
// 普通Thread無InternalThreadLocalMap,但有ThreadLocal屬性,在它里面存InternalThreadLocalMap等於間接有了InternalThreadLocalMap
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
也就是說,如果是普通Thread使用FastThreadLocal,則需要先拿到ThreadLocal對象,然后再get到里面存的InternalThreadLocalMap。這一get過程完全是ThreadLocal的get,也需要執行hash碰撞&getEntryAfterMiss等邏輯。(有的地方稱之為退化)