轉自:https://www.shiyanlou.com/courses/543/labs/1835/document
https://www.shiyanlou.com/courses/536/labs/1818/document
一、從 RDD 創建 DataFrame:
方法一 由反射機制推斷出模式:
Step 1:引用必要的類。
import org.apache.spark.sql._ import sqlContext.implicits._ //idea中此處導入應在sqlContext 創建之后,否則報錯,不知道為什么。。?? // 在使用Spark Shell時,下面這句不是必需的。
// Spark Shell已默認為你啟用了SQL context,在需要時可直接使用sqlContext。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
Step 2:創建RDD。
//導入CSV文件並處理逗號分隔的數據 val sfpdRDD = sc.textFile("/home/shiyanlou/SFPD.csv").map(inc => inc.split(","))
Step 3:定義 case class 。
case class Incidents(incidentnum:String, category:String, description:String, dayofweek:String, date:String, time:String, pddistrict:String, resolution:String, address:String, x:String, y:String, location:String, pdid:String)
Step 4:將 RDD 轉換為含有 case 對象的 RDD 。
val sfpdCase = sfpdRDD.map(inc => Incidents(inc(0), inc(1), inc(2), inc(3), inc(4), inc(5), inc(6), inc(7), inc(8), inc(9), inc(10), inc(11), inc(12)))
Step 5:隱式轉換會將含有 case 對象的 RDD 轉換為 DataFrame ,將 DataFrame 的一些操作和函數應用於這個 DataFrame 中。
val sfpdDF = sfpdCase.toDF()
方法二 通過編程方式構建模式:
這種方式適用於列和類型在運行時不可知的情況,我們就需要手動地去構建 DataFrame 的模式。通常 DataFrame 的模式在動態變化時才會使用這種方式。
注意:該方式在 case class 在不能被提前定義時,或者使用 Scala 語言的項目中 case class 超過22個字段時,才會用到。
Step 1:引入必要的類。
import sqlContext.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
Step 2:由原始 RDD 創建一個 Row RDD 。
val rowRDD = sc.textFile("/home/shiyanlou/data.txt").map(x => x.split(" ")).map( p => Row(p(0), p(2), p(4)))
Step 3:使用 StructType 和 StructField 分別創建模式。其中, StructType 對應於 table (表),StructField 對應於 field (字段)。
val testSchema = StructType(Array(StructField("IncNum", StringType, true), StructField("Date", StringType, true), StructField("District", StringType, true)))
Step 4:使用 SQLContext 提供的方法,將模式應用於 Row RDD 上,以創建 DataFrame。
val testDF = sqlContext.createDataFrame(rowRDD, testSchema) // 將DataFrame注冊為表
testDF.registerTempTable("test") val incs = sql("SELECT * FROM test")
二、從數據源創建 DataFrame:
現有的大數據應用通常需要搜集和分析來自不同的數據源的數據。而 DataFrame 支持 JSON 文件、 Parquet 文件、 Hive 表等數據格式。它能從本地文件系統、分布式文件系統(HDFS)、雲存儲(Amazon S3)和外部的關系數據庫系統(通過JDBC,在Spark 1.4版本起開始支持)等地方讀取數據。另外,通過 Spark SQL 的外部數據源 API ,DataFrame 能夠被擴展,以支持第三方的數據格式或數據源。
csv:
主要是 com.databricks_spark-csv_2.11-1.1.0
這個庫,用於支持 CSV 格式文件的讀取和操作。
step 1:
在終端中輸入命令: wget http://labfile.oss.aliyuncs.com/courses/610/spark_csv.tar.gz
下載相關的 jar 包。
將該壓縮文件解壓至 /home/shiyanlou/.ivy2/jars/
目錄中,確保該目錄含有如圖所示的以下三個 jar 包。
step 2 導入包:
spark-shell --packages com.databricks:spark-csv_2.11:1.1.0
step 3 直接將 CSV 文件讀入為 DataFrame :
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/home/shiyanlou/1987.csv") // 此處的文件路徑請根據實際情況修改
step 4 根據需要修改字段類型:
def convertColumn(df: org.apache.spark.sql.DataFrame, name:String, newType:String) = { val df_1 = df.withColumnRenamed(name, "swap") df_1.withColumn(name, df_1.col("swap").cast(newType)).drop("swap") }
//例如
val df_3 = convertColumn(df_2, "ArrDelay", "int")
val df_4 = convertColumn(df_2, "DepDelay", "int")
json:
sqlContext.read.json(filePath)
擴展閱讀:
CSV
:逗號分隔值( Comma-Separated Values ),其文件以純文本形式存儲表格數據。請參考:http://commons.apache.org/proper/commons-csv/ 。Apache Avro
: 一個數據序列化的系統,相當於基於二進制數據傳輸高性能的中間件。請參考:https://avro.apache.org/docs/current/ 。Elasticsearch
:一個基於 Lucene 的搜索服務器。提供了一個基於 RESTful web 接口的分布式全文搜索引擎。請參考:https://www.elastic.co/products/elasticsearch/ 。Apache Cassandra
:一套開源分布式NoSQL數據庫系統。請參考:http://cassandra.apache.org/ 。
由於數據格式和數據源眾多,這里暫不一一展開講解。在實際應用中,如果需要使用某種格式的數據或者某個數據源,應查詢其官方文檔。通常官方文檔(特別是 API 手冊)都提供了詳細的集成方法和指導。
在 Spark 中,默認的數據源被設定為 Parquet ,所以通用的加載方式為:
sqlContext.load("/home/shiyanlou/data.parquet")
如果是其他格式,則需要手動地指定格式:
sqlContext.load("/home/shiyanlou/data", "json")
下面給出了其他的加載指定數據源的方法:
sqlContext.jdbc
:從數據庫表中加載 DataFramesqlContext.jsonFile
:從 JSON 文件中加載 DataFramesqlContext.jsonRDD
:從包含 JSON 對象的 RDD 中加載 DataFramesqlContext.parquetFile
:從 parquet 文件中加載 DataFrame
需要注意的是,在 Spark 1.4 及之后的版本中,加載數據源的方法為:
// 默認格式parquet文件的加載方法,需要給出文件的路徑
sqlContext.read.load("/home/shiyanlou/data.parquet") // 加載其他格式的文件,需要在format方法中指明格式
sqlContext.read.format("json").load("/home/shiyanlou/data.json")