Netty中使用FastThreadLocal替代JDK中的ThreadLocal【JAVA】ThreadLocal源碼分析,其用法和ThreadLocal 一樣,只不過從名字FastThreadLocal來看,其處理效率要比JDK中的ThreadLocal要高
在類加載的時候,先初始化了一個靜態成員:
1 private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
實際上FastThreadLocal的操作都是通過對InternalThreadLocalMap的操作來實現的,
而InternalThreadLocalMap是UnpaddedInternalThreadLocalMap的子類,UnpaddedInternalThreadLocalMap的定義比較簡單:
1 class UnpaddedInternalThreadLocalMap { 2 static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal(); 3 static final AtomicInteger nextIndex = new AtomicInteger(); 4 Object[] indexedVariables; 5 int futureListenerStackDepth; 6 int localChannelReaderStackDepth; 7 Map<Class<?>, Boolean> handlerSharableCache; 8 IntegerHolder counterHashCode; 9 ThreadLocalRandom random; 10 Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache; 11 Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache; 12 StringBuilder stringBuilder; 13 Map<Charset, CharsetEncoder> charsetEncoderCache; 14 Map<Charset, CharsetDecoder> charsetDecoderCache; 15 ArrayList<Object> arrayList; 16 17 UnpaddedInternalThreadLocalMap(Object[] indexedVariables) { 18 this.indexedVariables = indexedVariables; 19 } 20 }
可以看到在類加載時,會初始化一個泛型為InternalThreadLocalMap的JDK的ThreadLocal對象作為其靜態成員slowThreadLocalMap ,還有一個原子化的Integer靜態成員nextIndex
InternalThreadLocalMap的定義如下:
1 public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap { 2 private static final InternalLogger logger = InternalLoggerFactory.getInstance(InternalThreadLocalMap.class); 3 private static final int DEFAULT_ARRAY_LIST_INITIAL_CAPACITY = 8; 4 private static final int STRING_BUILDER_INITIAL_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalMap.stringBuilder.initialSize", 1024); 5 private static final int STRING_BUILDER_MAX_SIZE; 6 public static final Object UNSET = new Object(); 7 private BitSet cleanerFlags;
InternalThreadLocalMap的nextVariableIndex方法:
1 public static int nextVariableIndex() { 2 int index = nextIndex.getAndIncrement(); 3 if (index < 0) { 4 nextIndex.decrementAndGet(); 5 throw new IllegalStateException("too many thread-local indexed variables"); 6 } else { 7 return index; 8 } 9 }
這是一個CAS滯后自增操作,獲取nextIndex自增前的值,那么variablesToRemoveIndex初始化時就是0,且恆為0,nextIndex此時變成了1
FastThreadLocal對象的初始化:
1 private final int index = InternalThreadLocalMap.nextVariableIndex(); 2 3 public FastThreadLocal() { 4 }
由上面可知,index成員恆等於nextVariableIndex的返回值,nextIndex 的CAS操作保障了每個FastThreadLocal對象的index是不同的
首先看到set方法:
1 public final void set(V value) { 2 if (value != InternalThreadLocalMap.UNSET) { 3 InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); 4 if (this.setKnownNotUnset(threadLocalMap, value)) { 5 this.registerCleaner(threadLocalMap); 6 } 7 } else { 8 this.remove(); 9 } 10 11 }
只要set的value不是InternalThreadLocalMap.UNSET,會先調用InternalThreadLocalMap的get方法:
1 public static InternalThreadLocalMap get() { 2 Thread thread = Thread.currentThread(); 3 return thread instanceof FastThreadLocalThread ? fastGet((FastThreadLocalThread)thread) : slowGet(); 4 }
判斷當前線程是否是FastThreadLocalThread,是則調用fastGet,否則調用slowGet
FastThreadLocalThread是經過包裝后的Thread:
1 public class FastThreadLocalThread extends Thread { 2 private final boolean cleanupFastThreadLocals; 3 private InternalThreadLocalMap threadLocalMap; 4 5 public FastThreadLocalThread() { 6 this.cleanupFastThreadLocals = false; 7 } 8 9 public FastThreadLocalThread(Runnable target) { 10 super(FastThreadLocalRunnable.wrap(target)); 11 this.cleanupFastThreadLocals = true; 12 } 13 14 public FastThreadLocalThread(ThreadGroup group, Runnable target) { 15 super(group, FastThreadLocalRunnable.wrap(target)); 16 this.cleanupFastThreadLocals = true; 17 } 18 19 public FastThreadLocalThread(String name) { 20 super(name); 21 this.cleanupFastThreadLocals = false; 22 } 23 24 public FastThreadLocalThread(ThreadGroup group, String name) { 25 super(group, name); 26 this.cleanupFastThreadLocals = false; 27 } 28 29 public FastThreadLocalThread(Runnable target, String name) { 30 super(FastThreadLocalRunnable.wrap(target), name); 31 this.cleanupFastThreadLocals = true; 32 } 33 34 public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) { 35 super(group, FastThreadLocalRunnable.wrap(target), name); 36 this.cleanupFastThreadLocals = true; 37 } 38 39 public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) { 40 super(group, FastThreadLocalRunnable.wrap(target), name, stackSize); 41 this.cleanupFastThreadLocals = true; 42 } 43 44 public final InternalThreadLocalMap threadLocalMap() { 45 return this.threadLocalMap; 46 } 47 48 public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) { 49 this.threadLocalMap = threadLocalMap; 50 } 51 52 public boolean willCleanupFastThreadLocals() { 53 return this.cleanupFastThreadLocals; 54 } 55 56 public static boolean willCleanupFastThreadLocals(Thread thread) { 57 return thread instanceof FastThreadLocalThread && ((FastThreadLocalThread)thread).willCleanupFastThreadLocals(); 58 } 59 }
如果看過我之前寫的ThreadLocal源碼分析,看到這就明白,JDK的ThreadLocal中很重要的一點是在Thread類中有一個ThreadLocalMap類型的成員,每個線程都維護這一張ThreadLocalMap,通過ThreadLocalMap來和ThreadLocal對象產生映射關系;而這里和JDK同理綁定的就是InternalThreadLocalMap。
fastGet方法:
1 private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) { 2 InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); 3 if (threadLocalMap == null) { 4 thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap()); 5 } 6 7 return threadLocalMap; 8 }
這里也和JDK的ThreadLocal類似,判斷FastThreadLocalThread 線程的threadLocalMap成員是否為null,若是null,則先創建一個InternalThreadLocalMap實例:
1 private InternalThreadLocalMap() { 2 super(newIndexedVariableTable()); 3 }
先調用newIndexedVariableTable方法:
1 private static Object[] newIndexedVariableTable() { 2 Object[] array = new Object[32]; 3 Arrays.fill(array, UNSET); 4 return array; 5 }
創建了一個大小為32的數組,並且用UNSET這個Object填充了整個數組,然后調用UnpaddedInternalThreadLocalMap的構造,令indexedVariables成員保存該數組
再來看slowGet方法:
1 private static InternalThreadLocalMap slowGet() { 2 ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; 3 InternalThreadLocalMap ret = (InternalThreadLocalMap)slowThreadLocalMap.get(); 4 if (ret == null) { 5 ret = new InternalThreadLocalMap(); 6 slowThreadLocalMap.set(ret); 7 } 8 9 return ret; 10 }
可以看到,其實這里為了提高效率,並沒有直接使用JDK的ThreadLocal,而是給當前非FastThreadLocalThread線程綁定了一個ThreadLocal<InternalThreadLocalMap>對象,避免直接使用JDK的ThreadLocal效率低。
回到FastThreadLocal的set方法,在取得到了當前線程的InternalThreadLocalMap成員后,調用setKnownNotUnset方法:
1 private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) { 2 if (threadLocalMap.setIndexedVariable(this.index, value)) { 3 addToVariablesToRemove(threadLocalMap, this); 4 return true; 5 } else { 6 return false; 7 } 8 }
首先調用了InternalThreadLocalMap的setIndexedVariable方法:
1 public boolean setIndexedVariable(int index, Object value) { 2 Object[] lookup = this.indexedVariables; 3 if (index < lookup.length) { 4 Object oldValue = lookup[index]; 5 lookup[index] = value; 6 return oldValue == UNSET; 7 } else { 8 this.expandIndexedVariableTableAndSet(index, value); 9 return true; 10 } 11 }
因為index是不可更改的常量,所以這里有兩種情況:
當indexedVariables這個Object數組的長度大於index時,直接將value放在indexedVariables數組下標為index的位置,返回oldValue是否等於UNSET,若是不等於UNSET,說明已經set過了,直進行替換,若是等於UNSET,還要進行后續的registerCleaner
當indexedVariables這個Object數組的長度小於等於index時,調用expandIndexedVariableTableAndSet方法擴容
expandIndexedVariableTableAndSet方法:
1 private void expandIndexedVariableTableAndSet(int index, Object value) { 2 Object[] oldArray = this.indexedVariables; 3 int oldCapacity = oldArray.length; 4 int newCapacity = index | index >>> 1; 5 newCapacity |= newCapacity >>> 2; 6 newCapacity |= newCapacity >>> 4; 7 newCapacity |= newCapacity >>> 8; 8 newCapacity |= newCapacity >>> 16; 9 ++newCapacity; 10 Object[] newArray = Arrays.copyOf(oldArray, newCapacity); 11 Arrays.fill(newArray, oldCapacity, newArray.length, UNSET); 12 newArray[index] = value; 13 this.indexedVariables = newArray; 14 }
如果讀過HashMap源碼的話對上述的位運算操作因該不陌生,這個位運算產生的newCapacity的值是大於oldCapacity的最小的二的整數冪(【Java】HashMap中的tableSizeFor方法)
然后申請一個newCapacity大小的數組,將原數組的內容拷貝到新數組,並且用UNSET填充剩余部分,還是將value放在下標為index的位置,用indexedVariables保存新數組。
setIndexedVariable成立后,setKnownNotUnset繼續調用addToVariablesToRemove方法:
1 private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) { 2 Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); 3 Set variablesToRemove; 4 if (v != InternalThreadLocalMap.UNSET && v != null) { 5 variablesToRemove = (Set)v; 6 } else { 7 variablesToRemove = Collections.newSetFromMap(new IdentityHashMap()); 8 threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); 9 } 10 11 variablesToRemove.add(variable); 12 }
上面說過variablesToRemoveIndex恆為0,調用InternalThreadLocalMap的indexedVariable方法:
1 public Object indexedVariable(int index) { 2 Object[] lookup = this.indexedVariables; 3 return index < lookup.length ? lookup[index] : UNSET; 4 }
由於variablesToRemoveIndex恆等於0,所以這里判斷indexedVariables這個Object數組是否為空,若是為空,則返回第0個元素,若不是則返回UNSET
在addToVariablesToRemove中,接着對indexedVariables的返回值進行了判斷,
判斷不是UNSET,並且不等於null,則說明是set過的,然后將剛才的返回值強轉為Set類型
若上述條件不成立,創建一個IdentityHashMap,將其包裝成Set賦值給variablesToRemove,然后調用InternalThreadLocalMap的setIndexedVariable方法,這里就和上面不一樣了,上面是將value放在下標為index的位置,而這里是將Set放在下標為0的位置。
看到這,再結合上面來看,其實已經有一個大致的想法了,一開始在set時,是將value放在InternalThreadLocalMap的Object數組下標為index的位置,然后在這里獲取下標為0的Set,說明value是暫時放在下標為index的位置,然后判斷下標為0的位置有沒有Set,若是有,取出這個Set ,將當前FastThreadLocal對象放入Set中,則說明這個Set中存放的是FastThreadLocal集合
那么就有如下關系:
回到FastThreadLocal的set方法,在setKnownNotUnset成立后,調用registerCleaner方法:
1 private void registerCleaner(InternalThreadLocalMap threadLocalMap) { 2 Thread current = Thread.currentThread(); 3 if (!FastThreadLocalThread.willCleanupFastThreadLocals(current) && !threadLocalMap.isCleanerFlagSet(this.index)) { 4 threadLocalMap.setCleanerFlag(this.index); 5 } 6 }
willCleanupFastThreadLocals的返回值在前面FastThreadLocalThread的初始化時就確定了,看到isCleanerFlagSet方法:
1 public boolean isCleanerFlagSet(int index) { 2 return this.cleanerFlags != null && this.cleanerFlags.get(index); 3 }
cleanerFlags 是一個BitSet對象,在InternalThreadLocalMap初始化時是null,
若不是第一次的set操作,則根據index,獲取index在BitSet對應位的值
這里使用BitSet,使其持有的位和indexedVariables這個Object數組形成了一一對應關系,每一位都是0和1代表當前indexedVariables的對應下標位置的使用情況,0表示沒有使用對應UNSET,1則代表有value
在上面條件成立的情況下,調用setCleanerFlag方法:
1 public void setCleanerFlag(int index) { 2 if (this.cleanerFlags == null) { 3 this.cleanerFlags = new BitSet(); 4 } 5 6 this.cleanerFlags.set(index); 7 }
邏輯比較簡單,判斷cleanerFlags是否初始化,若沒有,則立即初始化,再將cleanerFlags中對應index位的值設為1;
這里通過registerCleaner直接標記了所有set了value的下標可,為以后的removeAll 清除提高效率。
下來看FastThreadLocal的get方法:
1 public final V get() { 2 InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); 3 Object v = threadLocalMap.indexedVariable(this.index); 4 if (v != InternalThreadLocalMap.UNSET) { 5 return v; 6 } else { 7 V value = this.initialize(threadLocalMap); 8 this.registerCleaner(threadLocalMap); 9 return value; 10 } 11 }
和上面一樣,先取得當前線程持有的InternalThreadLocalMap ,調用indexedVariable方法,根據當前FastThreadLocal的index定位,判斷是否是UNSET(set過),若沒有set過則和JDK一樣調用initialize先set:
1 private V initialize(InternalThreadLocalMap threadLocalMap) { 2 Object v = null; 3 4 try { 5 v = this.initialValue(); 6 } catch (Exception var4) { 7 PlatformDependent.throwException(var4); 8 } 9 10 threadLocalMap.setIndexedVariable(this.index, v); 11 addToVariablesToRemove(threadLocalMap, this); 12 return v; 13 }
initialValue()方法就是對外提供的,需要手動覆蓋:
1 protected V initialValue() throws Exception { 2 return null; 3 }
后面的操作就和set的邏輯一樣。
remove方法:
1 public final void remove() { 2 this.remove(InternalThreadLocalMap.getIfSet()); 3 }
getIfSet方法:
1 public static InternalThreadLocalMap getIfSet() { 2 Thread thread = Thread.currentThread(); 3 return thread instanceof FastThreadLocalThread ? ((FastThreadLocalThread)thread).threadLocalMap() : (InternalThreadLocalMap)slowThreadLocalMap.get(); 4 }
和上面的get方法思路相似,只不過在這里如果獲取不到不會創建
然后調用remove重載:
1 public final void remove(InternalThreadLocalMap threadLocalMap) { 2 if (threadLocalMap != null) { 3 Object v = threadLocalMap.removeIndexedVariable(this.index); 4 removeFromVariablesToRemove(threadLocalMap, this); 5 if (v != InternalThreadLocalMap.UNSET) { 6 try { 7 this.onRemoval(v); 8 } catch (Exception var4) { 9 PlatformDependent.throwException(var4); 10 } 11 } 12 13 } 14 }
先檢查threadLocalMap是否存在,若存在才進行后續操作:
調用removeIndexedVariable方法:
1 public Object removeIndexedVariable(int index) { 2 Object[] lookup = this.indexedVariables; 3 if (index < lookup.length) { 4 Object v = lookup[index]; 5 lookup[index] = UNSET; 6 return v; 7 } else { 8 return UNSET; 9 } 10 }
和之前的setIndexedVariable邏輯相似,只不過現在是把index位置的元素設置為UNSET
接着調用removeFromVariablesToRemove方法:
1 private static void removeFromVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) { 2 Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); 3 if (v != InternalThreadLocalMap.UNSET && v != null) { 4 Set<FastThreadLocal<?>> variablesToRemove = (Set)v; 5 variablesToRemove.remove(variable); 6 } 7 }
之前說過variablesToRemoveIndex恆為0,在Object數組中下標為0存儲的Set<FastThreadLocal<?>>集合(不為UNSET情況下),從集合中,將當前FastThreadLocal移除掉
最后調用了onRemoval方法,該方法需要由用戶去覆蓋:
1 protected void onRemoval(V value) throws Exception { 2 }
removeAll方法,是一個靜態方法:
1 public static void removeAll() { 2 InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet(); 3 if (threadLocalMap != null) { 4 try { 5 Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); 6 if (v != null && v != InternalThreadLocalMap.UNSET) { 7 Set<FastThreadLocal<?>> variablesToRemove = (Set)v; 8 FastThreadLocal<?>[] variablesToRemoveArray = (FastThreadLocal[])variablesToRemove.toArray(new FastThreadLocal[0]); 9 FastThreadLocal[] var4 = variablesToRemoveArray; 10 int var5 = variablesToRemoveArray.length; 11 12 for(int var6 = 0; var6 < var5; ++var6) { 13 FastThreadLocal<?> tlv = var4[var6]; 14 tlv.remove(threadLocalMap); 15 } 16 } 17 } finally { 18 InternalThreadLocalMap.remove(); 19 } 20 21 } 22 }
首先獲取當前線程的InternalThreadLocalMap,若是存在繼續后續操作:
通過indexedVariable方法,取出Object數組中下標為0的Set集合(如果不是UNSET情況下),將其轉換為FastThreadLocal數組,遍歷這個數組調用上面的remove方法。
FastThreadLocal源碼分析到此結束。