ThreadLocal原理


先看一下大體結構

ThreadLocal(線程本地變量),作用是讓每個線程都維護一份獨立的變量副本,解決了變量並發訪問沖突的問題。表面上看,變量是存儲在ThreadLocal里面的,實則不然:
1. ThreadLocal只是個“工具類”,對外暴露了get、set、remove接口;
2. 內部實現:變量其實是保存在當前線程Thread類里,准確來說是保存在Thread類中由ThreadLocal實現的ThreadLocal.ThreadLocalMap成員變量里;

先易后難,先看入口方法

set

public void set(T value) {
    // 獲取當前線程
    Thread t = Thread.currentThread();
    // 嘗試獲取當前線程內部的ThreadLocalMap
    ThreadLocalMap map = getMap(t);
    // map不為空,就正常set值
    if (map != null)
        map.set(this, value);
    else
        // 否則就初始化Map
        createMap(t, value);
}

ThreadLocalMap getMap(Thread t) {
    // 可以看出,ThreadLocalMap是存儲在線程對象里的
    return t.threadLocals;
}

void createMap(Thread t, T firstValue) {
    // new個ThreadLocalMap,key和value分別為當前ThreadLocal對象已經傳入的值
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

get

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    // 如果map不為空,就嘗試獲取;
    if (map != null) {
        // 以當前ThreadLocal對象為key,獲取對應的值
        ThreadLocalMap.Entry e = map.getEntry(this);
        // 不為空就返回,否則返回默認值
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    // 否則初始化並返回默認值
    return setInitialValue();
}

private T setInitialValue() {
    // 獲得默認值
    T value = initialValue();
    // 以下過程和set方法一樣
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
    return value;
}
// 這里可以看出,這個方法可以由子類實現,默認返回null
protected T initialValue() {
    return null;
}

remove

public void remove() {
    // 嘗試獲取ThreadLocalMap
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        // map不為空,則移除key為當前ThreadLocal對象的Entry
        m.remove(this);
}

小結論:ThreadLocalMap存儲在Thread對象里,但卻是在ThreadLocal對象里進行初始化,ThreadLocal對外暴露的接口實際上都是交給ThreadLocalMap進行處理,所以ThreadLocalMap是核心部分。

=======================================================

ThreadLocalMap里有個Entry對象

static class Entry extends WeakReference<ThreadLocal<?>> {
    /** 和當前ThreadLocal有關聯的值 */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

這個Entry是弱引用的,擴展一下:

強引用:在代碼里普遍存在,比如Object obj = new Object();。當內存空間不足,Java虛擬機寧願拋出OutOfMemoryError錯誤使程序異常終止,也不會靠隨意回收具有"強引用"的對象來解決內存不足問題。

軟引用:如果內存空間足夠,垃圾回收器就不會回收它,如果內存空間不足了,就會回收這些對象的內存。軟引用可用來實現內存敏感的高速緩存。

弱引用:被弱引用關聯的對象只能生存到下一次垃圾收集發生之前。在垃圾回收過程中,一旦發現了只具有弱引用的對象,不管當前內存空間足夠與否,都會回收它的內存。

虛引用:如果一個對象僅持有虛引用,那么它就和沒有任何引用一樣,在任何時候都可能被垃圾回收。虛引用主要用來跟蹤對象被垃圾回收的活動。也就是說,持有虛引用的對象能在這個對象被回收時收到一個系統通知。

既然是弱引用,那么就會有個問題:如果key被回收了,就存在一個null-value鍵值對,這個value既無法被訪問到,同時如果線程生命周期很長(比如線程池里),那么這些null key的強引用關系:Thread --> ThreadLocalMap-->Entry-->Value導致Value不會回收,造成內存泄漏。

官方團隊也加入了解決辦法,當調用set、get、remove方法的時候會去掃描key為null的Entry並清除(Entry=null)。但是這個並不是100%保證不出問題,如果這個Entry過期了,但是線程沒有調用set、get或者remove,這個null key的Entry依然會存在,依然是內存泄漏了。所以還是要規范,不用了就調用remove清除。

一個例子就是線程池使用ThreadLocal

import java.util.*;
import java.util.concurrent.*;

public class Main {
    private static ThreadLocal<Integer> local = new ThreadLocal<>();

    public static void main(String[] args) throws Exception {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        for (int i = 0; i < 10; i++){
            executor.execute(()->{
                String name = Thread.currentThread().getName();
                // 正常來說,每個線程先讀取得到的值應該是一樣的初始值(同一變量的副本)。如果讀到了其它線程修改之后的值,則證明出問題了。
                Integer init = local.get();
                // 修改自己變量的值
                local.set(new Random().nextInt(100));
                // 讀取修改之后的值
                Integer data = local.get();
                System.out.println(name + " | init:" + init + " | data:" + data);
            });
        }
        executor.shutdown();
    }
}

輸出:(結果顯而易見,由於沒有清理自己的變量,導致當前線程復用到其它任務的時候,仍然保留着上一家的數據,如果先讀取就會出錯)

pool-1-thread-3 | init:null | data:84
pool-1-thread-1 | init:null | data:33
pool-1-thread-2 | init:null | data:85
pool-1-thread-1 | init:33 | data:96
pool-1-thread-3 | init:84 | data:82
pool-1-thread-1 | init:96 | data:83
pool-1-thread-2 | init:85 | data:51
pool-1-thread-1 | init:83 | data:48
pool-1-thread-3 | init:82 | data:17
pool-1-thread-2 | init:51 | data:58

正確做法

import java.util.*;
import java.util.concurrent.*;

public class Main {
    private static ThreadLocal<Integer> local = new ThreadLocal<>();

    public static void main(String[] args) throws Exception {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
        for (int i = 0; i < 10; i++){
            executor.execute(()->{
                try {
                    String name = Thread.currentThread().getName();
                    // 正常來說,每個線程先讀取得到的值應該是一樣的初始值(同一變量的副本)。如果讀到了其它線程修改之后的值,則證明出問題了。
                    Integer init = local.get();
                    // 修改自己變量的值
                    local.set(new Random().nextInt(100));
                    // 讀取修改之后的值
                    Integer data = local.get();
                    System.out.println(name + " | init:" + init + " | data:" + data);
                } finally {
                    // 最終清除數據
                    local.remove();
                }
            });
        }
        executor.shutdown();
    }
}

輸出:

pool-1-thread-1 | init:null | data:6
pool-1-thread-1 | init:null | data:93
pool-1-thread-2 | init:null | data:93
pool-1-thread-3 | init:null | data:41
pool-1-thread-2 | init:null | data:37
pool-1-thread-1 | init:null | data:54
pool-1-thread-2 | init:null | data:61
pool-1-thread-3 | init:null | data:76
pool-1-thread-2 | init:null | data:95
pool-1-thread-1 | init:null | data:68

所以,千萬記得remove

源碼

private final int threadLocalHashCode = nextHashCode();
private static int nextHashCode() {
    return nextHashCode.getAndAdd(HASH_INCREMENT);  // HASH_INCREMENT = 0x61c88647
}

0x61c88647是斐波那契散列乘數,它的優點是通過它散列(hash)出來的結果分布會比較均勻,可以很大程度上避免hash沖突。

set

private void set(ThreadLocal<?> key, Object value) {
    // 指向當前數組
    Entry[] tab = table;
    // 當前數組長度
    int len = tab.length;
    // 計算下標
    int i = key.threadLocalHashCode & (len-1);

    // 遍歷table
    for (Entry e = tab[i];  // 從計算的下標開始
         e != null; // 直到遇到空槽
         e = tab[i = nextIndex(i, len)]) {  // 指向下一個位置的元素
        // 獲取當前位置的key
        ThreadLocal<?> k = e.get();

        // 如果傳入的key已存在,則覆蓋舊值
        if (k == key) {
            e.value = value;
            return;
        }
        // 如果當前位置i的key為null,
        if (k == null) {
            // 此方法:1. 用指定key-value的新Entry替換set操作期間遇到的過期Entry(key==null)2. 如果遇到已存在的key,則用新值覆蓋舊值。3. 清除兩個空槽之間過期的Entry
            replaceStaleEntry(key, value, i);
            return;
        }
    }

    // 如果前面沒找到已存在的key,則新創建一個Entry放在此位置
    tab[i] = new Entry(key, value);
    int sz = ++size;
    // 1. 啟發式地掃描並清除過期的Entry。2. 如果沒有需要清除的並且需要擴容,則進行擴容
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}

set方法做的事情:從計算得到的下標開始,遇到空槽為止進行掃描。遇到相同的key則覆蓋;遇到key為null的Entry則直接new一個新Entry替換無效Entry;否則在下標處new一個新的Entry。最后掃描並清理無效槽位,如果滿足擴容條件即擴容。

已經確認,只有set的時候可能調用replaceStaleEntry方法,而這種情況下當前位置i(staleSolt)是個過期位置(key==null)

private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
    // 指向當前table
    Entry[] tab = table;
    // table長度
    int len = tab.length;
    // 指向每一個遍歷的數組對象
    Entry e;

    // 記錄需要刪除的槽位。開始的時候等於傳入的位置。
    int slotToExpunge = staleSlot;
    // 向前找直到槽位為空,如果遇到key為空的Entry,則僅僅記錄下最后一個。
    for (int i = prevIndex(staleSlot, len); // 找到當前位置的前一位:((i - 1 >= 0) ? i - 1 : len - 1)
         (e = tab[i]) != null;  // 結束條件是當前位置為null(空槽)
         i = prevIndex(i, len)) // 繼續尋找前一位
        if (e.get() == null)
            // 如果當前位置的key是null,則記錄下此位置
            slotToExpunge = i;

    // 查找key或者空槽,以最先出現的為准
    for (int i = nextIndex(staleSlot, len); // 當前位置的下一個開始:((i + 1 < len) ? i + 1 : 0)
         (e = tab[i]) != null;      // 遇到空槽結束
         i = nextIndex(i, len)) {   // 下一個
        // 當前key
        ThreadLocal<?> k = e.get();

        // 如果當前key和傳入的key相同,那么我們需要將它與過期槽位的內容進行交換,以保持哈希表的順序。
        // 然后可以將新過期的槽或上面遇到的任何其他過期槽的位置發送到expungeStaleEntry,以刪除或重新散列運行中的所有其他Entry。
        if (k == key) {
            e.value = value;
            // 當前位置
            tab[i] = tab[staleSlot];
            tab[staleSlot] = e;

            // 如果slotToExpunge == staleSlot,則證明上一步向前找的過程中沒有遇到key==null的Entry。此種情況,把slotToExpunge記錄為當前位置i
            if (slotToExpunge == staleSlot)
                slotToExpunge = i;
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        }

        // 1. 當前key為null,則此位置是個過期Entry
        // 2. 如果此時slotToExpunge == staleSlot,則證明上一步向前找的過程中沒有遇到key==null的Entry、並且向后找的過程也沒有遇到相同的key(因為前面如果遇到了相同key,則已經退出了循環)
        // 滿足兩個條件,則將當前過期槽位的位置記錄下來
        if (k == null && slotToExpunge == staleSlot)
            slotToExpunge = i;
    }

    // 如果相同的key沒有找到,則把新的Entry放在過期的槽位
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);

    // 如果還存在其它過期槽位,刪除之
    if (slotToExpunge != staleSlot)
        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

圖解

 

情況示例一:向前尋找有個過期槽位,向后尋找沒有沖突key。

情況示例二: 向前尋找有個過期槽位,向后尋找發現沖突key。

 

.

再來看expungeStaleEntry,顧名思義,刪除過期的Entry。

// 在staleSlot和下一個空槽之間:1. 重新哈希任何可能碰撞的Entry。2. 刪除過期的Entry。
private int expungeStaleEntry(int staleSlot) {
    Entry[] tab = table;
    int len = tab.length;

    // 刪除位於staleSlot的Entry
    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    size--;

    // Rehash 直到遇到空槽
    Entry e;
    int i;
    // 從staleSlot的下一個開始遍歷
    for (i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        // 當前槽位的key
        ThreadLocal<?> k = e.get();
        // key為null,刪除之
        if (k == null) {
            e.value = null;
            tab[i] = null;
            size--;
        } else {
            // rehash
            int h = k.threadLocalHashCode & (len - 1);
            // 如果新計算出的位置不等於當前位置,則:
            if (h != i) {
                // 1. 先把當前位置置為空
                tab[i] = null;

                // Unlike Knuth 6.4 Algorithm R, we must scan until
                // null because multiple entries could have been stale.
                // 2. 從h開始找直到遇到空槽。然后把其中的內容移到找到的空槽里。
                while (tab[h] != null)
                    h = nextIndex(h, len);
                tab[h] = e;
            }
        }
    }
    // 返回空槽的位置
    return i;
}

圖解

看cleanSomeSlots

// 啟發式地掃描並清除過期的Entry。
private boolean cleanSomeSlots(int i, int n) {
    boolean removed = false;
    Entry[] tab = table;
    int len = tab.length;
    do {
        // 從位置i的下一個開始搜索
        i = nextIndex(i, len);
        Entry e = tab[i];
        // 如果遇到過期Entry,清除
        if (e != null && e.get() == null) {
            n = len;
            removed = true;
            i = expungeStaleEntry(i);
        }
    } while ( (n >>>= 1) != 0);
    // 只要有過期Entry被移除就會返回true
    return removed;
}

.

最后看下擴容

private void rehash() {
    // 這個方法從0開始遍歷table,遇到key==null的就執行expungeStaleEntry方法
    expungeStaleEntries();

    // Use lower threshold for doubling to avoid hysteresis
    if (size >= threshold - threshold / 4)
        // 如果當前size達到擴容閾值的75%,則擴容
        resize();
}

private void resize() {
    // 舊table
    Entry[] oldTab = table;
    // 舊容量
    int oldLen = oldTab.length;
    // 新容量=舊容量*2(舊容量的2倍)
    int newLen = oldLen * 2;
    // 按照新容量new個新的Entry數組
    Entry[] newTab = new Entry[newLen];
    int count = 0;

    // 遍歷舊table
    for (int j = 0; j < oldLen; ++j) {
        Entry e = oldTab[j];
        if (e != null) {
            ThreadLocal<?> k = e.get();
            // 遇到過期Entry,處理之
            if (k == null) {
                e.value = null; // Help the GC
            } else {
                // 正常的Entry做Rehash操作,放到計算得到的新位置h之后的第一個空槽里
                int h = k.threadLocalHashCode & (newLen - 1);
                while (newTab[h] != null)
                    h = nextIndex(h, newLen);
                newTab[h] = e;
                count++;
            }
        }
    }

    // 設置新的閾值
    setThreshold(newLen);
    size = count;
    table = newTab;
}

擴容很簡單:達到擴容閾值的75%,即擴容,新容量是老容量的2倍,遇到過期的Entry刪除,其它Entry做Rehash操作放到新位置。

以上是set核心方法,下面來看get涉及的方法getEntry

private Entry getEntry(ThreadLocal<?> key) {
    // 計算下標
    int i = key.threadLocalHashCode & (table.length - 1);
    Entry e = table[i];
    // 有值則返回
    if (e != null && e.get() == key)
        return e;
    else
        // 沒有匹配的則進行清理工作(這個方法也體現了,調用get方法不一定會進行清理過期Entry工作)
        return getEntryAfterMiss(key, i, e);
}

private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
    Entry[] tab = table;
    int len = tab.length;

    // 從給定的位置i開始遍歷
    while (e != null) {
        ThreadLocal<?> k = e.get();
        // 如果key的地址相同,證明不需要清理,直接返回即可
        if (k == key)
            return e;
        // 如果key是null,則清理
        if (k == null)
            expungeStaleEntry(i);
        else
            // 否則移到下一個
            i = nextIndex(i, len);
        e = tab[i];
    }
    return null;
}

最后是remove

private void remove(ThreadLocal<?> key) {
    // 當前table
    Entry[] tab = table;
    // 當前長度
    int len = tab.length;
    // 根據key計算下標
    int i = key.threadLocalHashCode & (len-1);
    // 從計算得到的位置開始清理,直到遇到空槽停止
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        // 如果找到目標key,清理后返回
        if (e.get() == key) {
            e.clear();
            expungeStaleEntry(i);
            return;
        }
    }
}

最后兩個問題(我在讀的過程中也充滿了疑惑)

源碼里replaceStaleEntry有個向前找向后找的過程,如果循環之內的條件一直不滿足,則只能依靠循環條件((e = tab[i]) != null)來結束循環;

類似的expungeStaleEntry中Rehash過程有個尋找新位置的過程,結束條件也是while (tab[h] != null)

我就想了,如果沒有空槽呢?豈不是死循環了。

而實際上是不會存在這種情況的,因為擴容啊,每次達到擴容閾值的75%就擴容了,所以空槽是肯定一直存在的。

我們了解到底層的Map是一個Entry數組,那么問題來了:通常我們使用ThreadLocal都是存儲當前線程的私有變量,也就是只存一個值,為什么還需要一個可以存多個值的Entry數組呢?

ThreadLocal可以定義多個,每個都有自己的線程私有變量。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM