創建DataFrame的兩個途徑


轉自:https://www.shiyanlou.com/courses/543/labs/1835/document

https://www.shiyanlou.com/courses/536/labs/1818/document

 

一、從 RDD 創建 DataFrame:

方法一 由反射機制推斷出模式:
Step 1:引用必要的類。
import org.apache.spark.sql._ import sqlContext.implicits._ //idea中此處導入應在sqlContext 創建之后,否則報錯,不知道為什么。。?? // 在使用Spark Shell時,下面這句不是必需的。 
// Spark Shell已默認為你啟用了SQL context,在需要時可直接使用sqlContext。 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
Step 2:創建RDD。
//導入CSV文件並處理逗號分隔的數據 val sfpdRDD = sc.textFile("/home/shiyanlou/SFPD.csv").map(inc => inc.split(","))

 Step 3:定義 case class 。

case class Incidents(incidentnum:String, category:String, description:String, dayofweek:String, date:String, time:String, pddistrict:String, resolution:String, address:String, x:String, y:String, location:String, pdid:String)

Step 4:將 RDD 轉換為含有 case 對象的 RDD 。

val sfpdCase = sfpdRDD.map(inc => Incidents(inc(0), inc(1), inc(2), inc(3), inc(4), inc(5), inc(6), inc(7), inc(8), inc(9), inc(10), inc(11), inc(12)))

Step 5:隱式轉換會將含有 case 對象的 RDD 轉換為 DataFrame ,將 DataFrame 的一些操作和函數應用於這個 DataFrame 中。

val sfpdDF = sfpdCase.toDF()

 

方法二 通過編程方式構建模式:

這種方式適用於列和類型在運行時不可知的情況,我們就需要手動地去構建 DataFrame 的模式。通常 DataFrame 的模式在動態變化時才會使用這種方式。

注意:該方式在 case class 在不能被提前定義時,或者使用 Scala 語言的項目中 case class 超過22個字段時,才會用到。

 Step 1:引入必要的類。

import sqlContext.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

Step 2:由原始 RDD 創建一個 Row RDD 。

val rowRDD = sc.textFile("/home/shiyanlou/data.txt").map(x => x.split(" ")).map( p => Row(p(0), p(2), p(4)))

Step 3:使用 StructType 和 StructField 分別創建模式。其中, StructType 對應於 table (表),StructField 對應於 field (字段)。

val testSchema = StructType(Array(StructField("IncNum", StringType, true), StructField("Date", StringType, true), StructField("District", StringType, true)))

Step 4:使用 SQLContext 提供的方法,將模式應用於 Row RDD 上,以創建 DataFrame。

val testDF = sqlContext.createDataFrame(rowRDD, testSchema) // 將DataFrame注冊為表
testDF.registerTempTable("test") val incs = sql("SELECT * FROM test")

 

二、從數據源創建 DataFrame:

現有的大數據應用通常需要搜集和分析來自不同的數據源的數據。而 DataFrame 支持 JSON 文件、 Parquet 文件、 Hive 表等數據格式。它能從本地文件系統、分布式文件系統(HDFS)、雲存儲(Amazon S3)和外部的關系數據庫系統(通過JDBC,在Spark 1.4版本起開始支持)等地方讀取數據。另外,通過 Spark SQL 的外部數據源 API ,DataFrame 能夠被擴展,以支持第三方的數據格式或數據源。

csv:

主要是 com.databricks_spark-csv_2.11-1.1.0 這個庫,用於支持 CSV 格式文件的讀取和操作。

step 1:

在終端中輸入命令: wget http://labfile.oss.aliyuncs.com/courses/610/spark_csv.tar.gz 下載相關的 jar 包。

將該壓縮文件解壓至 /home/shiyanlou/.ivy2/jars/ 目錄中,確保該目錄含有如圖所示的以下三個 jar 包。

step 2 導入包:

spark-shell --packages com.databricks:spark-csv_2.11:1.1.0

step 3 直接將 CSV 文件讀入為 DataFrame :

val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/home/shiyanlou/1987.csv") // 此處的文件路徑請根據實際情況修改

step 4 根據需要修改字段類型:

def convertColumn(df: org.apache.spark.sql.DataFrame, name:String, newType:String) = { val df_1 = df.withColumnRenamed(name, "swap") df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap") }

//例如
val df_3 = convertColumn(df_2, "ArrDelay", "int")
val df_4 = convertColumn(df_2, "DepDelay", "int")

json:

sqlContext.read.json(filePath)

 

擴展閱讀:

由於數據格式和數據源眾多,這里暫不一一展開講解。在實際應用中,如果需要使用某種格式的數據或者某個數據源,應查詢其官方文檔。通常官方文檔(特別是 API 手冊)都提供了詳細的集成方法和指導。

在 Spark 中,默認的數據源被設定為 Parquet ,所以通用的加載方式為:

sqlContext.load("/home/shiyanlou/data.parquet")

 

如果是其他格式,則需要手動地指定格式:

sqlContext.load("/home/shiyanlou/data", "json")

 

下面給出了其他的加載指定數據源的方法:

  • sqlContext.jdbc:從數據庫表中加載 DataFrame
  • sqlContext.jsonFile:從 JSON 文件中加載 DataFrame
  • sqlContext.jsonRDD:從包含 JSON 對象的 RDD 中加載 DataFrame
  • sqlContext.parquetFile:從 parquet 文件中加載 DataFrame

需要注意的是,在 Spark 1.4 及之后的版本中,加載數據源的方法為:

// 默認格式parquet文件的加載方法,需要給出文件的路徑
sqlContext.read.load("/home/shiyanlou/data.parquet") // 加載其他格式的文件,需要在format方法中指明格式
sqlContext.read.format("json").load("/home/shiyanlou/data.json")

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM