java對象池commons-pool-1.6詳解(一)


自己的項目中用到了 對象池 commons-pool:

package com.sankuai.qcs.regulation.protocol.client;

import com.dianping.cat.Cat;
import com.dianping.cat.message.Transaction;
import com.sankuai.qcs.regulation.constant.CatConstant;
import com.sankuai.qcs.regulation.exception.QcsSysErrorEnum;
import com.sankuai.qcs.regulation.exception.QcsSysException;
import lombok.Data;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPSClient;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
 
public class PoolingFTPSClient implements FileClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(PoolingFTPSClient.class);

    private Config config;

    private GenericObjectPool<FTPSClient> ftpsClientPool;

    public PoolingFTPSClient(Config config) {
        this.config = config;
        ftpsClientPool = new GenericObjectPool<>(new FTPSClientFactory(config), config.poolConfig);
    }


    @Override
    public void storeFile(String fileName, byte[] data) throws Exception {
        Transaction transaction = Cat.newTransaction(CatConstant.TRANSACTION_FTPS_REQUEST.getKey(), CatConstant.TRANSACTION_FTPS_REQUEST.getValue());

        FTPSClient client = null;
        try {
            client = ftpsClientPool.borrowObject();
            if (!client.storeFile(fileName, new ByteArrayInputStream(data))) {
                String errorInfo = String.format("上傳文件:%s 到%s失敗.", fileName, config.getName());
                LOGGER.error(errorInfo);
                throw QcsSysErrorEnum.SYS_ERROR.formException(errorInfo);
            }

            if (client.rename(fileName, String.format("%s/%s", config.getFinalFileDir(), fileName))) {
                String errorInfo = String.format("%s文件:%s 移動操作失敗.", config.getName(), fileName);
                LOGGER.error(errorInfo);
                throw QcsSysErrorEnum.SYS_ERROR.formException(errorInfo);
            }

            transaction.setSuccessStatus();
        } catch (Exception e) {
            transaction.setStatus(e);
            LOGGER.error("PoolingFTPSClient.storeFile error.(config:{},fileName:{})", config.toString(), fileName, e);
            throw e;
        } finally {
            if (client != null) {
                ftpsClientPool.returnObject(client);
            }
            transaction.complete();
        }
    }

    public void destroy() {
        if (!ftpsClientPool.isClosed()) {
            ftpsClientPool.close();
        }
    }

    private static final class FTPSClientFactory implements PooledObjectFactory<FTPSClient> {

        private Config config;

        FTPSClientFactory(Config config) {
            this.config = config;
        }

        @Override
        public PooledObject<FTPSClient> makeObject() throws Exception {
            FTPSClient client = new FTPSClient();
            client.setControlEncoding("utf-8");

            try {
                client.connect(config.serverIp, config.serverPort);
                boolean isOk = client.login(config.user, config.pwd);
                if (!isOk) {
                    String errorInfo = String.format("ftps服務登錄失敗:config:%s", config);
                    LOGGER.error(errorInfo);
                    throw QcsSysErrorEnum.SYS_ERROR.formException(errorInfo);
                }

                if (!client.changeWorkingDirectory(config.tempFileDir)) {
                    client.makeDirectory(config.tempFileDir);

                    if (!client.changeWorkingDirectory(config.tempFileDir)) {
                        String errorInfo = String.format("切換%s的工作目錄失敗", config.getName());
                        LOGGER.error(errorInfo);
                        throw QcsSysErrorEnum.SYS_ERROR.formException(errorInfo);
                    }
                }
                client.setFileType(FTP.BINARY_FILE_TYPE);
                return new DefaultPooledObject(client);
            } catch (Exception e) {
                client.disconnect();
                if (e instanceof QcsSysException) {
                    throw e;
                } else {
                    throw QcsSysErrorEnum.SYS_ERROR.formException(String.format("構造FTPSClient異常,errorInfo:%s,config:%s", e.getMessage(), config));
                }
            }

        }

        @Override
        public void destroyObject(PooledObject<FTPSClient> pooledObject) throws Exception {
            FTPSClient client = pooledObject.getObject();
            if (client.isConnected()) {
                /* 不需要再調用 退出 指令 */
                client.disconnect();
            }

            LOGGER.warn("{}的一個ftps連接已經關閉.", config.getName());
        }

        @Override
        public boolean validateObject(PooledObject<FTPSClient> pooledObject) {
            FTPSClient client = pooledObject.getObject();
            try {
                if (client.sendNoOp()) {
                    return true;
                }
            } catch (IOException e) {
                LOGGER.warn("{}的一個ftps連接驗證失敗.", config.getName(), e);
                return false;
            }

            return false;
        }

        @Override
        public void activateObject(PooledObject<FTPSClient> pooledObject) {
            //TODO:待定
        }

        @Override
        public void passivateObject(PooledObject<FTPSClient> pooledObject) {
            //TODO:待定
        }
    }

    @Data
    public static final class Config {
        private String name;
        private String serverIp;
        private int serverPort;
        private String user;
        private String pwd;
        private String tempFileDir;
        private String finalFileDir;
        private GenericObjectPoolConfig poolConfig;
    }
}

 

 

對象的創建和銷毀在一定程度上會消耗系統的資源,雖然jvm的性能在近幾年已經得到了很大的提高,對於多數對象來說,沒有必要利用對象池技術來進行對象的創建和管理。但是對於有些對象來說,其創建的代價還是比較昂貴的,比如線程、tcp連接、rpc連接、數據庫連接等對象,因此對象池技術還是有其存在的意義。

Apache-commons-pool-1.6提供的對象池主要有兩種:一種是帶Key的對象池,這種帶Key的對象池是把相同的池對象放在同一個池中,也就是說有多少個key就有多少個池;另一種是不帶Key的對象池,這種對象池是把生產完全一致的對象放在同一個池中,但是有時候,單用對池內所有對象一視同仁的對象池,並不能解決的問題。例如:對於一組某些參數設置不同的同類對象——比如一堆指向不同地址的 java.net.URL對象或者一批代表不同語句的java.sql.PreparedStatement對象,用這樣的方法池化,就有可能取出不合用的對象。

 

1、對象池:

1)對象池接口介紹:

如果讓我們去設計一個對象池接口,會給用戶提供哪些核心的方法呢?

borrowObject(),returnObject()是兩個核心方法,一個是’借’,一個是’還’。那么我們有可能需要對一個已經借到的對象置為失效(比如當我們的遠程連接關閉或產生異常,這個連接不可用需要失效掉),invalidateObject()也是必不可少的。對象池剛剛創建的時候,我們可能需要預熱一部分對象,而不是采用懶加載模式以避免系統啟動時候的抖動,因此addObject()提供給用戶,以進行對象池的預熱。有創建就有銷毀,clear()和close()就是用來清空對象池(覺得叫purge()可能更好一點)。除此之外,我們可能還需要一些簡單的統計,比如getNumIdle()獲得空閑對象個數和getNumActive()獲得活動對象(被借出對象)的個數。如下表:


方法名 作用
borrowObject() 從池中借對象
returnObject() 還回池中
invalidateObject() 失效一個對象
addObject() 池中增加一個對象
clear() 清空對象池
close() 關閉對象池
getNumIdle() 獲得空閑對象數量
getNumActive() 獲得被借出對象數量
2)在commons-pool中有兩類對象池接口(帶key和不帶key),一個是ObjectPool,另一個是KeyedObjectPool;此外,為了方便他們分別還對應了ObjectPoolFactory、KeyedObjectPoolFactory兩個接口(這兩個接口在功能上和他們都一樣,只是使用形式上不一樣)
3)對象池空間划分:

一個對象存儲到對象池中,其位置不是一成不變的。空間的划分可以分為兩種,一種是物理空間划分,一種是邏輯空間划分。不同的實現可能采用不同的技術手段,Commons Pool實際上采用了邏輯划分。如下圖所示:

 

從整體上來講,可以將空間分為池外空間和池內空間,池外空間是指被’出借’的對象所在的空間(邏輯空間)。池內空間進一步可以划分為idle空間,abandon空間和invalid空間。idle空間就是空閑對象所在的空間,空閑對象之間是有一定的組織結構的(詳見后文)。abandon空間又被稱作放逐空間,用於放逐被出借超時的對象。invalid空間其實就是對象的垃圾場,這些對象將不會在被使用,而是等待被gc處理掉。

4)對象池的放逐與驅逐策略:

下面我們會多次提到驅逐(eviction)和放逐(abandon),這兩個概念是對象池設計的核心。

       先來看驅逐,我們知道對象池的一個重要的特性就是伸縮性,所謂伸縮性是指對象池能夠根據當前池中空閑對象的數量(maxIdle和minIdle配置)自動進行調整,進而避免內存的浪費。自動伸縮,這是驅逐所需要達到的目標,他是如何實現的呢?實際上在對象池內部,我們可以維護一個驅逐定時器(EvictionTimer),由timeBetweenEvictionRunsMillis參數對定時器的間隔加以控制,每次達到驅逐時間后,我們就選定一批對象(由numTestsPerEvictionRun參數進行控制)進行驅逐測試,這個測試可以采用策略模式,比如Commons Pool的DefaultEvictionPolicy,代碼如下:

@Override
public boolean evict(EvictionConfig config, PooledObject<T> underTest,
int idleCount) {
if ((config.getIdleSoftEvictTime() < underTest.getIdleTimeMillis() &&
config.getMinIdle() < idleCount) ||
config.getIdleEvictTime() < underTest.getIdleTimeMillis()) {
return true;
}
return false;
}

 


對於符合驅逐條件的對象,將會被對象池無情的驅逐出空閑空間,並丟棄到invalid空間。之后對象池還需要保證內部空閑對象數量需要至少達到minIdle的控制要求。
       我們在看來放逐,對象出借時間太長(由removeAbandonedTimeout控制),我們就把他們稱作流浪對象,這些對象很有可能是那些用完不還的壞蛋們的傑作,也有可能是對象使用者出現了什么突發狀況,比如網絡連接超時時間設置長於放逐時間。總之,被放逐的對象是不允許再次回歸到對象池中的,他們會被擱置到abandon空間,進而進入invalid空間再被gc掉以完成他們的使命。放逐由removeAbandoned()方法實現,分為標記過程和放逐過程,代碼實現並不難,有興趣的可以直接翻翻源代碼。

驅逐是由內而外將對象驅逐出境,放逐則是由外而內,將對象流放。他們一內一外,正是整個對象池形成閉環的核心要素。

5)對象池有效性探測:

用過數據庫連接池的同學可能對類似testOnBorrow的配置比較熟悉。除了testOnBorrow,對象池還提供了testOnCreate, testOnReturn, testWhileIdle,其中testWhileIdle是當對象處於空閑狀態的時候所進行的測試,當測試通過則繼續留在對象池中,如果失效,則棄置到invalid空間。所謂testOnBorrow其實就是當對象出借前進行測試,測試什么?當然是有效性測試,在測試之前我們需要調用factory.activateObject()以激活對象,在調用factory.validateObject(p)對准備出借的對象做有有效性檢查,如果這個對象無效則可能有拋出異常的行為,或者返回空對象,這全看具體實現了。testOnCreate表示當對象創建之后,再進行有效性測試,這並不適用於頻繁創建和銷毀對象的對象池,他與testOnBorrow的行為類似。testOnReturn是在對象還回到池子之前鎖進行的測試,與出借的測試不同,testOnReturn無論是測試成功還是失敗,我們都需要保證池子中的對象數量是符合配置要求的()ensureIdle()方法就是做這個事情),並且如果測試失敗了,我們可以直接swallow這個異常,因為用戶根本不需要關心池子的狀態。

6)對象池的常見配置一覽:


配置參數 意義 默認值
maxTotal 對象總數 8
maxIdle 最大空閑對象數 8
minIdle 最小空閑對象書 0
lifo 對象池借還是否采用lifo true
fairness 對於借對象的線程阻塞恢復公平性 false
maxWaitMillis 借對象阻塞最大等待時間 -1
minEvictableIdleTimeMillis 最小驅逐空閑時間 30分鍾
numTestsPerEvictionRun 每次驅逐數量 3
testOnCreate 創建后有效性測試 false
testOnBorrow 出借前有效性測試 false
testOnReturn 還回前有效性測試 false
testWhileIdle 空閑有效性測試 false
timeBetweenEvictionRunsMillis 驅逐定時器周期 false
blockWhenExhausted 對象池耗盡是否block true
2、池化對象:

1)池化對象接口:(被池化的對象需要實現該接口)

池化對象就是對象池中所管理的基本單元。我們可以思考一下,如果直接將我們的原始對象放到對象池中是否可以?答案當然是可以,但是不好,因為如果那樣做,我們的對象池就退化成了容器Collection了,之所以需要將原始對象wrapper成池對象,是因為我們需要提供額外的管理功能,比如生命周期管理。commons pool采用了PooledObject<T>接口和KeyedPooledObject<T>接口用於表達池對象,它主要抽象了池對象的狀態管理和一些諸如狀態變遷時所產生的統計指標,這些指標可以配合對象池做更精准的管理操作。

2)池化對象狀態:

說到對池對象的管理,最重要的當屬對狀態的管理。對於狀態管理,我們熟知的模型就是狀態機模型了。池對象當然也有一套自己的狀態機,我們先來看看commons pool所定義的池對象都有哪些狀態:


狀態 解釋
IDLE 空閑狀態
ALLOCATED 已出借狀態
EVICTION 正在進行驅逐測試
EVICTION_RETURN_TO_HEAD 驅逐測試通過對象放回到頭部
VALIDATION 空閑校驗中
VALIDATION_PREALLOCATED 出借前校驗中
VALIDATION_RETURN_TO_HEAD 校驗通過后放回頭部
INVALID 無效對象
ABANDONED 放逐中
RETURNING 換回對象池中
這里只需知道:放逐(ABANDONED)指的是不在對象池中的對象超時流放;驅逐(EVICTION)指的是空閑對象超時銷毀;VALIDATION是有效性校驗,主要校驗空閑對象的有效性。注意與驅逐和放逐之間的區別。我們通過一張圖來看看狀態之間的變遷。


我們看到上圖的’圓圈’表示的就是池對象,其中中間的英文簡寫是其對應的狀態。虛線外框則表示瞬時狀態。比如RETURNING和ABANDONED。這里我們省略了VALIDATION_RETURN_TO_HEAD,VALIDATION_PREALLOCATED,EVICTION_RETURN_TO_HEAD,因為這對於我們理解池對象狀態變遷並沒有太多幫助。針對上圖,我們重點關注四個方面:

IDLE->ALLOCATED 即上圖的borrow操作,除了需要將狀態置為已分配,我們還需要考慮如果對象池耗盡了怎么辦?是繼續阻塞還是直接異常退出?如果阻塞是阻塞多久?
ALLOCATED->IDLE 即上圖的return操作,我們需要考慮的是,如果池對象還回到對象池,此時對象池空閑數已經達到上界或該對象已經無效,我們是否需要進行特殊處理?
IDLE->EVICTION 與 ALLOCATED->ABANDONED 請參考后文
IDLE->VALIDATION 是testWhileIdle的有效性測試所需要經歷的狀態變遷,他是指每隔一段時間對池中所有的idle對象進行有效性檢查,以排除那些已經失效的對象。失效的對象將會棄置到invalid空間。
3)池化對象生命周期控制:

只搞清楚了池化對象的狀態和狀態轉移是不夠的,我們還應該能夠對池對象生命周期施加影響。Commons Pool通過PooledObjectFactory<T>接口和KeyedPooledObjectFactory<T>對對象生命周期進行控制。該接口有如下方法:


方法 解釋
makeObject 創建對象
destroyObject 銷毀對象
validateObject 校驗對象
activateObject 重新初始化對象
passivateObject 反初始化對象
我們需要注意,池對象必須經過創建(makeObject())和初始化過程(activateObject())后才能夠被我們使用。我們看一看這些方法能夠影響哪些狀態變遷。


4)池對象組織結構:

池中的對象,並不是雜亂無章的,他們得有一定的組織結構。不同的組織結構可能會從整體影響對象池的使用。Apache Commons提供了兩種組織結構,其一是有界阻塞雙端隊列(LinkedBlockingDeque),其二是key桶。


有界阻塞隊列能夠提供阻塞特性,當池中對象exhausted后,新申請對象的線程將會阻塞,這是典型的生產者/消費者模型,通過這種雙端的阻塞隊列,我們能夠實現池對象的lifo或fifo。如下代碼:

 

if (getLifo()) {
idleObjects.addFirst(p);
} else {
idleObjects.addLast(p);
}

 


因為是帶有阻塞性質的隊列,我們能夠通過fairness參數控制線程獲得鎖的公平性,這里我們可以參考AQS實現,不說了。下面我們再來看一看key桶的數據結構:

 


從上圖我們可以看出,每一個key對應一個的雙端阻塞隊列ObjectDeque,ObjectDeque實際上就是包裝了LinkedBlockingDeque,采用這種結構我們能夠對池對象進行一定的划分,從而更加靈活的使用對象池。Commons Pool采用了KeyedObjectPool<K,V>用以表示采用這種數據結構的對象池。當我們borrow和return的時候,都需要指定對應的key空間。

參考:探索對象池技術

對象池源碼:https://github.com/apache/commons-pool

對象池wiki:https://commons.apache.org/proper/commons-pool/
參考:java對象池commons-pool-1.6詳解(一)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM