Spark讀取文本文件並轉換為DataFrame


本文將介紹spark讀取多列txt文件后轉成DataFrame的兩種方法。

數據是Spark中自帶的:sample_movielens_ratings.txt

//形式如下面所示
0::2::3::1424380312
0::3::1::1424380312
0::5::2::1424380312
0::9::4::1424380312
0::11::1::1424380312
0::12::2::1424380312
0::15::1::1424380312
0::17::1::1424380312
0::19::1::1424380312
0::21::1::1424380312
0::23::1::1424380312

 

一、通過反射機制將RDD轉為DataFrame

  Scala由於其具有隱式轉換的特性,所以Spark SQL的Scala接口,是支持自動將包含了case class的RDD轉換為DataFrame的。case class就定義了元數據。Spark SQL會通過反射讀取傳遞給case class的參數的名稱,然后將其作為列名。

import org.apache.spark.ml.linalg.Vectors
import spark.implicits._ 

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)

val rdd = sc.textFile("/data/mllib/als/sample_movielens_ratings.txt")
def parseRating(str: String): Rating = {
  val fields = str.split("::")
  assert(fields.size == 4)
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

val ratings = spark.read.textFile("/data/mllib/als/sample_movielens_ratings.txt")
  .map(parseRating)
  .toDF()
ratings.printSchema
ratings.show()

二、通過動態編程的方式將RDD轉為DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val rdd = sc.textFile("/data/mllib/als/sample_movielens_ratings.txt")

 val schema = StructType(Array(
    StructField("userId", IntegerType, true),
    StructField("movieId", IntegerType, true),
    StructField("rating", FloatType, true),
    StructField("timestamp", LongType, true)
))

// 對每一行的數據進行處理
val rowRDD = rdd.map(_.split("::")).map(p => Row(p(0).toInt,p(1).toInt,p(2).toFloat,p(3).toLong))
val data = spark.createDataFrame(rowRDD, schema)
data.printSchema
data.createOrReplaceTempView("test")
spark.sql("select *from test").show()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM