最近在學多線程並發的知識,發現好像ThreadLoca還挺重要,決定看看源碼以及查找各方資料來學習一下。
ThreadLocal能夠提供線程的局部變量,讓每個線程都可以通過set/get來對這個局部變量進行操作,不會和其它線程的局部變量進行沖突,實現了線程的數據隔離。
首先是ThreadLocal的結構:
每個Thread維護一個ThreadLocalMap,這個Map的的key就是ThreadLocal本身,value才是真正要存儲的變量。所以這個變量當然是線程私有的。
相比於早期的結構,早期結構式Thread和ThreadLocal換了一下。好處就是:
1.當並發量夠大時,如果時早期結構,那么意味着所有的線程都會去操作同一個map,map的體積可能會很大導致訪問性能的下降。也就是說現在的設計會讓每個map存儲的entry數量變少,因為實際運用中,往往ThreadLocal的數量是少於Thread的數量。之前的存儲數量是由Thread的數量決定,現在是由ThreadLocal的數量決定。
2.當Thread銷毀之后,對應的ThreadLocalMap也會隨之銷毀,能夠減少內存的使用。
接下來講解一下ThreadLocal的核心方法
set方法:
public void set(T value) {
//獲得當前線程 Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t);//得到實際存儲的map if (map != null)如果map已經存在,那么就存入 map.set(this, value);//this就是當前ThreadLocal else createMap(t, value);//如果map不存在,那么創建map再set }
所以代碼的執行流程就是:
首先獲取當前線程,並根據當前線程獲取一個Map,如果map存在,就直接set,如果不存在,就先創建map,再set。
get方法:
/**
返回當前線程中保存ThreadLocal的值,
如果當前線程沒有此ThreadLocal變量,
則會通過調用setInitialValue方法進行初始化值。
*/
public T get() { Thread t = Thread.currentThread();//獲得當前線程對象 ThreadLocalMap map = getMap(t);//獲得當前map if (map != null) {如果map存在 ThreadLocalMap.Entry e = map.getEntry(this);//以當前的ThreadLocal為key,獲得存儲實體Entry類型的e if (e != null) {//如果e不為空 @SuppressWarnings("unchecked") T result = (T)e.value;//獲得e中對應的value值。並返回 return result; } }
//會有兩種情況執行當前代碼
1.map不存在,
2.map存在,但是沒有與當前ThreadLocal關聯的entry。 return setInitialValue(); }
private T setInitialValue() { T value = initialValue();//調用initialValue獲取初始化的值,此方法可以被子類重寫,如果不重寫默認返回null Thread t = Thread.currentThread();//獲取當前線程對象 ThreadLocalMap map = getMap(t);//獲得map if (map != null)如果map存在,那么直接set,則對應上面的第二種情況 map.set(this, value); else//對應上面的第一種情況 createMap(t, value);//那么對map初始化創建,將t(當前線程)和value作為第一個entry存放到map中。 return value; }
代碼流程:首先獲得當前線程,根據當前線程獲取一個map。如果map不為空,則再map中以ThreadLocal的引用作為key來再map中獲取對應的entry e。如果e不為null,則返回e.value,否則map為空或者e為空,則通過setInitialValue函數獲取初始值value。然后用ThreadLocal的引用和value作為firstKey和firstValue創建一個新的map。
總結就是先獲取當前線程的ThreadLocalMap變量,如果存在則返回值,不存在則創建並返回初始值。
remove方法:
刪除當前線程中保存的ThreadLocal對應的實體entry
public void remove() {
//獲取當前線程對象中維護的ThreadLocalMap對象 ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null)//如果此map存在,則刪除。 m.remove(this); }
private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1);//計算索引 for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {//進行線性探索,查找正確的key if (e.get() == key) { e.clear();//調用弱引用的claer()清除引用, expungeStaleEntry(i);//然后連續段清除。 return; } } }
接下來講解ThreadLocalMap的源碼
再上述的createMap方法中,
void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); }
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { table = new Entry[INITIAL_CAPACITY]; int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); table[i] = new Entry(firstKey, firstValue); size = 1; setThreshold(INITIAL_CAPACITY); }
這里就采用了一個延遲初始化,在第一次調用get()或者set()方法的時候才會進行初始化。計算索引的時候是采用&長度-1,這其實就是%(2^n),也就是對2的冪進行取模,這也解釋了為什么map長度一直為2的次方數。
ThreadLocalMap中的set()方法:
它使用線性探測法來解決哈希沖突,就是如果計算出下標是i,如果沖突了i=i+1,如果到了數組的最后一位,還是沖突,那么就從數組0位置再開始遍歷。
private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0); } /** * Decrement i modulo len. */ private static int prevIndex(int i, int len) { return ((i - 1 >= 0) ? i - 1 : len - 1); }
private void set(ThreadLocal<?> key, Object value) { // We don't use a fast path as with get() because it is at // least as common to use set() to create new entries as // it is to replace existing ones, in which case, a fast // path would fail more often than not. Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1);//計算索引位置 for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {//根據獲取到的索引進行循環,如果當前索引上的tab[i]不為空,在沒有retuen的情況下,就使用nextIndex()獲取下一個。也就是線性探測法 ThreadLocal<?> k = e.get();//這也就是tab[i]的key if (k == key) {判斷是否與方法參數key相同,如果相同就替換value,然后return e.value = value; return; } if (k == null) {//key為null,但是值不為null,說明之前的ThreadLocal對象已經被回收了,那么當前數組中的Entry是一個陳舊的元素 replaceStaleEntry(key, value, i);//用新元素替換陳舊的元素,這個方法進行了不少的垃圾清理動作,防止內存泄露。 return; } } tab[i] = new Entry(key, value);//ThreadLocal對應的key不存在並且沒有找到陳舊的元素,則在空元素的位置創建一個新的Entry。 int sz = ++size;
// cleanSomeSlots用於清除那些e.get()==null的元素,
// 這種數據key關聯的對象已經被回收,所以這個Entry(table[index])可以被置null。
// 如果沒有清除任何entry,並且當前使用量達到了負載因子所定義(長度的2/3),那么進行 if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); }
總結:1.先通過key的hash值計算索引,然后根據獲取到的索引i進行循環,循環結束的條件為tab[i]!=null。
1.1在循環里會進行判斷,tab[i].get,就是table[i]的key,是否與方法參數key相同,相同就替換value,然后return
1.2如果不相同再判斷entry的key是否為null,如果是null的話說明這個位置被回收了,那么調用replaceStaleEntry(key,value,i)方法,也就是替換無效的entry(那么再這個無效的table[i]處可以用新的key-value進行替換,並清楚其他無效的entry)。然后return。
2.如果循環結束了,說明當前table[i]為null,那就直接在這個位置放entry就ok了,然后size++;
3.最后進行判斷,如果沒有清楚任何一個entry並且當前size已經大於擴容因子了,也就是數組的2/3,那就需要rehash。
下面就講解replaceStaleEntry(key, value, i);方法。
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { Entry[] tab = table;//entry數組 int len = tab.length; Entry e;//entry // Back up to check for prior stale entry in current run. // We clean out whole runs at a time to avoid continual // incremental rehashing due to garbage collector freeing // up refs in bunches (i.e., whenever the collector runs). int slotToExpunge = staleSlot;//之后用於清理的起點 for (int i = prevIndex(staleSlot, len);//這里是向staleSlot前掃描,時刻記住此時的staleSlot是一個無效的entry。 (e = tab[i]) != null; i = prevIndex(i, len)) if (e.get() == null)//向前掃描找到了第一個無效的entry。那么起點就是這個無效的entry,否則起點就是最開始的staleSlot slotToExpunge = i; // Find either the key or trailing null slot of run, whichever // occurs first for (int i = nextIndex(staleSlot, len);//接着向后掃描 (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); // If we find key, then we need to swap it // with the stale entry to maintain hash table order. // The newly stale slot, or any other stale slot // encountered above it, can then be sent to expungeStaleEntry // to remove or rehash all of the other entries in run. if (k == key) {//如果相等,那么更新value即可 e.value = value;這時候e就是一個有效的entry, tab[i] = tab[staleSlot];//然后這時候把無效的賦值到當前i位置 tab[staleSlot] = e;//再把這個entry賦值給最開始傳入這個方法的位置處。也就是交換了位置。讓無效的entry盡可能靠后。 // Start expunge at preceding stale entry if it exists if (slotToExpunge == staleSlot)//如果向前找沒有找到無效的entry,那么開始的起點就是i。也就是交換后的無效的位置。 slotToExpunge = i; cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); return; } // If we didn't find stale entry on backward scan, the // first stale entry seen while scanning for key is the // first still present in the run. if (k == null && slotToExpunge == staleSlot)//這里就是如果向前查找沒有無效的entry,然后當前向后掃描的entry無效,則更新清理起點。 slotToExpunge = i; } // If key not found, put new entry in stale slot tab[staleSlot].value = null;//上面的k==key判斷沒有經歷到的話,那么說明沒有找到key,有也就是說key之前不存在,那么直接再最開始的無效entry,也就是tab[stableSlot]上新增即可 tab[staleSlot] = new Entry(key, value); // If there are any other stale entries in run, expunge them if (slotToExpunge != staleSlot)//經過上面的for循環之后到這,說明存在其他的無效entry需要進行清理。 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); }
總結一下:上面的目的就是兩個,先把有效entry放在盡可能靠前的位置,然后從第一個無效entry的位置向后清理。
接下來就是expungeStaleEntry(slotToExpunge)方法:
private int expungeStaleEntry(int staleSlot) {//連續段清除 Entry[] tab = table; int len = tab.length; // expunge entry at staleSlot tab[staleSlot].value = null;//清理無效entry,置空 tab[staleSlot] = null; size--;//size減1,置空后table的被使用量減1 // Rehash until we encounter null Entry e; int i; for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {//從staleSlot開始向后掃描一段連續的entry ThreadLocal<?> k = e.get(); if (k == null) {//如果遇到key為null,表示無效entry,進行清理 e.value = null; tab[i] = null; size--; } else {//如果key不為null,計算索引 int h = k.threadLocalHashCode & (len - 1); if (h != i) {計算出來的索引h與當前所在位置的索引i不一致,那么就置空當前的tab[i], tab[i] = null; // Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. while (tab[h] != null)//然后從h開始向后線性探測到第一個空的slot,把e賦值過去。 h = nextIndex(h, len); tab[h] = e; } } } return i;//下一個為空的slot索引。 }
總結:從第一個無效entry向后遍歷連續entry,清理每一個無效entry,對有效的entry重新計算其數組位置,如果和當前位置不符就將其移動到重新計算的位置,如果存在沖突就采用線性探測,最后返回連續entry后的那個下標。這個下標對應的是tab[i]==null。
接下來就是cleanSomeSlots方法
//啟發式的掃描清楚,掃描次數由傳入的參數n決定。
//從i開始向后掃描,(不包括i,因為上面已經說了,i所對應的entry是null)
//n控制掃描次數,正常情況下為log2(n),如果找到了無效entry,會將n重置為table的長度len,然后再調用上面的方法進行連續段清除。
private boolean cleanSomeSlots(int i, int n) { boolean removed = false; Entry[] tab = table; int len = tab.length; do { i = nextIndex(i, len); Entry e = tab[i]; if (e != null && e.get() == null) { n = len;//這里就是找到了一個無效的entry,那么重置n,並段清除。 removed = true; i = expungeStaleEntry(i); } } while ( (n >>>= 1) != 0);//無符號的右移動,可以用於控制掃描次數在log2(n) return removed; }
接下來講解rehash()方法:
private void rehash() { expungeStaleEntries();//全清理 // Use lower threshold for doubling to avoid hysteresis
//threshold = 2/3*len,所以-threshold / 4=len/2.這里主要是因為上面做了一次全清理所以減少,需要進行判斷。判斷的時候把閾值減少了。 if (size >= threshold - threshold / 4) resize(); }
private void expungeStaleEntries() { Entry[] tab = table; int len = tab.length; for (int j = 0; j < len; j++) { Entry e = tab[j]; if (e != null && e.get() == null) expungeStaleEntry(j); } }
private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2;//擴容,擴為原來的兩倍,這樣保證了長度為2的冪 Entry[] newTab = new Entry[newLen]; int count = 0; for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; // Help the GC//雖然做過一次清理,但在擴容的時候可能會又存在key==null的情況 } else { int h = k.threadLocalHashCode & (newLen - 1);//同樣用線性探測法來設置每個位置。 while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } } setThreshold(newLen);//設置新的閾值 size = count; table = newTab; }
接下來講ThreadLocalMap中的getEntry()方法
private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1);//根據key計算索引,獲取entry Entry e = table[i]; if (e != null && e.get() == key)//如果這個table[i]不為null且其key等於key,就返回entry return e; else return getEntryAfterMiss(key, i, e);//如果不是,那就執行這個函數 }
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length; while (e != null) { ThreadLocal<?> k = e.get(); if (k == key) return e; if (k == null) expungeStaleEntry(i);//清除無效的entry else i = nextIndex(i, len);//基於線性探測法向后掃描 e = tab[i]; } return null;//如果都沒有就返回null }
最后就講解一下內存泄露的問題
首先,內存泄漏跟entry中使用了弱引用沒有關系。
先說內存泄漏的概念:內存泄漏值程序中已動態分配的堆內存由於某種原因程序未釋放或者無法釋放,造成系統內存的浪費,導致程序運行速度減慢什么系統崩潰等嚴重后果。
弱引用:垃圾回收器一旦發現了只有弱引用的對象,不管當前內存空間足夠與否,都會回收它的內存。
強引用:平時的引用一般都是強引用,只要對象沒有被置為null,在GC時就不會被回收。
如果key使用了強引用,那么會內存泄漏嗎
那么當棧中的ThreadLocalref引用斷開,那么在ThreadLocalref就被回收了。但是因為entry強引用了threadLocal,造成ThreadLocal無法被回收。在沒有手動刪除這個Entry以及CurrentThread依然運行的前提下,始終有強引用鏈 threadRef->currentThread->threadLocalMap->entry,Entry就不會被回收(Entry中包括了ThreadLocal實例和value),導致Entry內存泄漏。
也就是說,ThreadLocalMap中的key使用了強引用, 是無法完全避免內存泄漏的。
如果使用弱引用:
那么同樣的代碼中使用完了ThreadLocal,ThreadLocal Ref被回收了。
同時,由於entry指向的ThreadLocal是弱引用,所以ThreadLocal可以被順利回收。也就是key為null。但是沒有手動刪除這個entry以及thread仍然運行的情況下,依然有ThreadRef-Thread-ThreadLocalMap-Entry value-Object這條引用存在。value不會被回收,那么就會導致內存泄漏。也就是說使用了弱引用。也有可能內存泄漏。
所以出現內存泄漏的真實原因:
1.沒有手動刪除這個Entry
2.CurrentThread依然運行。
第一點就是使用完ThreadLocal,調用其remove方法刪除對應的Entry,就能避免內存泄漏
第二點就是ThreadLocalMap是Thread的一個樹形,被當前線程所引用,所以它的生命周期跟Thread一樣長,如果使用完ThreadLocal之后,如果當前Thread也隨之執行結束,ThreadLocalMap自然也會被gc回收,從根源上避免內存泄漏。
那么為啥還要使用弱引用呢
剛剛直到要避免內存泄漏有兩種方式
1.使用完ThreadLocal,調用其remove方法刪除對應的Entry
2.使用完ThreadLocal,當前Thread也隨之運行結束。
但是如果是線程池的話,那么線程結束時不會銷毀的,只是返回線程池。
也就是說,只要記得在使用完ThreadLocal之后及時調用remove。無論key時強引用還是弱引用都不會有問題。那么使用key為弱引用的原因是為啥呢?
通過上述源碼分析我們知道,在ThreadLocalMap中的set/get方法中,會對key為null進行判斷。如果為null的話,那么是會對value置為null的。也就是清除。
這也就意味着使用完ThreadLocal,Thread依然運行的前提下,就算忘記調用remove方法,弱引用也會比強引用多一層保障:弱引用的ThreadLocal會被回收,對應的value在下一次ThreadLocalMap調用set,get,remove中的任一方法的時候都會清除,從而避免內存泄漏。