Spark SQL 筆記


Spark SQL

  1. 簡介

    • SparkSQL 的前身是 Shark, SparkSQL 產生的根本原因是其完全脫離了 Hive 的限制。(Shark 底層依賴於 Hive 的解析器, 查詢優化器)
      • SparkSQL 支持查詢原生的 RDD。
      • 能夠在 scala/java 中寫 SQL 語句。 支持簡單的 SQL 語法檢查, 能夠在 Scala 中 寫Hive 語句訪問 Hive 數據, 並將結果取回作為RDD使用
  2. Spark on Hive 和 Hive on Spark

    • Spark on Hive: Hive 只作為儲存角色, Spark負責 sql 解析優化, 執行。
    • Hive on Spark: Hive 即作為存儲又負責 sql 的解析優化, Spark 負責執行。
  3. Dataset 與 DataFrame

    • Dataset 是一個分布式數據容器,與 RDD 類似, 然而 DataSet 更像 傳統數據庫的二維表格, 除了數據以外, 還掌握的結構信息, 即schema。
    • 同時, 與 Hive 類似, Dataset 也支持嵌套數據類型 (struct、array 和 map)。
    • 從 API 易用性角度上看, DataSet API 提供的是一套高層的關系操作, 比函數式的 RDD API 更加友好, 門檻更低。
    • Dataset 的底層封裝的是RDD, 當 RDD 的泛型是 Row 類型的時候, 我們可以可以稱它為 DataFrame。即 Dataset = DataFrame
  4. SparkSQL 的數據源

    • SparkSQL的數據源可以是JSON類型的字符串, JDBC, Parquent, Hive, HDFS 等。

      image-20191028082149132

  5. SparkSQL 底層架構

    • 首先拿到 sql 后解析一批未被解決的邏輯計划, 再經過分析得到分析后的邏輯計划, 再經過一批優化規則轉換成一批最佳優化的邏輯計划, 再經過一批優化規則轉換成一批最佳優化的邏輯計划, 再經過 SparkPlanner 測策略轉化成一批物理計划, 隨后經過消費模型轉換成一個個的Spark任務執行。

      image-20191028082638317

  6. 謂詞下推 (predicate Pushdown)

    image-20191028083121073

    • 從關系型數據庫借鑒而來, 關系型數據中謂詞下推到外部數據庫用以減少數據傳輸
    • 基本思想: 盡可能早的處理表達式
    • 屬於邏輯優化, 優化器將謂詞過濾下推到數據源, 使物理執行跳過無關數據
    • 參數打開設置: hive.optimize.ppd=true

創建 Dataset 的幾種方式

  1. 讀取 json 格式的文件創建 DataSet

    • 注意事項:

      • json 文件中的 json 數據不能嵌套 json 格式數據。

      • Dataset 是一個一個 Row 類型的 RDD, ds.rdd()/ds.javaRDD()。

      • 可以兩種方式讀取json格式的文件。

      • df.show() 默認顯示前 20 行數據。

      • Dataset 原生 API 可以操作 Dataset(不方便)。

      • 注冊成臨時表時, 表中的列默認按 ascii 順序顯示列。

      • 案例:

        • json:

          {"name":"burning","age": 18}
          {"name":"atme"}
          {"name":"longdd","age":18}
          {"name":"yyf","age":28}
          {"name":"zhou","age":20}
          {"name":"blaze"}
          {"name":"ocean","age":18}
          {"name":"xiaoliu","age":28}
          {"name":"zhangsan","age":28}
          {"name":"lisi"}
          {"name":"wangwu","age":18}
          
        • Java代碼:

        package com.ronnie.java.json;
        
        import org.apache.spark.api.java.JavaRDD;
        import org.apache.spark.sql.Dataset;
        import org.apache.spark.sql.Row;
        import org.apache.spark.sql.SparkSession;
        
        public class ReadJson {
        
            public static void main(String[] args) {
                SparkSession sparkSession = SparkSession.builder().appName("jsonFile").master("local").getOrCreate();
        
                /**
                 *  Dataset的底層是一個個的 RDD, RDD的泛型是Row類型。
                 *  以下兩種方式都可以讀取json格式的文件
                 */
        
                Dataset<Row> ds = sparkSession.read().format("json").load("./resources/json");
        //        Dataset<Row> ds = sparkSession.read().json("data/json");
                ds.show();
                /**
                 * +----+--------+
                 * | age|    name|
                 * +----+--------+
                 * |  18| burning|
                 * |null|    atme|
                 * |  18|  longdd|
                 * |  28|     yyf|
                 * |  20|    zhou|
                 * |null|   blaze|
                 * |  18|   ocean|
                 * |  28| xiaoliu|
                 * |  28|zhangsan|
                 * |null|    lisi|
                 * |  18|  wangwu|
                 * +----+--------+
                 */
        
                /**
                 *  Dataset 轉換為 RDD
                 */
        
                 JavaRDD<Row> javaRDD = ds.javaRDD();
        
                /**
                 *  顯示 Dataset 中的內容, 默認顯示前20行. 如果顯示多行要指定多少行show(行數)
                 *  注意: 當有多個列時, 顯示的列先后順序是按列的ascii碼順序先后顯示
                 */
        
                /**
                 *  樹形的形式顯示schema信息
                 */
                 ds.printSchema();
        
                /**
                 * root
                 *  |-- age: long (nullable = true)
                 *  |-- name: string (nullable = true)
                 */
        
                /**
                 * Dataset自帶的API 操作Dataset
                 */
        
                // select name from table
                ds.select("name").show();
                /**
                 *  +--------+
                 * |    name|
                 * +--------+
                 * | burning|
                 * |    atme|
                 * |  longdd|
                 * |     yyf|
                 * |    zhou|
                 * |   blaze|
                 * |   ocean|
                 * | xiaoliu|
                 * |zhangsan|
                 * |    lisi|
                 * |  wangwu|
                 * +--------+
                 */
        
                // select name age+10 as addage from table
                ds.select(ds.col("name"),ds.col("age").plus(10).alias("addage")).show();
                /**
                 * +--------+------+
                 * |    name|addage|
                 * +--------+------+
                 * | burning|    28|
                 * |    atme|  null|
                 * |  longdd|    28|
                 * |     yyf|    38|
                 * |    zhou|    30|
                 * |   blaze|  null|
                 * |   ocean|    28|
                 * | xiaoliu|    38|
                 * |zhangsan|    38|
                 * |    lisi|  null|
                 * |  wangwu|    28|
                 * +--------+------+
                 */
        
                // select name, age from table where age > 19
                ds.select(ds.col("name"),ds.col("age")).where(ds.col("age").gt(19)).show();
        
                /**
                 * +--------+---+
                 * |    name|age|
                 * +--------+---+
                 * |     yyf| 28|
                 * |    zhou| 20|
                 * | xiaoliu| 28|
                 * |zhangsan| 28|
                 * +--------+---+
                 */
        
                // select count(*) from table group by age
                ds.groupBy(ds.col("age")).count().show();
                /**
                 * +----+-----+
                 * | age|count|
                 * +----+-----+
                 * |null|    3|
                 * |  28|    3|
                 * |  18|    4|
                 * |  20|    1|
                 * +----+-----+
                 */
        
                /**
                 *  將Dataset 注冊成臨時的一張表, 這張表臨時注冊到內存中, 是邏輯上的表, 不會霧化到磁盤
                 */
        
                ds.createOrReplaceTempView("jtable");
        
                Dataset<Row> result = sparkSession.sql("select age, count(*) as gege from jtable group by age");
        
                result.show();
                
                /**
                 * +----+----+
                 * | age|gege|
                 * +----+----+
                 * |null|   3|
                 * |  28|   3|
                 * |  18|   4|
                 * |  20|   1|
                 * +----+----+
                 */
                sparkSession.stop();
            }
        }
        
  2. 通過 json 格式的 RDD 創建 DataSet

    package com.ronnie.java.json;
    
    import org.apache.spark.SparkContext;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    import java.util.Arrays;
    
    
    public class CreateDatasetFromJsonRDD {
    
        public static void main(String[] args) {
            SparkSession sparkSession = SparkSession.builder().appName("jsonrdd").master("local").getOrCreate();
            SparkContext sc = sparkSession.sparkContext();
            JavaSparkContext jsc = new JavaSparkContext(sc);
    
            JavaRDD<String> nameRDD = jsc.parallelize(Arrays.asList(
                    "{'name':'hao','age':\"24\"}",
                    "{\"name\":\"mu\",\"age\":\"26\"}",
                    "{\"name\":\"xiao8\",\"age\":\"27\"}"
            ));
    
            JavaRDD<String> scoreRDD = jsc.parallelize(Arrays.asList(
                    "{\"name\":\"zhangsan\",\"score\":\"100\"}",
                    "{\"name\":\"mu\",\"score\":\"200\"}",
                    "{\"name\":\"wangwu\",\"score\":\"300\"}"
            ));
    
            Dataset<Row> nameds = sparkSession.read().json(nameRDD);
            Dataset<Row> scoreds = sparkSession.read().json(scoreRDD);
            nameds.createOrReplaceTempView("nameTable");
            scoreds.createOrReplaceTempView("scoreTable");
    
            Dataset<Row> result = sparkSession.sql("select nameTable.name, nameTable.age, scoreTable.score "
                                                            + "from nameTable join scoreTable "
                                                            + "on nameTable.name = scoreTable.name");
            result.show();
            /**
             * +----+---+-----+
             * |name|age|score|
             * +----+---+-----+
             * |  mu| 26|  200|
             * +----+---+-----+
             */
            sparkSession.stop();
        }
    }
    
    
  3. 非 json 格式的 RDD 創建 DataSet

    • 通過反射的方式將非 json 格式的RDD轉換成 Dataset

      • 自定義類要可序列化
      • 自定義類的訪問級別是 Public
      • RDD 轉成 Dataset 后會根據映射將字段 ASCII 碼排序
      • 將 Dataset 轉換成 RDD時獲取字段的兩種方式:
        • ds.getInt(0) 下標獲取(不推薦使用)
        • ds.getAs("列名") 獲取(推薦使用)
      • person.txt
      
      1,longdd,27
      2,yyf,26
      3,zhou,27
      4,burning,30
      5,atme,21
      
      • Person.java
      package com.ronnie.java.entity;
      import	java.io.Serializable;
      
      public class Person implements Serializable {
          private static final long serialVersionUID = 1L;
      
          private String id ;
          private String name;
          private Integer age;
      
      
          public String getId() {
              return id;
          }
      
          public void setId(String id) {
              this.id = id;
          }
      
          public String getName() {
              return name;
          }
      
          public void setName(String name) {
              this.name = name;
          }
      
          public Integer getAge() {
              return age;
          }
      
          public void setAge(Integer age) {
              this.age = age;
          }
      
          @Override
          public String toString() {
              return "Person{" +
                      "id='" + id + '\'' +
                      ", name='" + name + '\'' +
                      ", age=" + age +
                      '}';
          }
      }
      
      • CreateDatasetRDDWithReflect

        package com.ronnie.java.json;
        
        
        import com.ronnie.java.entity.Person;
        import org.apache.spark.SparkContext;
        import org.apache.spark.api.java.JavaRDD;
        import org.apache.spark.api.java.JavaSparkContext;
        import org.apache.spark.api.java.function.Function;
        import org.apache.spark.sql.Dataset;
        import org.apache.spark.sql.Row;
        import org.apache.spark.sql.SparkSession;
        
        public class CreateDatasetFromRDDWithReflect {
        
            public static void main(String[] args) {
                SparkSession sparkSession = SparkSession.builder().appName("reflect").master("local").getOrCreate();
        
                SparkContext sc = sparkSession.sparkContext();
        
                JavaSparkContext jsc = new JavaSparkContext(sc);
        
                JavaRDD<String> lineRDD = jsc.textFile("./resources/person.txt");
        
                JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {
                    private static final long serialVersionUID = 1L;
        
                    @Override
                    public Person call(String line) throws Exception {
                        Person p = new Person();
                        p.setId(line.split(",")[0]);
                        p.setName(line.split(",")[1]);
                        p.setAge(Integer.valueOf(line.split(",")[2]));
        
                        return p;
                    }
                });
                /**
                 * 傳入進去Person.class的時候,sqlContext是通過反射的方式創建DataFrame
                 * 在底層通過反射的方式獲得Person的所有field,結合RDD本身,就生成了DataFrame
                 */
                Dataset<Row> dataFrame = sparkSession.createDataFrame(personRDD, Person.class);
                dataFrame.show();
                /**
                 * +---+---+-------+
                 * |age| id|   name|
                 * +---+---+-------+
                 * | 27|  1| longdd|
                 * | 26|  2|    yyf|
                 * | 27|  3|   zhou|
                 * | 30|  4|burning|
                 * | 21|  5|   atme|
                 * +---+---+-------+
                 */
                dataFrame.printSchema();
                /**
                 * root
                 *  |-- age: integer (nullable = true)
                 *  |-- id: string (nullable = true)
                 *  |-- name: string (nullable = true)
                 */
                dataFrame.registerTempTable("person");
                Dataset<Row> sql = sparkSession.sql("select name, id, age from person where id = 2");
                sql.show();
                /**
                 * +----+---+---+
                 * |name| id|age|
                 * +----+---+---+
                 * | yyf|  2| 26|
                 * +----+---+---+
                 */
        
                /**
                 * 將Dataset轉成JavaRDD
                 * 注意:
                 * 1.可以使用row.getInt(0),row.getString(1)...通過下標獲取返回Row類型的數據,但是要注意列順序問題---不常用
                 * 2.可以使用row.getAs("列名")來獲取對應的列值。
                 */
                JavaRDD<Row> javaRDD = dataFrame.javaRDD();
        
                JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {
        
                    private static final long serialVersionUID = 1L;
        
                    @Override
                    public Person call(Row row) throws Exception {
                        Person p = new Person();
        
                        p.setId(row.getAs("id"));
                        p.setName(row.getAs("name"));
                        p.setAge(row.getAs("age"));
        
                        return p;
                    }
                });
                map.foreach(x -> System.out.println(x));
                /**
                 * Person{id='1', name='longdd', age=27}
                 * Person{id='2', name='yyf', age=26}
                 * Person{id='3', name='zhou', age=27}
                 * Person{id='4', name='burning', age=30}
                 * Person{id='5', name='atme', age=21}
                 */
        
                sc.stop();
            }
        }
        
        
    • 動態創建 Schema 將非 json 格式的 RDD 轉換成 Dataset

      package com.ronnie.java.json;
      
      import org.apache.spark.SparkContext;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.apache.spark.api.java.function.Function;
      import org.apache.spark.sql.Dataset;
      import org.apache.spark.sql.Row;
      import org.apache.spark.sql.RowFactory;
      import org.apache.spark.sql.SparkSession;
      import org.apache.spark.sql.types.DataTypes;
      import org.apache.spark.sql.types.StructField;
      import org.apache.spark.sql.types.StructType;
      
      import java.util.Arrays;
      import java.util.List;
      
      public class CreateDatasetFromRDDWithStruct {
          public static void main(String[] args) {
              SparkSession sparkSession = SparkSession.builder().appName("schema").master("local").getOrCreate();
      
              SparkContext sc = sparkSession.sparkContext();
      
              JavaSparkContext jsc = new JavaSparkContext(sc);
      
              JavaRDD<String> lineRDD = jsc.textFile("./resources/person.txt");
      
              /**
               *  轉換成Row類型的RDD
               */
      
              final JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
      
                  private static final long serialVersionUID = 1L;
      
                  @Override
                  public Row call(String line) throws Exception {
                      return RowFactory.create(
                              line.split(",")[0],
                              line.split(",")[1],
                              Integer.valueOf(line.split(",")[2])
                      );
                  }
              });
      
      
              /**
               * 動態構建DataFrame中的元數據,一般來說這里的字段可以來源自字符串,也可以來源於外部數據庫
               */
      
              List<StructField> asList = Arrays.asList(
                      DataTypes.createStructField("id", DataTypes.StringType, true),
                      DataTypes.createStructField("name", DataTypes.StringType, true),
                      DataTypes.createStructField("age", DataTypes.IntegerType, true)
              );
      
              StructType schema = DataTypes.createStructType(asList);
      
              Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema);
      
              df.printSchema();
              /**
               * root
               *  |-- id: string (nullable = true)
               *  |-- name: string (nullable = true)
               *  |-- age: integer (nullable = true)
               */
              df.show();
              /**
               * +---+-------+---+
               * | id|   name|age|
               * +---+-------+---+
               * |  1| longdd| 27|
               * |  2|    yyf| 26|
               * |  3|   zhou| 27|
               * |  4|burning| 30|
               * |  5|   atme| 21|
               * +---+-------+---+
               */
      
              sc.stop();
              
          }
      }
      
  4. 讀取 parquet 文件創建 DataSet

    • SaveMode: 指文件保存時的模式
    • Overwrite: 覆蓋
    • Append: 追加
    • getOrCreate: 獲取或創建
    package com.ronnie.java.parquet;
    
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.SparkSession;
    
    public class CreateDataFrameFromParquet {
        public static void main(String[] args) {
            SparkSession sparkSession = SparkSession.builder().appName("parquet").master("local").getOrCreate();
    
            Dataset<Row> df = sparkSession.read().json("./resources/json");
    
            df.show();
            /**
             * +----+--------+
             * | age|    name|
             * +----+--------+
             * |  18| burning|
             * |null|    atme|
             * |  18|  longdd|
             * |  28|     yyf|
             * |  20|    zhou|
             * |null|   blaze|
             * |  18|   ocean|
             * |  28| xiaoliu|
             * |  28|zhangsan|
             * |null|    lisi|
             * |  18|  wangwu|
             * +----+--------+
             */
    
            /**
             * 將Dataset保存成parquet文件,
             * SaveMode指定存儲文件時的保存模式:
             * 		Overwrite:覆蓋
             * 		Append:追加
             * 		ErrorIfExists:如果存在就報錯
             * 		Ignore:如果存在就忽略
             * 保存成parquet文件有以下兩種方式:
             */
    
            // df.write().mode(SaveMode.Overwrite).format("parquet").save("./resources/parquet");
            df.write().mode(SaveMode.Overwrite).parquet("./resources/parquet");
            /**
             * {
             *   "type" : "struct",
             *   "fields" : [ {
             *     "name" : "age",
             *     "type" : "long",
             *     "nullable" : true,
             *     "metadata" : { }
             *   }, {
             *     "name" : "name",
             *     "type" : "string",
             *     "nullable" : true,
             *     "metadata" : { }
             *   } ]
             * }
             * and corresponding Parquet message type:
             * message spark_schema {
             *   optional int64 age;
             *   optional binary name (UTF8);
             * }
             */
    
            /**
             * 加載parquet文件成Dataset
             * 加載parquet文件有以下兩種方式:
             */
    
            Dataset<Row> load = sparkSession.read().format("parquet").load("./resources/parquet");
    
            load.show();
            /**
             * +----+--------+
             * | age|    name|
             * +----+--------+
             * |  18| burning|
             * |null|    atme|
             * |  18|  longdd|
             * |  28|     yyf|
             * |  20|    zhou|
             * |null|   blaze|
             * |  18|   ocean|
             * |  28| xiaoliu|
             * |  28|zhangsan|
             * |null|    lisi|
             * |  18|  wangwu|
             * +----+--------+
             */
    
            sparkSession.stop();
        }
    }
    
  5. 讀取 JDBC 中的數據創建 DataSet

    package com.ronnie.java.jdbc;
    
    import org.apache.spark.sql.*;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    public class CreateDatasetFromMysql {
    
        public static void main(String[] args) {
            SparkSession sparkSession = SparkSession.builder().appName("jdbc").master("local").getOrCreate();
    
            /**
             * 第一種方式讀取MySql數據庫表,加載為Dataset
             */
    
            Map<String, String> options = new HashMap<>();
    
            options.put("url", "jdbc:mysql://localhost:3306/spark");
            options.put("driver", "com.mysql.jdbc.Driver");
            options.put("user", "root");
            options.put("password", "123456");
            options.put("dbtable", "person");
    
            Dataset<Row> person = sparkSession.read().format("jdbc").options(options).load();
    
            person.show();
            /**
             * +---+------+---+
             * | id|  name|age|
             * +---+------+---+
             * |  1| slark| 70|
             * |  2|   pom| 40|
             * |  3|huskar| 60|
             * |  4|   axe| 80|
             * +---+------+---+
             */
    
            person.createOrReplaceTempView("person");
    
            /**
             * 第二種方式讀取MySql數據表加載為Dataset
             */
            DataFrameReader reader = sparkSession.read().format("jdbc");
            reader.option("url", "jdbc:mysql://localhost:3306/spark");
            reader.option("driver", "com.mysql.jdbc.Driver");
            reader.option("user", "root");
            reader.option("password", "123456");
            reader.option("dbtable", "score");
    
            Dataset<Row> score = reader.load();
            score.show();
            /**
             * +---+------+-----+
             * | id|  name|score|
             * +---+------+-----+
             * |  1|dragon|   80|
             * |  2|   axe|   99|
             * |  3| slark|   81|
             * +---+------+-----+
             */
    
            score.createOrReplaceTempView("score");
            Dataset<Row>  result =
                    sparkSession.sql("select person.id,person.name,person.age,score.score "
                            + "from person,score "
                            + "where person.name = score.name  and score.score> 82");
            result.show();
            /**
             * +---+----+---+-----+
             * | id|name|age|score|
             * +---+----+---+-----+
             * |  4| axe| 80|   99|
             * +---+----+---+-----+
             */
    
            result.registerTempTable("result");
    
            /**
             * 將Dataset結果保存到Mysql中
             */
    //
            Properties properties = new Properties();
            properties.setProperty("user", "root");
            properties.setProperty("password", "123456");
    
            /**
             * SaveMode:
             * Overwrite:覆蓋
             * Append:追加
             * ErrorIfExists:如果存在就報錯
             * Ignore:如果存在就忽略
             *
             */
    
            result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://127.0.0.1:3306/spark", "result", properties);
    //        System.out.println("----Finish----");
            sparkSession.stop();
        }
    }
    
  6. Hive 中的數據創建 DataSet

    package com.ronnie.java.hive;
    
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SaveMode;
    import org.apache.spark.sql.SparkSession;
    
    public class CreateDatasetFromHive {
    
        public static void main(String[] args) {
            SparkSession sparkSession = SparkSession
                    .builder()
                    .appName("hive")
                    //開啟hive的支持,接下來就可以操作hive表了
                    // 前提需要是需要開啟hive metastore 服務
                    .enableHiveSupport()
                    .getOrCreate();
    
            sparkSession.sql("USE spark");
            sparkSession.sql("DROP TABLE IF EXISTS student_infos");
            //在hive中創建student_infos表
            sparkSession.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t' ");
            sparkSession.sql("load data local inpath '/root/student_infos' into table student_infos");
            //注意:此種方式,程序需要能讀取到數據(如/root/student_infos),同時也要能讀取到 metastore服務的配置信息。
    
            sparkSession.sql("DROP TABLE IF EXISTS student_scores");
            sparkSession.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by '\t'");
            sparkSession.sql("LOAD DATA "
                    + "LOCAL INPATH '/root/student_scores'"
                    + "INTO TABLE student_scores");
    
            //		Dataset<Row> df = hiveContext.table("student_infos");//讀取Hive表加載Dataset方式
            /**
             * 查詢表生成Dataset
             */
            Dataset<Row> goodStudentsDF = sparkSession.sql("SELECT si.name, si.age, ss.score "
                    + "FROM student_infos si "
                    + "JOIN student_scores ss "
                    + "ON si.name=ss.name "
                    + "WHERE ss.score>=80");
    
            goodStudentsDF.registerTempTable("goodstudent");
            Dataset<Row> result = sparkSession.sql("select * from goodstudent");
            result.show();
    
            /**
             * 將結果保存到hive表 good_student_infos
             */
            sparkSession.sql("DROP TABLE IF EXISTS good_student_infos");
            goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");
    
            sparkSession.stop();
        }
    
    }
    

序列化問題

  • Java 中以下幾種情況下不被序列化的問題:
    • 反序列化時 serializable 版本號不一致導致不能反序列化
    1. 子類中實現了serializable 接口, 但父類中沒有實現, 父類中的變量不能被序列化, 序列化后父類中的變量會得到 null。
    2. 被關鍵字 transient 修飾的變量不能被序列化。
    3. 靜態變量不能被序列化, 屬於類, 不屬於方法和對象, 所以不能被序列化。

儲存 DataSet

  • 將 DataSet 存儲為 parquet 文件。
  • 將 DataSet 存儲到 JDBC 數據庫。
  • 將DataSet 存儲到 Hive 表

自定義函數 UDP 和 UDAF

  • UDF(User Defined Function): 用戶自定義函數

    package com.ronnie.java.udf_udaf;
    
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.api.java.UDF2;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    
    public class UDF {
        public static void main(String[] args) {
            SparkSession sparkSession = SparkSession.builder().appName("udf").master("local").getOrCreate();
    
            JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());
    
            JavaRDD<String> parallelize = jsc.parallelize(Arrays.asList("atme", "maybe", "chalice"));
    
            JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Row call(String s) throws Exception {
                    return RowFactory.create(s);
                }
            });
    
            /**
             * 動態創建Schema方式加載DF
             */
    
            ArrayList<StructField> fields = new ArrayList<>();
            fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));
    
            StructType schema = DataTypes.createStructType(fields);
    
            Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema);
    
    
            df.registerTempTable("user");
    
            /**
             * 根據UDF函數參數的個數來決定是實現哪一個UDF  UDF1,UDF2。。。。UDF1xxx
             */
    
            sparkSession.udf().register("StrLen", new UDF2<String, Integer, Integer>(){
    
                private static final long serialVersionUID = 1L;
    
                @Override
                public Integer call(String t1, Integer t2) throws Exception {
                    return t1.length() + t2;
                }
            }, DataTypes.IntegerType);
            sparkSession.sql("select name ,StrLen(name,100) as length from user").show();
            /**
             * +-------+------+
             * |   name|length|
             * +-------+------+
             * |   atme|   104|
             * |  maybe|   105|
             * |chalice|   107|
             * +-------+------+
             */
    
            sparkSession.stop();
        }
    }
    
  • UDAF(User Defined Aggregate Function): 用戶自定義聚合函數

    • 實現UDAF函數如果要自定義類要實現UserDefinedAggregateFunction

      package com.ronnie.java.udf_udaf;
      
      
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.apache.spark.api.java.function.Function;
      import org.apache.spark.sql.Dataset;
      import org.apache.spark.sql.Row;
      import org.apache.spark.sql.RowFactory;
      import org.apache.spark.sql.SparkSession;
      import org.apache.spark.sql.expressions.MutableAggregationBuffer;
      import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
      import org.apache.spark.sql.types.DataType;
      import org.apache.spark.sql.types.DataTypes;
      import org.apache.spark.sql.types.StructField;
      import org.apache.spark.sql.types.StructType;
      
      import java.util.ArrayList;
      import java.util.Arrays;
      import java.util.List;
      
      public class UDAF {
      
          public static void main(String[] args) {
              SparkSession sparkSession = SparkSession.builder().appName("udaf").master("local").getOrCreate();
      
      
              JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
      
              JavaRDD<String> parallelize = sc.parallelize(
                      Arrays.asList("zeus", "lina", "wind ranger", "zeus", "zeus", "lina","zeus", "lina", "wind ranger", "zeus", "zeus", "lina"),2);
      
              JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
                  @Override
                  public Row call(String s) throws Exception {
                      return RowFactory.create(s);
                  }
              });
      
              List<StructField> fields = new ArrayList<>();
              fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
      
              StructType schema = DataTypes.createStructType(fields);
      
              Dataset<Row> df = sparkSession.createDataFrame(rowRDD, schema);
      
              df.registerTempTable("user");
      
              /**
               * 注冊一個UDAF函數,實現統計相同值得個數
               * 注意:這里可以自定義一個類繼承UserDefinedAggregateFunction類也是可以的
               * 數據:
               *     zeus
               *     zeus
               *     lina
               *     lina
               *
               *     select count(*)  from user group by name
               */
      
              sparkSession.udf().register("StringCount", new UserDefinedAggregateFunction() {
      
                  /**
                   * 指定輸入字段的字段及類型
                   * @return
                   */
                  @Override
                  public StructType inputSchema() {
                      return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, true)));
                  }
      
                  /**
                   * 指定UDAF函數計算后返回的結果類型
                   * @return
                   */
                  @Override
                  public DataType dataType() {
                      return DataTypes.IntegerType;
                  }
      
                  /**
                   * 確保一致性 一般用true,用以標記針對給定的一組輸入,UDAF是否總是生成相同的結果。
                   * @return
                   */
                  @Override
                  public boolean deterministic() {
                      return true;
                  }
      
                  /**
                   * 更新 可以認為一個一個地將組內的字段值傳遞進來 實現拼接的邏輯
                   * buffer.getInt(0)獲取的是上一次聚合后的值
                   * 相當於map端的combiner,combiner就是對每一個map task的處理結果進行一次小聚合
                   * 大聚和發生在reduce端.
                   * 這里即是:在進行聚合的時候,每當有新的值進來,對分組后的聚合如何進行計算
                   * @param buffer
                   * @param input
                   */
                  @Override
                  public void update(MutableAggregationBuffer buffer, Row input) {
                      buffer.update(0, buffer.getInt(0) + 1);
                      System.out.println("update......buffer" + buffer.toString() + " | row" + input);
                  }
      
                  /**
                   * 在進行聚合操作的時候所要處理的數據的結果的類型
                   * @return
                   */
                  @Override
                  public StructType bufferSchema() {
                      return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("buffer", DataTypes.IntegerType, true)));
                  }
      
                  /**
                   * 合並 update操作,可能是針對一個分組內的部分數據,在某個節點上發生的 但是可能一個分組內的數據,會分布在多個節點上處理
                   * 此時就要用merge操作,將各個節點上分布式拼接好的串,合並起來
                   * buffer1.getInt(0) : 大聚合的時候 上一次聚合后的值
                   * buffer2.getInt(0) : 這次計算傳入進來的update的結果
                   * 這里即是:最后在分布式節點完成后需要進行全局級別的Merge操作
                   */
                  @Override
                  public void merge(MutableAggregationBuffer buffer1,  Row buffer2) {
                      /* 2 3  4  5  6  7
                         0 + 2 = 2
                         2 + 3 = 5
                         5 + 4  = 9 */
      
                      buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));
                      System.out.println("merge.....buffer : "  + buffer1.toString() + "| row" + buffer2.toString());
                  }
      
                  /**
                   * 初始化一個內部的自己定義的值,在Aggregate之前每組數據的初始化結果
                   * @param buffer
                   */
                  @Override
                  public void initialize(MutableAggregationBuffer buffer) {
                      buffer.update(0, 0);
      
                      System.out.println("init ......" + buffer.get(0));
                  }
      
                  /**
                   * 最后返回一個和DataType的類型要一致的類型,返回UDAF最后的計算結果
                   * @param row
                   * @return
                   */
                  @Override
                  public Object evaluate(Row row) {
                      return row.getInt(0);
                  }
              });
      
              sparkSession.sql("select name, StringCount(name) as number from user group by name").show();
              /**
               * +-----------+------+
               * |       name|number|
               * +-----------+------+
               * |wind ranger|     2|
               * |       lina|     4|
               * |       zeus|     6|
               * +-----------+------+
               */
      
              sc.stop();
          }
      }
      
      

