Spark SQL 之 DataFrame


Spark SQL 之 DataFrame


轉載請注明出處:http://www.cnblogs.com/BYRans/

概述(Overview)

Spark SQL是Spark的一個組件,用於結構化數據的計算。Spark SQL提供了一個稱為DataFrames的編程抽象,DataFrames可以充當分布式SQL查詢引擎。

DataFrames

DataFrame是一個分布式的數據集合,該數據集合以命名列的方式進行整合。DataFrame可以理解為關系數據庫中的一張表,也可以理解為R/Python中的一個data frame。DataFrames可以通過多種數據構造,例如:結構化的數據文件、hive中的表、外部數據庫、Spark計算過程中生成的RDD等。
DataFrame的API支持4種語言:Scala、Java、Python、R。

入口:SQLContext(Starting Point: SQLContext)

Spark SQL程序的主入口是SQLContext類或它的子類。創建一個基本的SQLContext,你只需要SparkContext,創建代碼示例如下:

  • Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  • Java
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

除了基本的SQLContext,也可以創建HiveContext。SQLContext和HiveContext區別與聯系為:

  • SQLContext現在只支持SQL語法解析器(SQL-92語法)
  • HiveContext現在支持SQL語法解析器和HiveSQL語法解析器,默認為HiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器,來運行HiveSQL不支持的語法。
  • 使用HiveContext可以使用Hive的UDF,讀寫Hive表數據等Hive操作。SQLContext不可以對Hive進行操作。
  • Spark SQL未來的版本會不斷豐富SQLContext的功能,做到SQLContext和HiveContext的功能容和,最終可能兩者會統一成一個Context

HiveContext包裝了Hive的依賴包,把HiveContext單獨拿出來,可以在部署基本的Spark的時候就不需要Hive的依賴包,需要使用HiveContext時再把Hive的各種依賴包加進來。

SQL的解析器可以通過配置spark.sql.dialect參數進行配置。在SQLContext中只能使用Spark SQL提供的”sql“解析器。在HiveContext中默認解析器為”hiveql“,也支持”sql“解析器。

創建DataFrames(Creating DataFrames)

使用SQLContext,spark應用程序(Application)可以通過RDD、Hive表、JSON格式數據等數據源創建DataFrames。下面是基於JSON文件創建DataFrame的示例:

  • Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
  • Java
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();

DataFrame操作(DataFrame Operations)

DataFrames支持Scala、Java和Python的操作接口。下面是Scala和Java的幾個操作示例:

  • Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Show the content of the DataFrame
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1
  • Java
JavaSparkContext sc // An existing SparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Show the content of the DataFrame
df.show();
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df.col("name"), df.col("age").plus(1)).show();
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df.col("age").gt(21)).show();
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show();
// age  count
// null 1
// 19   1
// 30   1

詳細的DataFrame API請參考 API Documentation

除了簡單列引用和表達式,DataFrames還有豐富的library,功能包括string操作、date操作、常見數學操作等。詳細內容請參考 DataFrame Function Reference

運行SQL查詢程序(Running SQL Queries Programmatically)

Spark Application可以使用SQLContext的sql()方法執行SQL查詢操作,sql()方法返回的查詢結果為DataFrame格式。代碼如下:

  • Scala
val sqlContext = ...  // An existing SQLContext
val df = sqlContext.sql("SELECT * FROM table")
  • Java
SQLContext sqlContext = ...  // An existing SQLContext
DataFrame df = sqlContext.sql("SELECT * FROM table")

DataFrames與RDDs的相互轉換(Interoperating with RDDs)

Spark SQL支持兩種RDDs轉換為DataFrames的方式:

  • 使用反射獲取RDD內的Schema
    • 當已知類的Schema的時候,使用這種基於反射的方法會讓代碼更加簡潔而且效果也很好。
  • 通過編程接口指定Schema
    • 通過Spark SQL的接口創建RDD的Schema,這種方式會讓代碼比較冗長。
    • 這種方法的好處是,在運行時才知道數據的列以及列的類型的情況下,可以動態生成Schema

使用反射獲取Schema(Inferring the Schema Using Reflection)

Spark SQL支持將JavaBean的RDD自動轉換成DataFrame。通過反射獲取Bean的基本信息,依據Bean的信息定義Schema。當前Spark SQL版本(Spark 1.5.2)不支持嵌套的JavaBeans和復雜數據類型(如:List、Array)。創建一個實現Serializable接口包含所有屬性getters和setters的類來創建一個JavaBean。通過調用createDataFrame並提供JavaBean的Class object,指定一個Schema給一個RDD。示例如下:

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
  new Function<String, Person>() {
    public Person call(String line) throws Exception {
      String[] parts = line.split(",");

      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));

      return person;
    }
  });

// Apply a schema to an RDD of JavaBeans and register it as a table.
DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

通過編程接口指定Schema(Programmatically Specifying the Schema)

當JavaBean不能被預先定義的時候,編程創建DataFrame分為三步:

  • 從原來的RDD創建一個Row格式的RDD
  • 創建與RDD 中Rows結構匹配的StructType,通過該StructType創建表示RDD 的Schema
  • 通過SQLContext提供的createDataFrame方法創建DataFrame,方法參數為RDD 的Schema

示例如下:

import org.apache.spark.api.java.function.Function;
// Import factory methods provided by DataTypes.
import org.apache.spark.sql.types.DataTypes;
// Import StructType and StructField
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;
// Import Row.
import org.apache.spark.sql.Row;
// Import RowFactory.
import org.apache.spark.sql.RowFactory;

// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// Load a text file and convert each line to a JavaBean.
JavaRDD<String> people = sc.textFile("examples/src/main/resources/people.txt");

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName: schemaString.split(" ")) {
  fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows.
JavaRDD<Row> rowRDD = people.map(
  new Function<String, Row>() {
    public Row call(String record) throws Exception {
      String[] fields = record.split(",");
      return RowFactory.create(fields[0], fields[1].trim());
    }
  });

// Apply the schema to the RDD.
DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);

// Register the DataFrame as a table.
peopleDataFrame.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> names = results.javaRDD().map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM