Spark SQL 之 DataFrame
轉載請注明出處:http://www.cnblogs.com/BYRans/
概述(Overview)
Spark SQL是Spark的一個組件,用於結構化數據的計算。Spark SQL提供了一個稱為DataFrames的編程抽象,DataFrames可以充當分布式SQL查詢引擎。
DataFrames
DataFrame是一個分布式的數據集合,該數據集合以命名列的方式進行整合。DataFrame可以理解為關系數據庫中的一張表,也可以理解為R/Python中的一個data frame。DataFrames可以通過多種數據構造,例如:結構化的數據文件、hive中的表、外部數據庫、Spark計算過程中生成的RDD等。
DataFrame的API支持4種語言:Scala、Java、Python、R。
入口:SQLContext(Starting Point: SQLContext)
Spark SQL程序的主入口是SQLContext類或它的子類。創建一個基本的SQLContext,你只需要SparkContext,創建代碼示例如下:
- Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- Java
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
除了基本的SQLContext,也可以創建HiveContext。SQLContext和HiveContext區別與聯系為:
- SQLContext現在只支持SQL語法解析器(SQL-92語法)
- HiveContext現在支持SQL語法解析器和HiveSQL語法解析器,默認為HiveSQL語法解析器,用戶可以通過配置切換成SQL語法解析器,來運行HiveSQL不支持的語法。
- 使用HiveContext可以使用Hive的UDF,讀寫Hive表數據等Hive操作。SQLContext不可以對Hive進行操作。
- Spark SQL未來的版本會不斷豐富SQLContext的功能,做到SQLContext和HiveContext的功能容和,最終可能兩者會統一成一個Context
HiveContext包裝了Hive的依賴包,把HiveContext單獨拿出來,可以在部署基本的Spark的時候就不需要Hive的依賴包,需要使用HiveContext時再把Hive的各種依賴包加進來。
SQL的解析器可以通過配置spark.sql.dialect參數進行配置。在SQLContext中只能使用Spark SQL提供的”sql“解析器。在HiveContext中默認解析器為”hiveql“,也支持”sql“解析器。
創建DataFrames(Creating DataFrames)
使用SQLContext,spark應用程序(Application)可以通過RDD、Hive表、JSON格式數據等數據源創建DataFrames。下面是基於JSON文件創建DataFrame的示例:
- Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
- Java
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");
// Displays the content of the DataFrame to stdout
df.show();
DataFrame操作(DataFrame Operations)
DataFrames支持Scala、Java和Python的操作接口。下面是Scala和Java的幾個操作示例:
- Scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create the DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")
// Show the content of the DataFrame
df.show()
// age name
// null Michael
// 30 Andy
// 19 Justin
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
// name
// Michael
// Andy
// Justin
// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
// name (age + 1)
// Michael null
// Andy 31
// Justin 20
// Select people older than 21
df.filter(df("age") > 21).show()
// age name
// 30 Andy
// Count people by age
df.groupBy("age").count().show()
// age count
// null 1
// 19 1
// 30 1
- Java
JavaSparkContext sc // An existing SparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create the DataFrame
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");
// Show the content of the DataFrame
df.show();
// age name
// null Michael
// 30 Andy
// 19 Justin
// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show();
// name
// Michael
// Andy
// Justin
// Select everybody, but increment the age by 1
df.select(df.col("name"), df.col("age").plus(1)).show();
// name (age + 1)
// Michael null
// Andy 31
// Justin 20
// Select people older than 21
df.filter(df.col("age").gt(21)).show();
// age name
// 30 Andy
// Count people by age
df.groupBy("age").count().show();
// age count
// null 1
// 19 1
// 30 1
詳細的DataFrame API請參考 API Documentation。
除了簡單列引用和表達式,DataFrames還有豐富的library,功能包括string操作、date操作、常見數學操作等。詳細內容請參考 DataFrame Function Reference。
運行SQL查詢程序(Running SQL Queries Programmatically)
Spark Application可以使用SQLContext的sql()方法執行SQL查詢操作,sql()方法返回的查詢結果為DataFrame格式。代碼如下:
- Scala
val sqlContext = ... // An existing SQLContext
val df = sqlContext.sql("SELECT * FROM table")
- Java
SQLContext sqlContext = ... // An existing SQLContext
DataFrame df = sqlContext.sql("SELECT * FROM table")
DataFrames與RDDs的相互轉換(Interoperating with RDDs)
Spark SQL支持兩種RDDs轉換為DataFrames的方式:
- 使用反射獲取RDD內的Schema
- 當已知類的Schema的時候,使用這種基於反射的方法會讓代碼更加簡潔而且效果也很好。
- 通過編程接口指定Schema
- 通過Spark SQL的接口創建RDD的Schema,這種方式會讓代碼比較冗長。
- 這種方法的好處是,在運行時才知道數據的列以及列的類型的情況下,可以動態生成Schema
使用反射獲取Schema(Inferring the Schema Using Reflection)
Spark SQL支持將JavaBean的RDD自動轉換成DataFrame。通過反射獲取Bean的基本信息,依據Bean的信息定義Schema。當前Spark SQL版本(Spark 1.5.2)不支持嵌套的JavaBeans和復雜數據類型(如:List、Array)。創建一個實現Serializable接口包含所有屬性getters和setters的類來創建一個JavaBean。通過調用createDataFrame並提供JavaBean的Class object,指定一個Schema給一個RDD。示例如下:
public static class Person implements Serializable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
new Function<String, Person>() {
public Person call(String line) throws Exception {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
}
});
// Apply a schema to an RDD of JavaBeans and register it as a table.
DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
schemaPeople.registerTempTable("people");
// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
通過編程接口指定Schema(Programmatically Specifying the Schema)
當JavaBean不能被預先定義的時候,編程創建DataFrame分為三步:
- 從原來的RDD創建一個Row格式的RDD
- 創建與RDD
中Rows結構匹配的StructType,通過該StructType創建表示RDD
的Schema
- 通過SQLContext提供的createDataFrame方法創建DataFrame,方法參數為RDD
的Schema
示例如下:
import org.apache.spark.api.java.function.Function;
// Import factory methods provided by DataTypes.
import org.apache.spark.sql.types.DataTypes;
// Import StructType and StructField
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;
// Import Row.
import org.apache.spark.sql.Row;
// Import RowFactory.
import org.apache.spark.sql.RowFactory;
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// Load a text file and convert each line to a JavaBean.
JavaRDD<String> people = sc.textFile("examples/src/main/resources/people.txt");
// The schema is encoded in a string
String schemaString = "name age";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName: schemaString.split(" ")) {
fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows.
JavaRDD<Row> rowRDD = people.map(
new Function<String, Row>() {
public Row call(String record) throws Exception {
String[] fields = record.split(",");
return RowFactory.create(fields[0], fields[1].trim());
}
});
// Apply the schema to the RDD.
DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
// Register the DataFrame as a table.
peopleDataFrame.registerTempTable("people");
// SQL can be run over RDDs that have been registered as tables.
DataFrame results = sqlContext.sql("SELECT name FROM people");
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> names = results.javaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();