PySpark關於HDFS文件(目錄)輸入、數據格式的探討


背景
 
平台HDFS數據存儲規則是按照“數據集/天目錄/小時目錄/若干文件”進行的,其中數據集是依據產品線或業務划分的。
 
用戶分析數據時,可能需要處理以下五個場景:
 
(一)分析指定數據集、指定日期、指定小時、指定文件的數據;
(二)分析指定數據集、指定日期、指定小時的數據;
(三)分析指定數據集、指定日期的數據(24個小時目錄的數據);
(四)分析多個數據集、多個日期或多個小時的數據;
(五)多種存儲格式(textfile、sequencefile、rcfile等)。
 
目前我們平台提供給用戶的分析工具為PySpark(Spark、Spark SQL、Python),本文討論的就是使用PySpark如果應對上述場景。
 
示例
 
假設HDFS存在一個wordcount目錄,包含三個子目錄:data1、data2、data3,
 
 
這三個子目錄下均含有一個文本文件words,各自的內容如下:
 
 
解決方案
 
這里我們暫時只考慮文本文件。
 
1. 分析指定數據集、指定日期、指定小時、指定文件的數據;
 
2. 分析指定數據集、指定日期、指定小時的數據;
 
SparkContext textFile默認情況下可以接收一個文本文件路徑或者僅僅包含文本文件的目錄,使用該方法可以應對場景(一)、(二)。
 
(1)分析指定文本文件的數據,這里我們假設待分析的數據為“/user/hdfs/yurun/wordcount/data1/words”:
 
 
可以得到如下輸出:
 
 
(2)分析指定目錄的數據,且該目錄僅包含文本文件,這里我們假設待分析的數據為“/user/hdfs/yurun/wordcount/data1/”:
 
 
運行上述代碼會得到與(1)相同的結果。
 
3. 分析指定數據集、指定日期的數據(24個小時目錄的數據);
 
針對我們的示例,也就是需要“遞歸”分析wordcount目錄下三個子目錄data1、data2、data3中的數據。我們嘗試將上面示例中的輸入路徑修改為“/user/hdfs/yurun/wordcount/”,
 
 
運行上述代碼會出現異常:
 
 
根據1、2、3中的示例,如果我們指定的輸入路徑是一個目錄,而這個目錄中存在子目錄就會出現上述異常。究其原因,實際上與FileInputFormat(org.apache.hadoop.mapred.FileInputFormat)的“某個”配置屬性值有關。
 
注意:源碼中有兩個模塊中都包含這個類,
 
 
這里使用的是模塊hadoop-mapreduce-client中的類。
 
根據拋出的異常信息,我們可以在FileInputFormat的源碼中找到如下代碼塊:
 
 
可以看出,如果files中的任何一個FileStatus實例(由file表示)為目錄,便會引發“Not a file”的異常,而files來源於方法listStatus,
 
 
我們需要注意兩個很重要的變量:
 
(1)dirs
 
dirs為我們指定的輸入文件或目錄,它是由getInputPaths計算而來的,
 
 
可以看出,我們通過SparkContext textfile指定的輸入路徑實際是保存在Configuration中的,以屬性FileInputFormat.INPUT_DIR(mapreduce.input.fileinputformat.inputdir)表示,從代碼邏輯可知,它的值可以為多個以“,”分隔的字符串。
 
與就是說,理論上我們是可以指定多個輸入文件或目錄的(SparkContext textfile僅支持單個文件路徑或目錄路徑)。
 
(2)recursive
 
如果我們指定的是一個目錄路徑,recursive表示着是否允許在后續的切片計算過程中“遞歸”處理該路徑中的子目錄,它的值由屬性INPUT_DIR_RECURSIVE(mapreduce.input.fileinputformat.input.dir.recursive)決定,默認值為false。
 
也就是說,默認情況下,FileInputFormat是不會以“遞歸”的形式處理指定目錄中的子目錄的,這也是引發上述異常的根本原因。
 
如果我們需要處理“遞歸”目錄的場景,可以采用下述兩個方法:
 
(1)在Hadoop的配置文件mapred-site.xml中添加屬性mapreduce.input.fileinputformat.input.dir.recursive,並指定值為true;
 
 
僅僅需要在Spark Client(提交Spark Application的機器)相關機器上操作即可,不需要修改Hadoop集群配置。
 
修改配置文件后,再次提交上述程序,即可正常執行。
 
(2)使用hadoopRDD;
 
 
這種方式不需要修改配置文件,而是在代碼中通過Hadoop Configuration(hadoopConf)直接指定相關屬性:
 
mapreduce.input.fileinputformat.inputdir:hdfs://dip.cdh5.dev:8020/user/hdfs/yurun/wordcount/;
mapreduce.input.fileinputformat.input.dir.recursive:true;
 
此外還需要注意hadoopRDD的幾個參數:
 
inputFormatClass:org.apache.hadoop.mapred.TextInputFormat
keyClass:org.apache.hadoop.io.LongWritable
valueClass:org.apache.hadoop.io.Text
 
這僅僅是針對textfile設置的參數值,對於其它的數據格式會有所不同,后面會討論。
 
使用hadoopRDD的運行結果會有所不同:
 
 
這是因為SparkContext textfile省略了TextInputFormat中的“key”,它表示每一行文本在各自文件中的起始偏移量。
 
4. 分析多個數據集、多個日期或多個小時的數據;
 
這種場景要求我們能夠指定多個目錄或文件,其中還可能需要“遞歸”處理子目錄,SparkContext textfile只能接收一個目錄或文件,此時我們只能使用hadoopRDD。
 
前面提到過,“mapreduce.input.fileinputformat.inputdir”可以以“,”分隔的形式接收多個目錄或文件路徑。假設我們需要分析的數據為“hdfs://dip.cdh5.dev:8020/user/hdfs/yurun/wordcount/data1”、“hdfs://dip.cdh5.dev:8020/user/hdfs/yurun/wordcount/data2”、“hdfs://dip.cdh5.dev:8020/user/hdfs/yurun/wordcount/data3/words”,代碼示例如下:
 
 
運行結果同上。
 
5. 多種存儲格式;
 
1-4的討論僅僅局限於textfile,textfile易於人的閱讀和分析,但存儲開銷很大,即使采用相應的壓縮,效果也並是很理想。在實踐中我們發現rcfile采用列式壓縮效果顯著,因此也需要考慮如何使得PySpark支持rcfile。
 
為什么這個地方要有專門的“考慮”?
 
簡單來講,Hadoop是使用Java構建的,Spark是使用Scala構建的,而我們現在使用的開發語言為Python,這就帶來一個問題:Java/Scala中的數據類型如何轉換為相應的Python數據類型?
 
如TextInputFormat返回的鍵值對類型為LongWritable、Text,可以被“自動”轉換為Python中的int、str(基本數據類型均可以被“自動”轉換),RCFileInputFormat返回的鍵值對類型為LongWritable、BytesRefArrayWritable,BytesRefArrayWritable不是基本數據類型,它應該如何被轉換呢?
 
我們查看SparkContext hadoopRDD的文檔可知,
 
 
keyConverter、valueConverter就是用來負責完成鍵值類型的轉換的。
 
假設我們有一個RCFile格式的文件:
 
 
RCFileInputFormat的鍵類型為LongWritable,可以自動被轉換;
RCFileInputFormat的值類型為BytesRefArrayWritable,無法被自動轉換,需要一個Converter,這里我們把每一個BytesRefArrayWritable實例轉換為一個Text實例,其中三列數據以空格分隔。
 
我們將Converter定義為BytesRefArrayWritableToStringConverter(com.sina.dip.spark.converter.BytesRefArrayWritableToStringConverter),代碼如下:
 
 
其實Converter的邏輯非常簡單,就是將BytesRefArrayWritable中的數據提取、轉換為基本數據類型Text。
 
將上述代碼編譯打包為converter.jar。
 
PySpark代碼如下:
 
 
重點注意幾個參數值:
 
mapreduce.input.fileinputformat.inputdir:hdfs://dip.cdh5.dev:8020/user/hdfs/yurun/rcfile/data
inputFormatClass:org.apache.hadoop.hive.ql.io.RCFileInputFormat
keyClass:org.apache.hadoop.io.LongWritable
valueClass:org.apache.hadoop.io.Text
valueConverter:com.sina.dip.spark.converter.BytesRefArrayWritableToStringConverter
 
執行命令:
 
 
結果輸出:
 
 
三行數據,每行數據均為字符串輸出,且以空格分隔,可見數據得到正常轉換。
 
通過上述方式,我們可以通過SparkContext hadoopRDD支持多種數據格式數據的分析。
 
總結
 
本文通過五種常見應用場景的討論,可以得出使用PySpark可以支持靈活的數據輸入路徑,還可以根據需求擴展支持多種數據格式。
 
 
 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM