HDFS datanode源碼分析


datanode的介紹

一個典型的HDFS系統包括一個NameNode和多個DataNode。DataNode是hdfs文件系統中真正存儲數據的節點。

每個DataNode周期性和唯一的NameNode通信,還時不時和hdfs客戶端代碼以及其他datanode通信。

 

datanode維護一個重要的表:

  塊=>字節流

這些存儲在本地磁盤,DataNode在啟動時,還有啟動后周期性報告給NameNode,這個表的內容。

DataNodes周期性請求NameNode詢問命令操作,NameNode不能直接連接DataNode,NameNode在DataNode調用時,簡單返回值。

DataNodes還維護一個開放的socket服務器,讓客戶端代碼和其他DataNode通過它可以讀寫數據,這個服務器的host/port會匯報給NameNode。

 

datanode啟動流程

在命令行啟動datanode的方法是:bin/hadoop datanode

查看bin/hadoop腳本,可以看到最后執行的java類是:org.apache.hadoop.hdfs.server.datanode.DataNode

DataNode的骨架成員如下:

public class DataNode extends Configured implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants,Runnable, DataNodeMXBean {
  public DatanodeProtocol namenode = null;//與NameNode通信的ipc客戶端類
  public FSDatasetInterface data = null;//管理一系列的數據塊,每個塊在本地磁盤上都有唯一的名字和擴展名。所有和數據塊相關的操作,都在FSDataset相關的類中進行處理。
  public DatanodeRegistration dnRegistration = null;//DataNode向NameNode的注冊信息,包含名字(datanode機器名:dfs.datanode.address端口),info的http端口,ipc的端口等

  volatile boolean shouldRun = true;//DataNode循環運行標志,為true就一直運行
  private LinkedList<Block> receivedBlockList = new LinkedList<Block>();//已經接收的數據塊,定期通知namenode接收完畢時,會移除
  private final Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();//存放正在從本地塊恢復到其他DataNode的數據塊,恢復完畢后移除,在其他DataNode的數據塊副本損壞或丟失時會使用
  private LinkedList<String> delHints = new LinkedList<String>(); //需要刪除的塊,一般是被替換時才會被刪除,也是在定期通知namenode后,會移除

  Daemon dataXceiverServer = null;//用於讀寫數據的服務器,接收客戶端和其他DataNode的請求,它不用於內部hadoop ipc機制,端口是dfs.datanode.address
  public Server ipcServer; //內部datanode調用的ipc服務器,用於客戶端,端口是dfs.datanode.ipc.address
  
  long blockReportInterval;//數據塊報告周期,默認是60*60秒,即一個小時
  long lastBlockReport = 0;//記錄最近的數據塊報告時間,與blockReportInterval聯合使用


  long lastHeartbeat = 0;//記錄最近和namenode的心跳時間
  long heartBeatInterval;//和namenode的心跳周期,默認是3s
  private DataStorage storage = null;//DataStorage提供了format方法,用於創建DataNode上的Storage,對DataNode的升級/回滾/提交過程,就是對DataStorage的doUpgrade/doRollback/doFinalize分析得到的。同時,利用StorageDirectory,DataStorage管理存儲系統的狀態。
  private HttpServer infoServer = null;//查看DataNode狀態信息的http服務器,端口是dfs.datanode.http.address
  
  public DataBlockScanner blockScanner = null;//檢測它所管理的所有Block數據塊的一致性,因此,對已DataNode節點上的每一個Block,它都會每隔scanPeriod ms(默認三個星期)利用Block對應的校驗和文件來檢測該Block一次,看看這個Block的數據是否已經損壞。
  public Daemon blockScannerThread = null;
  
}

 

DataNode的初始化和啟動:

public class DataNode extends Configured implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants,Runnable, DataNodeMXBean {
  //main方法,DataNode的入口點
  public static void main(String args[]) {
    secureMain(args, null);
  }
  
  //主線程阻塞,讓DataNode的任務循環執行
  public static void secureMain(String [] args, SecureResources resources) {
    try {
      ...
      DataNode datanode = createDataNode(args, null, resources);
      if (datanode != null)
        datanode.join();
    }
    ...
  }
  
  public static DataNode createDataNode(String args[],Configuration conf, SecureResources resources) throws IOException {
    DataNode dn = instantiateDataNode(args, conf, resources);
    runDatanodeDaemon(dn);//DataNode類作為一個Thread運行
    return dn;
  }
  
  public static DataNode instantiateDataNode(String args[],Configuration conf, SecureResources resources) throws IOException {
    ...
    String[] dataDirs = conf.getStrings(DATA_DIR_KEY);//獲取DataNode管理的本地目錄集合
    return makeInstance(dataDirs, conf, resources);
  }
  
  //檢查本地目錄集合的合法性
  public static DataNode makeInstance(String[] dataDirs, Configuration conf, SecureResources resources) throws IOException {
    ...
    ArrayList<File> dirs = new ArrayList<File>();
    FsPermission dataDirPermission = new FsPermission(conf.get(DATA_DIR_PERMISSION_KEY, DEFAULT_DATA_DIR_PERMISSION));
    for (String dir : dataDirs) {
      ...
        DiskChecker.checkDir(localFS, new Path(dir), dataDirPermission);
        dirs.add(new File(dir));
      ...
    }
    if (dirs.size() > 0) 
      return new DataNode(conf, dirs, resources);
    return null;
  }
  
  //實例化DataNode
  DataNode(final Configuration conf,final AbstractList<File> dataDirs, SecureResources resources) throws IOException {
    super(conf);
    ...
    try {
      startDataNode(conf, dataDirs, resources);
    } catch (IOException ie) {
      shutdown();
      throw ie;
    }   
  }
  
   void startDataNode(Configuration conf, AbstractList<File> dataDirs, SecureResources resources) throws IOException {
    
    InetSocketAddress nameNodeAddr = NameNode.getServiceAddress(conf, true);

    InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);//獲取DataNode的數據塊流的讀寫的端口
    int tmpPort = socAddr.getPort();
    storage = new DataStorage();//管理數據目錄的類,完成格式化,升級,回滾等功能
    // construct registration
    this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);

    //與namenode通信的客戶端類
    this.namenode = (DatanodeProtocol) RPC.waitForProxy(DatanodeProtocol.class,DatanodeProtocol.versionID,nameNodeAddr, conf);
    //從NameNode獲取版本和id信息
    NamespaceInfo nsInfo = handshake();

    if (simulatedFSDataset) {
        ...
    } else { // real storage
      // read storage info, lock data dirs and transition fs state if necessary
      storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
      // adjust
      this.dnRegistration.setStorageInfo(storage);
      // initialize data node internal structure
      this.data = new FSDataset(storage, conf);//一切數據塊讀寫的實際操作類
    }
      
    ...
    this.dataXceiverServer = new Daemon(threadGroup, new DataXceiverServer(ss, conf, this));//初始化數據塊的流讀寫服務器
    ...
    //初始化數據塊報告周期,默認是一個小時
    this.blockReportInterval = conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
    ...
    //初始化與namenode心跳周期,默認是3秒
    this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
    ...
    if ( reason == null ) {
      blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);//初始化數據塊一致性檢測類
    } 
    ...

    //DataNode的狀態信息查詢的http服務器地址
    InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
    ...
    //初始化DataNode的狀態信息查詢的http服務器
    this.infoServer = (secureResources == null) 
       ? new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0, 
           conf, SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN))
       : new HttpServer("datanode", infoHost, tmpInfoPort, tmpInfoPort == 0,
           conf, SecurityUtil.getAdminAcls(conf, DFSConfigKeys.DFS_ADMIN),
           secureResources.getListener());
    ...
    //添加infoServer一些Servlet的映射url和類
    ...
    this.infoServer.start();
    ...
    //初始化內部hadoop ipc服務器
    InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
        conf.get("dfs.datanode.ipc.address"));
    ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(), 
        conf.getInt("dfs.datanode.handler.count", 3), false, conf,
        blockTokenSecretManager);
    dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
    ...
  }

 

DataNode的服務:

//運行DataNode的后台線程
  public static void runDatanodeDaemon(DataNode dn) throws IOException {
    if (dn != null) {
      //register datanode
      dn.register();
      dn.dataNodeThread = new Thread(dn, dnThreadName);
      dn.dataNodeThread.setDaemon(true); 
      dn.dataNodeThread.start();
    }
  }
  //啟動數據塊的流讀寫服務器,內部hadoop ipc服務器
  public void run() {
    ...
    dataXceiverServer.start();
    ipcServer.start();
        
    while (shouldRun) {
      try {
        startDistributedUpgradeIfNeeded();//檢測是否需要升級hadoop文件系統
        offerService();//DataNode提供服務,定時發送心跳給NameNode,響應NameNode返回的命令並執行
      } 
      ...
    }
  }
  
  //DataNode提供服務,定時發送心跳給NameNode,響應NameNode返回的命令並執行,通知namenode接收完畢的數據塊和刪除的數據塊,定時上報數據塊
  public void offerService() throws Exception {
    ...
    while (shouldRun) {
      try {
        long startTime = now();
        ...
        if (startTime - lastHeartbeat > heartBeatInterval) {
          lastHeartbeat = startTime;
          //定期發送心跳給NameNode
          DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
                                                       data.getCapacity(),
                                                       data.getDfsUsed(),
                                                       data.getRemaining(),
                                                       xmitsInProgress.get(),
                                                       getXceiverCount());
          ...
          //響應namenode返回的命令做處理
          if (!processCommand(cmds))
            continue;
        }
        
        synchronized(receivedBlockList) {
          synchronized(delHints) {
              blockArray = receivedBlockList.toArray(new Block[numBlocks]);
              delHintArray = delHints.toArray(new String[numBlocks]);
            }
          }
        }
        if (blockArray != null) {
          //通知NameNode已經接收完畢的塊,以及刪除的塊
          namenode.blockReceived(dnRegistration, blockArray, delHintArray);
          synchronized (receivedBlockList) {
            synchronized (delHints) {
              for(int i=0; i<blockArray.length; i++) {
                receivedBlockList.remove(blockArray[i]);//清空保存接收完畢的塊
                delHints.remove(delHintArray[i]);//清空保存刪除完畢的塊
              }
            }
          }
        }
        
       if (startTime - lastBlockReport > blockReportInterval) {
          if (data.isAsyncBlockReportReady()) {
            // Create block report
            ...
            Block[] bReport = data.retrieveAsyncBlockReport();
            ...
            //向NameNode上報數據塊信息
            DatanodeCommand cmd = namenode.blockReport(dnRegistration,
                    BlockListAsLongs.convertToArrayLongs(bReport));
            ...
            processCommand(cmd);
          } else {
            //請求異步准備好數據塊上報信息
            data.requestAsyncBlockReport();
            ...
            }
          }
        }
        
    } // while (shouldRun)
  } // offerService
}

以上就是DataNode的啟動流程和服務流程,都以作適當刪減,留下主干,加上注釋。

 

DataNode的相關重要類

FSDataset:所有和數據塊相關的操作,都在FSDataset相關的類。詳細分析參考 http://caibinbupt.iteye.com/blog/284365

DataXceiverServer:處理數據塊的流讀寫的的服務器,處理邏輯由DataXceiver完成。詳細分析參考 http://caibinbupt.iteye.com/blog/284979

DataXceiver:處理數據塊的流讀寫的線程。詳細分析參考 http://caibinbupt.iteye.com/blog/284979

                  還有處理非讀寫的非主流的流程。詳細分析參考 http://caibinbupt.iteye.com/blog/286533

BlockReceiver:完成數據塊的流寫操作。詳細分析參考 http://caibinbupt.iteye.com/blog/286259

BlockSender:完成數據塊的流讀操作。

DataBlockScanner:用於定時對數據塊文件進行校驗。詳細分析參考http://caibinbupt.iteye.com/blog/286650

 

總結

上面講了DataNode相關的核心類的成員和初始化流程,並做了代碼的刪減,留下主干,加上注釋,讓初學者可以概覽DataNode的源碼,快速入門。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM