1.前言
hdfs , Hadoop Distributed File System。Hadoop的分布式文件系統,安全行和擴展性沒得說。
訪問HDFS的方式有以下幾種:
- 命令行方式:FS Shell
- 編程方式:FileSystem Java API,libhdfs(c語言)
- REST API : WebHDFS, HttpFs
- 把HDFS mount成本地文件目錄
使用python訪問HDFS比較容易:
- python 封裝FS Shell, 這種方式本地必須部署了Hadoop 的 FS Shell。
- 使用python調用libhdfs,如果善於python和C語言可以采用此種方式,libhdfs是標准API的子集,有一些功能不能實現,網上有人說libhdfs比較坑
- python 封裝REST API,支持windows環境
- 已有模塊phdfs (封裝了WebHDFS),支持windows環境,類似的python模塊還有HdfsCLI、webpyhdfs,pyhdfs
- snakebite,純python hdfs client,使用了protobuf 和 hadoop rpc。
這里主要介紹使用hdfs 訪問HDFS,支持python 2.7和 python 3
文檔地址: hdfs 2.1.0
2.環境建立
- Hadoop 使用已有的CDH 5.6.1 環境
- 安裝 hdfs,最新版本是0.2.1。
pip install hdfs
3. ipython, 可選但強烈建議。
3.示例代碼
- 創建目錄並寫文件
#!c:\python27\python.exe # -*- coding: UTF-8 -*- import traceback from hdfs import InsecureClient import time import sys from numpy import true_divide reload(sys) sys.setdefaultencoding("utf-8") try: root_path = "/" #獲取客戶端,並且指定登陸用戶,如果使用Client創建客戶端就不能指定用戶了 c = InsecureClient(url="http://172.16.21.22:50070",user='hdfs',root=root_path) #創建目錄 c.makedirs('/user/root/pyhdfs') #寫文件 #第一個參數:文件路徑 #第二個參數:文件內容 #第三個參數:是否覆蓋已有的文件,如果不覆蓋,且文件已經存在會拋出異常 c.write('/user/root/pyhdfs/1.log',time.asctime(time.localtime(time.time())) + '\n',True) #下載文件 #第一個參數,hdfs路徑 #第二個參數,本地路徑 #第三個參數,是否覆蓋本地文件 c.download('/user/root/pyhdfs/1.log', '.', True) #上傳文件 #第一個參數,hdfs路徑 #第二個參數,本地路徑 #第三個參數,是否覆蓋hdfs已有的文件 c.upload('/user/root/pyhdfs/', './pyhdfs_example.py', True) #獲取目錄下的的文件列表,第一個參數:hdfs路徑,第二個參數是否獲取文件的狀態數據 #另外pyhdfs 有一個walk函數 c.walk(hdfs_path, depth, status) #用法和os.walk類似,遍歷目錄非常好用 hdfs_files = c.list('/user/root/pyhdfs', True) for f in hdfs_files: print f #輸出結果如下 #(u'1.log', {u'group': u'supergroup', u'permission': u'755', u'blockSize': 134217728, u'accessTime': 1519459768533L, u'pathSuffix': u'1.log', u'modificationTime': 1519459768569L, u'replication': 1, u'length': 25, u'childrenNum': 0, u'owner': u'hdfs', u'storagePolicy': 0, u'type': u'FILE', u'fileId': 33037}) #(u'pyhdfs_example.py', {u'group': u'supergroup', u'permission': u'755', u'blockSize': 134217728, u'accessTime': 1519459768683L, u'pathSuffix': u'pyhdfs_example.py', u'modificationTime': 1519459768711L, u'replication': 1, u'length': 1624, u'childrenNum': 0, u'owner': u'hdfs', u'storagePolicy': 0, u'type': u'FILE', u'fileId': 33038}) #返回文件的summary信息,算法是MD5-of-0MD5-of-512CRC32C #可以使用 #返回結果:{u'spaceConsumed': 2567, u'quota': -1, u'spaceQuota': -1, u'length': 2567, u'directoryCount': 0, u'fileCount': 1} print c.content('/user/root/pyhdfs/pyhdfs_example.py') #返回的是md5的checksum #結果:{u'length': 28, u'bytes': u'0000020000000000000000005506d1dae3da4073662038f2392c08b700000000', u'algorithm': u'MD5-of-0MD5-of-512CRC32C'} #本地文件的checksum生成:hadoop fs -checksum /abc.txt print c.checksum('/user/root/pyhdfs/pyhdfs_example.py') #刪除文件,第一個參數hdfs路徑,第二參數,是否遞歸刪除 c.delete('/user/root/pyhdfs/', True) except Exception , ex: print traceback.format_exc()
4.其他hdfs Client api
- resolve(hdfs_path) , 返回hdfs絕對路徑
- status(hdfs_path,strict=True) 返回文件或者目錄的狀態
- acl_status(hdfs_path, strict=True), 返回acl的狀態
- set_acl(hdfs_path, acl_spec, stict=True), 設置acl
- parts(hdfs_path, parts=None, status=False) ?
- read(hdfs_path, offset=0, length=None, buffer_size=None)
- rename(hdfs_src_path, hdfs_dest_path), 重命名
- set_owner(hdfs_paht, owner=None, group=None)
- set_permission(hdfs_path, permission)
- set_times(hdfs_path, access_time=None, modification_time=None)
- set_replication(hdfs_path, replication), 設置文件的副本數量, 只能設置文件,不能設置目錄,否則會由異常
- walk(hdfs_path, depth=0, status=False), 類似 os.walk
上面都是基於 HDFS WEBAPI 進行的操作,記錄一下。
Reference:
https://zhuanlan.zhihu.com/p/33983161 python訪問HDFS