GenericObjectPool源碼分析


apache提供了三種對象池:GenericKeyedObjectPool,SoftReferenceObjectPool和GenericObjectPool,其中GenericObjectPool是我們最常用的對象池,內部實現也最復雜,本文講解其實現原理。

GenericObjectPool實現了ObjectPool<T>接口,而ObjectPool<T>中有以下方法:

  • Object borrowObject() // 從池中獲得一個對象  
  • void returnObject(Object obj) // 返回一個對象給池  
  • void invalidateObject(Object obj) // 使對象實效,不再受池管轄(必須是已經從池中獲得的對象)  
  • void addObject() // 生成一個對象(通過工程或其他實現方式),並將其放入空閑隊列中  
  • int getNumIdle() // 獲得空閑對象的數量  
  • int getNumActive() // 獲得活動對象的數量  
  • void clear() // 清空池中空閑對象,釋放相關資源  
  • void close() // 關閉池,釋放所有與它相關資源  
  • void setFactory(PoolableObjectFactory factory) // 設置池對象工廠

其中,前四個方法比較重要,本文重點研究這四個方法的碼源實現,講解這四個方法前,先了解下部分重要屬性的含義。

  • CursorableLinkedList<ObjectTimestampPair<T>>  _pool: 隊列,用於保存空閑object,ObjectTimestampPair的value值即為真實的object
  • LinkedList<Latch<T>> _allocationQueue: 隊列,用於保存線程borrow object的請求。
  • PoolableObjectFactory<T> _factory:用於生產object的工廠類
  • _maxActive: 鏈接池中最大連接數,默認為8.
  • _whenExhaustedAction: 當“連接池”中active數量達到閥值時,即“鏈接”資源耗盡時,連接池需要采取的手段, 默認為1:
  • -> 0 : 拋出異常,
  • -> 1 : 阻塞,直到有可用鏈接資源,這里如果設置了maxWait值,則在阻塞了maxWait時間后拋出異常
  • -> 2 : 強制創建新的鏈接資源
  • _maxWait: 當連接池資源耗盡時,調用者最大阻塞的時間,超時將跑出異常。單位,毫秒數;默認為-1.表示永不超時.
  • _maxIdle: 鏈接池中最大空閑的連接數,默認為8.該參數一般盡量與_maxActive相同,以提高並發數
  • _minIdle: 連接池中最少空閑的連接數,默認為0.
  • _testOnBorrow: 向調用者輸出“鏈接”資源時,是否檢測是有有效,如果無效則從連接池中移除,並嘗試獲取繼續獲取。默認為false。建議保持默認值.
  • _testOnReturn:  向連接池“歸還”鏈接時,是否檢測“鏈接”對象的有效性。默認為false。建議保持默認值.
  • _timeBetweenEvictionRunsMillis:  “空閑鏈接”檢測線程,檢測的周期,毫秒數。如果為負值,表示不運行“檢測線程”。默認為-1.該值非-1時下面的參數才有效
  • _numTestsPerEvictionRun:檢測線程一次運行檢查多少條“鏈接”
  • _minEvictableIdleTimeMillis: 連接空閑的最小時間,達到此值后空閑連接將可能會被移除。負值(-1)表示不移除
  • _testWhileIdle:  向調用者輸出“鏈接”對象時,是否檢測它的空閑超時;默認為false。如果“鏈接”空閑超時,將會被移除。建議保持默認值.
  • _softMinEvictableIdleTimeMillis: 連接空閑的最小時間,達到此值后空閑鏈接將會被移除,且保留“minIdle”個空閑連接數。默認為-1.
  • lifo:false為隊列,true為棧,表示object 的出借方式

構造方法:工廠方法用於創建object,config主要配置pool的一些屬性(上面屬性中第四個到最后)。其他的構造方法基本一致,都會傳factory,pool屬性配置可以不傳,GenericObjectPool有默認屬性可設置

public GenericObjectPool(PoolableObjectFactory<T> factory, GenericObjectPool.Config config) {
        this(factory, config.maxActive, config.whenExhaustedAction, config.maxWait, config.maxIdle, config.minIdle,
                config.testOnBorrow, config.testOnReturn, config.timeBetweenEvictionRunsMillis,
                config.numTestsPerEvictionRun, config.minEvictableIdleTimeMillis, config.testWhileIdle,
                config.softMinEvictableIdleTimeMillis, config.lifo);

}

主要四個方法:

1.borrowObject()

 public T borrowObject() throws Exception {
    第一步:創建請求latch放入分配隊列,設置相關屬性,並執行一次分配動作
        long starttime = System.currentTimeMillis();
        Latch<T> latch = new Latch<T>(); // 保存object的基本單位
        byte whenExhaustedAction;
        long maxWait;
        synchronized (this) {                        
            // Get local copy of current config. Can't sync when used later as
            // it can result in a deadlock. Has the added advantage that config
            // is consistent for entire method execution
            whenExhaustedAction = _whenExhaustedAction; //設置阻塞方式
            maxWait = _maxWait;//阻塞時最大等待時間

            // Add this request to the queue
            _allocationQueue.add(latch);  //將borrow請求加入到分配隊列
        }
        // Work the allocation queue, allocating idle instances and
        // instance creation permits in request arrival order
        allocate();   執行一次分配動作,嘗試給上面的latch分配object


    第二步是一個大循環,里面又分為2步,第一步用於判斷是否獲取到object並根據阻塞方式操作,第二步是為latch分配或創建object。
    1. 第一步中如果沒有分配到object,並且不能創建新的object時,switch pool設置的阻塞方式:
         a:whenExhaustedAction=2,強制創建一個object,此時同步pool,如果未獲取且不能創建(說明未分配到object),則從分配隊列移除latch,正在創建數量+1,break跳出switch,進入第2步
         b:whenExhaustedAction=0,直接拋出異常,如果已獲取或允許創建,則break出switch,進入第2步,否則從分配隊列移除latch,拋出異常,跳出borrowObject方法
         c:whenExhaustedAction=1,阻塞一定時間拋出異常,同步latch,如果已獲取或允許創建,break跳出switch,進入第2步;否則,如果maxWait<0,則一直阻塞,maxWait>0,計算阻塞時間waitTime並阻塞。
            c2:這一段都是c中的異常邏輯,如果拋出中斷異常,同步pool並判斷latch狀態,1、未獲取且不能創建,則從分配隊列移除latch 2、未獲取但允許創建,正在創建數量減一,創建標志為true  3、已獲取object,正在創建數量減一,已borrow數量加一,調用returnObject(object)歸還object ; 然后如果創建標志為true,調用分配方法(這里異常可能占用一個創建的機會,需調用分配方法顯式讓其他線程獲取得創建標志),然后中斷當前線程,拋出異常,跳出borrowObject方法。  
            c3:然后,如果已超時, 繼續判斷latch,未創建且不能創建,則從分配隊列中移除latch,然后拋出異常,跳出borrowObject;已創建或可以創建,則break,進入第2步。未超時的情況下,繼續循環,從第1步開始。
        d:默認 阻塞方式 屬性不能識別

    2.如果latch未獲取object,則通過factory創建一個object賦給latch,設置新創建標志為true;有異常時,如果標志位不是新創建的,則將正在創建數量減一(這里減一對應分配方法創建新的 和a 中 給新創建數量加1的邏輯,因為只有強制創建a,或者分配方法中才有創建加1的邏輯),並再分配。
      然后激活object,檢測object有效性,無效進入異常邏輯。有效使創建數量減1,已borrow數量加一,返回object,跳出borrowObject方法。 如果出現異常,工廠毀滅object,並將正在創建減一,如果不是新創建的Object(失效的空閑object),latch重置,並加入分配隊列,再分配, 如果是新創建的,拋出異常,不是則繼續循環。

        for(;;) {
            synchronized (this) {
                assertOpen();//父類的方法,確認 pool沒有被關閉,如果關閉了調用該方法會拋異常
            }
       第1步:
            // If no object was allocated from the pool above
            if(latch.getPair() == null) {    沒有從pool中分配到object(沒有空閑的)
                // check if we were allowed to create one
                if(latch.mayCreate()) {   如果設置了可以嘗試創建新object
                    // allow new object to be created
                } else {
               pool設置了取不到object時的動作
                    // the pool is exhausted
                    switch(whenExhaustedAction) {
                        case WHEN_EXHAUSTED_GROW:   強制創造一個object分配的情況
                            // allow new object to be created   
                            synchronized (this) {
                //防止其他
                                // Make sure another thread didn't allocate us an object
                                // or permit a new object to be created
                                if (latch.getPair() == null && !latch.mayCreate()) {
                                    _allocationQueue.remove(latch); 確保未分配到object,分配隊列中刪除請求
                                    _numInternalProcessing++;
                                }
                            }
                            break;
                        case WHEN_EXHAUSTED_FAIL:   直接拋異常的情況
                            synchronized (this) {
                                // Make sure allocate hasn't already assigned an object
                                // in a different thread or permitted a new object to be created
                                if (latch.getPair() != null || latch.mayCreate()) {  如果分配到object或獲取到創建object權限,則跳出switch,進入創建邏輯代碼
                                    break;    
                                }
                                _allocationQueue.remove(latch);
                            }
                            throw new NoSuchElementException("Pool exhausted");
                        case WHEN_EXHAUSTED_BLOCK:   阻塞maxWait秒 拋異常  的情況
                            try {
                                synchronized (latch) {
                                    // Before we wait, make sure another thread didn't allocate us an object
                                    // or permit a new object to be created
                                    if (latch.getPair() == null && !latch.mayCreate()) {  //未分配到object並且沒有權限創建
                                        if(maxWait <= 0) {
                                            latch.wait(); maxWait小於0 就一直阻塞
                                        } else {
                                            // this code may be executed again after a notify then continue cycle
                                            // so, need to calculate the amount of time to wait
                                            final long elapsed = (System.currentTimeMillis() - starttime);
                                            final long waitTime = maxWait - elapsed; //計算需要阻塞的時間
                                            if (waitTime > 0)
                                            {
                                                latch.wait(waitTime);
                                            }
                                        }
                                    } else {
                                        break;
                                    }
                                }
                                // see if we were awakened by a closing pool
                                if(isClosed() == true) {
                                    throw new IllegalStateException("Pool closed");
                                }
                            } catch(InterruptedException e) {
                                boolean doAllocate = false;
                                synchronized(this) {
                                    // Need to handle the all three possibilities
                                    if (latch.getPair() == null && !latch.mayCreate()) {
                    還是沒有分配到object 在分配隊列中,直接移除
                                        // Case 1: latch still in allocation queue      
                                        // Remove latch from the allocation queue
                                        _allocationQueue.remove(latch);
                                    } else if (latch.getPair() == null && latch.mayCreate()) {
                    可以創建一個object,需要將 正在創建的數量-1,設置允許創建標志位
                                        // Case 2: latch has been given permission to create
                                        //         a new object
                                        _numInternalProcessing--;
                                        doAllocate = true;   允許創建的標志
                                    } else {這種情況是 已分配得到object
                    被分配到對象   正創建數量 -1   已borrow數量+1,並且object歸還給pool
                                        // Case 3: An object has been allocated
                                        _numInternalProcessing--;
                                        _numActive++;
                                        returnObject(latch.getPair().getValue());
                                    }
                                }
                                if (doAllocate) {
                                    allocate();//如果是可以創建,則顯示調用分配方法,將該機會分配出去
                                }
                                Thread.currentThread().interrupt();
                                throw e;
                            }
                            if(maxWait > 0 && ((System.currentTimeMillis() - starttime) >= maxWait)) {
                                synchronized(this) {    如果阻塞超時,並且沒有獲得object、不能創建,則在分配隊列中去除latch,否則跳出switch,可以進入創建邏輯
                                    // Make sure allocate hasn't already assigned an object
                                    // in a different thread or permitted a new object to be created
                                    if (latch.getPair() == null && !latch.mayCreate()) { 同上面
                                        // Remove latch from the allocation queue
                                        _allocationQueue.remove(latch);
                                    } else {
                                        break;
                                    }
                                }
                                超時並且沒有分配到object並且沒有創建權限,則拋異常
                                throw new NoSuchElementException("Timeout waiting for idle object");
                            } else {
                                continue; // keep looping
                            }
                        default:
                            throw new IllegalArgumentException("WhenExhaustedAction property " + whenExhaustedAction +
                                    " not recognized.");
                    }
                }
            }
       //第2步,這一步是創建過程的邏輯,說明是允許創建或者已分配到object,  其他的阻塞超時之類的上面直接拋出異常跳出方法
            boolean newlyCreated = false;
            if(null == latch.getPair()) { 未分配到object時創建object
                try {
                    T obj = _factory.makeObject();  工廠類創建object
                    latch.setPair(new ObjectTimestampPair<T>(obj));
                    newlyCreated = true;
                } finally {
                    if (!newlyCreated) {  //  如果不是新創建的,說明創建出現異常,創建失敗,需要把正在創建中數量-1
                        // object cannot be created
                        synchronized (this) {
                            _numInternalProcessing--;
                            // No need to reset latch - about to throw exception
                        }
                        allocate(); //分配 方法,將創建失敗的機會嘗試分配給其他線程
                    }
                }
            }
            // activate & validate the object
            try {
                _factory.activateObject(latch.getPair().value);  激活object
                if(_testOnBorrow &&
                        !_factory.validateObject(latch.getPair().value)) {
                    throw new Exception("ValidateObject failed”);      如果設置了testOnBorrow參數並且對象失效拋異常
                }
                synchronized(this) {
                    _numInternalProcessing--;     將正在創建數量轉為已borrow數量
                    _numActive++;
                }
                return latch.getPair().value;
            }
            catch (Throwable e) {
                PoolUtils.checkRethrow(e);
                // object cannot be activated or is invalid
                try {
                    _factory.destroyObject(latch.getPair().value);   將已經該失效的object毀滅
                } catch (Throwable e2) {
                    PoolUtils.checkRethrow(e2);
                    // cannot destroy broken object
                }
                synchronized (this) {
                    _numInternalProcessing--; 拋出異常說明創建失敗,將正在創建減一
                    if (!newlyCreated) {  如果是object是從pool中獲取的空閑object,失效后需要reset,將請求命令重新放入分配隊列中,再次嘗試獲取新的object
                        latch.reset();
                        _allocationQueue.add(0, latch);   插入到隊列中第一位
                    }
                }
                allocate();   // 顯示為創建失敗的請求 分配object
                如果object是新創建的,則拋異常,說明再創建也有問題,直接拋異常
                if(newlyCreated) {
                    throw new NoSuchElementException("Could not create a validated object, cause: " + e.getMessage());
                }
                else {
                    continue; // keep looping
                }
            }
        }
    }

對分配隊列中的請求分配object方法
private synchronized void allocate() {
        if (isClosed()) return;

        // First use any objects in the pool to clear the queue
     1.嘗試從pool中向需要分配的隊列分配  空閑的對象! 直對象池空或者分配隊列空才break
        for (;;) {
            if (!_pool.isEmpty() && !_allocationQueue.isEmpty()) {
                Latch<T> latch = _allocationQueue.removeFirst();  //Latch 代表borrowObject的一個命令請求
                latch.setPair( _pool.removeFirst()); //將池中的pair分配出去
                _numInternalProcessing++;  //指還在分配過程中的數量,不屬於已經borrow、空閑的對象的數
                synchronized (latch) {
                    latch.notify();//喚醒borrow中的阻塞的線程
                }
            } else {
                break;
            }
        }
    
        // Second utilise any spare capacity to create new objects
        for(;;) {
        2.如果分配隊列還有需要分配對象的請求,並且 pool大小小於0(這個邏輯不理解,有懂的人麻煩說下)  或者  已borrow的數量+正在分配的數量 小於pool的大小  ,這個時候可以讓pool創建新的object,否則break
            if((!_allocationQueue.isEmpty()) && (_maxActive < 0 || (_numActive + _numInternalProcessing) < _maxActive)) {
                Latch<T> latch = _allocationQueue.removeFirst();
                latch.setMayCreate(true);  //設置為創建新的對象
                _numInternalProcessing++;
                synchronized (latch) {
                    latch.notify();
                }
            } else {
                break;
            }
        }
    }

2.returnObject(T obj),將對象返回給pool,這時object已經不受pool管理,如果有異常,讓其失效,jvm會處理
public void returnObject(T obj) throws Exception {
        try {
            addObjectToPool(obj, true); //將對象添加到pool true 表示歸還的對象,false表示新創建的
        } catch (Exception e) {
            if (_factory != null) {
                try {
                    _factory.destroyObject(obj);   添加到pool時異常,需將對象毀滅,將borrow數量減一
                } catch (Exception e2) {
                    // swallowed
                }
                // TODO: Correctness here depends on control in addObjectToPool.
                // These two methods should be refactored, removing the
                // "behavior flag", decrementNumActive, from addObjectToPool.
                synchronized(this) {
                    _numActive--;
                }
                allocate();//將異常時歸還object的機會分配 給其他線程
            }
        }
    }

private void addObjectToPool(T obj, boolean decrementNumActive) throws Exception {
        boolean success = true;
        if(_testOnReturn && !(_factory.validateObject(obj))) { //檢測對象是否有效
            success = false;   對象失效
        } else {
            _factory.passivateObject(obj);   //對象鈍化
        }

        boolean shouldDestroy = !success;  是否應該毀來

        // Add instance to pool if there is room and it has passed validation
        // (if testOnreturn is set)
        boolean doAllocate = false;
        synchronized (this) {
            if (isClosed()) {   //pool關閉,所有object應該被 毀滅
                shouldDestroy = true;
            } else {
                if((_maxIdle >= 0) && (_pool.size() >= _maxIdle)) {
                    shouldDestroy = true;    最大空閑數量大於0      並且 pool的空閑數量大於最大空閑數量時,需將其毀滅
                } else if(success) {
                    // borrowObject always takes the first element from the queue,
                    // so for LIFO, push on top, FIFO add to end   按照設定的模式將對象加入pool
                    if (_lifo) {
                        _pool.addFirst(new ObjectTimestampPair<T>(obj));
                    } else {
                        _pool.addLast(new ObjectTimestampPair<T>(obj));
                    }
                    if (decrementNumActive) { 如果是舊對象歸還的操作
                        _numActive--;    
                    }
                    doAllocate = true; 再分配
                }
            }
        }
        if (doAllocate) {
            allocate(); //有可分配的object,所以執行一次
        }

        // Destroy the instance if necessary
        if(shouldDestroy) {
            try {
                _factory.destroyObject(obj); 毀滅
            } catch(Exception e) {
                // ignored
            }
            // Decrement active count *after* destroy if applicable
            if (decrementNumActive) {  如果是歸還的對象
                synchronized(this) {
                    _numActive--;
                }
                allocate();
            }
        }

    }

3.使對象實效,不再受池管轄(必須是已經從池中獲得的對象)提供給開發者調用,在其拋出異常時可使用     由jvm自動清理
public void invalidateObject(T obj) throws Exception {
        try {
            if (_factory != null) {
                _factory.destroyObject(obj);   毀滅object ,將borrow數減一
            }
        } finally {
            synchronized (this) {
                _numActive--;   不管毀滅成功失敗,都要將borrow數量減一
            }
            allocate();  將返回的可borrow 分配出去
        }
    }
4.添加一個object到pool中,一般開發者不會調用,用於pool維護 最小空閑object數量
public void addObject() throws Exception {
        assertOpen(); pool是否打開
        if (_factory == null) {
            throw new IllegalStateException("Cannot add objects without a factory.");
        }
        T obj = _factory.makeObject();  工廠創造object
        try {
            assertOpen();    pool是否打開
            addObjectToPool(obj, false);    將object添加到pool,設置false為表示新創建的object
        } catch (IllegalStateException ex) { // Pool closed
            try {
                _factory.destroyObject(obj);添加失敗就毀滅object
            } catch (Exception ex2) {
                // swallow
            }
            throw ex;
        }
    }

 

后續研究再補充吧,有研究這塊源碼的朋友可以分享下,網上資料確實太少,轉載請注明原地址,謝謝。

 

 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM