Spark創建DataFrame的不同方式
本文介紹了使用Scala示例在Spark中創建DataFrame(createDataFrame)的不同方法。
首先,讓我們導入Spark需要的隱式函數,如.toDF()函數,並為示例創建數據。
import spark.implicits._
val columns = Seq("language", "users_count")
val data = Seq(("Java", "20000"), ("Python", "10000"), ("Scala", "30000"))
1. Create Spark DataFrame from RDD
首先,調用SparkContext中的parallelize()函數從集合Seq創建RDD。對於下面的所有示例,都需要這個rdd對象。
val rdd = spark.SparkContext.parallelize(data)
1. a) 使用toDF()函數
一旦創建了一個RDD,可以使用toDF()來創建一個DataFrame。默認情況下,假如數據集每一行有兩列,創建的DF時候的列名就是"_1"和"_2"。
val dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
toDF()具有另一個簽名,該簽名自定義列名稱參數,如下所示:
val dfFromRDD1 = rdd.toDF("language", "users_count")
dfFromRDD1.printSchema()
root
|-- language: string (nullable = true)
|-- users: string (nullable = true)
默認情況下,這些列的數據類型是通過推斷列的數據類型來判斷的。我們可以通過提供模式來更改此行為,我們可以在其中為每個字段/列指定列名,數據類型和可為空。
1.b) 使用SparkSession的creatDataFrame()函數
使用SparkSession中的createDataFrame()是另一種創建方法,它以rdd對象作為參數。使用toDF()來指定列的名稱。
dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)
1.c)對行類型使用createDataFrame()
createDataFrame()有另一個簽名,它將列名的RDD[Row]類型和模式作為參數。首先,我們需要將rdd對象從RDD[T]轉換為RDD[Row]類型。
val schema = StructType(columns.map(fieldName => StructField(fieldName, StringType, nullable = true)))
val rowRDD = rdd.map(attributes => Row(attributes._1, attributs._2))
val dfFromRDD3 = spark.createDataFrame(rowRdd.schema)
2. 從List和Seq集合中創建Spark DataFrame
在本節中,我們將看到從集合Seq[T]或List[T]創建Spark DataFrame的幾種方法。這些示例與我們上面的RDD部分看到的類型,但是我們使用的是數據對象而不是RDD對象。
2.a) List或者Seq使用toDF()
val dfFromData1 = data.toDF()
2.b) 使用SparkSession的createDataFrame()方法
var dfFromData2 = spark.createDataFrame(data).toDF(columns:_*)
2.c) 使用Row type的createDataFrame()方法
import scala.collection.JavaConversions._
val rowData = data.map(attributes => Row(attributes._1, attributes._2))
var dfFromData3 = spark.createDataFrame(rowData, schema)
3. 從CSV文件創建Spark DataFrame
val df2 = spark.read.csv("/src/resources/file.csv")
4. 從text文件創建
val df2 = spark.read.text("/src/resources/file.txt")
5. 從JSON文件創建
val df2 = spark.read.json("/src/resources/file.json")
6. 從XML文件創建
從xml解析DataFrame,我們應該使用數據源:com.databricks.spark.xml
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-xml_2.11</artifactId>
<version>0.6.0</version>
</dependency>
val df = spark.read.format("com.databricks.spark.xml")
.option("rowTag", "person")
.xml("src/main/resources/persons.xml")
7. 從Hive創建
val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark.sparkContext)
val hiveDF = hiveContext.sql("select * from emp")
8. 從RDBMS創建
8.a) Mysql table
確保在pom.xml文件或類路徑中的MySQL jars中都具有Mysql庫作為依賴項
val df_mysql = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:port/db")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "tablename")
.option("user", "user")
.option("password", "password")
.load()
8.b) DB2
確保在pom.xml文件或類路徑中的DB2 jar中將DB2庫作為依賴項。
val df_db2 = spark.read.format(“jdbc”)
.option(“url”, “jdbc:db2://localhost:50000/dbname”)
.option(“driver”, “com.ibm.db2.jcc.DB2Driver”)
.option(“dbtable”, “tablename”)
.option(“user”, “user”)
.option(“password”, “password”)
.load()
9. 從HBase創建DataFrame
要從HBase表創建Spark DataFrame,我們應該使用Spark HBase連接器中定義的數據源。
val hbaseDF = sparkSession.read
.options(Map(HBaseTableCatalog.tableCatalog -> catalog))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()