引言
Hadoop提供的HDFS布式文件存儲系統,提供了基於thrift的客戶端訪問支持,但是因為Thrift自身的訪問特點,在高並發的訪問情況下,thrift自身結構可能將會成為HDFS文件存儲系統的一個性能瓶頸。我們先來看一下一不使用Thrfit方式訪問HDFS文件系統的業務流程。
一、HDFS文件讀取流程
流程說明:
- 使用HDFS提供的客戶端開發庫Client,向遠程的Namenode發起RPC請求;
- Namenode會視情況返回文件的部分或者全部block列表,對於每個block,Namenode都會返回有該block拷貝的DataNode地址;
- 客戶端開發庫Client會選取離客戶端最接近的DataNode來讀取block;如果客戶端本身就是DataNode,那么將從本地直接獲取數據.
- 讀取完當前block的數據后,關閉與當前的DataNode連接,並為讀取下一個block尋找最佳的DataNode;
- 當讀完列表的block后,且文件讀取還沒有結束,客戶端開發庫會繼續向Namenode獲取下一批的block列表。
- 讀取完一個block都會進行checksum驗證,如果讀取datanode時出現錯誤,客戶端會通知Namenode,然后再從下一個擁有該block拷貝的datanode繼續讀。
二、HDFS文件寫入流程
流程說明:
- 使用HDFS提供的客戶端開發庫Client,向遠程的Namenode發起RPC請求;
- Namenode會檢查要創建的文件是否已經存在,創建者是否有權限進行操作,成功則會為文件創建一個記錄,否則會讓客戶端拋出異常;
- 當 客戶端開始寫入文件的時候,開發庫會將文件切分成多個packets,並在內部以數據隊列"data queue"的形式管理這些packets,並向Namenode申請新的blocks,獲取用來存儲replicas的合適的datanodes列表, 列表的大小根據在Namenode中對replication的設置而定。
- 開始以pipeline(管道)的形式將packet寫入所 有的replicas中。開發庫把packet以流的方式寫入第一個datanode,該datanode把該packet存儲之后,再將其傳遞給在此 pipeline中的下一個datanode,直到最后一個datanode,這種寫數據的方式呈流水線的形式。
- 最后一個datanode成功存儲之后會返回一個ack packet,在pipeline里傳遞至客戶端,在客戶端的開發庫內部維護着"ack queue",成功收到datanode返回的ack packet后會從"ack queue"移除相應的packet。
- 如 果傳輸過程中,有某個datanode出現了故障,那么當前的pipeline會被關閉,出現故障的datanode會從當前的pipeline中移除, 剩余的block會繼續剩下的datanode中繼續以pipeline的形式傳輸,同時Namenode會分配一個新的datanode,保持 replicas設定的數量。
三、關鍵詞
HDFSClient通過文件IO操作最終實現是通過直接訪問DataNode進行。
四、Thrift的訪問流程:猜測版
流程說明:
1.ThriftClient客戶端將操作命令傳給ThriftServer。
2.ThriftServer調用HDFSClient接口API實現HDFS讀寫操作,操作流程如二和三所示。
五、疑問
與DataNode發生數據交換的到底是ThriftServer還是ThriftClient,如果是ThriftServer,那么多個ThriftClient並行訪問時,ThriftServer必將成為HDFS訪問的性能瓶頸;如果是ThriftClient直接訪問DataNode,那么理論依據何在呢?
六、示例程序
下面是一個基於Thrift實現的HDFS客戶端程序,實現了文件的訪問和創建和讀取
1 // HdfsDemo.cpp : Defines the entry point for the console application. 2 // 3 4 #include "stdafx.h" 5 #include <iostream> 6 #include <string> 7 #include <boost/lexical_cast.hpp> 8 #include <protocol/TBinaryProtocol.h> 9 #include <transport/TSocket.h> 10 #include <transport/TTransportUtils.h> 11 #include "ThriftHadoopFileSystem.h" 12 13 #ifndef _WIN32_WINNT 14 #define _WIN32_WINNT 0x0500 15 #endif 16 using namespace std; 17 using namespace apache::thrift; 18 using namespace apache::thrift::protocol; 19 using namespace apache::thrift::transport; 20 21 int _tmain(int argc, _TCHAR* argv[]) 22 { 23 if (argc < 3) 24 { 25 std::cerr << "Invalid arguments!\n" << "Usage: DemoClient host port" << std::endl; 26 //return -1; 27 } 28 boost::shared_ptr<TTransport> socket(new TSocket("192.168.230.133", 55952));//boost::lexical_cast<int>(argv[2]))); 29 boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket)); 30 boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); 31 ThriftHadoopFileSystemClient client(protocol); 32 try 33 { 34 transport->open(); 35 Pathname path; 36 //01_create directory 37 path.__set_pathname("/user/hadoop"); 38 if(client.exists(path) == true) 39 { 40 printf("path is exists.\r\n"); 41 } 42 else 43 { 44 printf("path is not exists."); 45 //return 0; 46 } 47 //02_put file 48 Pathname filepath; 49 filepath.__set_pathname("/user/hadoop/in/test1.txt"); 50 /* 51 FILE* localfile = fopen("E:\\project\\Hadoop\\HdfsDemo\\Debug\\hello.txt","rb"); 52 if (localfile == NULL) 53 { 54 transport->close(); 55 return 0; 56 } 57 ThriftHandle hdl; 58 client.create(hdl,filepath); 59 while (true) 60 { 61 char data[1024]; 62 memset(data,0x00,sizeof(data)); 63 size_t Num = fread(data,1,1024,localfile); 64 if (Num <= 0) 65 { 66 break; 67 } 68 client.write(hdl,data); 69 } 70 fclose(localfile); 71 client.close(hdl); 72 */ 73 //03_get file 74 /* 75 ThriftHandle hd2; 76 FileStatus stat1; 77 client.open(hd2,filepath); 78 client.stat(stat1,filepath); 79 int index = 0; 80 while(true) 81 { 82 string data; 83 if (stat1.length <= index) 84 { 85 break; 86 } 87 client.read(data,hd2,index,1024); 88 89 index += data.length(); 90 printf("==%s\r\n",data.c_str()); 91 } 92 client.close(hd2); 93 */ 94 95 //04_list files 96 std::vector<FileStatus> vFileStatus; 97 client.listStatus(vFileStatus,path); 98 for (int i=0;i<vFileStatus.size();i++) 99 { 100 printf("i=%d file=%s\r\n",i,vFileStatus[i].path.c_str()); 101 } 102 transport->close(); 103 } catch (const TException &tx) { 104 std::cerr << "ERROR: " << tx.what() << std::endl; 105 } 106 getchar(); 107 return 0; 108 }
七、源碼分析
1.文件創建:
1 /** 2 * Create a file and open it for writing, delete file if it exists 3 */ 4 public ThriftHandle createFile(Pathname path, 5 short mode, 6 boolean overwrite, 7 int bufferSize, 8 short replication, 9 long blockSize) throws ThriftIOException { 10 try { 11 now = now(); 12 HadoopThriftHandler.LOG.debug("create: " + path + 13 " permission: " + mode + 14 " overwrite: " + overwrite + 15 " bufferSize: " + bufferSize + 16 " replication: " + replication + 17 " blockSize: " + blockSize); 18 FSDataOutputStream out = fs.create(new Path(path.pathname), 19 new FsPermission(mode), 20 overwrite, 21 bufferSize, 22 replication, 23 blockSize, 24 null); // progress 25 long id = insert(out); 26 ThriftHandle obj = new ThriftHandle(id); 27 HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id); 28 return obj; 29 } catch (IOException e) { 30 throw new ThriftIOException(e.getMessage()); 31 } 32 }
ThriftHandle的兩端到底是誰呢?是ThriftClient和DataNode?還是ThriftServer與DataNode?
2.文件寫入
1 public boolean write(ThriftHandle tout, String data) throws ThriftIOException { 2 try { 3 now = now(); 4 HadoopThriftHandler.LOG.debug("write: " + tout.id); 5 FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id); 6 byte[] tmp = data.getBytes("UTF-8"); 7 out.write(tmp, 0, tmp.length); 8 HadoopThriftHandler.LOG.debug("wrote: " + tout.id); 9 return true; 10 } catch (IOException e) { 11 throw new ThriftIOException(e.getMessage()); 12 } 13 }
寫入時依賴的還是ThriftHandle?
3.文件讀取
1 /** 2 * read from a file 3 */ 4 public String read(ThriftHandle tout, long offset, 5 int length) throws ThriftIOException { 6 try { 7 now = now(); 8 HadoopThriftHandler.LOG.debug("read: " + tout.id + 9 " offset: " + offset + 10 " length: " + length); 11 FSDataInputStream in = (FSDataInputStream)lookup(tout.id); 12 if (in.getPos() != offset) { 13 in.seek(offset); 14 } 15 byte[] tmp = new byte[length]; 16 int numbytes = in.read(offset, tmp, 0, length); 17 HadoopThriftHandler.LOG.debug("read done: " + tout.id); 18 return new String(tmp, 0, numbytes, "UTF-8"); 19 } catch (IOException e) { 20 throw new ThriftIOException(e.getMessage()); 21 } 22 }
八、遺留問題
ThriftHandle可以看做是Socket連接句柄,但是他的兩端到底是誰呢?如果是ThriftClient代表的客戶端則一切OK,那么我該如何證明呢?存疑待考!