commons-pool與commons-pool2連接池(Hadoop連接池)


commons-pool和commons-pool2是用來建立對象池的框架,提供了一些將對象池化必須要實現的接口和一些默認動作。對象池化之后可以通過pool的概念去管理其生命周期,例如對象的創建,使用,銷毀等。例如我們通常使用的連接池,連接池可以有效管理連接的數量和狀態,保證連接資源的情況而且避免並發場景下連接的頻繁建立和釋放。

我們這里來講述如何使用commons-pool2來池化對象。我們以池化hadoop連接為例。

 

1、先解決依賴

     <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.3</version>
        </dependency>

 

2、如何使用連接池

我們是在spingboot框架中池化hadoop集群連接,先看一下池化之后的效果。

下面是我們池化之后的hadoop集群客戶端。可以看到我們可以通過連接池的方式管理hadoo集群的鏈接。

1)配置連接池

最大連接數maxTotal

最大空閑連接數maxIdle

最小空閑連接數minIdle

獲取連接的最大等待時間maxWait

可以看到傳入這些配置的時候我們使用了一個config對象JHadoopPoolConfig,后面我們將說明這個config對象如何實現。

2)管理連接池

我們以三個函數說明了如何去連接池中申請連接,使用連接和釋放鏈接資源。

申請資源pool.getResource()

釋放資源pool.returnBrokenResource()和pool.returnResource()

這里要注意的是,一定要在catch和finally中成功釋放資源,不然會導致could not get a Resource from the Pool的異常

package com.xiaoju.dqa.jazz.hadoop.client;

import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.util.Date;


public class JHadoopClient {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    private JHadoopPool jHadoopPool;
    private String coreResource;
    private String hdfsResource;
    private int maxTotal;
    private int maxIdle;
    private int minIdle;
    private int maxWaitMillis;

    public String getCoreResource() {
        return coreResource;
    }
    public void setCoreResource(String coreResource) {
        this.coreResource = coreResource;
    }
    public String getHdfsResource() {
        return hdfsResource;
    }
    public void setHdfsResource(String hdfsResource) {
        this.hdfsResource = hdfsResource;
    }
    public int getMaxTotal() {
        return maxTotal;
    }
    public void setMaxTotal(int maxTotal) {
        this.maxTotal = maxTotal;
    }
    public int getMaxIdle() {
        return maxIdle;
    }
    public void setMaxIdle(int maxIdle) {
        this.maxIdle = maxIdle;
    }
    public int getMaxWaitMillis() {
        return maxWaitMillis;
    }
    public void setMaxWaitMillis(int maxWaitMillis) {
        this.maxWaitMillis = maxWaitMillis;
    }
    public int getMinIdle() {
        return minIdle;
    }
    public void setMinIdle(int minIdle) {
        this.minIdle = minIdle;
    }

    public void init() {
        try {
            JHadoopPoolConfig conf = new JHadoopPoolConfig();
            conf.setMaxTotal(maxTotal);
            conf.setMaxIdle(maxIdle);
            conf.setMinIdle(minIdle);
            conf.setMaxWaitMillis(maxWaitMillis);
            jHadoopPool = new JHadoopPool(conf, coreResource, hdfsResource);
            logger.info("[HDFS]初始化JHadoopClient成功");
        } catch (Exception ex) {
            logger.error("[HDFS]初始化JHadoopClient失敗", ex);
        }
    }

    public void stop() {
        try {
            jHadoopPool.destroy();
        } catch(Exception e) {
        }
    }

    public boolean exists(String path) throws Exception {
        JHadoop jHadoop = null;
        boolean broken = false;
        try {
            jHadoop = jHadoopPool.getResource();
            return jHadoop.exists(path);
        } catch (Exception e) {
            broken = true;
            jHadoopPool.returnBrokenResource(jHadoop);
            logger.error("[HDFS]判斷文件是否存在失敗", e);
            throw e;
        } finally {
            if (null != jHadoop && !broken) {
                jHadoopPool.returnResource(jHadoop);
            }
        }
    }

    public String getModificationTime(String path) throws Exception {
        JHadoop jHadoop = null;
        boolean broken = false;
        try {
            jHadoop = jHadoopPool.getResource();
            FileStatus fileStatus = jHadoop.getFileStatus(path);
            long modifyTimestamp = fileStatus.getModificationTime();
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
            Date date = new Date(modifyTimestamp);
            return simpleDateFormat.format(date);
        } catch (Exception e) {
            broken = true;
            jHadoopPool.returnBrokenResource(jHadoop);
            logger.error("[HDFS]獲取最近修改時間失敗", e);
            throw e;
        } finally {
            if (null != jHadoop && !broken) {
                jHadoopPool.returnResource(jHadoop);
            }
        }
    }

    public long getPathSize(String path) throws Exception {
        JHadoop jHadoop = null;
        boolean broken = false;
        try {
            jHadoop = jHadoopPool.getResource();
            return jHadoop.getContentSummary(path).getLength();
        } catch (Exception e) {
            broken = true;
            jHadoopPool.returnBrokenResource(jHadoop);
            logger.error("[HDFS]獲取路徑大小失敗", e);
            throw e;
        } finally {
            if (null != jHadoop && !broken) {
                jHadoopPool.returnResource(jHadoop);
            }
        }
    }

}

 

3)注冊成bean

通過配置文件傳入鏈接池相應的配置。

package com.xiaoju.dqa.jazz.hadoop.configuration;

import com.xiaoju.dqa.jazz.hadoop.client.JHadoopClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class HadoopConfig {

    @Value("${hadoop.core.resource}")
    private String coreResource;
    @Value("${hadoop.hdfs.resource}")
    private String hdfsResource;
    @Value("${hadoop.pool.maxTotal}")
    private int maxTotal;
    @Value("${hadoop.pool.maxIdle}")
    private int maxIdle;
    @Value("${hadoop.pool.minIdle}")
    private int minIdle;
    @Value("${hadoop.pool.maxWaitMillis}")
    private int maxWaitMillis;

    @Bean(initMethod = "init", destroyMethod = "stop")
    public JHadoopClient jHadoopClient() {
        JHadoopClient jHadoopClient = new JHadoopClient();
        jHadoopClient.setMaxTotal(maxTotal);
        jHadoopClient.setMaxIdle(maxIdle);
        jHadoopClient.setMinIdle(minIdle);
        jHadoopClient.setMaxWaitMillis(maxWaitMillis);
        jHadoopClient.setCoreResource(coreResource);
        jHadoopClient.setHdfsResource(hdfsResource);
        return jHadoopClient;
    }
}

 

 

4)config對象如何實現

我們這里要說明一下下面這些參數的含義:

1)setTestWhileConfig - 在空閑時檢查有效性, 默認false

2)setMinEvictableIdleTimeMillis - 逐出連接的最小空閑時間

3)setTimeBetweenEvictionRunsMillis - 逐出掃描的時間間隔(毫秒) 如果為負數則不運行逐出線程,默認-1

4) setNumTestsPerEvictionRun -  每次逐出檢查時 逐出的最大數目
package com.xiaoju.dqa.jazz.hadoop.client;


import org.apache.commons.pool2.impl.GenericObjectPoolConfig;


public class JHadoopPoolConfig extends GenericObjectPoolConfig {
    public JHadoopPoolConfig() {
        this.setTestWhileIdle(true);
        this.setMinEvictableIdleTimeMillis(60000L);
        this.setTimeBetweenEvictionRunsMillis(30000L);
        this.setNumTestsPerEvictionRun(-1);
    }
}

 

 

 

3、連接池JHadoopPool

這個類繼承了Pool<JHadoop>,用來初始化連接池對象。而JHadoop是Pool要管理的連接對象。

可以看到JHadoopPool在初始化的時候傳入了一個JHadoopFactory的實例。這個實例將會以工廠模式來創建實際的JHadoop

JHadoopPool代碼

package com.xiaoju.dqa.jazz.hadoop.client;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

public class JHadoopPool extends Pool<JHadoop> {
    public JHadoopPool(GenericObjectPoolConfig poolConfig, String coreResource, String hdfsResource) {
        super(poolConfig, new JHadoopFactory(coreResource, hdfsResource));
    }

    public JHadoopPool(GenericObjectPoolConfig poolConfig) {
        super(poolConfig, new JHadoopFactory());
    }

}

 

JHadoop代碼

JHadoop實現了hadoop.fs中的方法調用。

我這里只給出了幾個函數的簡單封裝,你可以根據具體的需要進行增加。

package com.xiaoju.dqa.jazz.hadoop.client;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;


public class JHadoop {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    private FileSystem fs;
    private String coreResource;
    private String hdfsResource;

    public JHadoop(String coreResource, String hdfsResource) {
        this.coreResource = coreResource;
        this.hdfsResource = hdfsResource;
    }

    public String getCoreResource() {
        return coreResource;
    }
    public void setCoreResource(String coreResource) {
        this.coreResource = coreResource;
    }
    public String getHdfsResource() {
        return hdfsResource;
    }
    public void setHdfsResource(String hdfsResource) {
        this.hdfsResource = hdfsResource;
    }


    public void open() {
        try {
            Configuration conf = new Configuration();
            conf.addResource(coreResource);
            conf.addResource(hdfsResource);
            fs = FileSystem.get(conf);
            logger.info("[JHadoop]創建實例成功");
        } catch (Exception e) {
            logger.error("[JHadoop]創建實例失敗", e);
        }
    }

    public void close() {
        try {
            if (null != fs) {
                fs.close();
                logger.info("[JHadoop]關閉實例成功");
            }
        } catch(Exception e) {
            logger.error("[JHadoop]關閉實例失敗", e);
        }
    }

    public boolean isConnected() throws IOException {
        fs.exists(new Path("/forTest"));
        return true;
    }

    public boolean exists(String path) throws IOException {
        Path hdfsPath = new Path(path);
        return fs.exists(hdfsPath);
    }

    public FileStatus getFileStatus(String path) throws IOException {
        Path hdfsPath = new Path(path);
        return fs.getFileStatus(hdfsPath);
    }

    public ContentSummary getContentSummary(String path) throws IOException {
        ContentSummary contentSummary = null;
        Path hdfsPath = new Path(path);
        if (fs.exists(hdfsPath)) {
            contentSummary = fs.getContentSummary(hdfsPath);
        }
        return contentSummary;
    }

}

 

 

4、連接工廠類JHadoopFactory

JHadoopFactory這個類管理着連接對象的創建,銷毀,驗證等動作

package com.xiaoju.dqa.jazz.hadoop.client;

import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;


public class JHadoopFactory implements PooledObjectFactory<JHadoop> {
    private final String coreResource;
    private final String hdfsResource;

    public JHadoopFactory() {
        this.coreResource = "core-site.xml";
        this.hdfsResource = "hdfs-site.xml";
    }
    public JHadoopFactory(String coreResource, String hdfsResource) {
        this.coreResource = coreResource;
        this.hdfsResource = hdfsResource;
    }

    @Override
    public PooledObject<JHadoop> makeObject() throws Exception {
        JHadoop jHadoop = new JHadoop(coreResource, hdfsResource);
        jHadoop.open();
        return new DefaultPooledObject<JHadoop>(jHadoop);
    }

    @Override
    public void destroyObject(PooledObject<JHadoop> pooledJHadoop) throws Exception {
        JHadoop jHadoop = pooledJHadoop.getObject();
        jHadoop.close();
    }

    @Override
    public boolean validateObject(PooledObject<JHadoop> pooledJHadoop) {
        JHadoop jHadoop = pooledJHadoop.getObject();
        try {
            return jHadoop.isConnected();
        } catch (Exception e) {
            return false;
        }
    }

    @Override
    public void activateObject(PooledObject<JHadoop> pooledObject) throws Exception {

    }

    @Override
    public void passivateObject(PooledObject<JHadoop> pooledObject) throws Exception {

    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM