Spark DataFrame基礎


Spark創建DataFrame的不同方式

本文介紹了使用Scala示例在Spark中創建DataFrame(createDataFrame)的不同方法。

首先,讓我們導入Spark需要的隱式函數,如.toDF()函數,並為示例創建數據。

import spark.implicits._
val columns = Seq("language", "users_count")
val data = Seq(("Java", "20000"), ("Python", "10000"), ("Scala", "30000"))

1. Create Spark DataFrame from RDD

首先,調用SparkContext中的parallelize()函數從集合Seq創建RDD。對於下面的所有示例,都需要這個rdd對象。

val rdd = spark.SparkContext.parallelize(data)

1. a) 使用toDF()函數

一旦創建了一個RDD,可以使用toDF()來創建一個DataFrame。默認情況下,假如數據集每一行有兩列,創建的DF時候的列名就是"_1"和"_2"。

val dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()

root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)

toDF()具有另一個簽名,該簽名自定義列名稱參數,如下所示:

val dfFromRDD1 = rdd.toDF("language", "users_count")
dfFromRDD1.printSchema()

root
|-- language: string (nullable = true)
|-- users: string (nullable = true)

默認情況下,這些列的數據類型是通過推斷列的數據類型來判斷的。我們可以通過提供模式來更改此行為,我們可以在其中為每個字段/列指定列名,數據類型和可為空。

1.b) 使用SparkSession的creatDataFrame()函數

使用SparkSession中的createDataFrame()是另一種創建方法,它以rdd對象作為參數。使用toDF()來指定列的名稱。

dfFromRDD2 = spark.createDataFrame(rdd).toDF(columns:_*)

1.c)對行類型使用createDataFrame()

createDataFrame()有另一個簽名,它將列名的RDD[Row]類型和模式作為參數。首先,我們需要將rdd對象從RDD[T]轉換為RDD[Row]類型。

val schema = StructType(columns.map(fieldName => StructField(fieldName, StringType, nullable = true)))
val rowRDD = rdd.map(attributes => Row(attributes._1, attributs._2))
val dfFromRDD3 = spark.createDataFrame(rowRdd.schema)

2. 從List和Seq集合中創建Spark DataFrame

在本節中,我們將看到從集合Seq[T]或List[T]創建Spark DataFrame的幾種方法。這些示例與我們上面的RDD部分看到的類型,但是我們使用的是數據對象而不是RDD對象。

2.a) List或者Seq使用toDF()

val dfFromData1 = data.toDF()

2.b) 使用SparkSession的createDataFrame()方法

var dfFromData2 = spark.createDataFrame(data).toDF(columns:_*)

2.c) 使用Row type的createDataFrame()方法

import scala.collection.JavaConversions._
val rowData = data.map(attributes => Row(attributes._1, attributes._2))
var dfFromData3 = spark.createDataFrame(rowData, schema)

3. 從CSV文件創建Spark DataFrame

val df2 = spark.read.csv("/src/resources/file.csv")

4. 從text文件創建

val df2 = spark.read.text("/src/resources/file.txt")

5. 從JSON文件創建

val df2 = spark.read.json("/src/resources/file.json")

6. 從XML文件創建

從xml解析DataFrame,我們應該使用數據源:com.databricks.spark.xml

<dependency>
     <groupId>com.databricks</groupId>
     <artifactId>spark-xml_2.11</artifactId>
     <version>0.6.0</version>
 </dependency>
val df = spark.read.format("com.databricks.spark.xml")
        .option("rowTag", "person")
        .xml("src/main/resources/persons.xml")

7. 從Hive創建

val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark.sparkContext)
val hiveDF = hiveContext.sql("select * from emp")

8. 從RDBMS創建

8.a) Mysql table

確保在pom.xml文件或類路徑中的MySQL jars中都具有Mysql庫作為依賴項

val df_mysql = spark.read.format("jdbc")
    .option("url", "jdbc:mysql://localhost:port/db")
    .option("driver", "com.mysql.jdbc.Driver")
    .option("dbtable", "tablename")
    .option("user", "user")
    .option("password", "password")
    .load()

8.b) DB2

確保在pom.xml文件或類路徑中的DB2 jar中將DB2庫作為依賴項。

val df_db2 = spark.read.format(“jdbc”)
   .option(“url”, “jdbc:db2://localhost:50000/dbname”)
   .option(“driver”, “com.ibm.db2.jcc.DB2Driver”)
   .option(“dbtable”, “tablename”) 
   .option(“user”, “user”) 
   .option(“password”, “password”) 
   .load()

9. 從HBase創建DataFrame

要從HBase表創建Spark DataFrame,我們應該使用Spark HBase連接器中定義的數據源。

 val hbaseDF = sparkSession.read
      .options(Map(HBaseTableCatalog.tableCatalog -> catalog))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .load()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM