Hadoop源碼學習筆記(6)
——從ls命令一路解剖
Hadoop幾個模塊的程序我們大致有了點了解,現在我們得細看一下這個程序是如何處理命令的。 我們就從原頭開始,然后一步步追查。
我們先選中ls命令,這是一個列出分面式文件系統中的目錄結構。傳入一個查閱地址,如果沒有則是根目錄。啟動NameNode和DataNode服務。然后在命令行中輸入ls :
換成程序,如果寫呢,我們新建一個ClientEnter類。之前章節中,我們就知道,在命令行中輸入的dfs命令,指向到org.apache.hadoop.fs.FsShell,則這個類入口是一個main函數。所以先直接用,在clientEnter類的main函數中加入下面代碼:
FsShell.main(new String[]{"-ls"});
然后執行,看到下面結果:
這個結果,與控制台中的是一樣的,所以說調用正確。注意,這里多加了幾行初使化用的,先寫上,后面會用。
動刀解剖,我們進入FsShell的main函數中,看看里面是怎么實現的:
-
public static void main(String argv[]) throws Exception {
-
FsShell shell = new FsShell();
-
int res;
-
try {
-
res = ToolRunner.run(shell, argv);
-
} finally {
-
shell.close();
-
}
-
System.exit(res);
-
}
它這里,通過ToolRunner繞了一下,目的是把argv進行了下處理,但最終回到了FsShell的run函數中,進之:
發現,里面有大量的if else 條件都是cmd判斷是否等於某個命令,於是找到ls命令,發現其調了FsShell下的ls函數,進之:
-
private int ls(String srcf, boolean recursive) throws IOException {
-
Path srcPath = new Path(srcf);
-
FileSystem srcFs = srcPath.getFileSystem(this.getConf());
-
FileStatus[] srcs = srcFs.globStatus(srcPath);
-
if (srcs==null || srcs.length==0) {
-
throw new FileNotFoundException("Cannot access " + srcf +
-
": No such file or directory.");
-
}
-
-
boolean printHeader = (srcs.length == 1) ? true: false;
-
int numOfErrors = 0;
-
for(int i=0; i<srcs.length; i++) {
-
numOfErrors += ls(srcs[i], srcFs, recursive, printHeader);
-
}
-
return numOfErrors == 0 ? 0 : -1;
-
}
這個ls函數行數不多,容易看清,參數是第一個是文件路徑,第二個是遞歸。
然后前2~4行,就可以看到,獲取到了文件狀態,第13行,是打印顯示查到的文件狀態結果。這里就不進入看了。
於是提到出2~4行代碼,加入到我們的測試程序中。同時把之前的調用注釋掉:
-
// 列出文件(原)
-
//FsShell.main(new String[]{"-ls"});
-
FileSystem srcFs = FileSystem.get(conf);
-
FileStatus[] items = srcFs.listStatus(new Path("hdfs://localhost:9000/user/zjf"));
-
for (int i = 0; i < items.length; i++)
-
System.out.println( items[i].getPath().toUri().getPath());
小步快走,運行之,發現文件夾是對的,但打印內容少了,沒錯,因為我們沒從FileStatus中全打出來。
這里可以看到,ls命令的核心是FileSystem中的listStatus函數了。
繼續動刀。
觀察FileSystem的listStatus函數,發現是個虛函數,同時FileSystem也是個一個抽象類,說明具體的listStatus實現,必然還在其它類中。於是監視srcFs類,發現其類型為DistributedFileSystem於是,把代碼轉換一下:
DistributedFileSystem srcFs = (DistributedFileSystem)FileSystem.get(conf);
運行程序,沒有變化。說明路走對了。
繼續,想拋棄這個FileSystem類呢,就行看看它如何在get方法中創建DistributedFileSystem這個類的,一級級查,發現,最終是在這里創建:
-
private static FileSystem createFileSystem(URI uri, Configuration conf ) throws IOException {
-
Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
-
if (clazz == null) {
-
throw new IOException("No FileSystem for scheme: " + uri.getScheme());
-
}
-
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
-
fs.initialize(uri, conf);
-
return fs;
-
}
這里應用了反射,可以看出,作者還是很牛的,這函數跟據傳入的uri,的scheme(就是地址的前綴),來根據配置,創建相應的實現類,是一個工廠模式。
這里我們傳入的uri是hdfs:// 所以查詢參數:fs.hadfs.impl。於是到core-default.xml中查:
-
<property>
-
<name>fs.hdfs.impl</name>
-
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
-
<description>The FileSystem for hdfs: uris.</description>
-
</property>
發現,此處配置文件,正是我們的DistributeFileSystem類。
然后看到,創建對象完后,就調用了initialize函數。於是我們把程序進一步改造:
-
DistributedFileSystem srcFs = new DistributedFileSystem();
-
srcFs.initialize(uri, conf);
-
FileStatus[] items = srcFs.listStatus(new Path(
-
......
運行程序,保持正確結果。
此時,可以殺入DistributedFileSystem類的listStatus函數了。
-
public FileStatus[] listStatus(Path p) throws IOException {
-
FileStatus[] infos = dfs.listPaths(getPathName(p));
-
if (infos == null) return null;
-
FileStatus[] stats = new FileStatus[infos.length];
-
for (int i = 0; i < infos.length; i++) {
-
stats[i] = makeQualified(infos[i]);
-
}
-
return stats;
-
}
進入后,發現主要由這個dfs對象訪問,於是尋找dfs對象的創建:
this.dfs = new DFSClient(namenode, conf, statistics);
於是繼續把我們的程序改造:
-
Statistics statistics = new Statistics(uri.getScheme());
-
InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority());
-
DFSClient dfs = new DFSClient(namenode, conf, statistics);
-
FileStatus[] items = dfs.listPaths("/user/zjf");
-
for (int i = 0; i < items.length; i++)
-
System.out.println(items[i].getPath().toUri().getPath());
改造后,運行,結果與之前相同,說明路走對了。
繼續前進,進入listPath函數,發現又是調用了ClientProtocol類的getListing函數。而這個ClientProtocol接口,好象有點熟悉了,在之前講RPC調用時,這個是一個給客戶端使用的接口,而具體的實現,在服務器端。
所以拋開DFSClient類,我們自己也可以創建這個RPC接口,然后自己調用。改造成如下:
-
InetSocketAddress nameNodeAddr = NameNode.getAddress(uri.getAuthority());
-
ClientProtocol namenode = (ClientProtocol)RPC.getProxy(ClientProtocol.class,
-
ClientProtocol.versionID, nameNodeAddr, UnixUserGroupInformation.login(conf, true), conf,
-
NetUtils.getSocketFactory(conf, ClientProtocol.class));
-
FileStatus[] items = namenode.getListing("/user/zjf");
-
for (int i = 0; i < items.length; i++)
-
System.out.println(items[i].getPath().toUri().getPath());
此時,我們在Client這端已經走到了盡頭,RPC調用getListing接口,具體實現是在nameNode服務器端了。
即然RPC在客戶端調用的接口,具體是在服務器端實現。那么,我們如果直接創建服務器端的實現類,調用相應類的函數,不也能產出相同的結果么。
於是直接使用NameNode來創建實例,來調用:
-
UserGroupInformation.setCurrentUser(UnixUserGroupInformation.login(conf, true));
-
NameNode namenode = new NameNode(conf);
-
FileStatus[] items = namenode.getListing("/user/zjf");
-
for (int i = 0; i < items.length; i++)
-
System.out.println(items[i].getPath().toUri().getPath());
運行該程序時,需要把之前的NameNode程序停止掉,因為我們不再需要這個服務,而是直接call這個服務中的方法了。
到目前為止,我們已經通過ls這個命令,從client端一殺到了namenode端。然后分析其它幾個命令(delete,mkdir,getFileInfo)與ls相同。所以要想再深入看這些命令的處理,得在namenode中進一步研究。
namenode我們知道,主要存放的是文件目錄信息,那利用上述這些命令,就可以進一步研究其目錄的存儲方式。這一塊將在下一章中為進一步探討。