1. 前言
netty
自行封裝了FastThreadLocal
以替換jdk
提供的ThreadLocal
,結合封裝的FastThreadLocalThread
,在多線程環境下的變量提高了ThreadLocal
對象的查詢以及更新效率.
下文,將通過對比ThreadLocal
與FastThreadLocal
,通過源碼解析,探究FastThreadLocal
與FastThreadLocalThread
的搭配使用后性能的奧秘.
2. ThreadLocalMap
ThreadLocalMap
是TharedLocal
中定義的靜態類,其作用是保存Thared
中引用的ThreadLocal
對象.
jdk
中,每一個Thread
對象中均會包含以下兩個變量:
public
class Thread implements Runnable {
// 此處省略若干代碼
// 存儲ThreadLocal變量,通過每個Thread存儲一個ThreadLocalMap,實現了變量的線程隔離
ThreadLocal.ThreadLocalMap threadLocals = null;
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}
編程實踐中,線程中可能包含多個ThreadLocal
去進行引用,它們均保存在ThreadLocal.ThreadLocalMap threadLocals
中(每個線程中均包含自己的ThreadLocalMap
,避免多線程爭用).
static class ThreadLocalMap {
// 需要注意,此處Entry使用WeakReference
(軟引用),這樣在資源緊張的時候可以回收部分不再引用的ThreadLocal變量
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
// ThreadLocal對象存儲數組的初始化長度
private static final int INITIAL_CAPACITY = 16;
// ThreadLocal對象存儲數組
private Entry[] table;
// 初始化ThreadLocalMap,使用數組存放ThreadLocal資源,使用ThreadLocal對象的threadLocalHashCode進行hash得到索引
// 此處使用對象數組存放ThreadLocal對象,操作類似於HashMap,感興趣的讀者可以查看HashMap的源碼進行比較
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
// 獲取ThreadLocal對象,此處需要根據threadLocalHashCode進行hash操作得到索引
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
}
由以上代碼可知,在ThreadLocalMap
初始化時,會創建一個對象數組.
對象數組的初始長度為16,在后續的擴張中,數組長度會保持在2^n
級別,以便進行hash
操作確定ThradLocal
對象的索引.
在每次獲取ThreadLocal
對象的時候,會根據對象的threadLocalHashCode
與對象數組長度減一的求與值,確定對象索引,從而快速獲取value
.
使用hash
確定數組下標,存在以下幾個問題:
- 解決
hash
沖突; - 對象數組擴容帶來的
rehash
.
ThreadLocal
是jdk
提供的通用類,在大部分場景下,線程中的ThreadLocal
變量較少,因此hash
沖突以及rehash
較少.
即使,偶爾發生的hash
沖突以及rehash
,也不會給應用程序帶來較大的性能損耗.
3. FastThreadLocalThread
Netty
對ThreadLocal
改造為FastThreadLocal
,以應對自身的大並發量,數據吞吐量大的應用場景.
為了更好的使用,Netty
亦繼承Thread
,構建了FastThreadLocalThread
.
當且僅當FastThreadLocal
與FastThreadLocalThread
合並使用,方能真正起到提速的作用.
// 限於篇幅,省略較多函數
public class FastThreadLocalThread extends Thread {
// 相對於Thread中使用ThreadLocal.ThreadLocalMap存放ThreadLocal資源,FastThreadLocalThread使用InternalThreadLocalMap存放ThreadLocal資源
private InternalThreadLocalMap threadLocalMap;
public final InternalThreadLocalMap threadLocalMap() {
return threadLocalMap;
}
public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
this.threadLocalMap = threadLocalMap;
}
@UnstableApi
public boolean willCleanupFastThreadLocals() {
return cleanupFastThreadLocals;
}
@UnstableApi
public static boolean willCleanupFastThreadLocals(Thread thread) {
return thread instanceof FastThreadLocalThread &&
((FastThreadLocalThread) thread).willCleanupFastThreadLocals();
}
}
由以上代碼可以看出,相對於Thread
,FastThreadLocalThread
添加了threadLocalMap
對象,以及threadLocalMap
的清理標志獲取函數.
ThreadLocal
即使使用了WeakReference
以保證資源釋放,但是仍會存在內存泄漏可能.
FastThreadLocalThread
與FastThreadLocal
均為Netty
定制,可以在線程任務執行后,強制執行InternalThreadLocalMap
的清理函數removeAll
(詳情見下文).
4. FastThreadLocal
4.1 InternalThreadLocalMap
前情提要:
FastThreadLocalThread
中聲明了InternalThreadLocalMap
對象threadLocalMap
.
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap{
}
從以上代碼可知,InternalThreadLocalMap
繼承於UnpaddedInternalThreadLocalMap
.
因此,我們需要先探究下UnpaddedInternalThreadLocalMap
的定義.
//
class UnpaddedInternalThreadLocalMap {
// 如果在`Thread`中使用`FastThreadLocal`,則實際上使用`ThreadLocal`存放資源
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
// 資源索引,每一個FastThreadLocal對象都會有對應的ID,即通過nextIndex自增得到
static final AtomicInteger nextIndex = new AtomicInteger();
// FastThreadLocal的資源存放地址,ThreadLocal中是通過ThreadLocalMap存放資源,索引是ThreadLocal對象的threadLocalHashCode進行hash得到
// FastThreadLocal使用Object[]數組,使用通過nextIndex自增得到的數值作為索引,保證每次查詢數值都是O(1)操作
// 需要注意,FastThreadLocal對象為了避免偽共享帶來的性能損耗,使用padding使得FastThreadLocal的對象大小超過128byte
// 避免偽共享的情況下,indexedVariables的多個連續數值在不更新的前提下可以被緩存至cpu chache line中,這樣大大的提高了查詢效率
Object[] indexedVariables;
// Core thread-locals
int futureListenerStackDepth;
int localChannelReaderStackDepth;
Map<Class<?>, Boolean> handlerSharableCache;
IntegerHolder counterHashCode;
ThreadLocalRandom random;
Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache;
Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache;
// String-related thread-locals
StringBuilder stringBuilder;
Map<Charset, CharsetEncoder> charsetEncoderCache;
Map<Charset, CharsetDecoder> charsetDecoderCache;
// ArrayList-related thread-locals
ArrayList<Object> arrayList;
// 構造函數,后續需要關注
UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
this.indexedVariables = indexedVariables;
}
}
以上代碼中,需要注意:
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
聲明slowThreadLocalMap
的原因在於,用戶可能在Thread
而非FastThreadLocalThread
中調用FastThreadLocal
.
因此,為了保證程序的兼容性,聲明此變量保存普通的ThreadLocal
相關變量(具體使用詳見后面說明).
// 出於篇幅考慮,刪除部分函數
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
private static final int DEFAULT_ARRAY_LIST_INITIAL_CAPACITY = 8;
// 資源未賦值變質量
public static final Object UNSET = new Object();
// 獲取ThreadLocal對象,此處會判斷當前調用線程的類型分別調用不同的資源
public static InternalThreadLocalMap getIfSet() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return ((FastThreadLocalThread) thread).threadLocalMap();
}
return slowThreadLocalMap.get();
}
// 獲取ThreadLocal對象,此處會判斷當前調用線程的類型,從而判斷調用fastGet或是slowGet
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
// 如果當前調用FastThreadLocal對象的是FastThreadLocalThread,則調用FastThreadLocalThread的threadLocalMap對象獲取相關資源
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
// 如果當前調用FastThreadLocal對象的是Thread,則調用slowThreadLocalMap對象獲取相關資源(slowThreadLocalMap其實是調用jdk提供的ThreadLocalMap)
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
// 保證FastThreadLocal的實體對象大小超過128byte,以避免偽共享發生
// 如果資源能夠避免偽共享,則FastThreadLocal的實體對象能夠部分緩存至L1緩存,通過提高緩存命中率加快查詢速度(查詢L1緩存的速度要遠快於查詢主存速度)
// 更多解釋,詳見
public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;
private InternalThreadLocalMap() {
super(newIndexedVariableTable());
}
// 初始化資源,初始化的長度為32,並初始化為UNSET
private static Object[] newIndexedVariableTable() {
Object[] array = new Object[32];
Arrays.fill(array, UNSET);
return array;
}
}
以上代碼為InternalThreadLocalMap
的主要實現,對於使用者來說,需要關注以下幾個函數:
- getIfSet();
- get();
- fastGet();
- slowGet();
存在以下兩種情況:
(1) 在Thread
中調用FastThreadLocal
;
(2) 在FastThreadLocalThread
中調用FastThreadLocal
.
因為存在以上兩種調用場景,在獲取InternalThreadLocalMap
時,會使用instanceof
進行判斷,如下所示:
if (thread instanceof FastThreadLocalThread) {
// 對應fastGet等操作
} else {
// 對應slowGet等操作
}
如果調用線程是
Thread
: 調用UnpaddedInternalThreadLocalMap
中的slowThreadLocalMap
變量;FastThreadLocalThread
: 調用FastThreadLocalThread
中的threadLocalMap
變量.
因為InternalThreadLocalMap
構造函數為私有函數,所以在getIfSet/fastGet
函數中均是獲取FastThreadLocalThread
的threadLocalMap
變量.若變量為空,則調用私有構造函數進行賦值操作.
// Cache line padding (must be public)
// With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;
private InternalThreadLocalMap() {
super(newIndexedVariableTable());
}
private static Object[] newIndexedVariableTable() {
Object[] array = new Object[32];
Arrays.fill(array, UNSET);
return array;
}
構造函數,會創建一個Object
數組(初始化長度為32),並逐個初始化數值為UNSET
,為后續的賦值操作提供判斷依據(詳見removeIndexedVariable
以及isIndexedVariableSet
函數).
Tips:
構造函數存在一段代碼
public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;
.
此段代碼無實際實用意義,其存在是為了保證InternalThreadLocalMap
的實例大小超過128
字節(以上long
變量72字節,InternalThreadLocalMap
的基類UnpaddedInternalThreadLocalMap
亦存在若干變量).
cpu cache line
的大小一般為64k
或者128k
,變量的大小超過128byte
,則會極大的減少偽共享情況.
(當前Netty
的版本號是4.1.38
,InternalThreadLocalMap
的實例大小是136byte
,這是因為在Netty
的4.0.33版本后,引入了cleanerFlags
以及arrayList
變量,忘記去除rp9
變量導致的).
關於偽共享,可關注JAVA 拾遺 — CPU Cache 與緩存行一文.
4.2 FastThreadLocal初始化
public class FastThreadLocal<V> {
private final int index;
// 原子變量自增,獲取ID,作為FastThreadLocal的存放索引
// public static int nextVariableIndex() {
// int index = nextIndex.getAndIncrement();
// if (index < 0) {
// nextIndex.decrementAndGet();
// throw new IllegalStateException("too many thread-local indexed variables");
// }
// return index;
// }
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
// 設置FastThreadLocal資源
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
setKnownNotUnset(threadLocalMap, value);
} else {
// 如果設置的資源為UNSET,則銷毀當前FastThreadLocal對應的資源對象
remove();
}
}
// 設置資源,並將設置好的FastThreadLocal變量添加至待銷毀資源列表中,待后續進行銷毀操作
private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
if (threadLocalMap.setIndexedVariable(index, value)) {
addToVariablesToRemove(threadLocalMap, this);
}
}
// 根據FastThreadLocal初始化的index,確定其在資源列表中的位置,后續查詢資源就可以根據索引快速確定位置
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
// 按照2的倍數,擴張資源池數組長度
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
int newCapacity = index;
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity ++;
Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
newArray[index] = value;
indexedVariables = newArray;
}
}
以上是FastThreadLocal
的部分函數節選.
由構造函數可知,FastThreadLocal
在初始化的時候,會使用InternalThreadLocalMap
的nextVariableIndex
獲取一個唯一ID
.
此ID
為原子變量自增獲取,后續對此變量的更新或者刪除操作,均是通過此index
進行操作.
在設置變量的時候,存在indexedVariables
空間不足的情況(初始化長度為32),則會對此數組通過expandIndexedVariableTableAndSet
進行擴容操作(>>>
為無符號右移即若該數為正,則高位補0,而若該數為負數,則右移后高位同樣補0).通過這樣的位移操作,每次數組均會乘2(保持2^n
).
因為使用常數索引index
,因此Netty
中查詢FastThreadLocal
變量的速度為O(1)
,擴容時采用Arrays.Copy
也很簡單(相較於jdk
的ThreadLocal
的rehash
操作).
4.3 FastThreadLocal變量獲取及刪除
public class FastThreadLocal<V> {
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
// 在線程執行完資源之后,需要根據業務場景,確定是否調用此函數以銷毀線程中存在的FastThreadLocal資源
public static void removeAll() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}
try {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[0]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
tlv.remove(threadLocalMap);
}
}
} finally {
// 實際上僅僅是將FastThreadLocalThread中的threadLocalMap置為null,或者是將slowThreadLocalMap銷毀
InternalThreadLocalMap.remove();
}
}
@SuppressWarnings("unchecked")
public final V get(InternalThreadLocalMap threadLocalMap) {
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
// 如果當前待獲取資源為空,則進行初始操作,返回相應資源
return initialize(threadLocalMap);
}
// 根據用戶重載的initialValue函數,初始化待獲取資源
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
v = initialValue();
} catch (Exception e) {
PlatformDependent.throwException(e);
}
threadLocalMap.setIndexedVariable(index, v);
addToVariablesToRemove(threadLocalMap, this);
return v;
}
// 將FastThreadLocal變量,添加至待刪除的資源列表中
@SuppressWarnings("unchecked")
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
// 如果待刪除資源列表為空,則初始化待刪除資源列表(Set)
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
variablesToRemove.add(variable);
}
@SuppressWarnings("unchecked")
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
Object v = threadLocalMap.removeIndexedVariable(index);
removeFromVariablesToRemove(threadLocalMap, this);
// FastThreadLocal變量已經被賦值,則需要調用用戶重載的onRemoval函數,銷毀資源
if (v != InternalThreadLocalMap.UNSET) {
try {
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}
// 確定資源的初始化函數(如果用戶不進行重載,則返回null)
protected V initialValue() throws Exception {
return null;
}
// 用戶需要重載次函數,以便銷毀申請的資源
protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { }
}
用戶在使用FastThreadLocal
時,需要繼承initialValue
以及onRemoval
函數(FastThreadLocal
對象的初始化及銷毀交由用戶控制).
initialValue
: 在獲取FastThreadLocal
對象時,若對象未設置,則調用initialValue
初始化資源(get
等函數中判斷對象為空,則調用initialize
初始化資源);onRemoval
: 在FastThreadLocal
更新對象或最終銷毀資源時,調用onRemoval
銷毀資源(set
等函數中判斷待設置對象已被設置過,則調用onRemoval
銷毀資源).
this.threadLocal = new FastThreadLocal<Recycler.Stack<T>>() {
protected Recycler.Stack<T> initialValue() {
return new Recycler.Stack(Recycler.this, Thread.currentThread(), Recycler.this.maxCapacityPerThread, Recycler.this.maxSharedCapacityFactor, Recycler.this.ratioMask, Recycler.this.maxDelayedQueuesPerThread);
}
protected void onRemoval(Recycler.Stack<T> value) {
if (value.threadRef.get() == Thread.currentThread() && Recycler.DELAYED_RECYCLED.isSet()) {
((Map)Recycler.DELAYED_RECYCLED.get()).remove(value);
}
}
};
以上代碼,就是Recycler
調用FastThreadLocal
的使用示范(Recycler
是Netty
的輕量級對象池).
需要注意,在FastThreadLocal
中,存在一個靜態變量variablesToRemoveIndex
,其作用是在對象池中占據一個固定位置,存放一個集合Set<FastThreadLocal<?>> variablesToRemove
.
每次初始化變量的時候,均會將對應的FastThreadLocal
存放至variablesToRemove
中,在更新對象的時候(set
等函數)或者清理FastThreadLocalThread
中的變量時(removeAll
函數)時,程序就會根據variablesToRemove
進行相應的清理工作.
這樣,用戶在使用FastThreadLocalThread
時,就無須花費過多的經理關注線程安全問題(在Netty
中,線程池的生命周期較長,無需過多的關注內存清理,然而如果用戶在線程池等場景使用FastThreadLocalThread
,就需要在執行完任務后,清理FastThreadLocal
參數,以免對后續的業務產生影響).
總結
通過以上源碼分析,可以得知Netty
為了提升ThreadLocal
性能,做了很多改善操作.
- 定制
FastThreadLocalThread
以及FastThreadLocal
; - 使用
padding
手段擴充FastThreadLocal
的實例大小,避免偽共享; - 使用原子變量自增獲取的ID作為常數索引,優化查詢速度至
O(1)
,避免了hash沖突
以及擴容導致的rehash
操作; - 提供
initialValue
以及onRemoval
函數,用戶可以自行重載函數,實現FastThreadLocal
資源的高度定制化操作; FastThreadLocal
對象數組的擴容(expandIndexedVariableTableAndSet
)采用位操作,計算數組長度;- 針對在
Thread
中調用FastThreadLocal
以及在FastThreadLocalThread
中調用FastThreadLocal
,分別采用不同的獲取方式,增強了兼容性. - 更多細節,讀者可以自己參照源碼進行進一步分析.
對於采用Object[]數組存放
FastThreadLocal
變量,是否存在犧牲空間換取性能,個人理解如下:
Netty
的默認啟動線程是2 * cpu core
,也就是兩倍cpu
核數,且此線程組會在Netty
的生命周期中持續存在.
Netty
不存在創建過多線程導致內存占用過多的現象(用戶手動調節Netty
的boss group
以及worker group
線程數量都會很慎重).
此外,Netty
中對於FastThreadLocal
存在較大的讀取以及更新需求量,確實存在優化ThreadLocal
的需求.
因此,適當的浪費一些空間,換取查詢和更新的性能提升,是恰當的操作.
PS:
如果您覺得我的文章對您有幫助,請關注我的微信公眾號,謝謝!