開窗函數

package com.ronnie.java.windowFun;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

/**
 * row_number()開窗函數:
 * 主要是按照某個字段分組,然后取另一字段的前幾個的值,相當於 分組取topN
 * row_number() over (partition by xxx order by xxx desc) xxx
 *
 */
public class RowNumberWindowFun {
    //-Xms800m -Xmx800m  -XX:PermSize=64M -XX:MaxNewSize=256m -XX:MaxPermSize=128m
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession
                .builder()
                .appName("window")
                .master("local")
                //開啟hive的支持,接下來就可以操作hive表了
                // 前提需要是需要開啟hive metastore 服務
                .enableHiveSupport()
                .getOrCreate();

        sparkSession.sql("use spark");
        sparkSession.sql("drop table if exists sales");
        sparkSession.sql("create table if not exists sales (riqi string,leibie string,jine Int) "
                + "row format delimited fields terminated by '\t'");
        sparkSession.sql("load data local inpath './data/sales.txt' into table sales");
        /**
         * 開窗函數格式:
         * 【 row_number() over (partition by XXX order by XXX) as rank】
         * 注意:rank 從1開始
         */
        /**
         * 以類別分組,按每種類別金額降序排序,顯示 【日期,種類,金額】 結果,如:
         *
         * 1 A 100
         * 2 B 200
         * 3 A 300
         * 4 B 400
         * 5 A 500
         * 6 B 600
         *
         * 排序后:
         * 5 A 500  --rank 1
         * 3 A 300  --rank 2
         * 1 A 100  --rank 3
         * 6 B 600  --rank 1
         * 4 B 400	--rank 2
         * 2 B 200  --rank 3
         *
         * 2018 A 400     1
         * 2017 A 500     2
         * 2016 A 550     3
         *
         *
         * 2016 A 550     1
         * 2017 A 500     2
         * 2018 A 400     3
         *
         */

        Dataset<Row> result = sparkSession.sql("select riqi,leibie,jine,rank "
                + "from ("
                + "select riqi,leibie,jine,"
                + "row_number() over (partition by leibie order by jine desc) rank "
                + "from sales) t "
                + "where t.rank<=3");

        result.show(100);
        /**
         * 將結果保存到hive表sales_result
         */
//		result.write().mode(SaveMode.Overwrite).saveAsTable("sales_result");
        sparkSession.stop();
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM