設置ZooKeeper服務器地址列表源碼解析及擴展
ZooKeeper zooKeeper = new ZooKeeper("192.168.109.130:2181",SESSION_TIMEOUT,new ZooKeeperFirstBlood());
在創建zk連接的時候,必須要獲取到zk服務器集群的地址,最簡單的方式是在構造函數中傳入ip:port,ip2:port2,...,ipn:portn的形式,優勢是簡單,劣勢也很明顯,擴展性不強,一旦zk集群發生變動,整個就gg了。
本文主要先分析zk是如何去解析服務器地址,然后會給去使用動態獲取zk服務器集群的方式。
zk最簡單的解析
zk在構造函數對傳入的connectString進行解析構造了 ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
對象,ConnectStringParser是一個服務器地址列表解析器,主要作用是解析根目錄和獲取zk server地址
-
定義根目錄
相當於Linux的chroot命令,如果客戶端設置了這個屬性,那么所有的操作都會被限制在該命名空間下。具體使用方式:
new Zookeeper("192.168.0.1:2181,192.168.0.2:2181/zktest/wpr")
-
解析zk地址
String hostsList[] = connectString.split(","); for (String host : hostsList) { int port = DEFAULT_PORT; int pidx = host.lastIndexOf(':'); if (pidx >= 0) { // otherwise : is at the end of the string, ignore if (pidx < host.length() - 1) { port = Integer.parseInt(host.substring(pidx + 1)); } host = host.substring(0, pidx); } serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
-
生成地址列表管理器HostProvider
在ConnectStringParser解析器中將字符串封裝為List
對象,經過處理后地址列表會被進一步封裝到StaticHostPorvider類中。
/**
* The next host to try to connect to.
*
* For a spinDelay of 0 there should be no wait.
*
* 用於返回一個InetSocketAddress地址,這個方法的調用必須返回一個InetSocketAddress,不能為null和報錯
* @param spinDelay
* Milliseconds to wait if all hosts have been tried once.
*/
public InetSocketAddress next(long spinDelay);
/**
* Notify the HostProvider of a successful connection.
*
* The HostProvider may use this notification to reset it's inner state.
*/
public void onConnected();
環形解析很多書已經解釋過了,不是本文的重點
問題:
- zk服務器一旦遷移或者個別機器變更,會導致大批客戶端應用變更。
- 部分情況下,為了增進系統的穩定性和容災特性,需要配置一些特殊的規則,原來的環形解析肯定是不滿足需求的
針對上面,主流的解決方案:地址列表管理器能夠定時從DNS或者一個配置管理中心上解析出Zk服務器值列表,如果這個列表變更了,可以同時更新到serverAddress集合,這樣在下次獲取服務器地址的時候(調用next),可以獲取到最新的服務器地址。
整體結構:
核心代碼
本次主要是從配置中心去獲取zk服務器的地址,在創建ZooKeeper對象的時候,僅僅使用一個HTTP地址,建立一個HTTP的長連接,去這個服務器上獲取zk地址列表,定時去獲取更新地址
public class DynamicHTTPHostProvider implements HostProvider {
/**
* zk服務器列表
*/
private final List<InetSocketAddress> domainAddresses = new ArrayList(5);
public DynamicHTTPHostProvider(String domainURL) {
ADDRESS_SERVER_URL = domainURL;
start();
}
public synchronized void start(){
//這個其實可以重新寫到一個類里面
if(isStart){
log.warn("DynamicHTTPHostProvider already run");
return ;
}
GetServerListTask getServersTask = new GetServerListTask(ADDRESS_SERVER_URL);
for (int i = 0; i < 3 && domainAddresses.isEmpty(); ++i) {
getServersTask.run();
try {
Thread.sleep(100L);
} catch (Exception e) {
}
}
if (domainAddresses.isEmpty()) {
log.error("DynamicHTTPHostProvider-0001|cannnot get zookeeper address");
throw new RuntimeException("fail to get zk-server serverlist! env:" + ADDRESS_SERVER_URL);
}
TimerService.scheduleWithFixedDelay(getServersTask, 0L, 30L, TimeUnit.SECONDS);
isStart = true;
}
在getServerListTask的線程中,去獲取並解析地址
class GetServerListTask implements Runnable {
final String url;
GetServerListTask(String url) {
this.url = url;
}
public void run() {
//獲取服務器地址
List<String> result = getZkServerList();
updateIfChanged(result);
}
private synchronized void updateIfChanged(List<String> result) {
//更新domainAddresses
}
備注
詳細代碼可以查看 https://github.com/wpr7280/zktest