Spark3學習【基於Java】3. Spark-Sql常用API


學習一門開源技術一般有兩種入門方法,一種是去看官網文檔,比如Getting Started - Spark 3.2.0 Documentation (apache.org),另一種是去看官網的例子,也就是%SPARK_HOME%\examples下面的代碼。打開IDEA,選擇File-Open...

跟前面文章中方法一樣導入jars目錄到classpath。

Spark解析json字符串

第一個例子是讀取並解析Json。這個例子的結果讓我有些震驚,先上代碼:

  1. public static void main(String[] args) {
  2.     SparkSession session = SparkSession.builder().master("local[1]").appName("SparkSqlApp").getOrCreate();
  3.  
  4.     Dataset<Row> json = session.read().json("spark-core/src/main/resources/people.json");
  5.     json.show();
  6. }

讓我驚訝的是文件的內容。例子里面的文件是三個大括號並列,文件擴展名是.json,由於沒有中括號,所以格式是錯的:

  1. {"name":"Michael"}
  2. {"name":"Andy", "age":30}
  3. {"name":"Justin", "age":19}

但是spark解析出來了:

於是我把文件改成下面這樣向看下結果

  1. [{"name":"Michael"},
  2. {"name":"Andy", "age":30},
  3. {"name":"Justin", "age":19}
  4. ]

你猜輸出是什么?

顯然,spark沒有解析出第一行,而且把第4行也解析了。這也說明了為什么樣例的文件可以解析:首先跟文件擴展名是沒啥關系的,另外spark是按行解析,只要考慮這一行是否符合解析要求就可以,行末可以有逗號。所以把文件改成下面也是可以的

  1. {"name":"Michael"},
  2. {"name":"Andy", "age":30},..
  3. {"name":"Justin", "age":19}

第一行后面有逗號,第二行后面還有兩個點。

SQL 查詢

在之前的例子中,讀取文件返回的是Dataset<String>,因為之前確實是讀取的文件內容。現在使用json()方法返回的是DataFrame,數據是經過spark處理過的。

DataFrame提供了一些好用的方法,用的最多的就是show()。它主要用於調試,可以把數據以表格形式打印。spark確實給DataFrame生成了表結構,可以通過printSchema()方法查看

不但有字段名,還有字段類型,還有是否可空(好像都能空)。

DF還提供了類似於sql查詢的方法,比如select()/groupBy(),和where類似的filter()等:

這里我們首先給年齡字段+1,並通過別名(相等於SQL里的AS)讓他覆蓋之前的字段,然后查詢比19大的記錄,最后根據年齡分組匯總。

如果我們把新字段不覆蓋原字段呢?你猜是執行報錯還是啥結果?

That's all?當然不是,Spark提供了更強大的SQL操作:視圖

View

視圖分臨時視圖和全局視圖。臨時視圖時會話級別的,會話結束了視圖就沒了;全局視圖時應用級別的,只要Spark應用不停,視圖就可以跨會話使用。

可見臨時視圖和全局視圖可以叫一樣的名字,它們的內容互不干擾。因為要訪問全局視圖需要通過global_temp庫。不信你可以這樣試一下

  1. Dataset<Row> group = json.select(col("name"), col("age").plus(1).alias("age1"))
  2.         .filter(col("age").gt(19))
  3.         .groupBy("age1")
  4.         .count();
  5.  
  6. group.createOrReplaceTempView("people");
  7. json.createOrReplaceGlobalTempView("people");
  8. Dataset<Row> temp = session.sql("select * from people");
  9. Dataset<Row> global = session.sql("select * from global_temp.people");
  10. Dataset<Row> global1 = session.newSession().sql("select * from global_temp.people");
  11. temp.show();
  12. global.show();
  13. global1.show();

Dataset

我們已經跟Dataset打過不少交道了,這里再稍晚多說一點點。實際上如果你是自己摸索而不是完全看我寫的,下面這些內容估計都已經探索出來了。

1 轉換自DF

DF是無類型的,Dataset是有類型的。如果要把無類型的轉成有類型的,就需要提供一個類型定義,就像mysql表和Java的PO一樣。

先來定義Java類:

  1. public class Person implements Serializable {
  2.   private String name;
  3.   private long age;
  4.  
  5.   public String getName() {
  6.     return name;
  7.   }
  8.  
  9.   public void setName(String name) {
  10.     this.name = name;
  11.   }
  12.  
  13.   public long getAge() {
  14.     return age;
  15.   }
  16.  
  17.   public void setAge(long age) {
  18.     this.age = age;
  19.   }
  20. }

這個類必須實現序列化接口,原因在前面也說過了。

接下來把讀入json的DataFrame轉成Dataset:

之前都是使用Encoders內置的編碼器,這里通過bean()方法生成我們自定義類的編碼器,然后傳給DF的as()方法就轉成了Dataset。

既然轉成了強類型的Dataset,那能把每一個對象拿出來嗎?給Person類增加toString方法,然后遍歷Dataset:

結果報錯了竟然:已經生成了集合,卻不能訪問元素?

報錯原因很簡單:我們類中的age是原始數據類型,但是實際數據有一個null。把long age改成Long age即可:

但是為什么會這樣呢?!~我猜是因為as方法用的編碼器(序列化工具)和foreach用到的解碼器不匹配,spark的編碼器不要求數據符合Java編譯規則。

來自Java集合

目前我們掌握了通過讀取文件(textFile(path))、轉化其他Dataset(map/flatMap)和轉換DF來生成Dataset,如果已經有一堆數據了,也可以直接創建。

SparkSession重載了大量根據數據集生成Dataset和DataFrame的方法,可以自由選擇:

所以我們創建一個List來生成,只能是List,不能是Collection

神奇的是原本應該一樣的代碼,執行的時候有一個報錯。這個算Java實現的BUG吧,原因參考Java中普通lambda表達式和方法引用本質上有什么區別? - RednaxelaFX的回答 - 知乎

https://www.zhihu.com/question/51491241/answer/126232275

轉自RDD

RDD 在Java環境下叫JavaRDD。它也是數據集,可以和Dataset/DataFrame互轉。這里不說了,有興趣可以探索。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM