SparkSQL學習案例:使用DataFrame和Dataset操作json數據


一、測試數據集(奧特曼.json)

 

 

二、源代碼及代碼分析

 

 1  Scala版本
 2 
 3 import org.apache.spark.sql.SparkSession
 4 
 5 //根據分析數據寫出相應的樣例類
 6 case class Ultraman(name: String, age: BigInt, address: Array[String])
 7 
 8 object DatasetAndDataFrameExample {
 9 
10   def main(args: Array[String]): Unit = {
11 
12     //實例化SparkSession
13     val spark = SparkSession
14       .builder()
15       .master("local[*]")
16       .appName("DatasetAndDataFrameExample")
17       .getOrCreate()
18 
19     //為避免影響查看輸出結果,設置日志打印等級為"WARN"
20     spark.sparkContext.setLogLevel("WARN")
21 
22     import spark.implicits._
23     //使用SparkSession讀取輸入路徑的json數據文件
24     val df1 = spark.read.json("/home/liuyin/IdeaProjects/Spark/src/main/scala/chap07_SparkSQL/奧特曼.json")
25     //打印當前表格內容
26     df1.show()
27     //篩選出符合條件的數據並打印出來
28     df1.filter($"address" === Array("M78")).filter($"age" > 10000).show()
29 
30     val ds1 = spark.read.json("/home/liuyin/IdeaProjects/Spark/src/main/scala/chap07_SparkSQL/奧特曼.json").as[Ultraman]
31     ds1.show()
32     ds1.filter(_.name == "迪迦").show()
33 
34     spark.stop()
35   }
36 }

 

 

相關的細節

 

 (1)28行的$"address" === Array("M78")是SQLContext中的判斷表達式,"==="是Column類中的一個方法,這個表達式也可以寫成$"address".===(Array("M78"))

 

(2)使用"==="等表達式需要導入SOark隱式轉換包,如22行所示

(3)第6行中的樣例類是為了創建Dataset實例后,Dataset能識別出輸入文件的每行數據各個元素的類型,樣例類的屬性名稱應與輸入數據的字段名一樣

(4)第30行中,在調用.as[Ultraman]之前生成的還是一個DataFrame實例,調用之后會生成Dataset實例(其中的元素類型就是我們之前寫的樣例類Ultraman),as方法具體描述如下圖

(5)樣例類Ultraman在編譯后會自動繼承Product特質(類似於Java的接口),成為Product的子類,這么做的原因是as方法中的Ultraman類型必須是Encoder類型或其子類型,正巧隱式轉換還有一個作用是將Dataset中的Product數據類型轉換成Encoder類型的對象(調用了特質LowPrioritySQLImplicits里面的newProductEncoder方法),若樣例類Ultraman在編譯后不是Product類或子類則無法滿足類型匹配

 可以看出Ultraman在編譯后已經繼承了Product

 as方法的詳細描述

 newProductEncoder方法的源代碼

三、輸出結果


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM