首先看看從官網學習后總結的一個思維導圖
概述(Overview)
Spark SQL是Spark的一個模塊,用於結構化數據處理。它提供了一個編程的抽象被稱為DataFrames,也可以作為分布式SQL查詢引擎。
開始Spark SQL
Spark SQL中所有功能的入口點是SQLContext類,或者它子類中的一個。為了創建一個基本的SQLContext,你所需要的是一個SparkContext。
除了基本的SQLContext,你還可以創建一個HiveContext,它提供了基本的SQLContext的所提供的功能的超集。這些功能中包括附加的特性,可以編寫查詢,使用更完全的HiveQL解析器,訪問Hive UDFs,能夠從Hive表中讀取數據。現在暫不研究,以后學習
- <span style="white-space:pre"> </span>SparkConf conf = new SparkConf();
- conf.set("spark.testing.memory", "2147480000"); // 因為jvm無法獲得足夠的資源
- JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
- System.out.println(sc);
- // sc is an existing JavaSparkContext.
- SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrames
DataFrame是一種以命名列方式組織的分布式數據集。它概念上相當於關系型數據庫中的表,或者R/Python中的數據幀,但是具有更豐富的優化。有很多方式可以構造出一個DataFrame,例如:結構化數據文件,Hive中的tables,外部數據庫或者存在的RDDs.
DataFrame的API適用於Scala、Java和Python.
該頁上所有的例子使用Spark分布式中的樣本數據,可以運行在spark-shell或者pyspark shell中。
創建DataFrames(Creating DataFrames)
使用SQLContext,應用可以從一個已經存在的RDD、Hive表或者數據源中創建DataFrames。
例如,以下根據一個JSON文件創建出一個DataFrame:
- package com.tg.spark.sql;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.sql.DataFrame;
- import org.apache.spark.sql.SQLContext;
- /**
- * 根據一個JSON文件創建出一個DataFrame:
- * @author 湯高
- *
- */
- public class DataFrameOps {
- public static void main(String[] args) {
- SparkConf conf=new SparkConf();
- conf.set("spark.testing.memory", "2147480000"); //因為jvm無法獲得足夠的資源
- //JavaSparkContext sc = new JavaSparkContext("spark://192.168.52.140:7077", "First Spark App",conf);
- JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
- SQLContext sqlContext = new SQLContext(sc);
- DataFrame df = sqlContext.read().json("hdfs://master:9000/testFile/people.json");
- // Displays the content of the DataFrame to stdout
- df.show();
- // age name
- // null Michael
- // 30 Andy
- // 19 Justin
- // Print the schema in a tree format
- df.printSchema();
- // root
- // |-- age: long (nullable = true)
- // |-- name: string (nullable = true)
- // Select only the "name" column
- df.select("name").show();
- // name
- // Michael
- // Andy
- // Justin
- // Select everybody, but increment the age by 1
- df.select(df.col("name"), df.col("age").plus(1)).show();
- // name (age + 1)
- // Michael null
- // Andy 31
- // Justin 20
- // Select people older than 21
- df.filter(df.col("age").gt(21)).show();
- // age name
- // 30 Andy
- // Count people by age
- df.groupBy("age").count().show();
- // age count
- // null 1
- // 19 1
- // 30 1
- }
- }
SQLContext中的sql函數使應用可以以編程方式運行SQL查詢,並且將結果以DataFrame形式返回。具體案例見后面
Spark SQL支持兩種不同的方法,用於將存在的RDDs轉換成DataFrames。第一種方法使用反射來推斷包含特定類型的對象的RDD的模式。在寫Spark應用時,當你已知schema的情況下,這種基於反射的方式使得代碼更加簡介,並且效果更好。
創建DataFrames的第二種方法是通過編程接口,它允許你構建一個模式,然后將其應用到現有的RDD上。這種方式更加的繁瑣,它允許你構建一個DataFrame當列以及類型未知,直到運行時才能知道時。
使用反射推斷模式(Inferring the Schema Using Reflection)
知道RDD格式的前提下
JavaBeans類定義了表的模式,JavaBeans類的參數的名稱使用反射來讀取,然后稱為列的名稱。
JavaBeans類還可以嵌套或者包含復雜的類型,例如Sequences或者Arrays。
這個RDD可以隱式地轉換為DataFrame,然后注冊成表,
表可以在后續SQL語句中使用Spark SQL中的Scala接口支持自動地將包含JavaBeans類的RDD轉換成DataFrame。
步驟:
1、使用JavaBeans類定義schema
2、創建一個SQLContext
3、通過調用createDataFrame方法模式應用到所有現有的RDD,並為JavaBean提供class對象 達到將RDD轉換成DataFrame
4、創建一個DataFrame,並將它注冊成表。
5、使用sqlContext提供的sql方法,就可以使用SQL語句來查詢了。查詢后返回的結果是DataFrame,它支持所有的RDD操作
首先寫一個JavaBean類,實現序列化接口,並提供get和set方法
- package com.tg.spark.sql;
- import scala.Serializable;
- public class Person implements Serializable {
- /**
- *
- */
- private static final long serialVersionUID = 727694963564948838L;
- private String name;
- private int age;
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public int getAge() {
- return age;
- }
- public void setAge(int age) {
- this.age = age;
- }
- }
- package com.tg.spark.sql;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.sql.DataFrame;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SQLContext;
- import org.apache.spark.storage.StorageLevel;
- import java.util.List;
- import org.apache.spark.SparkConf;
- public class CreateDataFrame1 {
- public static void main(String[] args) {
- SparkConf conf=new SparkConf();
- conf.set("spark.testing.memory", "2147480000"); //因為jvm無法獲得足夠的資源
- JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
- System.out.println(sc);
- // sc is an existing JavaSparkContext.
- SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
- // Load a text file and convert each line to a JavaBean.
- JavaRDD<Person> people = sc.textFile("hdfs://master:9000/testFile/people.txt").map(
- new Function<String, Person>() {
- public Person call(String line) throws Exception {
- String[] parts = line.split(",");
- Person person = new Person();
- person.setName(parts[0]);
- person.setAge(Integer.parseInt(parts[1].trim()));
- return person;
- }
- });
- //A schema can be applied to an existing RDD by calling createDataFrame and providing the Class object for the JavaBean.
- // Apply a schema to an RDD of JavaBeans and register it as a table.
- DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
- schemaPeople.registerTempTable("people");
- // SQL can be run over RDDs that have been registered as tables.
- DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
- // The results of SQL queries are DataFrames and support all the normal RDD operations.
- // The columns of a row in the result can be accessed by ordinal.
- List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
- public String call(Row row) {
- return "Name: " + row.getString(0);
- }
- }).collect();
- teenagers.persist(StorageLevel.MEMORY_ONLY());
- System.out.println(teenagerNames);
- }
- }
上面的這段代碼
- DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
SQLContext中的sql函數使應用可以以編程方式運行SQL查詢,並且將結果以DataFrame形式返回
以編程方式指定模式(Programmatically Specifying the Schema)
不知道RDD的列和它的類型時
步驟:
1.從原有的RDD中創建包含行的RDD。
2.創建一個由StructType表示的模式,StructType符合由步驟1創建的RDD的行的結構。
3.通過SQLContext提供的createDataFrame方法,將模式應用於包含行的RDD。
- package com.tg.spark.sql;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.sql.DataFrame;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.RowFactory;
- import org.apache.spark.sql.SQLContext;
- import org.apache.spark.sql.types.DataTypes;
- import org.apache.spark.sql.types.StructField;
- import org.apache.spark.sql.types.StructType;
- import org.apache.spark.storage.StorageLevel;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.spark.SparkConf;
- public class CreateDataFrame2 {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf();
- conf.set("spark.testing.memory", "2147480000"); // 因為jvm無法獲得足夠的資源
- JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
- System.out.println(sc);
- // sc is an existing JavaSparkContext.
- SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
- // Load a text file and convert each line to a JavaBean.
- JavaRDD<String> people = sc.textFile("hdfs://master:9000/testFile/people.txt");
- // Convert records of the RDD (people) to Rows.
- JavaRDD<Row> rowRDD = people.map(new Function<String, Row>() {
- public Row call(String record) throws Exception {
- String[] fields = record.split(",");
- return RowFactory.create(fields[0], fields[1].trim());
- }
- });
- // The schema is encoded in a string
- String schemaString = "name age";
- // Generate the schema based on the string of schema
- List<StructField> fields = new ArrayList<StructField>();
- for (String fieldName : schemaString.split(" ")) {
- // true表示可以為空
- fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));
- }
- StructType schema = DataTypes.createStructType(fields);
- // Apply the schema to the RDD.
- DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
- // Register the DataFrame as a table.
- peopleDataFrame.registerTempTable("people");
- // SQL can be run over RDDs that have been registered as tables.
- DataFrame results = sqlContext.sql("SELECT name FROM people");
- // The results of SQL queries are DataFrames and support all the normal
- // RDD operations.
- // The columns of a row in the result can be accessed by ordinal.
- List<String> names = results.javaRDD().map(new Function<Row, String>() {
- public String call(Row row) {
- return "Name: " + row.getString(0);
- }
- }).collect();
- results.persist(StorageLevel.MEMORY_ONLY());
- System.out.println(names);
- }
- }
數據源(Data Sources)
Spark SQL支持通過DataFrame接口在多種數據源上進行操作。一個DataFrame可以如同一個標准的RDDs那樣進行操作,還可以注冊成臨時的表。將一個DataFrame注冊成臨時表允許你在它的數據上運行SQL查詢。本節介紹使用Spark數據源裝載和保存數據的常用方法,使用Spark數據源保存數據。然后進入可用於內置數據源的特定選項。
通用的加載/保存功能(Generic Load/Save Functions)
在最簡單的形式中,默認的數據源(parquet除非通過spark.sql.sources.default另外進行配置)將被用於所有的操作。
- package com.tg.spark.sql;
- import java.util.List;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.sql.DataFrame;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SQLContext;
- import org.apache.spark.sql.SaveMode;
- import org.apache.spark.storage.StorageLevel;
- /**
- * 加載默認的數據源格式並保存
- * //第一種讀取方式xxxFile(path)
- * @author Administrator
- *
- */
- public class DataSource {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf();
- conf.set("spark.testing.memory", "2147480000"); // 因為jvm無法獲得足夠的資源
- JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
- System.out.println(sc);
- // sc is an existing JavaSparkContext.
- SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
- DataFrame df = sqlContext.read().load("hdfs://master:9000/testFile/users.parquet");
- df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
- //指定保存模式
- //df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");
- //第一種讀取方式
- DataFrame parquetFile = sqlContext.parquetFile("namesAndFavColors.parquet");
- parquetFile.registerTempTable("people");
- // SQL can be run over RDDs that have been registered as tables.
- DataFrame teenagers = sqlContext.sql("SELECT name FROM people ");
- // The results of SQL queries are DataFrames and support all the normal RDD operations.
- // The columns of a row in the result can be accessed by ordinal.
- List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
- public String call(Row row) {
- return "Name: " + row.getString(0);
- }
- }).collect();
- teenagers.persist(StorageLevel.MEMORY_ONLY());
- System.out.println(teenagerNames);
- }
- }
手動指定選項(Manually Specifying Options)
你還可以手動指定數據源,這些數據源將與任何額外的選項一同使用,你希望將這些選項傳入到數據源中。數據源是通過它們的全名來指定的(如org.apache.spark.sql.parquet),但是對於內置的數據源,你也可以使用簡短的名稱(json, parquet, jdbc)。任何類型的DataFrames使用這些語法可以轉化成其他的數據源:
- package com.tg.spark.sql;
- import java.util.List;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.sql.DataFrame;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SQLContext;
- import org.apache.spark.storage.StorageLevel;
- /**
- * 加載指定的數據源格式並保存
- * //第二種讀取方式sqlContext.read().XXX(path)
- * @author Administrator
- *
- */
- public class DataSource2 {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf();
- conf.set("spark.testing.memory", "2147480000"); // 因為jvm無法獲得足夠的資源
- JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
- System.out.println(sc);
- // sc is an existing JavaSparkContext.
- SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
- DataFrame df = sqlContext.read().format("json").load("hdfs://master:9000/testFile/people.json");
- df.select("name", "age").write().format("parquet").save("people.parquet");
- DataFrame parquetFile = sqlContext.read().parquet("people.parquet");
- // Parquet files can also be registered as tables and then used in SQL statements.
- parquetFile.registerTempTable("parquetFile");
- DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
- List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
- public String call(Row row) {
- return "Name: " + row.getString(0);
- }
- }).collect();
- teenagers.persist(StorageLevel.MEMORY_ONLY());
- System.out.println(teenagerNames);
- }
- }
保存模式(Save Modes)
Save操作可以可選擇性地接收一個SaveModel,如果數據已經存在了,指定如何處理已經存在的數據。意識到這些保存模式沒有利用任何鎖,也不是原子的,這很重要。因此,如果有多個寫入者試圖往同一個地方寫入,這是不安全的。此外,當執行一個Overwrite,在寫入新的數據之前會將原來的數據進行刪除。
df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");
Scala/Java |
Python |
Meaning |
SaveMode.ErrorIfExists (default) |
"error" (default) |
When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. 當往一個數據源中保存一個DataFrame,如果數據已經存在,會拋出一個異常。 |
SaveMode.Append |
"append" |
When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data. 當往一個數據源中保存一個DataFrame,如果data/table已經存在,DataFrame的內容會追加到已經存在的數據后面。 |
SaveMode.Overwrite |
"overwrite" |
Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame. Overwrite模式意味着當向數據源中保存一個DataFrame時,如果data/table已經存在了,已經存在的數據會被DataFrame中內容覆蓋掉。 |
SaveMode.Ignore |
"ignore" |
Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a `CREATE TABLE IF NOT EXISTS` in SQL. Ignore模式意味着當向數據源中保存一個DataFrame時,如果數據已經存在,save操作不會將DataFrame的內容進行保存,也不會修改已經存在的數據。這與SQL中的`CREATE TABLE IF NOT EXISTS`相似。 |
Parquet 文件
Parquet是一種列式存儲格式的文件,被許多其他數據處理系統所支持。Spark SQL支持度對Parquet文件的讀和寫,自動保存原有數據的模式。
代碼上面用過一次
- package com.tg.spark.sql;
- import java.util.List;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.sql.DataFrame;
- import org.apache.spark.sql.Row;
- import org.apache.spark.sql.SQLContext;
- import org.apache.spark.sql.SaveMode;
- import org.apache.spark.storage.StorageLevel;
- /**
- * 加載默認的數據源格式並保存
- * //第一種讀取方式xxxFile(path)
- * @author Administrator
- *
- */
- public class DataSource {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf();
- conf.set("spark.testing.memory", "2147480000"); // 因為jvm無法獲得足夠的資源
- JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
- System.out.println(sc);
- // sc is an existing JavaSparkContext.
- SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
- DataFrame df = sqlContext.read().load("hdfs://master:9000/testFile/users.parquet");
- df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
- //指定保存模式
- //df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");
- //第一種讀取方式
- DataFrame parquetFile = sqlContext.parquetFile("namesAndFavColors.parquet");
- parquetFile.registerTempTable("people");
- // SQL can be run over RDDs that have been registered as tables.
- DataFrame teenagers = sqlContext.sql("SELECT name FROM people ");
- // The results of SQL queries are DataFrames and support all the normal RDD operations.
- // The columns of a row in the result can be accessed by ordinal.
- List<String> teenagerNames = teenagers.javaRDD().map(new Function<Row, String>() {
- public String call(Row row) {
- return "Name: " + row.getString(0);
- }
- }).collect();
- teenagers.persist(StorageLevel.MEMORY_ONLY());
- System.out.println(teenagerNames);
- }
- }
JSON數據集(JSON Datasets)
Spark SQL可以自動推斷出JSON數據集的模式,將它作為DataFrame進行加載。這個轉換可以通過使用SQLContext中的下面兩個方法中的任意一個來完成。
• jsonFile - 從一個JSON文件的目錄中加載數據,文件中的每一個行都是一個JSON對象。
• jsonRDD - 從一個已經存在的RDD中加載數據,每一個RDD的元素是一個包含一個JSON對象的字符串。
代碼前面都有涉及到
- public class DataSource3 {
- public static void main(String[] args) {
- SparkConf conf = new SparkConf();
- conf.set("spark.testing.memory", "2147480000"); // 因為jvm無法獲得足夠的資源
- JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
- System.out.println(sc);
- // sc is an existing JavaSparkContext.
- SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
- // A JSON dataset is pointed to by path.
- // The path can be either a single text file or a directory storing text files.
- DataFrame people = sqlContext.read().json("hdfs://master:9000/testFile/people.json");
- //DataFrame people = sqlContext.jsonFile("hdfs://master:9000/testFile/people.json");
- // The inferred schema can be visualized using the printSchema() method.
- people.printSchema();
- // root
- // |-- age: integer (nullable = true)
- // |-- name: string (nullable = true)
- // Register this DataFrame as a table.
- people.registerTempTable("people");
- // SQL statements can be run by using the sql methods provided by sqlContext.
- DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
- // Alternatively, a DataFrame can be created for a JSON dataset represented by
- // an RDD[String] storing one JSON object per string.
- List<String> jsonData = Arrays.asList(
- "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
- JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
- DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);
- anotherPeople.show();
- }
- }
Datasets
Datasets是新出的接口在1.6版本,為了使RDDS更便利(強類型,能使用強大的lambda函數),可以通過JVM對象構建或者通過熟練使用函數化轉換得到(map, flatMap, filter, etc)
The unified Dataset API can be used both in Scala and Java. Python does not yet have support for the Dataset API, but due to its dynamic nature many of the benefits are already available (i.e. you can access the field of a row by name naturally row.columnName). Full python support will be added in a future release.
至於怎么用spark操作hive和其他數據庫,以后再做學習
碼字不易,轉載請指明出處http://blog.csdn.net/tanggao1314/article/details/51594942
更多內容,請參考官網 Spark sql 編程指南