Netty高性能組件——FastThreadLocal源碼解析(細微處見真章)


1. 前言

netty自行封裝了FastThreadLocal以替換jdk提供的ThreadLocal,結合封裝的FastThreadLocalThread,在多線程環境下的變量提高了ThreadLocal對象的查詢以及更新效率.
下文,將通過對比ThreadLocalFastThreadLocal,通過源碼解析,探究FastThreadLocalFastThreadLocalThread的搭配使用后性能的奧秘.

2. ThreadLocalMap

ThreadLocalMapTharedLocal中定義的靜態類,其作用是保存Thared中引用的ThreadLocal對象.
jdk中,每一個Thread對象中均會包含以下兩個變量:

public
class Thread implements Runnable {

    // 此處省略若干代碼

    // 存儲ThreadLocal變量,通過每個Thread存儲一個ThreadLocalMap,實現了變量的線程隔離
    ThreadLocal.ThreadLocalMap threadLocals = null;

    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}

編程實踐中,線程中可能包含多個ThreadLocal去進行引用,它們均保存在ThreadLocal.ThreadLocalMap threadLocals中(每個線程中均包含自己的ThreadLocalMap,避免多線程爭用).

 static class ThreadLocalMap {

        // 需要注意,此處Entry使用WeakReference
        (軟引用),這樣在資源緊張的時候可以回收部分不再引用的ThreadLocal變量
        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }
        
        // ThreadLocal對象存儲數組的初始化長度
        private static final int INITIAL_CAPACITY = 16;
        
        // ThreadLocal對象存儲數組
        private Entry[] table;
        
        // 初始化ThreadLocalMap,使用數組存放ThreadLocal資源,使用ThreadLocal對象的threadLocalHashCode進行hash得到索引
        // 此處使用對象數組存放ThreadLocal對象,操作類似於HashMap,感興趣的讀者可以查看HashMap的源碼進行比較
        ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            table = new Entry[INITIAL_CAPACITY];
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            table[i] = new Entry(firstKey, firstValue);
            size = 1;
            setThreshold(INITIAL_CAPACITY);
        }
        
        // 獲取ThreadLocal對象,此處需要根據threadLocalHashCode進行hash操作得到索引
        private Entry getEntry(ThreadLocal<?> key) {
            int i = key.threadLocalHashCode & (table.length - 1);
            Entry e = table[i];
            if (e != null && e.get() == key)
                return e;
            else
                return getEntryAfterMiss(key, i, e);
        }
    }

由以上代碼可知,在ThreadLocalMap初始化時,會創建一個對象數組.
對象數組的初始長度為16,在后續的擴張中,數組長度會保持在2^n級別,以便進行hash操作確定ThradLocal對象的索引.
在每次獲取ThreadLocal對象的時候,會根據對象的threadLocalHashCode與對象數組長度減一的求與值,確定對象索引,從而快速獲取value.

使用hash確定數組下標,存在以下幾個問題:

  • 解決hash沖突;
  • 對象數組擴容帶來的rehash.

ThreadLocaljdk提供的通用類,在大部分場景下,線程中的ThreadLocal變量較少,因此hash沖突以及rehash較少.
即使,偶爾發生的hash沖突以及rehash,也不會給應用程序帶來較大的性能損耗.

3. FastThreadLocalThread

NettyThreadLocal改造為FastThreadLocal,以應對自身的大並發量,數據吞吐量大的應用場景.
為了更好的使用,Netty亦繼承Thread,構建了FastThreadLocalThread.
當且僅當FastThreadLocalFastThreadLocalThread合並使用,方能真正起到提速的作用.

// 限於篇幅,省略較多函數
public class FastThreadLocalThread extends Thread {

    // 相對於Thread中使用ThreadLocal.ThreadLocalMap存放ThreadLocal資源,FastThreadLocalThread使用InternalThreadLocalMap存放ThreadLocal資源
    private InternalThreadLocalMap threadLocalMap;

    public final InternalThreadLocalMap threadLocalMap() {
        return threadLocalMap;
    }

    public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
        this.threadLocalMap = threadLocalMap;
    }
    
    @UnstableApi
    public boolean willCleanupFastThreadLocals() {
        return cleanupFastThreadLocals;
    }

    @UnstableApi
    public static boolean willCleanupFastThreadLocals(Thread thread) {
        return thread instanceof FastThreadLocalThread &&
                ((FastThreadLocalThread) thread).willCleanupFastThreadLocals();
    }
}

由以上代碼可以看出,相對於Thread,FastThreadLocalThread添加了threadLocalMap對象,以及threadLocalMap的清理標志獲取函數.

ThreadLocal即使使用了WeakReference以保證資源釋放,但是仍會存在內存泄漏可能.
FastThreadLocalThreadFastThreadLocal均為Netty定制,可以在線程任務執行后,強制執行InternalThreadLocalMap的清理函數removeAll(詳情見下文).

4. FastThreadLocal

4.1 InternalThreadLocalMap

前情提要:

FastThreadLocalThread中聲明了InternalThreadLocalMap對象threadLocalMap.

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap{
    
}

從以上代碼可知,InternalThreadLocalMap繼承於UnpaddedInternalThreadLocalMap.
因此,我們需要先探究下UnpaddedInternalThreadLocalMap的定義.

//
class UnpaddedInternalThreadLocalMap {

    // 如果在`Thread`中使用`FastThreadLocal`,則實際上使用`ThreadLocal`存放資源
    static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
    // 資源索引,每一個FastThreadLocal對象都會有對應的ID,即通過nextIndex自增得到
    static final AtomicInteger nextIndex = new AtomicInteger();

    // FastThreadLocal的資源存放地址,ThreadLocal中是通過ThreadLocalMap存放資源,索引是ThreadLocal對象的threadLocalHashCode進行hash得到
    // FastThreadLocal使用Object[]數組,使用通過nextIndex自增得到的數值作為索引,保證每次查詢數值都是O(1)操作
    // 需要注意,FastThreadLocal對象為了避免偽共享帶來的性能損耗,使用padding使得FastThreadLocal的對象大小超過128byte
    // 避免偽共享的情況下,indexedVariables的多個連續數值在不更新的前提下可以被緩存至cpu chache line中,這樣大大的提高了查詢效率
    Object[] indexedVariables;

    // Core thread-locals
    int futureListenerStackDepth;
    int localChannelReaderStackDepth;
    Map<Class<?>, Boolean> handlerSharableCache;
    IntegerHolder counterHashCode;
    ThreadLocalRandom random;
    Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache;
    Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache;

    // String-related thread-locals
    StringBuilder stringBuilder;
    Map<Charset, CharsetEncoder> charsetEncoderCache;
    Map<Charset, CharsetDecoder> charsetDecoderCache;

    // ArrayList-related thread-locals
    ArrayList<Object> arrayList;

    // 構造函數,后續需要關注
    UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
        this.indexedVariables = indexedVariables;
    }
}

以上代碼中,需要注意:

    static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();

聲明slowThreadLocalMap的原因在於,用戶可能在Thread而非FastThreadLocalThread中調用FastThreadLocal.
因此,為了保證程序的兼容性,聲明此變量保存普通的ThreadLocal相關變量(具體使用詳見后面說明).


// 出於篇幅考慮,刪除部分函數
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {

    private static final int DEFAULT_ARRAY_LIST_INITIAL_CAPACITY = 8;
    
    // 資源未賦值變質量
    public static final Object UNSET = new Object();

    // 獲取ThreadLocal對象,此處會判斷當前調用線程的類型分別調用不同的資源
    public static InternalThreadLocalMap getIfSet() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return ((FastThreadLocalThread) thread).threadLocalMap();
        }
        return slowThreadLocalMap.get();
    }

    // 獲取ThreadLocal對象,此處會判斷當前調用線程的類型,從而判斷調用fastGet或是slowGet
    public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return fastGet((FastThreadLocalThread) thread);
        } else {
            return slowGet();
        }
    }

    // 如果當前調用FastThreadLocal對象的是FastThreadLocalThread,則調用FastThreadLocalThread的threadLocalMap對象獲取相關資源
    private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }

    // 如果當前調用FastThreadLocal對象的是Thread,則調用slowThreadLocalMap對象獲取相關資源(slowThreadLocalMap其實是調用jdk提供的ThreadLocalMap)
    private static InternalThreadLocalMap slowGet() {
        ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
        InternalThreadLocalMap ret = slowThreadLocalMap.get();
        if (ret == null) {
            ret = new InternalThreadLocalMap();
            slowThreadLocalMap.set(ret);
        }
        return ret;
    }

    // 保證FastThreadLocal的實體對象大小超過128byte,以避免偽共享發生
    // 如果資源能夠避免偽共享,則FastThreadLocal的實體對象能夠部分緩存至L1緩存,通過提高緩存命中率加快查詢速度(查詢L1緩存的速度要遠快於查詢主存速度)
    // 更多解釋,詳見
    public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;

    private InternalThreadLocalMap() {
        super(newIndexedVariableTable());
    }

    // 初始化資源,初始化的長度為32,並初始化為UNSET
    private static Object[] newIndexedVariableTable() {
        Object[] array = new Object[32];
        Arrays.fill(array, UNSET);
        return array;
    }
}

以上代碼為InternalThreadLocalMap的主要實現,對於使用者來說,需要關注以下幾個函數:

  • getIfSet();
  • get();
  • fastGet();
  • slowGet();

存在以下兩種情況:

(1) 在Thread中調用FastThreadLocal;
(2) 在FastThreadLocalThread中調用FastThreadLocal.

因為存在以上兩種調用場景,在獲取InternalThreadLocalMap時,會使用instanceof進行判斷,如下所示:

        if (thread instanceof FastThreadLocalThread) {
            // 對應fastGet等操作
        } else {
            // 對應slowGet等操作
        }

如果調用線程是

  • Thread: 調用UnpaddedInternalThreadLocalMap中的slowThreadLocalMap變量;
  • FastThreadLocalThread: 調用FastThreadLocalThread中的threadLocalMap變量.

因為InternalThreadLocalMap構造函數為私有函數,所以在getIfSet/fastGet函數中均是獲取FastThreadLocalThreadthreadLocalMap變量.若變量為空,則調用私有構造函數進行賦值操作.

    // Cache line padding (must be public)
    // With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
    public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;

    private InternalThreadLocalMap() {
        super(newIndexedVariableTable());
    }

    private static Object[] newIndexedVariableTable() {
        Object[] array = new Object[32];
        Arrays.fill(array, UNSET);
        return array;
    }

構造函數,會創建一個Object數組(初始化長度為32),並逐個初始化數值為UNSET,為后續的賦值操作提供判斷依據(詳見removeIndexedVariable以及isIndexedVariableSet函數).

Tips:

構造函數存在一段代碼public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;.
此段代碼無實際實用意義,其存在是為了保證InternalThreadLocalMap的實例大小超過128字節(以上long變量72字節,InternalThreadLocalMap的基類UnpaddedInternalThreadLocalMap亦存在若干變量).
cpu cache line的大小一般為64k或者128k,變量的大小超過128byte,則會極大的減少偽共享情況.
(當前Netty的版本號是4.1.38InternalThreadLocalMap的實例大小是136byte,這是因為在Netty的4.0.33版本后,引入了cleanerFlags 以及arrayList變量,忘記去除rp9變量導致的).
關於偽共享,可關注JAVA 拾遺 — CPU Cache 與緩存行一文.

4.2 FastThreadLocal初始化

public class FastThreadLocal<V> {
    
    private final int index;

    // 原子變量自增,獲取ID,作為FastThreadLocal的存放索引
    // public static int nextVariableIndex() {
    //     int index = nextIndex.getAndIncrement();
    //     if (index < 0) {
    //         nextIndex.decrementAndGet();
    //         throw new IllegalStateException("too many thread-local indexed variables");
    //     }
    //     return index;
    // }
    public FastThreadLocal() {
        index = InternalThreadLocalMap.nextVariableIndex();
    }
    
    // 設置FastThreadLocal資源
    public final void set(V value) {
        if (value != InternalThreadLocalMap.UNSET) {
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            setKnownNotUnset(threadLocalMap, value);
        } else {
            // 如果設置的資源為UNSET,則銷毀當前FastThreadLocal對應的資源對象
            remove();
        }
    }
    
    // 設置資源,並將設置好的FastThreadLocal變量添加至待銷毀資源列表中,待后續進行銷毀操作
    private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
        if (threadLocalMap.setIndexedVariable(index, value)) {
            addToVariablesToRemove(threadLocalMap, this);
        }
    }
    
    // 根據FastThreadLocal初始化的index,確定其在資源列表中的位置,后續查詢資源就可以根據索引快速確定位置
    public boolean setIndexedVariable(int index, Object value) {
        Object[] lookup = indexedVariables;
        if (index < lookup.length) {
            Object oldValue = lookup[index];
            lookup[index] = value;
            return oldValue == UNSET;
        } else {
            expandIndexedVariableTableAndSet(index, value);
            return true;
        }
    }
    
    // 按照2的倍數,擴張資源池數組長度
    private void expandIndexedVariableTableAndSet(int index, Object value) {
        Object[] oldArray = indexedVariables;
        final int oldCapacity = oldArray.length;
        int newCapacity = index;
        newCapacity |= newCapacity >>>  1;
        newCapacity |= newCapacity >>>  2;
        newCapacity |= newCapacity >>>  4;
        newCapacity |= newCapacity >>>  8;
        newCapacity |= newCapacity >>> 16;
        newCapacity ++;

        Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
        Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
        newArray[index] = value;
        indexedVariables = newArray;
    }
}

以上是FastThreadLocal的部分函數節選.
由構造函數可知,FastThreadLocal在初始化的時候,會使用InternalThreadLocalMapnextVariableIndex獲取一個唯一ID.
ID為原子變量自增獲取,后續對此變量的更新或者刪除操作,均是通過此index進行操作.
在設置變量的時候,存在indexedVariables空間不足的情況(初始化長度為32),則會對此數組通過expandIndexedVariableTableAndSet進行擴容操作(>>>為無符號右移即若該數為正,則高位補0,而若該數為負數,則右移后高位同樣補0).通過這樣的位移操作,每次數組均會乘2(保持2^n).
因為使用常數索引index,因此Netty中查詢FastThreadLocal變量的速度為O(1),擴容時采用Arrays.Copy也很簡單(相較於jdkThreadLocalrehash操作).

4.3 FastThreadLocal變量獲取及刪除

public class FastThreadLocal<V> {

    private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
    

    // 在線程執行完資源之后,需要根據業務場景,確定是否調用此函數以銷毀線程中存在的FastThreadLocal資源
    public static void removeAll() {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
        if (threadLocalMap == null) {
            return;
        }

        try {
            Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
            if (v != null && v != InternalThreadLocalMap.UNSET) {
                @SuppressWarnings("unchecked")
                Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
                FastThreadLocal<?>[] variablesToRemoveArray =
                        variablesToRemove.toArray(new FastThreadLocal[0]);
                for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
                    tlv.remove(threadLocalMap);
                }
            }
        } finally {
            // 實際上僅僅是將FastThreadLocalThread中的threadLocalMap置為null,或者是將slowThreadLocalMap銷毀
            InternalThreadLocalMap.remove();
        }
    }
    
    @SuppressWarnings("unchecked")
    public final V get(InternalThreadLocalMap threadLocalMap) {
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }

        // 如果當前待獲取資源為空,則進行初始操作,返回相應資源
        return initialize(threadLocalMap);
    }

    // 根據用戶重載的initialValue函數,初始化待獲取資源
    private V initialize(InternalThreadLocalMap threadLocalMap) {
        V v = null;
        try {
            v = initialValue();
        } catch (Exception e) {
            PlatformDependent.throwException(e);
        }

        threadLocalMap.setIndexedVariable(index, v);
        addToVariablesToRemove(threadLocalMap, this);
        return v;
    }
    
    // 將FastThreadLocal變量,添加至待刪除的資源列表中
    @SuppressWarnings("unchecked")
    private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
        Set<FastThreadLocal<?>> variablesToRemove;
        // 如果待刪除資源列表為空,則初始化待刪除資源列表(Set)
        if (v == InternalThreadLocalMap.UNSET || v == null) {
            variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
            threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
        } else {
            variablesToRemove = (Set<FastThreadLocal<?>>) v;
        }

        variablesToRemove.add(variable);
    }
    

    @SuppressWarnings("unchecked")
    public final void remove(InternalThreadLocalMap threadLocalMap) {
        if (threadLocalMap == null) {
            return;
        }

        Object v = threadLocalMap.removeIndexedVariable(index);
        removeFromVariablesToRemove(threadLocalMap, this);
    
        // FastThreadLocal變量已經被賦值,則需要調用用戶重載的onRemoval函數,銷毀資源
        if (v != InternalThreadLocalMap.UNSET) {
            try {
                onRemoval((V) v);
            } catch (Exception e) {
                PlatformDependent.throwException(e);
            }
        }
    }
    
    // 確定資源的初始化函數(如果用戶不進行重載,則返回null)
    protected V initialValue() throws Exception {
        return null;
    }

    // 用戶需要重載次函數,以便銷毀申請的資源
    protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { }
}

用戶在使用FastThreadLocal時,需要繼承initialValue以及onRemoval函數(FastThreadLocal對象的初始化及銷毀交由用戶控制).

  • initialValue: 在獲取FastThreadLocal對象時,若對象未設置,則調用initialValue初始化資源(get等函數中判斷對象為空,則調用initialize初始化資源);
  • onRemoval: 在FastThreadLocal更新對象或最終銷毀資源時,調用onRemoval銷毀資源(set等函數中判斷待設置對象已被設置過,則調用onRemoval銷毀資源).
    this.threadLocal = new FastThreadLocal<Recycler.Stack<T>>() {
        protected Recycler.Stack<T> initialValue() {
            return new Recycler.Stack(Recycler.this, Thread.currentThread(), Recycler.this.maxCapacityPerThread, Recycler.this.maxSharedCapacityFactor, Recycler.this.ratioMask, Recycler.this.maxDelayedQueuesPerThread);
        }

        protected void onRemoval(Recycler.Stack<T> value) {
            if (value.threadRef.get() == Thread.currentThread() && Recycler.DELAYED_RECYCLED.isSet()) {
                ((Map)Recycler.DELAYED_RECYCLED.get()).remove(value);
            }

        }
    };

以上代碼,就是Recycler調用FastThreadLocal的使用示范(RecyclerNetty的輕量級對象池).
需要注意,在FastThreadLocal中,存在一個靜態變量variablesToRemoveIndex,其作用是在對象池中占據一個固定位置,存放一個集合Set<FastThreadLocal<?>> variablesToRemove.
每次初始化變量的時候,均會將對應的FastThreadLocal存放至variablesToRemove中,在更新對象的時候(set等函數)或者清理FastThreadLocalThread中的變量時(removeAll函數)時,程序就會根據variablesToRemove進行相應的清理工作.
這樣,用戶在使用FastThreadLocalThread時,就無須花費過多的經理關注線程安全問題(在Netty中,線程池的生命周期較長,無需過多的關注內存清理,然而如果用戶在線程池等場景使用FastThreadLocalThread,就需要在執行完任務后,清理FastThreadLocal參數,以免對后續的業務產生影響).

總結

通過以上源碼分析,可以得知Netty為了提升ThreadLocal性能,做了很多改善操作.

  • 定制FastThreadLocalThread以及FastThreadLocal;
  • 使用padding手段擴充FastThreadLocal的實例大小,避免偽共享;
  • 使用原子變量自增獲取的ID作為常數索引,優化查詢速度至O(1),避免了hash沖突以及擴容導致的rehash操作;
  • 提供initialValue以及onRemoval函數,用戶可以自行重載函數,實現FastThreadLocal資源的高度定制化操作;
  • FastThreadLocal對象數組的擴容(expandIndexedVariableTableAndSet)采用位操作,計算數組長度;
  • 針對在Thread中調用FastThreadLocal以及在FastThreadLocalThread中調用FastThreadLocal,分別采用不同的獲取方式,增強了兼容性.
  • 更多細節,讀者可以自己參照源碼進行進一步分析.

對於采用Object[]數組存放FastThreadLocal變量,是否存在犧牲空間換取性能,個人理解如下:
Netty的默認啟動線程是2 * cpu core,也就是兩倍cpu核數,且此線程組會在Netty的生命周期中持續存在.
Netty不存在創建過多線程導致內存占用過多的現象(用戶手動調節Nettyboss group以及worker group線程數量都會很慎重).
此外,Netty中對於FastThreadLocal存在較大的讀取以及更新需求量,確實存在優化ThreadLocal的需求.
因此,適當的浪費一些空間,換取查詢和更新的性能提升,是恰當的操作.

PS:
如果您覺得我的文章對您有幫助,請關注我的微信公眾號,謝謝!
程序員打怪之路


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM