Spark RDD、DataFrame原理及操作詳解


RDD是什么?

  RDD (resilientdistributed dataset),指的是一個只讀的,可分區的分布式數據集,這個數據集的全部或部分可以緩存在內存中,在多次計算間重用。

  RDD內部可以有許多分區(partitions),每個分區又擁有大量的記錄(records)。

五個特征:

  dependencies:建立RDD的依賴關系,主要rdd之間是寬窄依賴的關系,具有窄依賴關系的rdd可以在同一個stage中進行計算。

  partition:一個rdd會有若干個分區,分區的大小決定了對這個rdd計算的粒度,每個rdd的分區的計算都在一個單獨的任務中進行。

  preferedlocations:按照“移動數據不如移動計算”原則,在spark進行任務調度的時候,優先將任務分配到數據塊存儲的位置

  compute:spark中的計算都是以分區為基本單位的,compute函數只是對迭代器進行復合,並不保存單次計算的結果。

  partitioner:只存在於(K,V)類型的rdd中,非(K,V)類型的partitioner的值就是None。

 

  rdd的算子action會觸發真正的作業提交,而transformation算子是不會立即觸發作業提交的。

  在Spark中,所有RDD的轉換都是是惰性求值的。RDD的轉換操作transformation會生成新的RDD,新的RDD的數據依賴於原來的RDD的數據,每個RDD又包含多個分區。那么一段程序實際上就構造了一個由相互依賴的多個RDD組成的有向無環圖(DAG)。並通過在RDD上執行action動作將這個有向無環圖作為一個Job提交給Spark執行

  在DAG中又進行stage的划分,划分的依據是依賴算子是否是shuffle(如reduceByKey,Join等)的,每個stage又可以划分成若干task。接下來的事情就是driver發送task到executor,executor自己的線程池去執行這些task,完成之后將結果返回給driver。action算子是划分不同job的依據。

  Spark對於有向無環圖Job進行調度,確定階段(Stage),分區(Partition),流水線(Pipeline),任務(Task)和緩存(Cache),進行優化,並在Spark集群上運行Job。RDD之間的依賴分為寬依賴(依賴多個分區)和窄依賴(只依賴一個分區),在確定階段時,需要根據寬依賴shuffle划分階段。根據分區划分任務。

  Spark支持故障恢復的方式也不同,提供兩種方式,Linage,通過數據的血緣關系,再執行一遍前面的處理,Checkpoint,將數據集存儲到持久存儲中。  Spark為迭代式數據處理提供更好的支持。每次迭代的數據可以保存在內存中,而不是寫入文件

 

這里注意兩個算子coalesce()和repartition()

coalesce

def coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T] 
該函數用於將RDD進行重分區,使用HashPartitioner。 
第一個參數為重分區的數目,第二個為是否進行shuffle,默認為false。

repartition

def repartition(numPartitions: Int): RDD[T] 
該函數其實就是coalesce函數第二個參數為true的實現。

使用注意

他們兩個都是RDD的分區進行重新划分,repartition只是coalesce接口中shuffle為true的簡易實現,(假設RDD有N個分區,需要重新划分成M個分區) 
  1)N < M。一般情況下N個分區有數據分布不均勻的狀況,利用HashPartitioner函數將數據重新分區為M個,這時需要將shuffle設置為true。 
  2)如果N > M並且N和M相差不多,(假如N是1000,M是100)那么就可以將N個分區中的若干個分區合並成一個新的分區,最終合並為M個分區,這時可以將shuff設置為false,在shuffl為false的情況下,如果M>N時,coalesce為無效的,不進行shuffle過程,父RDD和子RDD之間是窄依賴關系。 
  3)如果N > M並且兩者相差懸殊,這時如果將shuffle設置為false,父子RDD是窄依賴關系,他們同處在一個stage中,就可能造成Spark程序的並行度不夠,從而影響性能,如果在M為1的時候,為了使coalesce之前的操作有更好的並行度,可以講shuffle設置為true。

  總之:如果shuff為false時,如果傳入的參數大於現有的分區數目,RDD的分區數不變,也就是說不經過shuffle,是無法將RDDde分區數變多的

參考:

Spark算子:RDD基本轉換操作(2)–coalesce、repartition

更多RDD算子內容推薦參考

    Spark函數詳解系列之RDD基本轉換

    Spark常用函數講解之鍵值RDD轉換

    Spark常用函數講解之Action操作

 

Spark的RDD原理以及2.0特性的介紹

 

 

窄依賴和寬依賴

  shuffle 是划分 DAG 中 stage 的標識,同時影響 Spark 執行速度的關鍵步驟
  RDD 的 Transformation 函數中,又分為窄依賴(narrow dependency)和寬依賴(wide dependency)的操作.窄依賴跟寬依賴的區別是是否發生 shuffle(洗牌) 操作.寬依賴會發生 shuffle 操作. 窄依賴是子 RDD的各個分片(partition)不依賴於其他分片,能夠獨立計算得到結果,寬依賴指子 RDD 的各個分片會依賴於父RDD 的多個分片,所以會造成父 RDD 的各個分片在集群中重新分片  

如下圖所示:map就是一種窄依賴,而join則會導致寬依賴

 

如上面的map,filter,union屬於第一類窄依賴,而join with inputs co-partitioned(對輸入進行協同划分的join操作,也就是說先按照key分組然后shuffle write的時候一個父分區對應一個子分區)則為第二類窄依賴

 groupByKey和對輸入未協同划分的join操作就是寬依賴,這是shuffle類操作。

細說:

  首先,窄依賴允許在單個集群節點上流水線式執行,這個節點可以計算所有父級分區。例如,可以逐個元素地依次執行filter操作和map操作。相反,寬依賴需要所有的父RDD數據可用並且數據已經通過類MapReduce的操作shuffle完成。 
  其次,在窄依賴中,節點失敗后的恢復更加高效。因為只有丟失的父級分區需要重新計算,並且這些丟失的父級分區可以並行地在不同節點上重新計算。與此相反,在寬依賴的繼承關系中,單個失敗的節點可能導致一個RDD的所有先祖RDD中的一些分區丟失,導致計算的重新執行。

 

// Map: "cat" -> c, cat
val rdd1 = rdd.Map(x => (x.charAt(0), x))
// groupby same key and count
val rdd2 = rdd1.groupBy(x => x._1).
                Map(x => (x._1, x._2.toList.length))

 

第一個 Map 操作將 RDD 里的各個元素進行映射, RDD 的各個數據元素之間不存在依賴,可以在集群的各個內存中獨立計算,也就是並行化,第二個 groupby 之后的 Map 操作,為了計算相同 key 下的元素個數,需要把相同 key 的元素聚集到同一個 partition 下,所以造成了數據在內存中的重新分布,即 shuffle 操作.shuffle 操作是 spark 中最耗時的操作,應盡量避免不必要的 shuffle.

根據是否發生 shuffle 操作能夠將其分成如下的 stage 類型

(join 需要針對同一個 key 合並,所以需要 shuffle) 
  運行到每個 stage 的邊界時,數據在父 stage 中按照 Task 寫到磁盤上,而在子 stage 中通過網絡從上一個 Task 中去讀取數據。這些操作會導致很嚴重的網絡傳輸以及磁盤的I/O,所以 stage 的邊界是非常占資源的,在編寫 Spark 程序的時候需要盡量避免的 。父 stage 中 partition 個數與子 stage 的 partition 個數可能不同,所以那些產生 stage 邊界的 Transformation 常常需要接受一個 numPartition 的參數來覺得子 stage 中的數據將被切分為多少個 partition。 
PS:shuffle 操作的時候可以用 combiner 壓縮數據,減少 IO 的消耗

 

參考:那些年我們對Spark RDD的理解

 

DataFrame是什么?
  在Spark中,DataFrame是一種以RDD為基礎的分布式數據集,類似於傳統數據庫中的二維表格。DataFrame與RDD的主要區別在於,前者帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構信息,從而對藏於DataFrame背后的數據源以及作用於DataFrame之上的變換進行了針對性的優化,最終達到大幅提升運行時效率的目標。反觀RDD,由於無從得知所存數據元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。
更多參考

 

一:DataFrame創建

SparkSQL可以以其他RDD對象、parquet文件、json文件、hive表,以及通過JDBC連接到其他關系型數據庫作為數據源來生成DataFrame對象。

1)jdbc

【讀】

postgresUrl="jdbc:postgresql://127.0.0.1:5432/testdb" dimDF = sqlContext.read.format('jdbc').options(url=postgresUrl,dbtable=tableName,user="root",password="root") .load() dimDF.registerTempTable(tmpTableName)

 【寫】

 
         
self.postgresURL = str(self.postgresIP) + ":" + str(self.postgresPort) + "/" + str(self.postgresDB)
self.postgresqlDatasource = {
"url" : "jdbc:postgresql://" + self.postgresURL,
"user" : self.postgresUser,
"password" : self.postgresPwd
}
resultDF.coalesce(int(partitionNum)).write.jdbc(url=postgresqlDatasource["url"], table=reportTable, mode='append', properties=postgresqlDatasource)

 

2)parquet

【讀】

telematicFilePath = "/user/spark/test/telematic.parquet/key=" + handleRecordDateStr if( common.fileExist(telematicFilePath, self.sc) ): df = self.sqlContext.read.schema(TELEMATIC_PARQUET_SCHEMA).parquet(telematicFilePath).coalesce(int(self.partitionNum))
# schema for /user/spark/test/telematic.parquet
TELEMATIC_PARQUET_SCHEMA = SQLType.StructType([
  SQLType.StructField('dm_transct_date_hr_key', SQLType.LongType(), True), SQLType.StructField('dm_vehicle_dim_key', SQLType.IntegerType(), True), SQLType.StructField('dm_driver_dim_key', SQLType.IntegerType(), True), SQLType.StructField('dm_company_dim_key', SQLType.IntegerType(), True), SQLType.StructField('deviceId', SQLType.StringType(), True), SQLType.StructField('companyId', SQLType.StringType(), True)])

 【寫】

df.write.parquet(parquetPath, mode="overwrite")

 

3)json 

df = sqlContext.read.json(path)

 

4)list列表

dataList = resultDF.collect()
resultDF = self.sqlContext.createDataFrame(dataList)

 

5)Rdd

if rddSchema is None:
    df = sqlContext.createDataFrame(rdd)
else:
    df = sqlContext.createDataFrame(rdd, rddSchema)
rdd = sc.parallelize(resultList)
df = self.sqlContext.createDataFrame(rdd)

 

二:Transform操作

三:Action操作

1、 collect() ,返回一個數組,包括dataframe集合所有的行

df = sqlContext.createDataFrame(parquetRecordList, PARQUET_FILE_SCHEMA) for key in df.rdd.map(lambda x: x["key"]).distinct().collect(): filePath = "/user/spark/test.parquet/key=20171110" df.filter("key="+str(key)).drop("key").write.parquet(filePath, mode="append")

 

2、 collectAsList() 返回值是一個java類型的數組,返回dataframe集合所有的行
3、 count() 返回一個number類型的,返回dataframe集合的行數
4、 toJson
5、 first() 返回第一行 ,類型是row類型
6、 head() 返回第一行 ,類型是row類型
7、 head(n:Int)返回n行  ,類型是row 類型
8、 show()返回dataframe集合的值 默認是20行,返回類型是unit
9、 show(n:Int)返回n行,,返回值類型是unit
10、table(n:Int) 返回n行  ,類型是row 類型

 

dataframe的基本操作
1、 cache()同步數據的內存

 

data = self.sqlContext.sql(queryStr).toJSON().cache().collect()


2、 columns 返回一個string類型的數組,返回值是所有列的名字
3、 dtypes返回一個string類型的二維數組,返回值是所有列的名字以及類型
4、 explan()打印執行計划  物理的
5、 toJSON 轉換為json格式數據
6、 isLocal 返回值是Boolean類型,如果允許模式是local返回true 否則返回false
7、 persist(newlevel:StorageLevel) 返回一個dataframe.this.type 輸入存儲模型類型

稍后詳解
8、 printSchema() 打印出字段名稱和類型 按照樹狀結構來打印
9、 registerTempTable(tablename:String) 返回Unit ,將df的對象只放在一張表里面,這個表隨着對象的刪除而刪除了
10、 schema 返回structType 類型,將字段名稱和類型按照結構體類型返回
11、 toDF()返回一個新的dataframe類型的
12、 toDF(colnames:String*)將參數中的幾個字段返回一個新的dataframe類型的,
13、 unpersist() 返回dataframe.this.type 類型,去除模式中的數據
14、 unpersist(blocking:Boolean)返回dataframe.this.type類型 true 和unpersist是一樣的作用false 是去除RDD

 

集成查詢:
1、 agg(expers:column*) 返回dataframe類型 ,按每個device分組查最小時間

df = sqlContext.createDataFrame(tensRdd)
resultDF = df.groupBy("device_id").agg({RegularDataEtlConstants.TIME: 'min'})
resultDF.repartition(self._partitionNum).foreachPartition(lambda iterator: self.__saveToHBase(iterator))

 

startTime = df.filter((df.startTime != "") & (df.startTime >= minStartTimeCurrent)).agg({"startTime": "min"}).collect()[0][0]


4、 apply(colName: String) 返回column類型,捕獲輸入進去列的對象
5、 as(alias: String) 返回一個新的dataframe類型,就是原來的一個別名
6、 col(colName: String)  返回column類型,捕獲輸入進去列的對象
7、 cube(col1: String, cols: String*) 返回一個GroupedData類型,根據某些字段來匯總
8、 distinct 去重 返回一個dataframe類型
9、 drop(col: Column) 刪除某列 返回dataframe類型

 

columnList = ['key', 'type', 'timestamp', 'data']
df = sqlContext.createDataFrame(dataList[index], columnList)
for key in df.rdd.map(lambda x: x["key"]).distinct().collect():
   parquetPath = parquetList[index] + "/key=" + str(key)
   df.filter("key="+str(key)).drop("key").write.parquet(parquetPath, mode="append", partitionBy="type")


10、 dropDuplicates(colNames: Array[String]) 刪除相同的列 返回一個dataframe
11、 except(other: DataFrame) 返回一個dataframe,返回在當前集合存在的在其他集合不存在的


12、 explode[A, B](inputColumn: String, outputColumn: String)行轉列

根據c3字段中的空格將字段內容進行分割,分割的內容存儲在新的字段c3_中
jdbcDF.explode( "c3" , "c3_" ){time: String => time.split( " " )}

 


13、 filter(conditionExpr: String): 刷選部分數據,返回dataframe類型

df.filter("age>10").show(); 
df.filter(df("age")>10).show();  
df.where(df("age")>10).show();

 

14、 groupBy(col1: String, cols: String*) 分組  

dfgroupBy("age").avg().show();
15、 intersect(other: DataFrame) 返回一個dataframe,在2個dataframe都存在的元素
16、 join(right: DataFrame, joinExprs: Column, joinType: String)
一個是關聯的dataframe,第二個關聯的條件,第三個關聯的類型:inner, outer, left_outer, right_outer, leftsemi
df.join(ds,df("name")===ds("name") and  df("age")===ds("age"),"outer").show();
17、 limit(n: Int) 返回dataframe類型  去n 條數據出來
18、 na: DataFrameNaFunctions ,可以調用dataframenafunctions的功能區做過濾 df.na.drop().show(); 刪除為空的行
19、 orderBy(sortExprs: Column*) 做alise排序
20、 select(cols:string*) dataframe 做字段的刷選 df.select($"colA", $"colB" + 1)
21、 selectExpr(exprs: String*) 做字段的刷選 df.selectExpr("name","name as names","upper(name)","age+1").show();
22、 sort(sortExprs: Column*) 排序 df.sort(df("age").desc).show(); 默認是asc
23、 unionAll(other:Dataframe) 合並 

 

df = df.unionAll(dfTemp).coalesce(int(self.partitionNum))


24、 withColumnRenamed(existingName: String, newName: String) 修改列表 df.withColumnRenamed("name","names").show();
25、 withColumn(colName: String, col: Column) 增加一列

df中新增一個名為aa的列,值與列name的一樣

 

df.withColumn("aa",df("name")).show(); 
將該列時間值計算加上時區偏移值 mergeDF
= mergeDF.withColumn("dm_transct_date_hr_key", functions.lit(self.__datehandle(mergeDF["dm_transct_date_hr_key"], self.timezoneOffset)))

 

Spark-SQL之DataFrame操作大全

http://blog.csdn.net/mtj66/article/details/52064827

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM