spark sql


  • 什么是spark sql
    • spark sql是為了處理結構化數據的一個spark 模塊。
    • 底層依賴於rdd,把sql語句轉換成一個個rdd,運行在不同的worker節點上
    • 特點:
      • 容易集成:SQL,對於不同的數據源,提供統一的訪問方式(DataFrame:表)
      • 兼容Hive
  • DataFrame,是Spark sql對結構化數據的抽象集合,表現形式:RDD
    • 表 = 表結構+數據
    • DataFrame = schema+RDD
  • 創建dataframe
    • 跟關系數據庫的表(Table)一樣,DataFrame是Spark中對帶模式(schema)行列數據的抽象。DateFrame廣泛應用於使用SQL處理大數據的各種場景。創建DataFrame有很多種方法,比如從本地List創建、從RDD創建或者從源數據創建,下面簡要介紹創建DataFrame的三種方法。

      方法一,Spark中使用toDF函數創建DataFrame

      通過導入(importing)Spark sql implicits, 就可以將本地序列(seq), 數組或者RDD轉為DataFrame。只要這些數據的內容能指定、

      數據類型即可。

      本地seq + toDF創建DataFrame示例:

      import sqlContext.implicits._val df = Seq(

        (1, "First Value", java.sql.Date.valueOf("2010-01-01")),

        (2, "Second Value", java.sql.Date.valueOf("2010-02-01"))

      ).toDF("int_column", "string_column", "date_column")

      注意:如果直接用toDF()而不指定列名字,那么默認列名為"_1", "_2", ...

      通過case class + toDF創建DataFrame的示例

      // sc is an existing SparkContext.val sqlContext = new org.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a DataFrame.import sqlContext.implicits._

      // Define the schema using a case class.// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,// you can use custom classes that implement the Product interface.case class Person(name: String, age: Int)

      // Create an RDD of Person objects and register it as a table.val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

      people.registerTempTable("people")

      // 使用 sqlContext 執行 sql 語句.val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

      // 注:sql()函數的執行結果也是DataFrame,支持各種常用的RDD操作.// The columns of a row in the result can be accessed by ordinal.

      teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

      方法二,Spark中使用createDataFrame函數創建DataFrame

      在SqlContext中使用createDataFrame也可以創建DataFrame。跟toDF一樣,這里創建DataFrame的數據形態也可以是本地數組或者RDD。

      通過row+schema創建示例

      import org.apache.spark.sql.types._val schema = StructType(List(

          StructField("integer_column", IntegerType, nullable = false),

          StructField("string_column", StringType, nullable = true),

          StructField("date_column", DateType, nullable = true)

      ))

      val rdd = sc.parallelize(Seq(

        Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),

        Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))

      ))val df = sqlContext.createDataFrame(rdd, schema)

      方法三,通過文件直接創建DataFrame

      使用parquet文件創建

      val df = sqlContext.read.parquet("hdfs:/path/to/file")

      使用json文件創建

      val df = spark.read.json("examples/src/main/resources/people.json")

      // Displays the content of the DataFrame to stdout

      df.show()// +----+-------+// | age|   name|// +----+-------+// |null|Michael|// |  30|   Andy|// |  19| Justin|// +----+-------+

      使用csv文件,spark2.0+之后的版本可用

      //首先初始化一個SparkSession對象val spark = org.apache.spark.sql.SparkSession.builder

              .master("local")

              .appName("Spark CSV Reader")

              .getOrCreate;

      //然后使用SparkSessions對象加載CSV成為DataFrameval df = spark.read

              .format("com.databricks.spark.csv")

              .option("header", "true") //reading the headers

              .option("mode", "DROPMALFORMED")

              .load("csv/file/path"); //.csv("csv/file/path") //spark 2.0 api

      df.show()

  • 操作dataframe
    • http://blog.csdn.net/dabokele/article/details/52802150  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM