commons-pool和commons-pool2是用來建立對象池的框架,提供了一些將對象池化必須要實現的接口和一些默認動作。對象池化之后可以通過pool的概念去管理其生命周期,例如對象的創建,使用,銷毀等。例如我們通常使用的連接池,連接池可以有效管理連接的數量和狀態,保證連接資源的情況而且避免並發場景下連接的頻繁建立和釋放。
我們這里來講述如何使用commons-pool2來池化對象。我們以池化hadoop連接為例。
1、先解決依賴
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.3</version> </dependency>
2、如何使用連接池
我們是在spingboot框架中池化hadoop集群連接,先看一下池化之后的效果。
下面是我們池化之后的hadoop集群客戶端。可以看到我們可以通過連接池的方式管理hadoo集群的鏈接。
1)配置連接池
最大連接數maxTotal
最大空閑連接數maxIdle
最小空閑連接數minIdle
獲取連接的最大等待時間maxWait
可以看到傳入這些配置的時候我們使用了一個config對象JHadoopPoolConfig,后面我們將說明這個config對象如何實現。
2)管理連接池
我們以三個函數說明了如何去連接池中申請連接,使用連接和釋放鏈接資源。
申請資源pool.getResource()
釋放資源pool.returnBrokenResource()和pool.returnResource()
這里要注意的是,一定要在catch和finally中成功釋放資源,不然會導致could not get a Resource from the Pool的異常
package com.xiaoju.dqa.jazz.hadoop.client; import org.apache.hadoop.fs.FileStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Date; public class JHadoopClient { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); private JHadoopPool jHadoopPool; private String coreResource; private String hdfsResource; private int maxTotal; private int maxIdle; private int minIdle; private int maxWaitMillis; public String getCoreResource() { return coreResource; } public void setCoreResource(String coreResource) { this.coreResource = coreResource; } public String getHdfsResource() { return hdfsResource; } public void setHdfsResource(String hdfsResource) { this.hdfsResource = hdfsResource; } public int getMaxTotal() { return maxTotal; } public void setMaxTotal(int maxTotal) { this.maxTotal = maxTotal; } public int getMaxIdle() { return maxIdle; } public void setMaxIdle(int maxIdle) { this.maxIdle = maxIdle; } public int getMaxWaitMillis() { return maxWaitMillis; } public void setMaxWaitMillis(int maxWaitMillis) { this.maxWaitMillis = maxWaitMillis; } public int getMinIdle() { return minIdle; } public void setMinIdle(int minIdle) { this.minIdle = minIdle; } public void init() { try { JHadoopPoolConfig conf = new JHadoopPoolConfig(); conf.setMaxTotal(maxTotal); conf.setMaxIdle(maxIdle); conf.setMinIdle(minIdle); conf.setMaxWaitMillis(maxWaitMillis); jHadoopPool = new JHadoopPool(conf, coreResource, hdfsResource); logger.info("[HDFS]初始化JHadoopClient成功"); } catch (Exception ex) { logger.error("[HDFS]初始化JHadoopClient失敗", ex); } } public void stop() { try { jHadoopPool.destroy(); } catch(Exception e) { } } public boolean exists(String path) throws Exception { JHadoop jHadoop = null; boolean broken = false; try { jHadoop = jHadoopPool.getResource(); return jHadoop.exists(path); } catch (Exception e) { broken = true; jHadoopPool.returnBrokenResource(jHadoop); logger.error("[HDFS]判斷文件是否存在失敗", e); throw e; } finally { if (null != jHadoop && !broken) { jHadoopPool.returnResource(jHadoop); } } } public String getModificationTime(String path) throws Exception { JHadoop jHadoop = null; boolean broken = false; try { jHadoop = jHadoopPool.getResource(); FileStatus fileStatus = jHadoop.getFileStatus(path); long modifyTimestamp = fileStatus.getModificationTime(); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); Date date = new Date(modifyTimestamp); return simpleDateFormat.format(date); } catch (Exception e) { broken = true; jHadoopPool.returnBrokenResource(jHadoop); logger.error("[HDFS]獲取最近修改時間失敗", e); throw e; } finally { if (null != jHadoop && !broken) { jHadoopPool.returnResource(jHadoop); } } } public long getPathSize(String path) throws Exception { JHadoop jHadoop = null; boolean broken = false; try { jHadoop = jHadoopPool.getResource(); return jHadoop.getContentSummary(path).getLength(); } catch (Exception e) { broken = true; jHadoopPool.returnBrokenResource(jHadoop); logger.error("[HDFS]獲取路徑大小失敗", e); throw e; } finally { if (null != jHadoop && !broken) { jHadoopPool.returnResource(jHadoop); } } } }
3)注冊成bean
通過配置文件傳入鏈接池相應的配置。
package com.xiaoju.dqa.jazz.hadoop.configuration; import com.xiaoju.dqa.jazz.hadoop.client.JHadoopClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class HadoopConfig { @Value("${hadoop.core.resource}") private String coreResource; @Value("${hadoop.hdfs.resource}") private String hdfsResource; @Value("${hadoop.pool.maxTotal}") private int maxTotal; @Value("${hadoop.pool.maxIdle}") private int maxIdle; @Value("${hadoop.pool.minIdle}") private int minIdle; @Value("${hadoop.pool.maxWaitMillis}") private int maxWaitMillis; @Bean(initMethod = "init", destroyMethod = "stop") public JHadoopClient jHadoopClient() { JHadoopClient jHadoopClient = new JHadoopClient(); jHadoopClient.setMaxTotal(maxTotal); jHadoopClient.setMaxIdle(maxIdle); jHadoopClient.setMinIdle(minIdle); jHadoopClient.setMaxWaitMillis(maxWaitMillis); jHadoopClient.setCoreResource(coreResource); jHadoopClient.setHdfsResource(hdfsResource); return jHadoopClient; } }
4)config對象如何實現
我們這里要說明一下下面這些參數的含義:
1)setTestWhileConfig - 在空閑時檢查有效性, 默認false
2)setMinEvictableIdleTimeMillis - 逐出連接的最小空閑時間
3)setTimeBetweenEvictionRunsMillis - 逐出掃描的時間間隔(毫秒) 如果為負數則不運行逐出線程,默認-1
4)
setNumTestsPerEvictionRun -
每次逐出檢查時 逐出的最大數目
package com.xiaoju.dqa.jazz.hadoop.client; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; public class JHadoopPoolConfig extends GenericObjectPoolConfig { public JHadoopPoolConfig() { this.setTestWhileIdle(true); this.setMinEvictableIdleTimeMillis(60000L); this.setTimeBetweenEvictionRunsMillis(30000L); this.setNumTestsPerEvictionRun(-1); } }
3、連接池JHadoopPool
這個類繼承了Pool<JHadoop>,用來初始化連接池對象。而JHadoop是Pool要管理的連接對象。
可以看到JHadoopPool在初始化的時候傳入了一個JHadoopFactory的實例。這個實例將會以工廠模式來創建實際的JHadoop
JHadoopPool代碼
package com.xiaoju.dqa.jazz.hadoop.client; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; public class JHadoopPool extends Pool<JHadoop> { public JHadoopPool(GenericObjectPoolConfig poolConfig, String coreResource, String hdfsResource) { super(poolConfig, new JHadoopFactory(coreResource, hdfsResource)); } public JHadoopPool(GenericObjectPoolConfig poolConfig) { super(poolConfig, new JHadoopFactory()); } }
JHadoop代碼
JHadoop實現了hadoop.fs中的方法調用。
我這里只給出了幾個函數的簡單封裝,你可以根據具體的需要進行增加。
package com.xiaoju.dqa.jazz.hadoop.client; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; public class JHadoop { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); private FileSystem fs; private String coreResource; private String hdfsResource; public JHadoop(String coreResource, String hdfsResource) { this.coreResource = coreResource; this.hdfsResource = hdfsResource; } public String getCoreResource() { return coreResource; } public void setCoreResource(String coreResource) { this.coreResource = coreResource; } public String getHdfsResource() { return hdfsResource; } public void setHdfsResource(String hdfsResource) { this.hdfsResource = hdfsResource; } public void open() { try { Configuration conf = new Configuration(); conf.addResource(coreResource); conf.addResource(hdfsResource); fs = FileSystem.get(conf); logger.info("[JHadoop]創建實例成功"); } catch (Exception e) { logger.error("[JHadoop]創建實例失敗", e); } } public void close() { try { if (null != fs) { fs.close(); logger.info("[JHadoop]關閉實例成功"); } } catch(Exception e) { logger.error("[JHadoop]關閉實例失敗", e); } } public boolean isConnected() throws IOException { fs.exists(new Path("/forTest")); return true; } public boolean exists(String path) throws IOException { Path hdfsPath = new Path(path); return fs.exists(hdfsPath); } public FileStatus getFileStatus(String path) throws IOException { Path hdfsPath = new Path(path); return fs.getFileStatus(hdfsPath); } public ContentSummary getContentSummary(String path) throws IOException { ContentSummary contentSummary = null; Path hdfsPath = new Path(path); if (fs.exists(hdfsPath)) { contentSummary = fs.getContentSummary(hdfsPath); } return contentSummary; } }
4、連接工廠類JHadoopFactory
JHadoopFactory這個類管理着連接對象的創建,銷毀,驗證等動作
package com.xiaoju.dqa.jazz.hadoop.client; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.DefaultPooledObject; public class JHadoopFactory implements PooledObjectFactory<JHadoop> { private final String coreResource; private final String hdfsResource; public JHadoopFactory() { this.coreResource = "core-site.xml"; this.hdfsResource = "hdfs-site.xml"; } public JHadoopFactory(String coreResource, String hdfsResource) { this.coreResource = coreResource; this.hdfsResource = hdfsResource; } @Override public PooledObject<JHadoop> makeObject() throws Exception { JHadoop jHadoop = new JHadoop(coreResource, hdfsResource); jHadoop.open(); return new DefaultPooledObject<JHadoop>(jHadoop); } @Override public void destroyObject(PooledObject<JHadoop> pooledJHadoop) throws Exception { JHadoop jHadoop = pooledJHadoop.getObject(); jHadoop.close(); } @Override public boolean validateObject(PooledObject<JHadoop> pooledJHadoop) { JHadoop jHadoop = pooledJHadoop.getObject(); try { return jHadoop.isConnected(); } catch (Exception e) { return false; } } @Override public void activateObject(PooledObject<JHadoop> pooledObject) throws Exception { } @Override public void passivateObject(PooledObject<JHadoop> pooledObject) throws Exception { } }