Spark SQL
-
簡介
- SparkSQL 的前身是 Shark, SparkSQL 產生的根本原因是其完全脫離了 Hive 的限制。(Shark 底層依賴於 Hive 的解析器, 查詢優化器)
- SparkSQL 支持查詢原生的 RDD。
- 能夠在 scala/java 中寫 SQL 語句。 支持簡單的 SQL 語法檢查, 能夠在 Scala 中 寫Hive 語句訪問 Hive 數據, 並將結果取回作為RDD使用
- SparkSQL 的前身是 Shark, SparkSQL 產生的根本原因是其完全脫離了 Hive 的限制。(Shark 底層依賴於 Hive 的解析器, 查詢優化器)
-
Spark on Hive 和 Hive on Spark
- Spark on Hive: Hive 只作為儲存角色, Spark負責 sql 解析優化, 執行。
- Hive on Spark: Hive 即作為存儲又負責 sql 的解析優化, Spark 負責執行。
-
Dataset 與 DataFrame
- Dataset 是一個分布式數據容器,與 RDD 類似, 然而 DataSet 更像 傳統數據庫的二維表格, 除了數據以外, 還掌握的結構信息, 即schema。
- 同時, 與 Hive 類似, Dataset 也支持嵌套數據類型 (struct、array 和 map)。
- 從 API 易用性角度上看, DataSet API 提供的是一套高層的關系操作, 比函數式的 RDD API 更加友好, 門檻更低。
- Dataset 的底層封裝的是RDD, 當 RDD 的泛型是 Row 類型的時候, 我們可以可以稱它為 DataFrame。即 Dataset
= DataFrame
-
SparkSQL 的數據源
-
SparkSQL的數據源可以是JSON類型的字符串, JDBC, Parquent, Hive, HDFS 等。
-
-
SparkSQL 底層架構
-
首先拿到 sql 后解析一批未被解決的邏輯計划, 再經過分析得到分析后的邏輯計划, 再經過一批優化規則轉換成一批最佳優化的邏輯計划, 再經過一批優化規則轉換成一批最佳優化的邏輯計划, 再經過 SparkPlanner 測策略轉化成一批物理計划, 隨后經過消費模型轉換成一個個的Spark任務執行。
-
-
謂詞下推 (predicate Pushdown)
- 從關系型數據庫借鑒而來, 關系型數據中謂詞下推到外部數據庫用以減少數據傳輸
- 基本思想: 盡可能早的處理表達式
- 屬於邏輯優化, 優化器將謂詞過濾下推到數據源, 使物理執行跳過無關數據
- 參數打開設置: hive.optimize.ppd=true
創建 Dataset 的幾種方式
-
讀取 json 格式的文件創建 DataSet
-
注意事項:
-
json 文件中的 json 數據不能嵌套 json 格式數據。
-
Dataset 是一個一個 Row 類型的 RDD, ds.rdd()/ds.javaRDD()。
-
可以兩種方式讀取json格式的文件。
-
df.show() 默認顯示前 20 行數據。
-
Dataset 原生 API 可以操作 Dataset(不方便)。
-
注冊成臨時表時, 表中的列默認按 ascii 順序顯示列。
-
案例:
-
json:
{"name":"burning","age": 18} {"name":"atme"} {"name":"longdd","age":18} {"name":"yyf","age":28} {"name":"zhou","age":20} {"name":"blaze"} {"name":"ocean","age":18} {"name":"xiaoliu","age":28} {"name":"zhangsan","age":28} {"name":"lisi"} {"name":"wangwu","age":18}
-
Java代碼:
package com.ronnie.java.json; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class ReadJson { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("jsonFile").master("local").getOrCreate(); /** * Dataset的底層是一個個的 RDD, RDD的泛型是Row類型。 * 以下兩種方式都可以讀取json格式的文件 */ Dataset<Row> ds = sparkSession.read().format("json").load("./resources/json"); // Dataset<Row> ds = sparkSession.read().json("data/json"); ds.show(); /** * +----+--------+ * | age| name| * +----+--------+ * | 18| burning| * |null| atme| * | 18| longdd| * | 28| yyf| * | 20| zhou| * |null| blaze| * | 18| ocean| * | 28| xiaoliu| * | 28|zhangsan| * |null| lisi| * | 18| wangwu| * +----+--------+ */ /** * Dataset 轉換為 RDD */ JavaRDD<Row> javaRDD = ds.javaRDD(); /** * 顯示 Dataset 中的內容, 默認顯示前20行. 如果顯示多行要指定多少行show(行數) * 注意: 當有多個列時, 顯示的列先后順序是按列的ascii碼順序先后顯示 */ /** * 樹形的形式顯示schema信息 */ ds.printSchema(); /** * root * |-- age: long (nullable = true) * |-- name: string (nullable = true) */ /** * Dataset自帶的API 操作Dataset */ // select name from table ds.select("name").show(); /** * +--------+ * | name| * +--------+ * | burning| * | atme| * | longdd| * | yyf| * | zhou| * | blaze| * | ocean| * | xiaoliu| * |zhangsan| * | lisi| * | wangwu| * +--------+ */ // select name age+10 as addage from table ds.select(ds.col("name"),ds.col("age").plus(10).alias("addage")).show(); /** * +--------+------+ * | name|addage| * +--------+------+ * | burning| 28| * | atme| null| * | longdd| 28| * | yyf| 38| * | zhou| 30| * | blaze| null| * | ocean| 28| * | xiaoliu| 38| * |zhangsan| 38| * | lisi| null| * | wangwu| 28| * +--------+------+ */ // select name, age from table where age > 19 ds.select(ds.col("name"),ds.col("age")).where(ds.col("age").gt(19)).show(); /** * +--------+---+ * | name|age| * +--------+---+ * | yyf| 28| * | zhou| 20| * | xiaoliu| 28| * |zhangsan| 28| * +--------+---+ */ // select count(*) from table group by age ds.groupBy(ds.col("age")).count().show(); /** * +----+-----+ * | age|count| * +----+-----+ * |null| 3| * | 28| 3| * | 18| 4| * | 20| 1| * +----+-----+ */ /** * 將Dataset 注冊成臨時的一張表, 這張表臨時注冊到內存中, 是邏輯上的表, 不會霧化到磁盤 */ ds.createOrReplaceTempView("jtable"); Dataset<Row> result = sparkSession.sql("select age, count(*) as gege from jtable group by age"); result.show(); /** * +----+----+ * | age|gege| * +----+----+ * |null| 3| * | 28| 3| * | 18| 4| * | 20| 1| * +----+----+ */ sparkSession.stop(); } }
-
-
-
-
通過 json 格式的 RDD 創建 DataSet
package com.ronnie.java.json; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import java.util.Arrays; public class CreateDatasetFromJsonRDD { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("jsonrdd").master("local").getOrCreate(); SparkContext sc = sparkSession.sparkContext(); JavaSparkContext jsc = new JavaSparkContext(sc); JavaRDD<String> nameRDD = jsc.parallelize(Arrays.asList( "{'name':'hao','age':\"24\"}", "{\"name\":\"mu\",\"age\":\"26\"}", "{\"name\":\"xiao8\",\"age\":\"27\"}" )); JavaRDD<String> scoreRDD = jsc.parallelize(Arrays.asList( "{\"name\":\"zhangsan\",\"score\":\"100\"}", "{\"name\":\"mu\",\"score\":\"200\"}", "{\"name\":\"wangwu\",\"score\":\"300\"}" )); Dataset<Row> nameds = sparkSession.read().json(nameRDD); Dataset<Row> scoreds = sparkSession.read().json(scoreRDD); nameds.createOrReplaceTempView("nameTable"); scoreds.createOrReplaceTempView("scoreTable"); Dataset<Row> result = sparkSession.sql("select nameTable.name, nameTable.age, scoreTable.score " + "from nameTable join scoreTable " + "on nameTable.name = scoreTable.name"); result.show(); /** * +----+---+-----+ * |name|age|score| * +----+---+-----+ * | mu| 26| 200| * +----+---+-----+ */ sparkSession.stop(); } }
-
非 json 格式的 RDD 創建 DataSet
-
通過反射的方式將非 json 格式的RDD轉換成 Dataset
- 自定義類要可序列化
- 自定義類的訪問級別是 Public
- RDD 轉成 Dataset 后會根據映射將字段 ASCII 碼排序
- 將 Dataset 轉換成 RDD時獲取字段的兩種方式:
- ds.getInt(0) 下標獲取(不推薦使用)
- ds.getAs("列名") 獲取(推薦使用)
- person.txt
1,longdd,27 2,yyf,26 3,zhou,27 4,burning,30 5,atme,21
- Person.java
package com.ronnie.java.entity; import java.io.Serializable; public class Person implements Serializable { private static final long serialVersionUID = 1L; private String id ; private String name; private Integer age; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } @Override public String toString() { return "Person{" + "id='" + id + '\'' + ", name='" + name + '\'' + ", age=" + age + '}'; } }
-
CreateDatasetRDDWithReflect
package com.ronnie.java.json; import com.ronnie.java.entity.Person; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class CreateDatasetFromRDDWithReflect { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("reflect").master("local").getOrCreate(); SparkContext sc = sparkSession.sparkContext(); JavaSparkContext jsc = new JavaSparkContext(sc); JavaRDD<String> lineRDD = jsc.textFile("./resources/person.txt"); JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() { private static final long serialVersionUID = 1L; @Override public Person call(String line) throws Exception { Person p = new Person(); p.setId(line.split(",")[0]); p.setName(line.split(",")[1]); p.setAge(Integer.valueOf(line.split(",")[2])); return p; } }); /** * 傳入進去Person.class的時候,sqlContext是通過反射的方式創建DataFrame * 在底層通過反射的方式獲得Person的所有field,結合RDD本身,就生成了DataFrame */ Dataset<Row> dataFrame = sparkSession.createDataFrame(personRDD, Person.class); dataFrame.show(); /** * +---+---+-------+ * |age| id| name| * +---+---+-------+ * | 27| 1| longdd| * | 26| 2| yyf| * | 27| 3| zhou| * | 30| 4|burning| * | 21| 5| atme| * +---+---+-------+ */ dataFrame.printSchema(); /** * root * |-- age: integer (nullable = true) * |-- id: string (nullable = true) * |-- name: string (nullable = true) */ dataFrame.registerTempTable("person"); Dataset<Row> sql = sparkSession.sql("select name, id, age from person where id = 2"); sql.show(); /** * +----+---+---+ * |name| id|age| * +----+---+---+ * | yyf| 2| 26| * +----+---+---+ */ /** * 將Dataset轉成JavaRDD * 注意: * 1.可以使用row.getInt(0),row.getString(1)...通過下標獲取返回Row類型的數據,但是要注意列順序問題---不常用 * 2.可以使用row.getAs("列名")來獲取對應的列值。 */ JavaRDD<Row> javaRDD = dataFrame.javaRDD(); JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() { private static final long serialVersionUID = 1L; @Override public Person call(Row row) throws Exception { Person p = new Person(); p.setId(row.getAs("id")); p.setName(row.getAs("name")); p.setAge(row.getAs("age")); return p; } }); map.foreach(x -> System.out.println(x)); /** * Person{id='1', name='longdd', age=27} * Person{id='2', name='yyf', age=26} * Person{id='3', name='zhou', age=27} * Person{id='4', name='burning', age=30} * Person{id='5', name='atme', age=21} */ sc.stop(); } }
-
動態創建 Schema 將非 json 格式的 RDD 轉換成 Dataset
package com.ronnie.java.json; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.Arrays; import java.util.List; public class CreateDatasetFromRDDWithStruct { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("schema").master("local").getOrCreate(); SparkContext sc = sparkSession.sparkContext(); JavaSparkContext jsc = new JavaSparkContext(sc); JavaRDD<String> lineRDD = jsc.textFile("./resources/person.txt"); /** * 轉換成Row類型的RDD */ final JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() { private static final long serialVersionUID = 1L; @Override public Row call(String line) throws Exception { return RowFactory.create( line.split(",")[0], line.split(",")[1], Integer.valueOf(line.split(",")[2]) ); } }); /** * 動態構建DataFrame中的元數據,一般來說這里的字段可以來源自字符串,也可以來源於外部數據庫 */ List<StructField> asList = Arrays.asList( DataTypes.createStructField("id", DataTypes.StringType, true), DataTypes.createStructField("name", DataTypes.StringType, true), DataTypes.createStructField("age", DataTypes.IntegerType, true) ); StructType schema = DataTypes.createStructType(asList); Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema); df.printSchema(); /** * root * |-- id: string (nullable = true) * |-- name: string (nullable = true) * |-- age: integer (nullable = true) */ df.show(); /** * +---+-------+---+ * | id| name|age| * +---+-------+---+ * | 1| longdd| 27| * | 2| yyf| 26| * | 3| zhou| 27| * | 4|burning| 30| * | 5| atme| 21| * +---+-------+---+ */ sc.stop(); } }
-
-
讀取 parquet 文件創建 DataSet
- SaveMode: 指文件保存時的模式
- Overwrite: 覆蓋
- Append: 追加
- getOrCreate: 獲取或創建
package com.ronnie.java.parquet; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; public class CreateDataFrameFromParquet { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("parquet").master("local").getOrCreate(); Dataset<Row> df = sparkSession.read().json("./resources/json"); df.show(); /** * +----+--------+ * | age| name| * +----+--------+ * | 18| burning| * |null| atme| * | 18| longdd| * | 28| yyf| * | 20| zhou| * |null| blaze| * | 18| ocean| * | 28| xiaoliu| * | 28|zhangsan| * |null| lisi| * | 18| wangwu| * +----+--------+ */ /** * 將Dataset保存成parquet文件, * SaveMode指定存儲文件時的保存模式: * Overwrite:覆蓋 * Append:追加 * ErrorIfExists:如果存在就報錯 * Ignore:如果存在就忽略 * 保存成parquet文件有以下兩種方式: */ // df.write().mode(SaveMode.Overwrite).format("parquet").save("./resources/parquet"); df.write().mode(SaveMode.Overwrite).parquet("./resources/parquet"); /** * { * "type" : "struct", * "fields" : [ { * "name" : "age", * "type" : "long", * "nullable" : true, * "metadata" : { } * }, { * "name" : "name", * "type" : "string", * "nullable" : true, * "metadata" : { } * } ] * } * and corresponding Parquet message type: * message spark_schema { * optional int64 age; * optional binary name (UTF8); * } */ /** * 加載parquet文件成Dataset * 加載parquet文件有以下兩種方式: */ Dataset<Row> load = sparkSession.read().format("parquet").load("./resources/parquet"); load.show(); /** * +----+--------+ * | age| name| * +----+--------+ * | 18| burning| * |null| atme| * | 18| longdd| * | 28| yyf| * | 20| zhou| * |null| blaze| * | 18| ocean| * | 28| xiaoliu| * | 28|zhangsan| * |null| lisi| * | 18| wangwu| * +----+--------+ */ sparkSession.stop(); } }
-
讀取 JDBC 中的數據創建 DataSet
package com.ronnie.java.jdbc; import org.apache.spark.sql.*; import java.util.HashMap; import java.util.Map; import java.util.Properties; public class CreateDatasetFromMysql { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("jdbc").master("local").getOrCreate(); /** * 第一種方式讀取MySql數據庫表,加載為Dataset */ Map<String, String> options = new HashMap<>(); options.put("url", "jdbc:mysql://localhost:3306/spark"); options.put("driver", "com.mysql.jdbc.Driver"); options.put("user", "root"); options.put("password", "123456"); options.put("dbtable", "person"); Dataset<Row> person = sparkSession.read().format("jdbc").options(options).load(); person.show(); /** * +---+------+---+ * | id| name|age| * +---+------+---+ * | 1| slark| 70| * | 2| pom| 40| * | 3|huskar| 60| * | 4| axe| 80| * +---+------+---+ */ person.createOrReplaceTempView("person"); /** * 第二種方式讀取MySql數據表加載為Dataset */ DataFrameReader reader = sparkSession.read().format("jdbc"); reader.option("url", "jdbc:mysql://localhost:3306/spark"); reader.option("driver", "com.mysql.jdbc.Driver"); reader.option("user", "root"); reader.option("password", "123456"); reader.option("dbtable", "score"); Dataset<Row> score = reader.load(); score.show(); /** * +---+------+-----+ * | id| name|score| * +---+------+-----+ * | 1|dragon| 80| * | 2| axe| 99| * | 3| slark| 81| * +---+------+-----+ */ score.createOrReplaceTempView("score"); Dataset<Row> result = sparkSession.sql("select person.id,person.name,person.age,score.score " + "from person,score " + "where person.name = score.name and score.score> 82"); result.show(); /** * +---+----+---+-----+ * | id|name|age|score| * +---+----+---+-----+ * | 4| axe| 80| 99| * +---+----+---+-----+ */ result.registerTempTable("result"); /** * 將Dataset結果保存到Mysql中 */ // Properties properties = new Properties(); properties.setProperty("user", "root"); properties.setProperty("password", "123456"); /** * SaveMode: * Overwrite:覆蓋 * Append:追加 * ErrorIfExists:如果存在就報錯 * Ignore:如果存在就忽略 * */ result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://127.0.0.1:3306/spark", "result", properties); // System.out.println("----Finish----"); sparkSession.stop(); } }
-
Hive 中的數據創建 DataSet
package com.ronnie.java.hive; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; public class CreateDatasetFromHive { public static void main(String[] args) { SparkSession sparkSession = SparkSession .builder() .appName("hive") //開啟hive的支持,接下來就可以操作hive表了 // 前提需要是需要開啟hive metastore 服務 .enableHiveSupport() .getOrCreate(); sparkSession.sql("USE spark"); sparkSession.sql("DROP TABLE IF EXISTS student_infos"); //在hive中創建student_infos表 sparkSession.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t' "); sparkSession.sql("load data local inpath '/root/student_infos' into table student_infos"); //注意:此種方式,程序需要能讀取到數據(如/root/student_infos),同時也要能讀取到 metastore服務的配置信息。 sparkSession.sql("DROP TABLE IF EXISTS student_scores"); sparkSession.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'"); sparkSession.sql("LOAD DATA " + "LOCAL INPATH '/root/student_scores'" + "INTO TABLE student_scores"); // Dataset<Row> df = hiveContext.table("student_infos");//讀取Hive表加載Dataset方式 /** * 查詢表生成Dataset */ Dataset<Row> goodStudentsDF = sparkSession.sql("SELECT si.name, si.age, ss.score " + "FROM student_infos si " + "JOIN student_scores ss " + "ON si.name=ss.name " + "WHERE ss.score>=80"); goodStudentsDF.registerTempTable("goodstudent"); Dataset<Row> result = sparkSession.sql("select * from goodstudent"); result.show(); /** * 將結果保存到hive表 good_student_infos */ sparkSession.sql("DROP TABLE IF EXISTS good_student_infos"); goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos"); sparkSession.stop(); } }
序列化問題
- Java 中以下幾種情況下不被序列化的問題:
- 反序列化時 serializable 版本號不一致導致不能反序列化
- 子類中實現了serializable 接口, 但父類中沒有實現, 父類中的變量不能被序列化, 序列化后父類中的變量會得到 null。
- 被關鍵字 transient 修飾的變量不能被序列化。
- 靜態變量不能被序列化, 屬於類, 不屬於方法和對象, 所以不能被序列化。
儲存 DataSet
- 將 DataSet 存儲為 parquet 文件。
- 將 DataSet 存儲到 JDBC 數據庫。
- 將DataSet 存儲到 Hive 表
自定義函數 UDP 和 UDAF
-
UDF(User Defined Function): 用戶自定義函數
package com.ronnie.java.udf_udaf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.api.java.UDF2; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.Arrays; public class UDF { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("udf").master("local").getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext()); JavaRDD<String> parallelize = jsc.parallelize(Arrays.asList("atme", "maybe", "chalice")); JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() { private static final long serialVersionUID = 1L; @Override public Row call(String s) throws Exception { return RowFactory.create(s); } }); /** * 動態創建Schema方式加載DF */ ArrayList<StructField> fields = new ArrayList<>(); fields.add(DataTypes.createStructField("name", DataTypes.StringType,true)); StructType schema = DataTypes.createStructType(fields); Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema); df.registerTempTable("user"); /** * 根據UDF函數參數的個數來決定是實現哪一個UDF UDF1,UDF2。。。。UDF1xxx */ sparkSession.udf().register("StrLen", new UDF2<String, Integer, Integer>(){ private static final long serialVersionUID = 1L; @Override public Integer call(String t1, Integer t2) throws Exception { return t1.length() + t2; } }, DataTypes.IntegerType); sparkSession.sql("select name ,StrLen(name,100) as length from user").show(); /** * +-------+------+ * | name|length| * +-------+------+ * | atme| 104| * | maybe| 105| * |chalice| 107| * +-------+------+ */ sparkSession.stop(); } }
-
UDAF(User Defined Aggregate Function): 用戶自定義聚合函數
-
實現UDAF函數如果要自定義類要實現UserDefinedAggregateFunction
package com.ronnie.java.udf_udaf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.expressions.MutableAggregationBuffer; import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class UDAF { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("udaf").master("local").getOrCreate(); JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext()); JavaRDD<String> parallelize = sc.parallelize( Arrays.asList("zeus", "lina", "wind ranger", "zeus", "zeus", "lina","zeus", "lina", "wind ranger", "zeus", "zeus", "lina"),2); JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() { @Override public Row call(String s) throws Exception { return RowFactory.create(s); } }); List<StructField> fields = new ArrayList<>(); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema); df.registerTempTable("user"); /** * 注冊一個UDAF函數,實現統計相同值得個數 * 注意:這里可以自定義一個類繼承UserDefinedAggregateFunction類也是可以的 * 數據: * zeus * zeus * lina * lina * * select count(*) from user group by name */ sparkSession.udf().register("StringCount", new UserDefinedAggregateFunction() { /** * 指定輸入字段的字段及類型 * @return */ @Override public StructType inputSchema() { return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true))); } /** * 指定UDAF函數計算后返回的結果類型 * @return */ @Override public DataType dataType() { return DataTypes.IntegerType; } /** * 確保一致性 一般用true,用以標記針對給定的一組輸入,UDAF是否總是生成相同的結果。 * @return */ @Override public boolean deterministic() { return true; } /** * 更新 可以認為一個一個地將組內的字段值傳遞進來 實現拼接的邏輯 * buffer.getInt(0)獲取的是上一次聚合后的值 * 相當於map端的combiner,combiner就是對每一個map task的處理結果進行一次小聚合 * 大聚和發生在reduce端. * 這里即是:在進行聚合的時候,每當有新的值進來,對分組后的聚合如何進行計算 * @param buffer * @param input */ @Override public void update(MutableAggregationBuffer buffer, Row input) { buffer.update(0, buffer.getInt(0) + 1); System.out.println("update......buffer" + buffer.toString() + " | row" + input); } /** * 在進行聚合操作的時候所要處理的數據的結果的類型 * @return */ @Override public StructType bufferSchema() { return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("buffer", DataTypes.IntegerType, true))); } /** * 合並 update操作,可能是針對一個分組內的部分數據,在某個節點上發生的 但是可能一個分組內的數據,會分布在多個節點上處理 * 此時就要用merge操作,將各個節點上分布式拼接好的串,合並起來 * buffer1.getInt(0) : 大聚合的時候 上一次聚合后的值 * buffer2.getInt(0) : 這次計算傳入進來的update的結果 * 這里即是:最后在分布式節點完成后需要進行全局級別的Merge操作 */ @Override public void merge(MutableAggregationBuffer buffer1, Row buffer2) { /* 2 3 4 5 6 7 0 + 2 = 2 2 + 3 = 5 5 + 4 = 9 */ buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0)); System.out.println("merge.....buffer : " + buffer1.toString() + "| row" + buffer2.toString()); } /** * 初始化一個內部的自己定義的值,在Aggregate之前每組數據的初始化結果 * @param buffer */ @Override public void initialize(MutableAggregationBuffer buffer) { buffer.update(0, 0); System.out.println("init ......" + buffer.get(0)); } /** * 最后返回一個和DataType的類型要一致的類型,返回UDAF最后的計算結果 * @param row * @return */ @Override public Object evaluate(Row row) { return row.getInt(0); } }); sparkSession.sql("select name, StringCount(name) as number from user group by name").show(); /** * +-----------+------+ * | name|number| * +-----------+------+ * |wind ranger| 2| * | lina| 4| * | zeus| 6| * +-----------+------+ */ sc.stop(); } }
-
開窗函數
package com.ronnie.java.windowFun;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* row_number()開窗函數:
* 主要是按照某個字段分組,然后取另一字段的前幾個的值,相當於 分組取topN
* row_number() over (partition by xxx order by xxx desc) xxx
*
*/
public class RowNumberWindowFun {
//-Xms800m -Xmx800m -XX:PermSize=64M -XX:MaxNewSize=256m -XX:MaxPermSize=128m
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName("window")
.master("local")
//開啟hive的支持,接下來就可以操作hive表了
// 前提需要是需要開啟hive metastore 服務
.enableHiveSupport()
.getOrCreate();
sparkSession.sql("use spark");
sparkSession.sql("drop table if exists sales");
sparkSession.sql("create table if not exists sales (riqi string,leibie string,jine Int) "
+ "row format delimited fields terminated by '\t'");
sparkSession.sql("load data local inpath './data/sales.txt' into table sales");
/**
* 開窗函數格式:
* 【 row_number() over (partition by XXX order by XXX) as rank】
* 注意:rank 從1開始
*/
/**
* 以類別分組,按每種類別金額降序排序,顯示 【日期,種類,金額】 結果,如:
*
* 1 A 100
* 2 B 200
* 3 A 300
* 4 B 400
* 5 A 500
* 6 B 600
*
* 排序后:
* 5 A 500 --rank 1
* 3 A 300 --rank 2
* 1 A 100 --rank 3
* 6 B 600 --rank 1
* 4 B 400 --rank 2
* 2 B 200 --rank 3
*
* 2018 A 400 1
* 2017 A 500 2
* 2016 A 550 3
*
*
* 2016 A 550 1
* 2017 A 500 2
* 2018 A 400 3
*
*/
Dataset<Row> result = sparkSession.sql("select riqi,leibie,jine,rank "
+ "from ("
+ "select riqi,leibie,jine,"
+ "row_number() over (partition by leibie order by jine desc) rank "
+ "from sales) t "
+ "where t.rank<=3");
result.show(100);
/**
* 將結果保存到hive表sales_result
*/
// result.write().mode(SaveMode.Overwrite).saveAsTable("sales_result");
sparkSession.stop();
}
}