場景:推送過來的數據文件數量很多,並且每個只有10-30M的大小
spark讀取hdfs一般都是用textfile(),但是對於這種情況,如果使用textFile默認產生的分區數將與文件數目一致,產生大量的任務。
對應這種小文件,spark提供了一個特殊的api, wholeTextFiles(), wholeTextFiles主要用於處理大量的小文件,源碼如下:
/** * Read a directory of text files from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI. Each file is read as a single record and returned in a * key-value pair, where the key is the path of each file, the value is the content of each file. * * <p> For example, if you have the following files: * {{{ * hdfs://a-hdfs-path/part-00000 * hdfs://a-hdfs-path/part-00001 * ... * hdfs://a-hdfs-path/part-nnnnn * }}} * * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`, * * <p> then `rdd` contains * {{{ * (a-hdfs-path/part-00000, its content) * (a-hdfs-path/part-00001, its content) * ... * (a-hdfs-path/part-nnnnn, its content) * }}} * * @note Small files are preferred, large file is also allowable, but may cause bad performance. * @note On some filesystems, `.../path/*` can be a more efficient way to read all files * in a directory rather than `.../path/` or `.../path` * @note Partitioning is determined by data locality. This may result in too few partitions * by default. * * @param path Directory to the input data files, the path can be comma separated paths as the * list of inputs. * @param minPartitions A suggestion value of the minimal splitting number for input data. * @return RDD representing tuples of file path and the corresponding file content */ def wholeTextFiles( path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope { assertNotStopped() val job = NewHadoopJob.getInstance(hadoopConfiguration) // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking // comma separated files as input. (see SPARK-7155) NewFileInputFormat.setInputPaths(job, path) val updateConf = job.getConfiguration new WholeTextFileRDD( this, classOf[WholeTextFileInputFormat], classOf[Text], classOf[Text], updateConf, minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path) }
wholeTextFiles讀取文件,輸入參數為路徑,並且可以設置為多個路徑,多個路徑之間以逗號分隔。wholeTextFiles讀取數據會生成一個Tuple2,Tuple2的第一個元素是該文件的完整路徑名,第二個元素表示該文件的文本內容(context)。比如兩行數據:
jack,1011,shanghai
kevin,2022,beijing
返回的文本內容是一行字符串,源數據的每行數據以換行符\n分隔,也即:jack,1011,shanghai\nkevin,2022,beijing
分區數可以自定義,如果不顯示指定,則默認分區數定義如下:
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
也就是在不指定分區的情況下,大部分情況都是以2個分區來處理數據。
樣例代碼:
處理邏輯可以理解為每個小文件對應一個城市的某個區下的所有道路相關的數據(當然了實際數據並不是,哪個城市有幾萬個幾十萬個區)。文件名為區的名字,文件內容為道路的名稱以及相關數據,在每行道路數據上加上區的名字。
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.SparkSession; import org.apache.spark.util.SizeEstimator; import scala.Tuple2; public class TestWholeTextFiles { public static void main(String[] args) { SparkConf conf = new SparkConf(); SparkSession spark = SparkSession .builder() .appName("TestWholeTextFiles") .master("local") .config(conf) .enableHiveSupport() .getOrCreate(); JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaPairRDD<String, String> javaPairRDD = sc.wholeTextFiles("hdfs://master01.xx.xx.cn:8020/kong/capacityLusunData_bak"); System.out.println("javaPairRDD分區數:"+javaPairRDD.getNumPartitions());//2 JavaRDD<String> map = javaPairRDD.map((Function<Tuple2<String, String>, String>) v1 -> { int index = v1._1.lastIndexOf("/"); String road_id = v1._1.substring(index+1).split("\\.")[0]; return v1._2.replace("\n", "\\|"+road_id + "\n"); }); System.out.println("mapRDD分區數:"+map.getNumPartitions());//2 map.saveAsTextFile("hdfs://master01.xx.xx.cn:8020/kong/data/testwholetextfiles/out"); } }
1