背景
應用需要對兩個集群中的同一目錄下的HDFS文件個數和文件總大小進行比對,在測試環境中發現,即使兩邊HDFS目錄下的數據不一樣,應用日志顯示兩邊始終比對一致,分下下來發現,應用連的一直是同一個集群。大數據集群:CDH6.2.1
定位分析
應用代碼片段
Configuration mainconf = new Configuration();
mainconf.addResource(new Path(main_prefix+"/core-site.xml"));
mainconf.addResource(new Path(main_prefix+"/hdfs-site.xml"));
//Main集群hdfs操作,獲取指定目錄下文件
getHdfsDirectoryTailList("指定目錄",mainConf)
Configuration slaveconf=new Configuration();
slaveconf.addResource(new Path(slave_prefix+"/core-site.xml"));
slaveconf.addResource(new Path(slave_prefix+"/hdfs-site.xml"));
//Slave集群hdfs操作,獲取指定目錄下文件
getHdfsDirectoryTailList("指定目錄",slaveConf)
public static List<String> getHdfsDirectoryTailList(String path,Configuration conf) {
List<String> tailList = new ArrayList<String>();
try {
FileSystem hdfs = FileSystem.get(URI.create(path), conf);
if(!hdfs.exists(new Path(path))){
return tailList;
}
FileStatus[] fs = hdfs.listStatus(new Path(path));
Path[] listPath = FileUtil.stat2Paths(fs);
for (Path p : listPath) {
String[] tailSplit = p.toString().split("\\/");
String tail = tailSplit[tailSplit.length - 1];
if (tail.equals("_SUCCESS")) {
continue;
}
tailList.add(tail);
}
} catch (IOException e) {
logger.error("Extract: getHdfsDirectoryTailList exception", e);
throw e;
}
return tailList;
}
檢查兩個集群配置及應用代碼,確認沒有問題
fs.hdfs.impl.disable.cache參數
fs.hdfs.impl.disable.cache參數之前又遇到過,默認值為false,表示使用cache,懷疑又是cache的問題,所以FileSystem.get(URI.create(path), conf),第一次獲取的是master集群,第二次使用了cache,所以一直連的是master集群。測試方法,在core-site.xml里加上下面配置,表示不使用cache,加上之后,應用能正常連接兩個集群了。
<property>
<name>fs.hdfs.impl.disable.cache</name>
<value>true</value>
</property>
FileSystem.get源碼分析
那么明明使用了兩個集群,為什么會使用到Cache呢,分析FileSystem.get源碼便知道原因了
public static FileSystem get(URI uri, Configuration conf) throws IOException {
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null && authority == null) { // use default FS
return get(conf);
}
if (scheme != null && authority == null) { // no authority
URI defaultUri = getDefaultUri(conf);
if (scheme.equals(defaultUri.getScheme()) // if scheme matches default
&& defaultUri.getAuthority() != null) { // & default has authority
return get(defaultUri, conf); // return default
}
}
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
LOGGER.debug("Bypassing cache to create filesystem {}", uri);
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
}
應用在獲取FileSystem時,提供了完整的hdfs目錄,同時沒有設置fs.hdfs.impl.disable.cache為true,所以創建slave集群的filesystem對象時,會使用CACHE.get(uri, conf)獲取,Cache內部使用一個HashMap來維護filesystem對象,很容易想到,當HashMap的key相同時,便返回了同一個filesystem對象,那么Cache中的key是什么樣的呢,代碼如下:
FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
return getInternal(uri, conf, key);
}
static class Key {
final String scheme;
final String authority;
final UserGroupInformation ugi;
final long unique; // an artificial way to make a key unique
Key(URI uri, Configuration conf) throws IOException {
this(uri, conf, 0);
}
Key(URI uri, Configuration conf, long unique) throws IOException {
scheme = uri.getScheme()==null ?
"" : StringUtils.toLowerCase(uri.getScheme());
authority = uri.getAuthority()==null ?
"" : StringUtils.toLowerCase(uri.getAuthority());
this.unique = unique;
this.ugi = UserGroupInformation.getCurrentUser();
}
@Override
public int hashCode() {
return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;
}
static boolean isEqual(Object a, Object b) {
return a == b || (a != null && a.equals(b));
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof Key) {
Key that = (Key)obj;
return isEqual(this.scheme, that.scheme)
&& isEqual(this.authority, that.authority)
&& isEqual(this.ugi, that.ugi)
&& (this.unique == that.unique);
}
return false;
}
@Override
public String toString() {
return "("+ugi.toString() + ")@" + scheme + "://" + authority;
}
}
}
可以看到Key由四個要素構成,其中前2個跟URI相關,我們兩集群的fs.defaultFS值均為CDH高可用集群創建時的默認值hdfs://nameservice1,應用比對的是兩邊集群的相同目錄,ugi為安全認證的用戶,應用使用的是同一個,unique為0,因此Key相同,第二次獲取filesystem對象時,直接返回了第一次創建的filesystem對象,最終造成了應用雖然使用了不同的集群配置文件,但最中獲取的是同一個filesystem對象。
解決
fs.hdfs.impl.disable.cache參數本身不建議修改,修改集群的fs.defaultFS,使不同集群的fs.defaultFS不一樣