要完成用SPARK将hadoop的文件数据转换为hive的表。首先,要安装好hadoop,hive,spark;其次,文本数据是结构化的文本,可以直接映射到表的如csv格式的。
我们的文本数据集由五个字段组成的,用tab符号隔开,存放在hadoop的hdfs:///data/source/tmpdataset.txt目录下。
在hive新建一张要存放导入数据的表,hive用的版本是1.2.1版本的。sql建表语句如下:
CREATE TABLE tmp_table (field1 VARCHAR(32), field2 VARCHAR(32), field3 VARCHAR(64), field4 VARCHAR(32), field5 VARCHAR(32)) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS PARQUET;
启动spark的spark-shell;spark用的是spark2.0的版本。
首先,获取hdfs:///data/source/tmpdataset.txt的数据集
val TmpDataList = spark.sparkContext.textFile("hdfs:///data/source/tmpdataset.txt")
创建DataFrames所用到的映射schema
val schemaString = "field1,field2,field3,field4,field5" import org.apache.spark.sql.types._ val fields = schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields)
将TmpDataList的数据集映射成RDD的格式
val TmpDataListRDD = TmpDataList.map(_.split("\t")).map(p => Row(p(0),p(1),p(2),p(3),p(4)))
创建数据集的DataFrames格式
val TmpDataListDF = spark.createDataFrame(TmpDataListRDD, schema)
将数据集的DataFrames格式映射到临时表
TmpDataListRDD.createOrReplaceTempView("TmpData")
用sql语句将临时表的数据导入hive的tmp_table表中
sql("insert overwrite table tmp_table select * from TmpData")
这样就将数据导入到hive中。为什么要用spark去将hadoop的数据转换为hive的格式?当你所拥有的数据集非常大,而且又是结构化的时候。用hive直接导入是非常慢的,用spark导入非常快。
这里讲了如何导入数据,以后可能会讲到如何用spark的ml和mllib里面的一些类去清理数据,对特征向量进行清洗和转换;同时,用里面的机器学习算法去测试一下数据集。