前言
在spark中通過hdfs的java接口並發寫文件出現了數據丟失的問題,一頓操作后發現原來是FileSystem的緩存機制。補一課先
FileSystem實例化
FileSystem.get(config)是如何創建一個hadoop的FileSystem。
分為3個步驟。
1. 初始化所有支持的FileSystem(沒有實例話,只是緩存類)
2. 通過uri的scheme拿到相應FileSystem
3. 緩存機制(如果不關閉的話,默認是開啟)
下面詳細分析一下各步驟流程
1. 初始化
通過java提供的ServiceLoader來錄入所有可能的FileSystem,就像這樣
ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class); for (FileSystem fs : serviceLoader) { SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass()); }
待初始化的類通過配置文件聲明,配置可以在hadoop-hdfs.jar里找到
捎帶一嘴,java提供的ServiceLoader有點像乞丐版spring的依賴反轉。
2.scheme
通過對Uri的解析來判斷創建一個什么FileSystem,
例如
hdfs://master:9200/test的scheme就是hdfs。
然后通過scheme和已經緩存好的FileSystem映射,找到需要實例化的類。
例如scheme是hdfs,那么就會創建一個DistributedFileSystem。
3. 緩存
FileSystem類中有一個Cache內部類,用於緩存已經被實例化的FileSystem。注意這個跟連接池還是有區別的,Cache中的緩存只是一個map,可以被多個線程拿到。這就會有一個問題,當你多線程同時get FileSystem的時候,可能返回的是同一個對象。所以切記,在多線程場景中,不要隨意調用FileSystem.close,你關的連接可能會影響到其他正在使用的線程。
注意: 當你在其他框架上拿fileSystem對象需要額外注意,例如在spark上進行 FileSystem.get(),如果你想自定義某些配置,設置hdfs的副本數(dfs.replication) 之類,你必須在configuration中關閉FileSystem的緩存機制,也就是設置
configuration.set("fs.hdfs.impl.disable.cache","true")
這很重要,因為你不確定spark是否在你之前創建了一個FileSystem,而你得到的可能不是你想要的。
參考資料
// 遇到的相同問題