通過Thrift訪問HDFS分布式文件系統的性能瓶頸分析


引言

  Hadoop提供的HDFS布式文件存儲系統,提供了基於thrift的客戶端訪問支持,但是因為Thrift自身的訪問特點,在高並發的訪問情況下,thrift自身結構可能將會成為HDFS文件存儲系統的一個性能瓶頸。我們先來看一下一不使用Thrfit方式訪問HDFS文件系統的業務流程。

一、HDFS文件讀取流程

  

流程說明:

  1. 使用HDFS提供的客戶端開發庫Client,向遠程的Namenode發起RPC請求;
  2. Namenode會視情況返回文件的部分或者全部block列表,對於每個block,Namenode都會返回有該block拷貝的DataNode地址;
  3. 客戶端開發庫Client會選取離客戶端最接近的DataNode來讀取block;如果客戶端本身就是DataNode,那么將從本地直接獲取數據.
  4. 讀取完當前block的數據后,關閉與當前的DataNode連接,並為讀取下一個block尋找最佳的DataNode;
  5. 當讀完列表的block后,且文件讀取還沒有結束,客戶端開發庫會繼續向Namenode獲取下一批的block列表。
  6. 讀取完一個block都會進行checksum驗證,如果讀取datanode時出現錯誤,客戶端會通知Namenode,然后再從下一個擁有該block拷貝的datanode繼續讀。

二、HDFS文件寫入流程

流程說明:

  1. 使用HDFS提供的客戶端開發庫Client,向遠程的Namenode發起RPC請求;
  2. Namenode會檢查要創建的文件是否已經存在,創建者是否有權限進行操作,成功則會為文件創建一個記錄,否則會讓客戶端拋出異常;
  3. 當 客戶端開始寫入文件的時候,開發庫會將文件切分成多個packets,並在內部以數據隊列"data queue"的形式管理這些packets,並向Namenode申請新的blocks,獲取用來存儲replicas的合適的datanodes列表, 列表的大小根據在Namenode中對replication的設置而定。
  4. 開始以pipeline(管道)的形式將packet寫入所 有的replicas中。開發庫把packet以流的方式寫入第一個datanode,該datanode把該packet存儲之后,再將其傳遞給在此 pipeline中的下一個datanode,直到最后一個datanode,這種寫數據的方式呈流水線的形式。
  5. 最后一個datanode成功存儲之后會返回一個ack packet,在pipeline里傳遞至客戶端,在客戶端的開發庫內部維護着"ack queue",成功收到datanode返回的ack packet后會從"ack queue"移除相應的packet。
  6. 如 果傳輸過程中,有某個datanode出現了故障,那么當前的pipeline會被關閉,出現故障的datanode會從當前的pipeline中移除, 剩余的block會繼續剩下的datanode中繼續以pipeline的形式傳輸,同時Namenode會分配一個新的datanode,保持 replicas設定的數量。

三、關鍵詞

  HDFSClient通過文件IO操作最終實現是通過直接訪問DataNode進行。

四、Thrift的訪問流程:猜測版

  

流程說明:

1.ThriftClient客戶端將操作命令傳給ThriftServer。

2.ThriftServer調用HDFSClient接口API實現HDFS讀寫操作,操作流程如和三所示。

五、疑問

  與DataNode發生數據交換的到底是ThriftServer還是ThriftClient,如果是ThriftServer,那么多個ThriftClient並行訪問時,ThriftServer必將成為HDFS訪問的性能瓶頸;如果是ThriftClient直接訪問DataNode,那么理論依據何在呢?

六、示例程序

  下面是一個基於Thrift實現的HDFS客戶端程序,實現了文件的訪問和創建和讀取

  1 // HdfsDemo.cpp : Defines the entry point for the console application.
  2 //
  3 
  4 #include "stdafx.h"
  5 #include <iostream>
  6 #include <string>
  7 #include <boost/lexical_cast.hpp>
  8 #include <protocol/TBinaryProtocol.h>
  9 #include <transport/TSocket.h>
 10 #include <transport/TTransportUtils.h>
 11 #include "ThriftHadoopFileSystem.h"
 12 
 13 #ifndef _WIN32_WINNT
 14 #define _WIN32_WINNT 0x0500
 15 #endif 
 16 using namespace std;
 17 using namespace apache::thrift;
 18 using namespace apache::thrift::protocol;
 19 using namespace apache::thrift::transport;
 20 
 21 int _tmain(int argc, _TCHAR* argv[])
 22 {
 23     if (argc < 3) 
 24     {
 25         std::cerr << "Invalid arguments!\n" << "Usage: DemoClient host port" << std::endl;
 26         //return -1;
 27     }
 28     boost::shared_ptr<TTransport> socket(new TSocket("192.168.230.133", 55952));//boost::lexical_cast<int>(argv[2])));
 29     boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
 30     boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
 31     ThriftHadoopFileSystemClient client(protocol);
 32     try 
 33     {
 34         transport->open();
 35         Pathname path;
 36         //01_create directory
 37         path.__set_pathname("/user/hadoop");
 38         if(client.exists(path) == true)
 39         {
 40             printf("path is exists.\r\n");
 41         }
 42         else
 43         {
 44             printf("path is not exists.");
 45             //return 0;
 46         }
 47         //02_put file
 48         Pathname filepath;
 49         filepath.__set_pathname("/user/hadoop/in/test1.txt");
 50         /*
 51         FILE* localfile = fopen("E:\\project\\Hadoop\\HdfsDemo\\Debug\\hello.txt","rb");
 52         if (localfile == NULL)
 53         {
 54             transport->close();
 55             return 0;
 56         }
 57  ThriftHandle hdl;  58  client.create(hdl,filepath);  59         while (true)
 60         {
 61             char data[1024];
 62             memset(data,0x00,sizeof(data));
 63             size_t Num = fread(data,1,1024,localfile);
 64             if (Num <= 0)
 65             {
 66                 break;
 67             }
 68  client.write(hdl,data);  69         }
 70         fclose(localfile);
 71  client.close(hdl);  72         */
 73         //03_get file
 74         /*
 75         ThriftHandle hd2;
 76         FileStatus stat1;
 77         client.open(hd2,filepath);
 78         client.stat(stat1,filepath);
 79         int index = 0;
 80         while(true)
 81         {
 82             string data;
 83             if (stat1.length <= index)
 84             {
 85                 break;
 86             }
 87             client.read(data,hd2,index,1024);
 88 
 89             index += data.length();
 90             printf("==%s\r\n",data.c_str());
 91         }
 92         client.close(hd2);
 93         */
 94 
 95         //04_list files
 96         std::vector<FileStatus> vFileStatus;
 97         client.listStatus(vFileStatus,path);
 98         for (int i=0;i<vFileStatus.size();i++)
 99         {
100             printf("i=%d file=%s\r\n",i,vFileStatus[i].path.c_str());
101         }
102         transport->close();
103     } catch (const TException &tx) {
104     std::cerr << "ERROR: " << tx.what() << std::endl;
105     }
106     getchar();
107     return 0;
108 }

 七、源碼分析

  1.文件創建:

 1     /**
 2       * Create a file and open it for writing, delete file if it exists
 3       */
 4     public ThriftHandle createFile(Pathname path, 
 5                                    short mode,
 6                                    boolean  overwrite,
 7                                    int bufferSize,
 8                                    short replication,
 9                                    long blockSize) throws ThriftIOException {
10       try {
11         now = now();
12         HadoopThriftHandler.LOG.debug("create: " + path +
13                                      " permission: " + mode +
14                                      " overwrite: " + overwrite +
15                                      " bufferSize: " + bufferSize +
16                                      " replication: " + replication +
17                                      " blockSize: " + blockSize);
18         FSDataOutputStream out = fs.create(new Path(path.pathname), 
19                                            new FsPermission(mode),
20                                            overwrite,
21                                            bufferSize,
22                                            replication,
23                                            blockSize,
24                                            null); // progress
25         long id = insert(out);
26         ThriftHandle obj = new ThriftHandle(id);
27         HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
28         return obj;
29       } catch (IOException e) {
30         throw new ThriftIOException(e.getMessage());
31       }
32     }

  ThriftHandle的兩端到底是誰呢?是ThriftClient和DataNode?還是ThriftServer與DataNode?

  2.文件寫入

 1     public boolean write(ThriftHandle tout, String data) throws ThriftIOException {
 2       try {
 3         now = now();
 4         HadoopThriftHandler.LOG.debug("write: " + tout.id);
 5         FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
 6         byte[] tmp = data.getBytes("UTF-8");
 7         out.write(tmp, 0, tmp.length);
 8         HadoopThriftHandler.LOG.debug("wrote: " + tout.id);
 9         return true;
10       } catch (IOException e) {
11         throw new ThriftIOException(e.getMessage());
12       }
13     }

  寫入時依賴的還是ThriftHandle?  

  3.文件讀取

 1     /**
 2      * read from a file
 3      */
 4     public String read(ThriftHandle tout, long offset,
 5                        int length) throws ThriftIOException {
 6       try {
 7         now = now();
 8         HadoopThriftHandler.LOG.debug("read: " + tout.id +
 9                                      " offset: " + offset +
10                                      " length: " + length);
11         FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
12         if (in.getPos() != offset) {
13           in.seek(offset);
14         }
15         byte[] tmp = new byte[length];
16         int numbytes = in.read(offset, tmp, 0, length);
17         HadoopThriftHandler.LOG.debug("read done: " + tout.id);
18         return new String(tmp, 0, numbytes, "UTF-8");
19       } catch (IOException e) {
20         throw new ThriftIOException(e.getMessage());
21       }
22     }

 八、遺留問題

  ThriftHandle可以看做是Socket連接句柄,但是他的兩端到底是誰呢?如果是ThriftClient代表的客戶端則一切OK,那么我該如何證明呢?存疑待考!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM