datanode的介紹
一個典型的HDFS系統包括一個NameNode和多個DataNode。DataNode是hdfs文件系統中真正存儲數據的節點。
每個DataNode周期性和唯一的NameNode通信,還時不時和hdfs客戶端代碼以及其他datanode通信。
datanode維護一個重要的表:
塊=>字節流
這些存儲在本地磁盤,DataNode在啟動時,還有啟動后周期性報告給NameNode,這個表的內容。
DataNodes周期性請求NameNode詢問命令操作,NameNode不能直接連接DataNode,NameNode在DataNode調用時,簡單返回值。
DataNodes還維護一個開放的socket服務器,讓客戶端代碼和其他DataNode通過它可以讀寫數據,這個服務器的host/port會匯報給NameNode。
datanode啟動流程
在命令行啟動datanode的方法是:bin/hadoop datanode
查看bin/hadoop腳本,可以看到最后執行的java類是:org.apache.hadoop.hdfs.server.datanode.DataNode
DataNode的骨架成員如下:
public class DataNode extends Configured implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants,Runnable, DataNodeMXBean { public DatanodeProtocol namenode = null;//與NameNode通信的ipc客戶端類 public FSDatasetInterface data = null;//管理一系列的數據塊,每個塊在本地磁盤上都有唯一的名字和擴展名。所有和數據塊相關的操作,都在FSDataset相關的類中進行處理。 public DatanodeRegistration dnRegistration = null;//DataNode向NameNode的注冊信息,包含名字(datanode機器名:dfs.datanode.address端口),info的http端口,ipc的端口等 volatile boolean shouldRun = true;//DataNode循環運行標志,為true就一直運行 private LinkedList<Block> receivedBlockList = new LinkedList<Block>();//已經接收的數據塊,定期通知namenode接收完畢時,會移除 private final Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();//存放正在從本地塊恢復到其他DataNode的數據塊,恢復完畢后移除,在其他DataNode的數據塊副本損壞或丟失時會使用 private LinkedList<String> delHints = new LinkedList<String>(); //需要刪除的塊,一般是被替換時才會被刪除,也是在定期通知namenode后,會移除 Daemon dataXceiverServer = null;//用於讀寫數據的服務器,接收客戶端和其他DataNode的請求,它不用於內部hadoop ipc機制,端口是dfs.datanode.address public Server ipcServer; //內部datanode調用的ipc服務器,用於客戶端,端口是dfs.datanode.ipc.address long blockReportInterval;//數據塊報告周期,默認是60*60秒,即一個小時 long lastBlockReport = 0;//記錄最近的數據塊報告時間,與blockReportInterval聯合使用 long lastHeartbeat = 0;//記錄最近和namenode的心跳時間 long heartBeatInterval;//和namenode的心跳周期,默認是3s private DataStorage storage = null;//DataStorage提供了format方法,用於創建DataNode上的Storage,對DataNode的升級/回滾/提交過程,就是對DataStorage的doUpgrade/doRollback/doFinalize分析得到的。同時,利用StorageDirectory,DataStorage管理存儲系統的狀態。 private HttpServer infoServer = null;//查看DataNode狀態信息的http服務器,端口是dfs.datanode.http.address public DataBlockScanner blockScanner = null;//檢測它所管理的所有Block數據塊的一致性,因此,對已DataNode節點上的每一個Block,它都會每隔scanPeriod ms(默認三個星期)利用Block對應的校驗和文件來檢測該Block一次,看看這個Block的數據是否已經損壞。 public Daemon blockScannerThread = null; }
DataNode的初始化和啟動:
public class DataNode extends Configured implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants,Runnable, DataNodeMXBean { //main方法,DataNode的入口點 public static void main(String args[]) { secureMain(args, null); } //主線程阻塞,讓DataNode的任務循環執行 public static void secureMain(String [] args, SecureResources resources) { try { ... DataNode datanode = createDataNode(args, null, resources); if (datanode != null) datanode.join(); } ... } public static DataNode createDataNode(String args[],Configuration conf, SecureResources resources) throws IOException { DataNode dn = instantiateDataNode(args, conf, resources); runDatanodeDaemon(dn);//DataNode類作為一個Thread運行 return dn; } public static DataNode instantiateDataNode(String args[],Configuration conf, SecureResources resources) throws IOException { ... String[] dataDirs = conf.getStrings(DATA_DIR_KEY);//獲取DataNode管理的本地目錄集合 return makeInstance(dataDirs, conf, resources); } //檢查本地目錄集合的合法性 public static DataNode makeInstance(String[] dataDirs, Configuration conf, SecureResources resources) throws IOException { ... ArrayList<File> dirs = new ArrayList<File>(); FsPermission dataDirPermission = new FsPermission(conf.get(DATA_DIR_PERMISSION_KEY, DEFAULT_DATA_DIR_PERMISSION)); for (String dir : dataDirs) { ... DiskChecker.checkDir(localFS, new Path(dir), dataDirPermission); dirs.add(new File(dir)); ... } if (dirs.size() > 0) return new DataNode(conf, dirs, resources); return null; } //實例化DataNode DataNode(final Configuration conf,final AbstractList<File> dataDirs, SecureResources resources) throws IOException { super(conf); ... try { startDataNode(conf, dataDirs, resources); } catch (IOException ie) { shutdown(); throw ie; } } void startDataNode(Configuration conf, AbstractList<File> dataDirs, SecureResources resources) throws IOException { InetSocketAddress nameNodeAddr = NameNode.getServiceAddress(conf, true); InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);//獲取DataNode的數據塊流的讀寫的端口 int tmpPort = socAddr.getPort(); storage = new DataStorage();//管理數據目錄的類,完成格式化,升級,回滾等功能 // construct registration this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort); //與namenode通信的客戶端類 this.namenode = (DatanodeProtocol) RPC.waitForProxy(DatanodeProtocol.class,DatanodeProtocol.versionID,nameNodeAddr, conf); //從NameNode獲取版本和id信息 NamespaceInfo nsInfo = handshake(); if (simulatedFSDataset) { ... } else { // real storage // read storage info, lock data dirs and transition fs state if necessary storage.recoverTransitionRead(nsInfo, dataDirs, startOpt); // adjust this.dnRegistration.setStorageInfo(storage); // initialize data node internal structure this.data = new FSDataset(storage, conf);//一切數據塊讀寫的實際操作類 } ... this.dataXceiverServer = new Daemon(threadGroup, new DataXceiverServer(ss, conf, this));//初始化數據塊的流讀寫服務器 ... //初始化數據塊報告周期,默認是一個小時 this.blockReportInterval = conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL); ... //初始化與namenode心跳周期,默認是3秒 this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L; ... if ( reason == null ) { blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);//初始化數據塊一致性檢測類 } ... //DataNode的狀態信息查詢的http服務器地址 InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf); ... //初始化DataNode的狀態信息查詢的http服務器 this.infoServer = (secureResources == null) ? new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0, conf, SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN)) : new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0, conf, SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN), secureResources.getListener()); ... //添加infoServer一些Servlet的映射url和類 ... this.infoServer.start(); ... //初始化內部hadoop ipc服務器 InetSocketAddress ipcAddr = NetUtils.createSocketAddr( conf.get("dfs.datanode.ipc.address")); ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(), conf.getInt("dfs.datanode.handler.count", 3), false, conf, blockTokenSecretManager); dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort()); ... }
DataNode的服務:
//運行DataNode的后台線程 public static void runDatanodeDaemon(DataNode dn) throws IOException { if (dn != null) { //register datanode dn.register(); dn.dataNodeThread = new Thread(dn, dnThreadName); dn.dataNodeThread.setDaemon(true); dn.dataNodeThread.start(); } } //啟動數據塊的流讀寫服務器,內部hadoop ipc服務器 public void run() { ... dataXceiverServer.start(); ipcServer.start(); while (shouldRun) { try { startDistributedUpgradeIfNeeded();//檢測是否需要升級hadoop文件系統 offerService();//DataNode提供服務,定時發送心跳給NameNode,響應NameNode返回的命令並執行 } ... } } //DataNode提供服務,定時發送心跳給NameNode,響應NameNode返回的命令並執行,通知namenode接收完畢的數據塊和刪除的數據塊,定時上報數據塊 public void offerService() throws Exception { ... while (shouldRun) { try { long startTime = now(); ... if (startTime - lastHeartbeat > heartBeatInterval) { lastHeartbeat = startTime; //定期發送心跳給NameNode DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration, data.getCapacity(), data.getDfsUsed(), data.getRemaining(), xmitsInProgress.get(), getXceiverCount()); ... //響應namenode返回的命令做處理 if (!processCommand(cmds)) continue; } synchronized(receivedBlockList) { synchronized(delHints) { blockArray = receivedBlockList.toArray(new Block[numBlocks]); delHintArray = delHints.toArray(new String[numBlocks]); } } } if (blockArray != null) { //通知NameNode已經接收完畢的塊,以及刪除的塊 namenode.blockReceived(dnRegistration, blockArray, delHintArray); synchronized (receivedBlockList) { synchronized (delHints) { for(int i=0; i<blockArray.length; i++) { receivedBlockList.remove(blockArray[i]);//清空保存接收完畢的塊 delHints.remove(delHintArray[i]);//清空保存刪除完畢的塊 } } } } if (startTime - lastBlockReport > blockReportInterval) { if (data.isAsyncBlockReportReady()) { // Create block report ... Block[] bReport = data.retrieveAsyncBlockReport(); ... //向NameNode上報數據塊信息 DatanodeCommand cmd = namenode.blockReport(dnRegistration, BlockListAsLongs.convertToArrayLongs(bReport)); ... processCommand(cmd); } else { //請求異步准備好數據塊上報信息 data.requestAsyncBlockReport(); ... } } } } // while (shouldRun) } // offerService }
以上就是DataNode的啟動流程和服務流程,都以作適當刪減,留下主干,加上注釋。
DataNode的相關重要類
FSDataset:所有和數據塊相關的操作,都在FSDataset相關的類。詳細分析參考 http://caibinbupt.iteye.com/blog/284365
DataXceiverServer:處理數據塊的流讀寫的的服務器,處理邏輯由DataXceiver完成。詳細分析參考 http://caibinbupt.iteye.com/blog/284979
DataXceiver:處理數據塊的流讀寫的線程。詳細分析參考 http://caibinbupt.iteye.com/blog/284979
還有處理非讀寫的非主流的流程。詳細分析參考 http://caibinbupt.iteye.com/blog/286533
BlockReceiver:完成數據塊的流寫操作。詳細分析參考 http://caibinbupt.iteye.com/blog/286259
BlockSender:完成數據塊的流讀操作。
DataBlockScanner:用於定時對數據塊文件進行校驗。詳細分析參考http://caibinbupt.iteye.com/blog/286650
總結
上面講了DataNode相關的核心類的成員和初始化流程,並做了代碼的刪減,留下主干,加上注釋,讓初學者可以概覽DataNode的源碼,快速入門。