接前一篇rpc框架之HA/負載均衡構架設計 繼續,寫了一個簡單的thrift 連接池:
先做點准備工作:
package yjmyzz;
public class ServerInfo {
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
private String host;
private int port;
public ServerInfo(String host, int port) {
this.host = host;
this.port = port;
}
public String toString() {
return "host:" + host + ",port:" + port;
}
}
上面這個類,用來封裝服務端的基本信息,主機名+端口號,連接時需要用到。
package yjmyzz;
import org.apache.thrift.transport.TTransport;
import java.text.SimpleDateFormat;
import java.util.Date;
public class TransfortWrapper {
private TTransport transport;
/**
* 是否正忙
*/
private boolean isBusy = false;
/**
* 是否已經掛
*/
private boolean isDead = false;
/**
* 最后使用時間
*/
private Date lastUseTime;
/**
* 服務端Server主機名或IP
*/
private String host;
/**
* 服務端Port
*/
private int port;
public TransfortWrapper(TTransport transport, String host, int port, boolean isOpen) {
this.lastUseTime = new Date();
this.transport = transport;
this.host = host;
this.port = port;
if (isOpen) {
try {
transport.open();
} catch (Exception e) {
//e.printStackTrace();
System.err.println(host + ":" + port + " " + e.getMessage());
isDead = true;
}
}
}
public TransfortWrapper(TTransport transport, String host, int port) {
this(transport, host, port, false);
}
public boolean isBusy() {
return isBusy;
}
public void setIsBusy(boolean isBusy) {
this.isBusy = isBusy;
}
public boolean isDead() {
return isDead;
}
public void setIsDead(boolean isDead) {
this.isDead = isDead;
}
public TTransport getTransport() {
return transport;
}
public void setTransport(TTransport transport) {
this.transport = transport;
}
/**
* 當前transport是否可用
*
* @return
*/
public boolean isAvailable() {
return !isBusy && !isDead && transport.isOpen();
}
public Date getLastUseTime() {
return lastUseTime;
}
public void setLastUseTime(Date lastUseTime) {
this.lastUseTime = lastUseTime;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
public String toString() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return "hashCode:" + hashCode() + "," +
host + ":" + port + ",isBusy:" + isBusy + ",isDead:" + isDead + ",isOpen:" +
transport.isOpen() + ",isAvailable:" + isAvailable() + ",lastUseTime:" + format.format(lastUseTime);
}
}
這是對TTransport的封裝,主要增加了一些輔助信息,直接看代碼注釋即可。
下面才是連接池的主要內容:
package yjmyzz;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Thrift連接池
*
* @author : 菩提樹下的楊過(http://yjmyzz.cnblogs.com/)
* @version : 0.1 BETA
* @since : 2015-09-27(中秋)
*/
public class ThriftTransportPool {
Semaphore access = null;
TransfortWrapper[] pool = null;
int poolSize = 1;//連接池大小
int minSize = 1;//池中保持激活狀態的最少連接個數
int maxIdleSecond = 300;//最大空閑時間(秒),超過該時間的空閑時間的連接將被關閉
int checkInvervalSecond = 60;//每隔多少秒,檢測一次空閑連接(默認60秒)
List<ServerInfo> serverInfos;
boolean allowCheck = true;
Thread checkTread = null;
public int getCheckInvervalSecond() {
return checkInvervalSecond;
}
public void setCheckInvervalSecond(int checkInvervalSecond) {
this.checkInvervalSecond = checkInvervalSecond;
}
/**
* 連接池構造函數
*
* @param poolSize 連接池大小
* @param minSize 池中保持激活的最少連接數
* @param maxIdleSecond 單個連接最大空閑時間,超過此值的連接將被斷開
* @param checkInvervalSecond 每隔多少秒檢查一次空閑連接
* @param serverList 服務器列表
*/
public ThriftTransportPool(int poolSize, int minSize, int maxIdleSecond, int checkInvervalSecond, List<ServerInfo> serverList) {
if (poolSize <= 0) {
poolSize = 1;
}
if (minSize > poolSize) {
minSize = poolSize;
}
if (minSize <= 0) {
minSize = 0;
}
this.maxIdleSecond = maxIdleSecond;
this.minSize = minSize;
this.poolSize = poolSize;
this.serverInfos = serverList;
this.allowCheck = true;
this.checkInvervalSecond = checkInvervalSecond;
init();
check();
}
/**
* 連接池構造函數(默認最大空閑時間300秒)
*
* @param poolSize 連接池大小
* @param minSize 池中保持激活的最少連接數
* @param serverList 服務器列表
*/
public ThriftTransportPool(int poolSize, int minSize, List<ServerInfo> serverList) {
this(poolSize, minSize, 300, 60, serverList);
}
public ThriftTransportPool(int poolSize, List<ServerInfo> serverList) {
this(poolSize, 1, 300, 60, serverList);
}
public ThriftTransportPool(List<ServerInfo> serverList) {
this(serverList.size(), 1, 300, 60, serverList);
}
/**
* 檢查空閑連接
*/
private void check() {
checkTread =
new Thread(new Runnable() {
public void run() {
while (allowCheck) {
//System.out.println("--------------");
System.out.println("開始檢測空閑連接...");
for (int i = 0; i < pool.length; i++) {
//if (pool[i] == null) {
// System.out.println("pool[" + i + "]為null");
//}
//if (pool[i].getTransport() == null) {
// System.out.println("pool[" + i + "].getTransport()為null");
//}
if (pool[i].isAvailable() && pool[i].getLastUseTime() != null) {
long idleTime = new Date().getTime() - pool[i].getLastUseTime().getTime();
//超過空閑閥值的連接,主動斷開,以減少資源消耗
if (idleTime > maxIdleSecond * 1000) {
if (getActiveCount() > minSize) {
pool[i].getTransport().close();
pool[i].setIsBusy(false);
System.out.println(pool[i].hashCode() + "," + pool[i].getHost() + ":" + pool[i].getPort() + " 超過空閑時間閥值被斷開!");
}
}
}
}
System.out.println("當前活動連接數:" + getActiveCount());
try {
Thread.sleep(checkInvervalSecond * 1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
checkTread.start();
}
/**
* 連接池初始化
*/
private void init() {
access = new Semaphore(poolSize);
pool = new TransfortWrapper[poolSize];
for (int i = 0; i < pool.length; i++) {
int j = i % serverInfos.size();
TSocket socket = new TSocket(serverInfos.get(j).getHost(),
serverInfos.get(j).getPort());
if (i < minSize) {
pool[i] = new TransfortWrapper(socket, serverInfos.get(j).getHost(), serverInfos.get(j).getPort(), true);
} else {
pool[i] = new TransfortWrapper(socket, serverInfos.get(j).getHost(), serverInfos.get(j).getPort());
}
}
}
/**
* 從池中取一個可用連接
* @return
*/
public TTransport get() {
try {
if (access.tryAcquire(3, TimeUnit.SECONDS)) {
synchronized (this) {
for (int i = 0; i < pool.length; i++) {
if (pool[i].isAvailable()) {
pool[i].setIsBusy(true);
pool[i].setLastUseTime(new Date());
return pool[i].getTransport();
}
}
//嘗試激活更多連接
for (int i = 0; i < pool.length; i++) {
if (!pool[i].isBusy() && !pool[i].isDead()
&& !pool[i].getTransport().isOpen()) {
try {
pool[i].getTransport().open();
pool[i].setIsBusy(true);
pool[i].setLastUseTime(new Date());
return pool[i].getTransport();
} catch (Exception e) {
//e.printStackTrace();
System.err.println(pool[i].getHost() + ":" + pool[i].getPort() + " " + e.getMessage());
pool[i].setIsDead(true);
}
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("can not get available client");
}
throw new RuntimeException("all client is too busy");
}
/**
* 客戶端調用完成后,必須手動調用此方法,將TTransport恢復為可用狀態
*
* @param client
*/
public void release(TTransport client) {
boolean released = false;
synchronized (this) {
for (int i = 0; i < pool.length; i++) {
if (client == pool[i].getTransport() && pool[i].isBusy()) {
pool[i].setIsBusy(false);
released = true;
break;
}
}
}
if (released) {
access.release();
}
}
public void destory() {
if (pool != null) {
for (int i = 0; i < pool.length; i++) {
pool[i].getTransport().close();
}
}
allowCheck = false;
checkTread = null;
System.out.print("連接池被銷毀!");
}
/**
* 獲取當前已經激活的連接數
*
* @return
*/
public int getActiveCount() {
int result = 0;
for (int i = 0; i < pool.length; i++) {
if (!pool[i].isDead() && pool[i].getTransport().isOpen()) {
result += 1;
}
}
return result;
}
/**
* 獲取當前繁忙狀態的連接數
*
* @return
*/
public int getBusyCount() {
int result = 0;
for (int i = 0; i < pool.length; i++) {
if (!pool[i].isDead() && pool[i].isBusy()) {
result += 1;
}
}
return result;
}
/**
* 獲取當前已"掛"掉連接數
*
* @return
*/
public int getDeadCount() {
int result = 0;
for (int i = 0; i < pool.length; i++) {
if (pool[i].isDead()) {
result += 1;
}
}
return result;
}
public String toString() {
return "poolsize:" + pool.length +
",minSize:" + minSize +
",maxIdleSecond:" + maxIdleSecond +
",checkInvervalSecond:" + checkInvervalSecond +
",active:" + getActiveCount() +
",busy:" + getBusyCount() +
",dead:" + getDeadCount();
}
public String getWrapperInfo(TTransport client) {
for (int i = 0; i < pool.length; i++) {
if (pool[i].getTransport() == client) {
return pool[i].toString();
}
}
return "";
}
}
主要思路:
1.構造器里,傳入 連接池大小,最小連接數,連接最大空閑時間,空間連接檢測時間間隔,服務端列表等基本信息
2.然后調用init方法進行初始化,初始化時把pool[]數組填滿,不過在填充的時候,要根據minsize決定激活多少連接(換句話講,連接實例都都建好了,只是連不連的問題),另外初始化的時候,還要考慮到某個服務器宕機的可能,如果服務端掛了,將對應的實例設置為isDead=true的狀態
3.新開一個線程定時檢查是否有空閑連接,如果空閑時間太長,主動斷開,以節省開銷。
4.get()方法從數組中撈一個可用的連接出來,取的時候要考慮到喚醒"沉睡"連接的情況,即如果當前池中只有2個活動連接,這時又來了請求,沒有活動連接了,要從池中把斷開的連接叫醒一個。
5.要控制並發控制,多個線程同時調用get()想從池中取可用連接時,可用Semaphore+Lock的機制來加以控制,可參考上一篇內容。
測試:
package yjmyzz;
import org.apache.thrift.transport.TSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class PoolTest {
public static void main(String[] args) throws Exception {
//初始化一個連接池(poolsize=15,minsize=1,maxIdleSecond=5,checkInvervalSecond=10)
final ThriftTransportPool pool = new ThriftTransportPool(15, 1, 5, 10, getServers());
//模擬客戶端調用
createClients(pool);
//等候清理空閑連接
Thread.sleep(30000);
//再模擬一批客戶端,驗證連接是否會重新增加
createClients(pool);
System.out.println("輸入任意鍵退出...");
System.in.read();
//銷毀連接池
pool.destory();
}
private static void createClients(final ThriftTransportPool pool) throws Exception {
//模擬5個client端
int clientCount = 5;
Thread thread[] = new Thread[clientCount];
FutureTask<String> task[] = new FutureTask[clientCount];
for (int i = 0; i < clientCount; i++) {
task[i] = new FutureTask<String>(new Callable<String>() {
public String call() throws Exception {
TSocket scoket = (TSocket) pool.get();//從池中取一個可用連接
//模擬調用RPC會持續一段時間
System.out.println(Thread.currentThread().getName() + " => " + pool.getWrapperInfo(scoket));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.release(scoket);//記得每次用完,要將連接釋放(恢復可用狀態)
return Thread.currentThread().getName() + " done.";
}
});
thread[i] = new Thread(task[i], "Thread" + i);
}
//啟用所有client線程
for (int i = 0; i < clientCount; i++) {
thread[i].start();
Thread.sleep(10);
}
System.out.println("--------------");
//等待所有client調用完成
for (int i = 0; i < clientCount; i++) {
System.out.println(task[i].get());
System.out.println(pool);
System.out.println("--------------");
thread[i] = null;
}
}
private static List<ServerInfo> getServers() {
List<ServerInfo> servers = new ArrayList<ServerInfo>();
servers.add(new ServerInfo("localhost", 9000));
servers.add(new ServerInfo("localhost", 9001));
servers.add(new ServerInfo("localhost", 1002));//這一個故意寫錯的,模擬服務器掛了,連接不上的情景
return servers;
}
}
輸出:
****************************
開始檢測空閑連接...
當前活動連接數:1
Thread1 => hashCode:919192718,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:09:59
Thread0 => hashCode:1510835162,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:09:59
localhost:1002 java.net.ConnectException: Connection refused
Thread2 => hashCode:1466719669,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:00
Thread3 => hashCode:2080503518,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:00
localhost:1002 java.net.ConnectException: Connection refused
Thread4 => hashCode:411724643,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:00
--------------
Thread0 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:3,dead:2
--------------
Thread1 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:3,dead:2
--------------
Thread2 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:2,dead:2
--------------
Thread3 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:1,dead:2
--------------
Thread4 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:0,dead:2
--------------
開始檢測空閑連接...
1510835162,localhost:9000 超過空閑時間閥值被斷開!
919192718,localhost:9001 超過空閑時間閥值被斷開!
1466719669,localhost:9000 超過空閑時間閥值被斷開!
2080503518,localhost:9001 超過空閑時間閥值被斷開!
當前活動連接數:1
開始檢測空閑連接...
當前活動連接數:1
開始檢測空閑連接...
當前活動連接數:1
Thread0 => hashCode:411724643,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
Thread1 => hashCode:1510835162,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
Thread2 => hashCode:919192718,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
Thread3 => hashCode:1466719669,localhost:9000,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
Thread4 => hashCode:2080503518,localhost:9001,isBusy:true,isDead:false,isOpen:true,isAvailable:false,lastUseTime:2015-09-27 16:10:31
--------------
Thread0 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:4,dead:2
--------------
Thread1 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:3,dead:2
--------------
Thread2 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:2,dead:2
--------------
Thread3 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:1,dead:2
--------------
Thread4 done.
poolsize:15,minSize:1,maxIdleSecond:5,checkInvervalSecond:10,active:5,busy:0,dead:2
--------------
輸入任意鍵退出...
q
連接池被銷毀!
***********************
注意上面高亮顏色的部分,2080503518 連接創建后,后來被check方法主動檢測到空閑斷開,然后第二輪調用時,又重新激活。411724643 則幸免於難,一直戰斗到最后。另外由於故意寫錯了一個server地址,池中始終有二個dead的實例。
值得改進的地方:
主要是公平性的問題,在初始化的時候,如果服務器有3台,而指定的連接池大小為4,目前的做法是,用4對3取模,所以第1、4個連接實例都是連接到服務器1,get取可用連接的時候也有類似情況,是按pool數組從前向后遍歷的,撈到第1個可用的連接就完事了,這樣永遠是排在List前面的服務器壓力會大一些,這樣有點不太符合負載"均衡"的本意。
不過,這個問題也很好解決,有一個很簡單有效的技巧,實際應用中,服務器列表是從zk上取回來的,取回來后,先對數組做隨機排序,這樣整體看來下,多個連接池總體的連接分布情況就比較平均了。
