1.概述
這篇博客接着《Hadoop2源碼分析-RPC機制初識》來講述,前面我們對MapReduce、序列化、RPC進行了分析和探索,對Hadoop V2的這些模塊都有了大致的了解,通過對這些模塊的研究,我們明白了MapReduce的運行流程以及內部的實現機制,Hadoop的序列化以及它的通信機制(RPC)。今天我們來研究另一個核心的模塊,那就是Hadoop的分布式文件存儲系統——HDFS,下面是今天分享的內容目錄:
- HDFS簡述
- NameNode
- DataNode
接下來,我們開始今天的分享內容。
2.HDFS簡述
HDFS全稱Hadoop Distributed File System,在HDFS中有幾個基本的概念,首先是它的數據塊(Block),HDFS的設計是用於支持大文件的。運行在HDFS上的程序也是用於處理大數據集的。這些程序僅寫一次數據,一次或多次讀數據請求,並且這些讀操作要求滿足流式傳輸速度。HDFS支持文件的一次寫多次讀操作。HDFS中典型的塊大小是64MB,一個HDFS文件可以被被切分成多個64MB大小的塊,如果需要,每一個塊可以分布在不同的數據節點上。HDFS 中,如果一個文件小於一個數據塊的大小,並不占用整個數據塊存儲空間。
HDFS提供了一個可操作文件系統的抽象類org.apache.hadoop.fs.FileSystem,該類被划分在Hadoop-Common部分,其源碼地址為:hadoop-2.6.0-src/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java,如下是FileSystem的部分源碼,如下所示:
@InterfaceAudience.Public @InterfaceStability.Stable public abstract class FileSystem extends Configured implements Closeable { // 代碼內容省略 // ... }
我們可以使用着抽象類,去操作HDFS系統上的內容,實現代碼如下所示:
private static void dfs() { FileSystem fs = null; try { fs = FileSystem.get(conf);// get file object FileStatus[] list = fs.listStatus(new Path("/"));// file status list for (FileStatus file : list) { LOGGER.info(file.getPath().getName());// print file names } } catch (IOException e) { e.printStackTrace(); LOGGER.error("Get hdfs path has error,msg is " + e.getMessage()); } finally { try { if (fs != null) { fs.close(); } } catch (IOException e) { e.printStackTrace(); LOGGER.error("Close fs object has error,msg is " + e.getMessage()); } } }
下面,我們來看另一個概念是元數據節點(Namenode)和數據節點(datanode),這2個是HDFS的核心模塊,下面我們分別來看看這2個核心模塊。
3.NameNode
NN節點用來管理文件系統的NameSpace,將所有的文件和文件夾的Meta保存在一個文件系統中,是HDFS中文件目錄和文件分配的管理者,保存的重要信息如下所示:

在HDFS集群上可能包含成百上千個DataNode(簡稱DN)節點,這些DN節點定時和NameNode(簡稱NN)節點保持通信,接受NN節點的一些指令,為了減小NN的壓力,NN上並不永久存儲那個DN上報的數據塊信息,而是通過DN上報的狀態來更新NN上的映射表信息。DN和NN建立連接后,會和NN保持心跳,心跳返回的信息包含了NN對DN的一些指令信息,如刪除數據,復制數據到其他的DN節點。值得注意的是NN不會主動去請求DN,這是一個嚴格意義上的C/S架構模型,同時,客戶端在操作HDFS集群時,DN節點會互相配合,保證數據的一致性。
NN節點信息存儲,部分截圖信息如下所示:


4.DataNode
下面我們來分析一下DN的實現,DN的實現包含以下部分,一部分是對本地Block的管理,另一部分就是和其他的Entity進行數據交互。首先,我們先看本地的Block管理部分。我們在搭建Hadoop集群時,會指定Block的存儲路徑,我們可以找到配置的存儲路徑,在hdfs-site.xml文件下,內容路徑如下所示:
<property> <name>dfs.datanode.data.dir</name> <value>/home/hadoop/data/dfs/data</value> </property>
然后,我們進入到DN節點上,找到對應的存儲目錄,如下圖所示:

這里面in_use.lock的作用是做一個排斥操作,在對應的應用上面加鎖。然后current目錄存放的是當前有效的Block,進入到current目錄后,出現如下圖所示的目錄:
VERSION存放着一些文件的Meta,接着還有一系列的Block文件和Meta文件,Block文件是存儲了HDFS中的數據的。存儲的Block,一個Block在多個DN節點上有備份,其備份參數可以調節,在hdfs-site.xml文件中,屬性設置如下所示:
<property> <name>dfs.replication</name> <value>3</value> </property>
首先,我們來看DateNode的類,部分代碼如下所示:
@VisibleForTesting @InterfaceAudience.Private public static DataNode createDataNode(String args[], Configuration conf, SecureResources resources) throws IOException { DataNode dn = instantiateDataNode(args, conf, resources);// init dn if (dn != null) { dn.runDatanodeDaemon();// register to nn and back to dn thread } return dn; }
/** Instantiate a single datanode object, along with its secure resources. * This must be run by invoking{@link DataNode#runDatanodeDaemon()} * subsequently. */ public static DataNode instantiateDataNode(String args [], Configuration conf, SecureResources resources) throws IOException { if (conf == null) conf = new HdfsConfiguration(); if (args != null) { // parse generic hadoop options GenericOptionsParser hParser = new GenericOptionsParser(conf, args); args = hParser.getRemainingArgs(); } if (!parseArguments(args, conf)) { printUsage(System.err); return null; } Collection<StorageLocation> dataLocations = getStorageLocations(conf); UserGroupInformation.setConfiguration(conf); SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY, DFS_DATANODE_KERBEROS_PRINCIPAL_KEY); return makeInstance(dataLocations, conf, resources); }
static DataNode makeInstance(Collection<StorageLocation> dataDirs, Configuration conf, SecureResources resources) throws IOException { LocalFileSystem localFS = FileSystem.getLocal(conf); FsPermission permission = new FsPermission( conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY, DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT)); DataNodeDiskChecker dataNodeDiskChecker = new DataNodeDiskChecker(permission); List<StorageLocation> locations = checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker); DefaultMetricsSystem.initialize("DataNode"); assert locations.size() > 0 : "number of data directories should be > 0"; return new DataNode(conf, locations, resources);// create dn obejct }
public void runDatanodeDaemon() throws IOException { blockPoolManager.startAll(); // start dataXceiveServer dataXceiverServer.start(); if (localDataXceiverServer != null) { localDataXceiverServer.start(); } ipcServer.start(); startPlugins(conf); }
public static void secureMain(String args[], SecureResources resources) { int errorCode = 0; try { StringUtils.startupShutdownMessage(DataNode.class, args, LOG); DataNode datanode = createDataNode(args, null, resources); if (datanode != null) { datanode.join(); } else { errorCode = 1; } } catch (Throwable e) { LOG.fatal("Exception in secureMain", e); terminate(1, e); } finally { // We need to terminate the process here because either shutdown was called // or some disk related conditions like volumes tolerated or volumes required // condition was not met. Also, In secure mode, control will go to Jsvc // and Datanode process hangs if it does not exit. LOG.warn("Exiting Datanode"); terminate(errorCode); } }
- Main函數入口
下面給出DN類的Main函數入口,代碼片段如下所示:
public static void main(String args[]) { if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) { System.exit(0); } secureMain(args, null); }
5.總結
在研究HDFS的相關模塊時,這里需要明白各個模塊的功能及作用,這里為大家介紹了DN類的部分代碼片段,以及給代碼片段重要部分添加了代碼注釋,若是大家需要了解詳細的相關流程及代碼,可以閱讀Hadoop的HDFS部分的源代碼。
6.結束語
這篇博客就和大家分享到這里,如果大家在研究學習的過程當中有什么問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!
