Spark提供了HDFS上一般的文件文件讀取接口 sc.textFile(),但在某些情況下HDFS中需要存儲自定義格式的文件,需要更加靈活的讀取方式。
使用KeyValueTextInputFormat
Hadoop的MapReduce框架下提供了一些InputFormat的實現,其中MapReduce2的接口(org.apache.hadoop.mapreduce下)與先前MapReduce1(org.apache.hadoop.mapred下)有區別,對應於newAPIHadoopFile函數。
使用KeyValueTextInputFormat的文件讀取如下
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat
import org.apache.hadoop.io.Text
val hFile = sc.newAPIHadoopFile("hdfs://hadoopmaster:9000/user/sparkl/README.md",
classOf[KeyValueTextInputFormat], classOf[Text], classOf[Text])
hFile.collect
使用自定義InputFormat
InputFormat是MapReduce框架下將輸入的文件解析成字符串的組件,Spark對HDFS中的文件實現自定義讀寫需要通過InputFormat的子類實現。下面只寫簡單的思路,具體的可以參考InputFormat和MapReduce相關資料。
InputFormat的修改可以參考TextInputFormat,繼承FileInputFormat后,重載createRecordReader返回一個新的繼承RecordReader的類,通過新的RecordReader讀取數據返回鍵值對。
打包后注意上傳時將jar包一起上傳:
`./spark-shell –jars newInputFormat.jar
運行的代碼和上面差不多,import相關的包后
val hFile = sc.newAPIHadoopFile("hdfs://hadoopmaster:9000/user/sparkl/README.md",
classOf[NewTextInputFormat], classOf[Text], classOf[Text])
一些坑
序列化問題
在讀取文件后使用first或者collect時,出現下面的錯誤
ERROR scheduler.TaskSetManager: Task 0.0 in stage 2.0 (TID 18) had a not serializable result: org.apache.hadoop.io.IntWritable Serialization stack: - object not serializable (class: org.apache.hadoop.io.IntWritable, value: 35) - element of array (index: 0) - array (class [Lorg.apache.hadoop.io.IntWritable;, size 1); not retrying 18/12/15 10:40:10 ERROR scheduler.TaskSetManager: Task 2.0 in stage 2.0 (TID 21) had a not serializable result: org.apache.hadoop.io.IntWritable Serialization stack: - object not serializable (class: org.apache.hadoop.io.IntWritable, value: 35) - element of array (index: 0) - array (class [Lorg.apache.hadoop.io.IntWritable;, size 1); not retrying
當鍵值對是其它的類型時,還可能出現類似的:
ERROR scheduler.TaskSetManager: Task 0.0 in stage 2.0 (TID 18) had a not serializable result: org.apache.hadoop.io.LongWritable ERROR scheduler.TaskSetManager: Task 0.0 in stage 2.0 (TID 18) had a not serializable result: org.apache.hadoop.io.Text
此問題略奇怪,都實現了Hadoop的Writable接口,卻不能被序列化。某些地方提到Hadoop與Spark沒有使用同一套序列化機制,需要在Spark的序列化框架下注冊才能使用。
一般更建議在drive程序上收集信息時,首先轉換成基本的數據類型:
hFile.filter(k => k._1.toString.contains(“a”)).collect
java.lang.IllegalStateException: unread block data
ERROR executor.Executor: Exception in task 0.3 in stage 0.0 (TID 3) java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
一個很坑的錯誤,spark-shell下只出現這個,並未表明真正的錯誤在哪。在spark的webUI上能夠看到相關的運行日志,上面的異常前還有一個異常寫的是我重寫的InputSplit沒有實現Writable接口。此處的坑,InputFormat中用的InputSplit如果需要重寫需要實現Writable接口,在MapReduce下使用貌似沒有這一要求。
補上之后上傳到集群的nodemanager即可。注意,當nodemanager和spark-shell上傳的jar包中有相同的類時,nodemanager優先使用了自身的類。