FileSystem實例化過程


HDFS案例代碼

Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop000:8020"), configuration);
    
InputStream in = fileSystem.open(new Path(HDFS_PATH+"/hdfsapi/test/log4j.properties"));
OutputStream out = new FileOutputStream(new File("log4j_download.properties"));
IOUtils.copyBytes(in, out, 4096, true); //最后一個參數表示完成拷貝之后關閉輸入/出流

 

FileSystem.java

static final Cache CACHE = new Cache();

public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();   //hdfs
    String authority = uri.getAuthority();  //hadoop000:8020

    return CACHE.get(uri, conf);
}

FileSystem get(URI uri, Configuration conf) throws IOException{
    Key key = new Key(uri, conf);
    return getInternal(uri, conf, key);
}

private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
    FileSystem fs;
    synchronized (this) {
        fs = map.get(key);
    }
    
    //根據URI取得一個FileSystem實例,如果允許緩存,會中從緩存中取出,否則將調用createFileSystem創建一個新實例
    if (fs != null) { 
        return fs;
    }
    
    fs = createFileSystem(uri, conf);
    synchronized (this) { 
        FileSystem oldfs = map.get(key);
        ... //放入到CACHE中秋
        return fs;
    }
}

private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {
    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf); // 返回的是:org.apache.hadoop.hdfs.DistributedFileSystem
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    fs.initialize(uri, conf); //初始化DistributedFileSystem
    return fs;
}

public static Class<? extends FileSystem> getFileSystemClass(String scheme,Configuration conf) throws IOException {
    if (!FILE_SYSTEMS_LOADED) { //文件系統是否被加載過,剛開始時為false
 loadFileSystems();
    }
    Class<? extends FileSystem> clazz = null;
    if (conf != null) {
        clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null); //fs.hdfs.impl ,此時我們並沒有在core-default.xml和core-site.xml中配置該屬性
    }
    if (clazz == null) {
        clazz = SERVICE_FILE_SYSTEMS.get(scheme); //class org.apache.hadoop.hdfs.DistributedFileSystem
    }
    if (clazz == null) {
        throw new IOException("No FileSystem for scheme: " + scheme);
    }
    return clazz;
}


private static void loadFileSystems() {
    synchronized (FileSystem.class) {
        if (!FILE_SYSTEMS_LOADED) {
            ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
            for (FileSystem fs : serviceLoader) {
                SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
            }
            FILE_SYSTEMS_LOADED = true; //標識為已經從系統中加載過
        }
    }
}

loadFileSystems后SERVICE_FILE_SYSTEMS存在如下值:

file=class org.apache.hadoop.fs.LocalFileSystem, 
ftp=class org.apache.hadoop.fs.ftp.FTPFileSystem, 
hdfs=class org.apache.hadoop.hdfs.DistributedFileSystem, 
hftp=class org.apache.hadoop.hdfs.web.HftpFileSystem, 
webhdfs=class org.apache.hadoop.hdfs.web.WebHdfsFileSystem, 
s3n=class org.apache.hadoop.fs.s3native.NativeS3FileSystem, 
viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem, 
swebhdfs=class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem, 
har=class org.apache.hadoop.fs.HarFileSystem, 
s3=class org.apache.hadoop.fs.s3.S3FileSystem, 
hsftp=class org.apache.hadoop.hdfs.web.HsftpFileSystem

 

DistributedFileSystem.java

DFSClient dfs; //重點屬性:客戶端與服務端交互操作需要先拿到DFSClient

@Override
public void initialize(URI uri, Configuration conf) throws IOException {
    super.initialize(uri, conf);
    setConf(conf);

    String host = uri.getHost();  //hadoop000

    this.dfs = new DFSClient(uri, conf, statistics); this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
    this.workingDir = getHomeDirectory();
}

 

DFSClient.java

final ClientProtocol namenode; //重點屬性:客戶端與NameNode通信的PRC接口

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats)throws IOException {
    
    NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,ClientProtocol.class); this.dtService = proxyInfo.getDelegationTokenService();
    this.namenode = proxyInfo.getProxy(); //org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB
}

 

NameNodeProxies.java

public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
    Class<FailoverProxyProvider<T>> failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri, xface);
    return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,UserGroupInformation.getCurrentUser(), true);
}

public static <T> ProxyAndInfo<T> createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
    UserGroupInformation ugi, boolean withRetries) throws IOException {
    Text dtService = SecurityUtil.buildTokenService(nnAddr);

    T proxy;
    if (xface == ClientProtocol.class) {
      proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,withRetries);
    } ...
    return new ProxyAndInfo<T>(proxy, dtService);
}

private static ClientProtocol createNNProxyWithClientProtocol(
    InetSocketAddress address, Configuration conf, UserGroupInformation ugi,boolean withRetries) throws IOException {
 
    //Client與NameNode的RPC交互接口
    final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
    ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
        ClientNamenodeProtocolPB.class, version, address, ugi, conf,
        NetUtils.getDefaultSocketFactory(conf),
        org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy)
            .getProxy();

    if (withRetries) { 
        //使用jdk的動態代理創建實例
        proxy = (ClientNamenodeProtocolPB) RetryProxy.create(
          ClientNamenodeProtocolPB.class,new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(
              ClientNamenodeProtocolPB.class, proxy),methodNameToPolicyMap,defaultPolicy);
    }
    return new ClientNamenodeProtocolTranslatorPB(proxy);
}

 

RetryProxy.java

public static <T> Object create(Class<T> iface,FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
    return Proxy.newProxyInstance(
        proxyProvider.getInterface().getClassLoader(),
        new Class<?>[] { iface },
        new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
    );
}

 

 獲取FileSystem實例源碼分析總結:

1、FileSystem.get通過反射實例化了一個DistributedFileSystem;

2、DistributedFileSystem中new DFSCilent()把他作為自己的成員變量;

3、在DFSClient構造方法里面,調用了createProxy使用RPC機制得到了一個NameNode的代理對象,就可以和NameNode進行通信;

4、整個流程:FileSystem.get()--> DistributedFileSystem.initialize() --> DFSClient(RPC.getProtocolProxy()) --> NameNode的代理。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM