【Spark】這一篇或許能讓你大概了解如何通過JavaAPI實現DataFrame的相關操作



需求概述

將RDD轉換得到DataFrame,主要有兩種方法:利用反射機制通過編程結構與RDD進行交互

步驟

一、創建Maven工程並導包

<properties>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.2.0</spark.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.5</version>
    </dependency>
</dependencies>
<build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
                <!-- <verbal>true</verbal>-->
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                    <configuration>
                        <args>
                            <arg>-dependencyfile</arg>
                            <arg>${project.build.directory}/.scala_dependencies</arg>
                        </args>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass></mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

二、選用第一種方法:利用反射機制配合樣例類構建DataFrame

開發代碼
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

// 定義person case class
case class Person(id: Int,name: String,age: Int)

object SparkDF {
  def main(args: Array[String]): Unit = {
    // 獲取SparkSession
    val sparkSession: SparkSession = SparkSession.builder().appName("SparkDF").master("local[2]").config("spark.driver.host", "localhost").getOrCreate()
    // 獲取SparkContext
    val sparkContext: SparkContext = sparkSession.sparkContext
    // 過濾篩選日志
    sparkContext.setLogLevel("WARN")
    //讀取文件
    val fileRDD: RDD[String] = sparkContext.textFile("/Users/zhaozhuang/Desktop/4、Spark/Spark第三天/Spark第三天教案/資料/person.txt")
    //按照分隔符切割數據
    val splitRDD: RDD[Array[String]] = fileRDD.map(x => x.split(" "))
    //關聯person case class
    val personRDD: RDD[Person] = splitRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

    //准備把personRDD轉換成DataFrame
    //首先要導包
    import sparkSession.implicits._
    val personDF: DataFrame = personRDD.toDF()
    //todo 至此已經成功通過反射機制構建DataFrame

    /** * 接下來可以使用DFL語法和SQL語法驗證是否構建成功 */
    println("*******************************************DSL語法測試開始********************************************")

    //查看全表數據 show()默認展示前20條數據
    personDF.show()
    //查看指定字段數據
    personDF.select($"name",$"age").show()
    //查看schema
    personDF.printSchema()
    //過濾篩選數據
    personDF.filter($"age" > 25).show()

    println("*******************************************DSL語法測試結束********************************************")

    println("*******************************************SQL語法測試開始********************************************")
    // 將DataFrame注冊為一張table,有三種方法
    //第一種 已過時
    val person1: Unit = personDF.registerTempTable("person1")
    //第二種
    val person2: Unit = personDF.createTempView("person2")
    //第三種 (推薦)
    val person3: Unit = personDF.createOrReplaceTempView("person3")

    //打印三鍾表的數據
    sparkSession.sql("select * from person1").show()
    sparkSession.sql("select * from person2").show()
    sparkSession.sql("select * from person3").show()

    //實現left join操作
    sparkSession.sql("select * from person1 p1 left join person2 p2 on p1.id = p2.id").show()


    println("*******************************************SQL語法測試結束********************************************")

	sparkContext.stop()
    sparkSession.close()
  }
}
控制台結果

*******************************************DSL語法測試開始********************************************
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan|108|
|  2|    lisi| 28|
|  3|  wangwu| 58|
|  4| zhaoliu| 24|
|  5|   zhuqi| 35|
|  6|   qiuba| 28|
+---+--------+---+

+--------+---+
|    name|age|
+--------+---+
|zhangsan|108|
|    lisi| 28|
|  wangwu| 58|
| zhaoliu| 24|
|   zhuqi| 35|
|   qiuba| 28|
+--------+---+

root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)

+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan|108|
|  2|    lisi| 28|
|  3|  wangwu| 58|
|  5|   zhuqi| 35|
|  6|   qiuba| 28|
+---+--------+---+

*******************************************DSL語法測試結束********************************************
*******************************************SQL語法測試開始********************************************
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan|108|
|  2|    lisi| 28|
|  3|  wangwu| 58|
|  4| zhaoliu| 24|
|  5|   zhuqi| 35|
|  6|   qiuba| 28|
+---+--------+---+

+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan|108|
|  2|    lisi| 28|
|  3|  wangwu| 58|
|  4| zhaoliu| 24|
|  5|   zhuqi| 35|
|  6|   qiuba| 28|
+---+--------+---+

+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan|108|
|  2|    lisi| 28|
|  3|  wangwu| 58|
|  4| zhaoliu| 24|
|  5|   zhuqi| 35|
|  6|   qiuba| 28|
+---+--------+---+

+---+--------+---+---+--------+---+
| id|    name|age| id|    name|age|
+---+--------+---+---+--------+---+
|  1|zhangsan|108|  1|zhangsan|108|
|  6|   qiuba| 28|  6|   qiuba| 28|
|  3|  wangwu| 58|  3|  wangwu| 58|
|  5|   zhuqi| 35|  5|   zhuqi| 35|
|  4| zhaoliu| 24|  4| zhaoliu| 24|
|  2|    lisi| 28|  2|    lisi| 28|
+---+--------+---+---+--------+---+

*******************************************SQL語法測試結束********************************************

Process finished with exit code 0

選用第二種方法:通過StrucType配合Row構建DataFrame

開發代碼
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object SparkDF2 {
  def main(args: Array[String]): Unit = {
    //獲取SparkSession
    val sparkSession: SparkSession = SparkSession.builder().appName("saprkDF2").master("local[2]").config("spark.driver.host", "localhost").getOrCreate()
    //獲取SparkContext
    val sparkContext: SparkContext = sparkSession.sparkContext
    //設置日志篩選
    sparkContext.setLogLevel("WARN")
    //讀取文件
    val fileRDD: RDD[String] = sparkContext.textFile("/Users/zhaozhuang/Desktop/4、Spark/Spark第三天/Spark第三天教案/資料/person.txt")
    //對數據進行切割
    val arrayRDD: RDD[Array[String]] = fileRDD.map(x => x.split(" "))
    //將arrayRDD轉換為row
    val rowRDD: RDD[Row] = arrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))

    //獲取StructType對象
    val structType = new StructType().add("id",IntegerType).add("name",StringType).add("age",IntegerType)

    //需要兩個參數 RDD[Row] 和 StructType
    val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, structType)

    //接下來DSL和SQL的操作和之前相同
    personDF.printSchema()
    
	sparkContext.stop()
    sparkSession.close()

  }

}
控制台結果

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)


Process finished with exit code 0


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM