撰寫本文的目的:對於sparksql,網上有大量的詳細文檔,本人針對常用的操作進行一個整理,當然有大多數都是從其他地方搬過來的,包括官方文檔以及其他網友的一些分享,一來是通過此次整理加強自己的記憶,二來如果有幸幫到某位網友,那是本人莫大的榮幸,先感謝您的閱讀,廢話不多說,進入正文:
下文所涉及到的相關軟件版本分別為:
spark版本:v2.2.0
hive : v1.2.1
hadoop : v2.7.6
前言:
Spark sql是spark處理結構化數據的一個模塊,它的前身是shark,與基礎的spark rdd不同,spark sql提供了結構化數據及計算結果等信息的接口,在內部,spark sql使用這個額外的信息去執行額外的優化,有幾種方式可以跟spark sql進行交互,包括sql和dataset api,使用相同的執行引擎進行計算的時候,無論是使用哪一種計算引擎都可以一快速的計算。
Dataset and DataFrames
RDD:在spark剛開始的時候,引入RDD(彈性分布式數據集)
優點:
1)編譯時類型安全,編譯時就能檢查出類型錯誤
2)面向對象的編程分格,直接通過類名點的方式來操作數據
例如:idAge.filter(_.age > "") //編譯時直接報錯
idAgeRDDPerson.filter(_.age > 25) //直接操作一個個的person對象
缺點:
1)序列化和反序列化的性能開銷,無論是集群間的通信還是IO操作,都需要對對象的結果和數據進行序列化和反序列化
2)GC的性能開銷,頻繁的創建和銷毀對象,勢必會增加GC
DataFrame:spark1.3的時候引入了DataFrmae,是一個列方式組織的分布式數據集
優點:
1)引入了Schema,包含了一ROW位單位的每行數據的列信息,spark通過Schema就能夠讀懂數據,因此在通信和IO時就只需要序列化和反序列化數據,而結構的部分就可以省略了;
2)off-heap:spark能夠以二進制的形式序列化數據(不包括結構)到off-heap(堆外內存),當要操作數據時,就直接操作off-heap內存,off-heap類似於地盤,schema類似於地圖,Spark有了地圖又有了自己地盤了,就可以自己說了算,不再受JVM的限制,也就不再受GC的困擾了,通過Schema和off-heap,DataFrame克服了RDD的缺點。對比RDD提升了計算效率,減少了數據的讀取,底層計算優化
3)引入了新的引擎:Tungsten
4)引入了新的語法解析框架:Catalyst
缺點:
DataFrame客服了RDD 的缺點,但是丟失了RDD的優點,DataFrame不是類型安全的,API也不是面向對象分格的。
1)API不是面向對象的
idAgeDF.filter(idAgeDF.col("age") > 22)
2)DataFrame不是編譯時類型安全的,下面這種情況下不會報錯
idAgeDF.filter(idAgeDF.col("age") > "")
DataSet:到spark1.6的時候引入了DataSet,Encoder分布式數據集,是一個被添加的新接口,它提供了RDD 的優點(強類型化,能夠使用強大的lambda函數)
/** * @groupname basic Basic Dataset functions * @groupname action Actions * @groupname untypedrel Untyped transformations * @groupname typedrel Typed transformations * * @since 1.6.0 */ @InterfaceStability.Stable class Dataset[T] private[sql]( @transient val sparkSession: SparkSession, @DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution, encoder: Encoder[T]) extends Serializable {
DataSet是一個類,其中包含了三個參數:
-
-
- SparkSession:環境信息
- QueryExecution:包含數據和執行邏輯
- Encoder:數據結構編碼信息(包含序列化,schema,數據類型)
-
核心:Encoder
優點:
1)一個DataSet可以從JVM對象來構造並且使用轉換功能(map,flatMap,filter...)
2)編譯時的類型安全檢查。性能極大地提升,內存使用極大降低,減少GC。極大地較少網絡數據的傳輸、極大地減少scala和java之間代碼的差異性
3)DataFrame每一行對應一個Row。而DataSet的定義更加寬松,每一個record對應了一個任意的類型。DataFrame只是DataSet的一種特例:type DataFrame = Dataset[Row]
4)不同的Row是一個泛華的無類型的JVM object,Dataset是有一系列的強類型的JVM object組成的,Scala的case class或者java class定義。因此Dataset可以在編譯時進行類型檢查
5)Dataset一Catalyst邏輯執行計划表示,並且數據以編碼的二進制形式被存儲,不需要反序列化就可以執行sorting,shuffle等操作
6)Dataset創建需要一個顯示的Encoder,把對象序列化為二進制
7)在scala API中,DataFrame僅僅是一個DataSet[Row]類型的別名,然而,在Java API中,用戶需要使用Dataset<Row> 去代表一個DataFrame。
sparkSession
Spark2.0中開始引入了SparkSession的概念,它為用戶提供了一個統一的切入點來使用Spark的各項功能,包括SQLContext和HiveContext的組合(未來可能會加上StreamingContext),用戶不但可以使用DataFrame和Dataset的各種API,大大降低了spark的學習難度
創建sparkSession:
scala:
import org.apache.spark.sql.SparkSession
val ssc = new SparkSession .Builder() .appName("name_in_webUI") //這里的這個名字隨便起,只要是自己能認識,最終這個是要顯示在weiUI界面的 .enableHiveSupport() //如果需要訪問hive,這一步不能少,如果只是讀取本地文件這一句可以省去 .getOrCreate()
//創建之后可以設置運行參數
ssc.conf.set("spark.sql.shuffle.partitions",4)
ssc.conf.set("spark.executor.memory","2g")
java:
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession .builder() .appName("name_in_webUI") .getOrCreate();
spark.conf().set("spark.sql.shuffle.partitions",4);
spark.conf().set("spark.executor.memory","2g");
Row
Row是一個泛華的無類型的JVM object
Row的訪問方式:
import org.apache.spark.sql.Row val row = Row(1,'asd',3.3) //Row的訪問方式 row(0) row(1) row(2) row.getInt(0) row.getString(1) row.getDouble(2) row.getAs[Int](0) row.getAs[String](1) row.getAs[Double](2)
DataFrame:
DataFrame即是帶有schema信息的RDD,spark直接可通過Schema就可以讀懂信息。
schema:
DataFrame中提供了詳細的數據信息,從而使得sparkSql可以清楚的知道數據集中包含了哪些列,每列的名稱和類型是什么?DataFrame的結構信息即為schema。
schema的定義方式:
import org.apache.spark.sql.types._
1、來自官網文檔
val schema1 = StructType( StructField("name", StringType, false) :: StructField("age", IntegerType, false) :: StructField("height", IntegerType, false) :: Nil) val schema2 = StructType( Seq(StructField("name", StringType, false), StructField("age", IntegerType, false), StructField("height", IntegerType, false))) val schema3 = StructType( List(StructField("name", StringType, false), StructField("age", IntegerType, false), StructField("height", IntegerType, false)))
2、來自spark源碼
val schema4 = (new StructType). add(StructField("name", StringType, false)). add(StructField("age", IntegerType, false)). add(StructField("height", IntegerType, false)) val schema5 = (new StructType). add("name", StringType, true, "comment1"). add("age", IntegerType, false, "comment2"). add("height", IntegerType, true, "comment3")
3、最便捷的方式
val schema6 = (new StructType). add("name", "string", false). add("age", "integer", false). add("height", "integer", false)
RDD、DataFrame和DataSet的共性與區別
共性:
- 三者都是spark平台下的分布式彈性數據集,為處理超大型數據提供便利
- 三者都有惰性機制。在創建時、轉換時(如map)不會立即執行,只有在遇到action算子的時候(比如foreach),才開始進行觸發計算。極端情況下,如果代碼中只有創建、轉換,但是沒有在后面的action中使用對應的結果,在執行時會被跳過。
- 三者都有partition的概念,都有緩存(cache)的操作,還可以進行檢查點操作(checkpoint)
- 三者都有許多共同的函數(如map、filter,sorted等等)。
- 在對DataFrame和DataSet操作的時候,大多數情況下需要引入隱式轉換(ssc.implicits._)
不同:
DataFrame:
DataFrame是DataSet的特例,也就是說DataSet[Row]的別名
DataFrame = RDD + schema
-
- DataFrame的每一行的固定類型為Row,只有通過解析才能獲得各個字段的值
- DataFrame與DataSet通常與spark ml同時使用
- DataFrame與DataSet均支持sparkSql操作,比如select,groupby等,也可以注冊成臨時表,進行sql語句操作
- DataFrame與DateSet支持一些方便的保存方式,比如csv,可以帶上表頭,這樣每一列的字段名就可以一目了然
DataSet:
DataSet = RDD + case class
-
- DataSet與DataFrame擁有相同的成員函數,區別只是只是每一行的數據類型不同。
- DataSet的每一行都是case class,在自定義case class之后可以很方便的獲取每一行的信息
DataFrame和DataSet的基本操作
DataFrame和DataSet的創建
DataFrame
1、集合轉DataFrame
val ssc = SparkSession().Builder.master("test").appName("test").getOrCreate val seq1 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165)) val df1 = ssc.createDataFrame(seq1).withColumnRenamed("_1", "name1"). withColumnRenamed("_2", "age1").withColumnRenamed("_3", "height1") df1.orderBy(desc("age1")).show(10) import ssc.implicit._ val df2 = ssc.createDataFrame(seq1).toDF("name", "age", "height")
2、RDD轉DataFrame
import org.apache.spark.sql.Row import org.apache.spark.sql.types._ val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165)) val rdd1 = sc.makeRDD(arr).map(f=>Row(f._1, f._2, f._3)) val schema = StructType( StructField("name", StringType, false) :: StructField("age", IntegerType, false) :: StructField("height", IntegerType, false) :: Nil)
// false:說明該字段不允許為null true:說明該字段可以為null val rddToDF = spark.createDataFrame(rdd1, schema) rddToDF.orderBy(desc("name")).show(false)
DataSet
1、由range生成DataSet
val numDS = spark.range(5,100,5) numDS.orderBy(desc("id")).show(5) numDS.describe().show
2、由集合生成DS
case class Person(name:String, age:Int, height:Int) val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165)) val spark:SparkSession = SparkSession.Builder.... val ds1 = spark.createDataset(seq1) ds1.show val seq2 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165)) val ds2 = spark.createDataset(seq2) ds2.show
3、由RDD進行轉換
import org.apache.spark.sql.types._ import org.apache.spark.sql.Row val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165)) val rdd2 = sc.makeRDD(arr).map(f=>Row(f._1, f._2, f._3))
val rdd3 = sc.makeRDD(arr).map(f=>Person(r._1,f._2,f._3)) val ds3 = sc.createDataset(rdd2)
val ds4 = rdd3.toDS() ds3.show(10)
通過SparkSession讀取文件
import org.apache.spark.sql.types._ val schema2 = StructType( StructField("name", StringType, false) :: StructField("age", IntegerType, false) :: StructField("height", IntegerType, false) :: Nil) val df7 = ssc.read.options(Map(("delimiter", ","), ("header", "false"))).schema(schema2).csv("file:///home/spark/t01.csv") // 讀取本地文件 df7.show()
DataSet的基礎函數
import org.apache.spark.storage.StorageLevel import org.apache.spark.sql.types._ case class Person(name:String, age:Int, height:Int) spark.sparkContext.setCheckpointDir("hdfs://node1:8020/checkpoint") // 1 DataSet存儲類型 val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165)) val ds1 = spark.createDataset(seq1) ds1.show() ds1.checkpoint() ds1.cache() ds1.persist(StorageLevel.MEMORY_ONLY) ds1.count() ds1.show() ds1.unpersist(true) // 2 DataSet結構屬性 ds1.columns ds1.dtypes ds1.explain() ds1.col("name") ds1.printSchema // 常用 // 3 DataSet rdd數據互轉 val rdd1 = ds1.rdd val ds2 = rdd1.toDS() ds2.show() val df2 = rdd1.toDF() df2.show() // 4 Dataset保存文件 ds1.select("name", "age", "height").write.format("csv").save("data/sql1/my01.csv") // 讀取保存的文件 val schema2 = StructType( StructField("name", StringType, false) :: StructField("age", IntegerType, false) :: StructField("height", IntegerType, false) :: Nil) val out = spark.read. options(Map(("delimiter", ","), ("header", "false"))). schema(schema2).csv("data/sql1/*") out.show(10)
DataSet的Action操作
// 1 顯示數據集 val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165)) val ds1 = spark.createDataset(seq1) // 缺省顯示20行 ds1.show() // 顯示2行 ds1.show(2) // 顯示20行,不截斷字符 ds1.show(20, false) // 2 獲取數據集 // collect返回的是數組 val c1 = ds1.collect() // collectAsList返回的是List val c2 = ds1.collectAsList() val h1 = ds1.head() val h2 = ds1.head(3) val f1 = ds1.first() val f2 = ds1.take(2) val t2 = ds1.takeAsList(2) ds.limit(10).show // 取10行數據生成新的DataSet // 3 統計數據集 ds1.count() // 返回全部列的統計(count、mean、stddev、min、max) ds1.describe().show // 返回指定列的統計(count、mean、stddev、min、max) ds1.describe("age").show ds1.describe("age", "height").show // 4 聚集 ds1.reduce{ (f1, f2) => Person("sum", f1.age+f2.age, f1.height+f2.height) }