池化技術簡介
在我們使用數據庫的過程中,我們往往使用數據庫連接池而不是直接使用數據庫連接進行操作,這是因為每一個數據庫連接的創建和銷毀的代價是昂貴的,而池化技術則預先創建了資源,這些資源是可復用的,這樣就保證了在多用戶情況下只能使用指定數目的資源,避免了一個用戶創建一個連接資源,造成程序運行開銷過大。關於Java並發編程的總結和思考
連接池實現原理
這里只實現一個簡易的連接池,更多復雜的需求可根據該連接池進行改進,該連接池主要參數如下:
- 一個繁忙隊列busy
- 一個空閑隊列idle
- 連接池最大活動連接數maxActive
- 連接池最大等待時間maxWait
- 連接池的活動連接數activeSize
程序流程圖如下:
代碼實現
泛型接口ConnectionPool.java
public interface ConnectionPool<T> {
/**
* 初始化池資源
* @param maxActive 池中最大活動連接數
* @param maxWait 最大等待時間
*/
void init(Integer maxActive, Long maxWait);
/**
* 從池中獲取資源
* @return 連接資源
*/
T getResource() throws Exception;
/**
* 釋放連接
* @param connection 正在使用的連接
*/
void release(T connection) throws Exception;
/**
* 釋放連接池資源
*/
void close();
}
以zookeeper為例,實現zookeeper連接池,ZookeeperConnectionPool.java
public class ZookeeperConnectionPool implements ConnectionPool<ZooKeeper> {
//最大活動連接數
private Integer maxActive;
//最大等待時間
private Long maxWait;
//空閑隊列
private LinkedBlockingQueue<ZooKeeper> idle = new LinkedBlockingQueue<>();
//繁忙隊列
private LinkedBlockingQueue<ZooKeeper> busy = new LinkedBlockingQueue<>();
//連接池活動連接數
private AtomicInteger activeSize = new AtomicInteger(0);
//連接池關閉標記
private AtomicBoolean isClosed = new AtomicBoolean(false);
//總共獲取的連接記數
private AtomicInteger createCount = new AtomicInteger(0);
//等待zookeeper客戶端創建完成的計數器
private static ThreadLocal<CountDownLatch> latchThreadLocal = ThreadLocal.withInitial(() -> new CountDownLatch(1));
public ZookeeperConnectionPool(Integer maxActive, Long maxWait) {
this.init(maxActive, maxWait);
}
@Override
public void init(Integer maxActive, Long maxWait) {
this.maxActive = maxActive;
this.maxWait = maxWait;
}
@Override
public ZooKeeper getResource() throws Exception {
ZooKeeper zooKeeper;
Long nowTime = System.currentTimeMillis();
final CountDownLatch countDownLatch = latchThreadLocal.get();
//空閑隊列idle是否有連接
if ((zooKeeper = idle.poll()) == null) {
//判斷池中連接數是否小於maxActive
if (activeSize.get() < maxActive) {
//先增加池中連接數后判斷是否小於等於maxActive
if (activeSize.incrementAndGet() <= maxActive) {
//創建zookeeper連接
zooKeeper = new ZooKeeper("localhost", 5000, (watch) -> {
if (watch.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
});
countDownLatch.await();
System.out.println("Thread:" + Thread.currentThread().getId() + "獲取連接:" + createCount.incrementAndGet() + "條");
busy.offer(zooKeeper);
return zooKeeper;
} else {
//如增加后發現大於maxActive則減去增加的
activeSize.decrementAndGet();
}
}
//若活動線程已滿則等待busy隊列釋放連接
try {
System.out.println("Thread:" + Thread.currentThread().getId() + "等待獲取空閑資源");
Long waitTime = maxWait - (System.currentTimeMillis() - nowTime);
zooKeeper = idle.poll(waitTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new Exception("等待異常");
}
//判斷是否超時
if (zooKeeper != null) {
System.out.println("Thread:" + Thread.currentThread().getId() + "獲取連接:" + createCount.incrementAndGet() + "條");
busy.offer(zooKeeper);
return zooKeeper;
} else {
System.out.println("Thread:" + Thread.currentThread().getId() + "獲取連接超時,請重試!");
throw new Exception("Thread:" + Thread.currentThread().getId() + "獲取連接超時,請重試!");
}
}
//空閑隊列有連接,直接返回
busy.offer(zooKeeper);
return zooKeeper;
}
@Override
public void release(ZooKeeper connection) throws Exception {
if (connection == null) {
System.out.println("connection 為空");
return;
}
if (busy.remove(connection)){
idle.offer(connection);
} else {
activeSize.decrementAndGet();
throw new Exception("釋放失敗");
}
}
@Override
public void close() {
if (isClosed.compareAndSet(false, true)) {
idle.forEach((zooKeeper) -> {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
busy.forEach((zooKeeper) -> {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
測試用例
這里創建20個線程並發測試連接池,Test.java
public class Test {
public static void main(String[] args) throws Exception {
int threadCount = 20;
Integer maxActive = 10;
Long maxWait = 10000L;
ZookeeperConnectionPool pool = new ZookeeperConnectionPool(maxActive, maxWait);
CountDownLatch countDownLatch = new CountDownLatch(20);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
countDownLatch.countDown();
try {
countDownLatch.await();
ZooKeeper zooKeeper = pool.getResource();
Thread.sleep(2000);
pool.release(zooKeeper);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
while (true){
}
}
}