Netty中FastThreadLocal源碼分析


Netty中使用FastThreadLocal替代JDK中的ThreadLocal【JAVA】ThreadLocal源碼分析,其用法和ThreadLocal 一樣,只不過從名字FastThreadLocal來看,其處理效率要比JDK中的ThreadLocal要高

在類加載的時候,先初始化了一個靜態成員:

1 private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();

實際上FastThreadLocal的操作都是通過對InternalThreadLocalMap的操作來實現的,

而InternalThreadLocalMap是UnpaddedInternalThreadLocalMap的子類,UnpaddedInternalThreadLocalMap的定義比較簡單:

 1 class UnpaddedInternalThreadLocalMap {
 2     static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal();
 3     static final AtomicInteger nextIndex = new AtomicInteger();
 4     Object[] indexedVariables;
 5     int futureListenerStackDepth;
 6     int localChannelReaderStackDepth;
 7     Map<Class<?>, Boolean> handlerSharableCache;
 8     IntegerHolder counterHashCode;
 9     ThreadLocalRandom random;
10     Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache;
11     Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache;
12     StringBuilder stringBuilder;
13     Map<Charset, CharsetEncoder> charsetEncoderCache;
14     Map<Charset, CharsetDecoder> charsetDecoderCache;
15     ArrayList<Object> arrayList;
16 
17     UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
18         this.indexedVariables = indexedVariables;
19     }
20 }

可以看到在類加載時,會初始化一個泛型為InternalThreadLocalMap的JDK的ThreadLocal對象作為其靜態成員slowThreadLocalMap ,還有一個原子化的Integer靜態成員nextIndex

InternalThreadLocalMap的定義如下:

1 public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
2     private static final InternalLogger logger = InternalLoggerFactory.getInstance(InternalThreadLocalMap.class);
3     private static final int DEFAULT_ARRAY_LIST_INITIAL_CAPACITY = 8;
4     private static final int STRING_BUILDER_INITIAL_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalMap.stringBuilder.initialSize", 1024);
5     private static final int STRING_BUILDER_MAX_SIZE;
6     public static final Object UNSET = new Object();
7     private BitSet cleanerFlags;

InternalThreadLocalMap的nextVariableIndex方法:

1 public static int nextVariableIndex() {
2     int index = nextIndex.getAndIncrement();
3     if (index < 0) {
4         nextIndex.decrementAndGet();
5         throw new IllegalStateException("too many thread-local indexed variables");
6     } else {
7         return index;
8     }
9 }

這是一個CAS滯后自增操作,獲取nextIndex自增前的值,那么variablesToRemoveIndex初始化時就是0,且恆為0,nextIndex此時變成了1

FastThreadLocal對象的初始化:

1 private final int index = InternalThreadLocalMap.nextVariableIndex();
2 
3 public FastThreadLocal() {
4 }

由上面可知,index成員恆等於nextVariableIndex的返回值,nextIndex 的CAS操作保障了每個FastThreadLocal對象的index是不同的

首先看到set方法:

 1 public final void set(V value) {
 2     if (value != InternalThreadLocalMap.UNSET) {
 3         InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
 4         if (this.setKnownNotUnset(threadLocalMap, value)) {
 5             this.registerCleaner(threadLocalMap);
 6         }
 7     } else {
 8         this.remove();
 9     }
10 
11 }

只要set的value不是InternalThreadLocalMap.UNSET,會先調用InternalThreadLocalMap的get方法:

1 public static InternalThreadLocalMap get() {
2     Thread thread = Thread.currentThread();
3     return thread instanceof FastThreadLocalThread ? fastGet((FastThreadLocalThread)thread) : slowGet();
4 }

判斷當前線程是否是FastThreadLocalThread,是則調用fastGet,否則調用slowGet
FastThreadLocalThread是經過包裝后的Thread:

 1 public class FastThreadLocalThread extends Thread {
 2     private final boolean cleanupFastThreadLocals;
 3     private InternalThreadLocalMap threadLocalMap;
 4 
 5     public FastThreadLocalThread() {
 6         this.cleanupFastThreadLocals = false;
 7     }
 8 
 9     public FastThreadLocalThread(Runnable target) {
10         super(FastThreadLocalRunnable.wrap(target));
11         this.cleanupFastThreadLocals = true;
12     }
13 
14     public FastThreadLocalThread(ThreadGroup group, Runnable target) {
15         super(group, FastThreadLocalRunnable.wrap(target));
16         this.cleanupFastThreadLocals = true;
17     }
18 
19     public FastThreadLocalThread(String name) {
20         super(name);
21         this.cleanupFastThreadLocals = false;
22     }
23 
24     public FastThreadLocalThread(ThreadGroup group, String name) {
25         super(group, name);
26         this.cleanupFastThreadLocals = false;
27     }
28 
29     public FastThreadLocalThread(Runnable target, String name) {
30         super(FastThreadLocalRunnable.wrap(target), name);
31         this.cleanupFastThreadLocals = true;
32     }
33 
34     public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) {
35         super(group, FastThreadLocalRunnable.wrap(target), name);
36         this.cleanupFastThreadLocals = true;
37     }
38 
39     public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) {
40         super(group, FastThreadLocalRunnable.wrap(target), name, stackSize);
41         this.cleanupFastThreadLocals = true;
42     }
43 
44     public final InternalThreadLocalMap threadLocalMap() {
45         return this.threadLocalMap;
46     }
47 
48     public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
49         this.threadLocalMap = threadLocalMap;
50     }
51 
52     public boolean willCleanupFastThreadLocals() {
53         return this.cleanupFastThreadLocals;
54     }
55 
56     public static boolean willCleanupFastThreadLocals(Thread thread) {
57         return thread instanceof FastThreadLocalThread && ((FastThreadLocalThread)thread).willCleanupFastThreadLocals();
58     }
59 }

如果看過我之前寫的ThreadLocal源碼分析,看到這就明白,JDK的ThreadLocal中很重要的一點是在Thread類中有一個ThreadLocalMap類型的成員,每個線程都維護這一張ThreadLocalMap,通過ThreadLocalMap來和ThreadLocal對象產生映射關系;而這里和JDK同理綁定的就是InternalThreadLocalMap。

fastGet方法:

1 private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
2    InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
3     if (threadLocalMap == null) {
4         thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
5     }
6 
7     return threadLocalMap;
8 }

這里也和JDK的ThreadLocal類似,判斷FastThreadLocalThread 線程的threadLocalMap成員是否為null,若是null,則先創建一個InternalThreadLocalMap實例:

1 private InternalThreadLocalMap() {
2     super(newIndexedVariableTable());
3 }

先調用newIndexedVariableTable方法:

1 private static Object[] newIndexedVariableTable() {
2     Object[] array = new Object[32];
3     Arrays.fill(array, UNSET);
4     return array;
5 }

創建了一個大小為32的數組,並且用UNSET這個Object填充了整個數組,然后調用UnpaddedInternalThreadLocalMap的構造,令indexedVariables成員保存該數組

再來看slowGet方法:

 1 private static InternalThreadLocalMap slowGet() {
 2     ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
 3     InternalThreadLocalMap ret = (InternalThreadLocalMap)slowThreadLocalMap.get();
 4     if (ret == null) {
 5         ret = new InternalThreadLocalMap();
 6         slowThreadLocalMap.set(ret);
 7     }
 8 
 9     return ret;
10 }

可以看到,其實這里為了提高效率,並沒有直接使用JDK的ThreadLocal,而是給當前非FastThreadLocalThread線程綁定了一個ThreadLocal<InternalThreadLocalMap>對象,避免直接使用JDK的ThreadLocal效率低。

回到FastThreadLocal的set方法,在取得到了當前線程的InternalThreadLocalMap成員后,調用setKnownNotUnset方法:

1 private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
2     if (threadLocalMap.setIndexedVariable(this.index, value)) {
3         addToVariablesToRemove(threadLocalMap, this);
4         return true;
5     } else {
6         return false;
7     }
8 }

首先調用了InternalThreadLocalMap的setIndexedVariable方法:

 1 public boolean setIndexedVariable(int index, Object value) {
 2     Object[] lookup = this.indexedVariables;
 3     if (index < lookup.length) {
 4         Object oldValue = lookup[index];
 5         lookup[index] = value;
 6         return oldValue == UNSET;
 7     } else {
 8         this.expandIndexedVariableTableAndSet(index, value);
 9         return true;
10     }
11 }

因為index是不可更改的常量,所以這里有兩種情況:
當indexedVariables這個Object數組的長度大於index時,直接將value放在indexedVariables數組下標為index的位置,返回oldValue是否等於UNSET,若是不等於UNSET,說明已經set過了,直進行替換,若是等於UNSET,還要進行后續的registerCleaner
當indexedVariables這個Object數組的長度小於等於index時,調用expandIndexedVariableTableAndSet方法擴容

expandIndexedVariableTableAndSet方法:

 1 private void expandIndexedVariableTableAndSet(int index, Object value) {
 2     Object[] oldArray = this.indexedVariables;
 3     int oldCapacity = oldArray.length;
 4     int newCapacity = index | index >>> 1;
 5     newCapacity |= newCapacity >>> 2;
 6     newCapacity |= newCapacity >>> 4;
 7     newCapacity |= newCapacity >>> 8;
 8     newCapacity |= newCapacity >>> 16;
 9     ++newCapacity;
10     Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
11     Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
12     newArray[index] = value;
13     this.indexedVariables = newArray;
14 }

如果讀過HashMap源碼的話對上述的位運算操作因該不陌生,這個位運算產生的newCapacity的值是大於oldCapacity的最小的二的整數冪(【Java】HashMap中的tableSizeFor方法

然后申請一個newCapacity大小的數組,將原數組的內容拷貝到新數組,並且用UNSET填充剩余部分,還是將value放在下標為index的位置,用indexedVariables保存新數組。

setIndexedVariable成立后,setKnownNotUnset繼續調用addToVariablesToRemove方法:

 1 private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
 2     Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
 3     Set variablesToRemove;
 4     if (v != InternalThreadLocalMap.UNSET && v != null) {
 5         variablesToRemove = (Set)v;
 6     } else {
 7         variablesToRemove = Collections.newSetFromMap(new IdentityHashMap());
 8         threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
 9     }
10 
11     variablesToRemove.add(variable);
12 }

上面說過variablesToRemoveIndex恆為0,調用InternalThreadLocalMap的indexedVariable方法:

1 public Object indexedVariable(int index) {
2     Object[] lookup = this.indexedVariables;
3     return index < lookup.length ? lookup[index] : UNSET;
4 }

由於variablesToRemoveIndex恆等於0,所以這里判斷indexedVariables這個Object數組是否為空,若是為空,則返回第0個元素,若不是則返回UNSET

在addToVariablesToRemove中,接着對indexedVariables的返回值進行了判斷,
判斷不是UNSET,並且不等於null,則說明是set過的,然后將剛才的返回值強轉為Set類型
若上述條件不成立,創建一個IdentityHashMap,將其包裝成Set賦值給variablesToRemove,然后調用InternalThreadLocalMap的setIndexedVariable方法,這里就和上面不一樣了,上面是將value放在下標為index的位置,而這里是將Set放在下標為0的位置。

看到這,再結合上面來看,其實已經有一個大致的想法了,一開始在set時,是將value放在InternalThreadLocalMap的Object數組下標為index的位置,然后在這里獲取下標為0的Set,說明value是暫時放在下標為index的位置,然后判斷下標為0的位置有沒有Set,若是有,取出這個Set ,將當前FastThreadLocal對象放入Set中,則說明這個Set中存放的是FastThreadLocal集合
那么就有如下關系:

回到FastThreadLocal的set方法,在setKnownNotUnset成立后,調用registerCleaner方法:

1 private void registerCleaner(InternalThreadLocalMap threadLocalMap) {
2     Thread current = Thread.currentThread();
3     if (!FastThreadLocalThread.willCleanupFastThreadLocals(current) && !threadLocalMap.isCleanerFlagSet(this.index)) {
4         threadLocalMap.setCleanerFlag(this.index);
5     }
6 }

willCleanupFastThreadLocals的返回值在前面FastThreadLocalThread的初始化時就確定了,看到isCleanerFlagSet方法:

1 public boolean isCleanerFlagSet(int index) {
2     return this.cleanerFlags != null && this.cleanerFlags.get(index);
3 }

cleanerFlags 是一個BitSet對象,在InternalThreadLocalMap初始化時是null,
若不是第一次的set操作,則根據index,獲取index在BitSet對應位的值

這里使用BitSet,使其持有的位和indexedVariables這個Object數組形成了一一對應關系,每一位都是0和1代表當前indexedVariables的對應下標位置的使用情況,0表示沒有使用對應UNSET,1則代表有value

在上面條件成立的情況下,調用setCleanerFlag方法:

1 public void setCleanerFlag(int index) {
2     if (this.cleanerFlags == null) {
3         this.cleanerFlags = new BitSet();
4     }
5 
6     this.cleanerFlags.set(index);
7 }

邏輯比較簡單,判斷cleanerFlags是否初始化,若沒有,則立即初始化,再將cleanerFlags中對應index位的值設為1;

這里通過registerCleaner直接標記了所有set了value的下標可,為以后的removeAll 清除提高效率。

下來看FastThreadLocal的get方法:

 1 public final V get() {
 2     InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
 3     Object v = threadLocalMap.indexedVariable(this.index);
 4     if (v != InternalThreadLocalMap.UNSET) {
 5         return v;
 6     } else {
 7         V value = this.initialize(threadLocalMap);
 8         this.registerCleaner(threadLocalMap);
 9         return value;
10     }
11 }

和上面一樣,先取得當前線程持有的InternalThreadLocalMap ,調用indexedVariable方法,根據當前FastThreadLocal的index定位,判斷是否是UNSET(set過),若沒有set過則和JDK一樣調用initialize先set:

 1 private V initialize(InternalThreadLocalMap threadLocalMap) {
 2     Object v = null;
 3 
 4     try {
 5         v = this.initialValue();
 6     } catch (Exception var4) {
 7         PlatformDependent.throwException(var4);
 8     }
 9 
10     threadLocalMap.setIndexedVariable(this.index, v);
11     addToVariablesToRemove(threadLocalMap, this);
12     return v;
13 }

initialValue()方法就是對外提供的,需要手動覆蓋:

1 protected V initialValue() throws Exception {
2     return null;
3 }

后面的操作就和set的邏輯一樣。

 

remove方法:

1 public final void remove() {
2     this.remove(InternalThreadLocalMap.getIfSet());
3 }

getIfSet方法:

1 public static InternalThreadLocalMap getIfSet() {
2     Thread thread = Thread.currentThread();
3     return thread instanceof FastThreadLocalThread ? ((FastThreadLocalThread)thread).threadLocalMap() : (InternalThreadLocalMap)slowThreadLocalMap.get();
4 }

和上面的get方法思路相似,只不過在這里如果獲取不到不會創建
然后調用remove重載:

 1 public final void remove(InternalThreadLocalMap threadLocalMap) {
 2     if (threadLocalMap != null) {
 3         Object v = threadLocalMap.removeIndexedVariable(this.index);
 4         removeFromVariablesToRemove(threadLocalMap, this);
 5         if (v != InternalThreadLocalMap.UNSET) {
 6             try {
 7                 this.onRemoval(v);
 8             } catch (Exception var4) {
 9                 PlatformDependent.throwException(var4);
10             }
11         }
12 
13     }
14 }

先檢查threadLocalMap是否存在,若存在才進行后續操作:
調用removeIndexedVariable方法:

 1 public Object removeIndexedVariable(int index) {
 2     Object[] lookup = this.indexedVariables;
 3     if (index < lookup.length) {
 4         Object v = lookup[index];
 5         lookup[index] = UNSET;
 6         return v;
 7     } else {
 8         return UNSET;
 9     }
10 }

和之前的setIndexedVariable邏輯相似,只不過現在是把index位置的元素設置為UNSET

接着調用removeFromVariablesToRemove方法:

1 private static void removeFromVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
2     Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
3     if (v != InternalThreadLocalMap.UNSET && v != null) {
4         Set<FastThreadLocal<?>> variablesToRemove = (Set)v;
5         variablesToRemove.remove(variable);
6     }
7 }

之前說過variablesToRemoveIndex恆為0,在Object數組中下標為0存儲的Set<FastThreadLocal<?>>集合(不為UNSET情況下),從集合中,將當前FastThreadLocal移除掉
最后調用了onRemoval方法,該方法需要由用戶去覆蓋:

1 protected void onRemoval(V value) throws Exception {
2 }


removeAll方法,是一個靜態方法:

 1 public static void removeAll() {
 2     InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
 3     if (threadLocalMap != null) {
 4         try {
 5             Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
 6             if (v != null && v != InternalThreadLocalMap.UNSET) {
 7                 Set<FastThreadLocal<?>> variablesToRemove = (Set)v;
 8                 FastThreadLocal<?>[] variablesToRemoveArray = (FastThreadLocal[])variablesToRemove.toArray(new FastThreadLocal[0]);
 9                 FastThreadLocal[] var4 = variablesToRemoveArray;
10                 int var5 = variablesToRemoveArray.length;
11 
12                 for(int var6 = 0; var6 < var5; ++var6) {
13                     FastThreadLocal<?> tlv = var4[var6];
14                     tlv.remove(threadLocalMap);
15                 }
16             }
17         } finally {
18             InternalThreadLocalMap.remove();
19         }
20 
21     }
22 }

首先獲取當前線程的InternalThreadLocalMap,若是存在繼續后續操作:
通過indexedVariable方法,取出Object數組中下標為0的Set集合(如果不是UNSET情況下),將其轉換為FastThreadLocal數組,遍歷這個數組調用上面的remove方法。

FastThreadLocal源碼分析到此結束。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM