【Spark】【RDD】初次學習RDD 筆記 匯總


RDD

Author:萌狼藍天

【嗶哩嗶哩】萌狼藍天

【博客】https://mllt.cc

【博客園】萌狼藍天 - 博客園

【微信公眾號】mllt9920

【學習交流QQ群】238948804

@萌狼藍天

【!】啟動spark集群

【!】啟動spark-shell

image-20211022094302469

spark2.0將spark context 和hive context集成到了spark session

spark也可以作為程序入口

spark用scala編程

image-20211022094516412

特點

它是集群節點上的不可改變的、已分區的集合對象;

  • 通過並行轉換的方式來創建如(map、filter、join等);
  • 失敗自動重建;
  • 可以控制存儲級別(內存、磁盤等)來進行重用;
  • 必須是可序列化的;在內存不足時可自動降級為磁盤存儲,把RDD存儲於磁盤上,這時性能有大的下降但不會差於現在的MapReduce;
  • 對於丟失部分數據分區只需要根據它的lineage就可重新計算出來,而不需要做特定的checkpoint;

創建

從內存中創建RDD

啟動spark-shell

val list = List(1,2,3)
var rdd = sc.parallelize(list)
rdd.partitions.size

image-20211025011852137

從外部存儲創建RDD

1.創建本地文件

cd /home
mkdir data
touch a.txt
  1. 不一定非要在家目錄創建
  2. 可以使用vim在a.txt中添加一些內容

2.啟動spark-shell

3.從本地文件系統中讀取

val localrdd = sc.textFile("file:///home/用戶名/data/a.txt")

image-20211022100856238

路徑前面加 file:// 表示從本地文件系統讀取

localrdd.collect//返回RDD中所有的元素

注意:若在完全分布式spark-shell模式下,該文件需要在所有節點的相同位置保存才可以被讀取,否則會報錯“文件不存在”

image-20211025015531777

從HDFS創建RDD

1.在HDFS根目錄下創建目錄(姓名學號)

hdfs dfs -mkdir /zwj25
hdfs dfs -ls /

訪問 http://[IP]:50070

image-20211025002555544

image-20211025002652308

image-20211025002943200

image-20211025002957999

2.上傳本地文件到HDFS

hdfs dfs -put file.txt /zwj25

image-20211025003059248

3.進入spark4-shell

var hdfsrdd=sc.textFile("/zwj25/file.txt")
hdfsrdd.collect
hdfsrdd.partitions
hdfsrdd.partitions.size

sc.defaultMinPartitions=min(sc.defaultParallelism,2)

rdd分區數=max(hdfs文件的block數目,sc.defaultMinPartitions)

image-20211025003157234

image-20211025003305459

從其他RDD創建

算子

map(func)

類型:Transformation類型算子

map: 將原來RDD的每個數據項通過map中的用戶自定義函數f轉換成一個新的RDD,map操作不會改變RDD的分區數目

filter 過濾

filter(func)

Transformation類型算子

保留通過函數func,返回值為true的元素,組成新的RDD

eg:過濾掉data RDD中元素小於或等於2的元素

val data =sc.parallelize(List(1,2,3,4))
val result = data.filter(x=>x>2)
result.collect

image-20211031033144678

flatMap(func) 分割單詞

類型:Transformation類型算子
flatMap:對集合中的每個元素進行map操作再扁平化

val data = sc.parallelize(List("I am Meng Lang Lan Tian","my wechat is mllt9920"))
data.map(x=>x.split(" ")).collect
data.flatMap(x=>x.split(" ")).collect

image-20211031033813179

sortBy 排序

sortBy(f:(T) => K, ascending, numPartitions)

類型:Transformation類型算子
作用:對標准RDD進行排序


sortBy()可接受三個參數:
f:(T) => K:左邊是要被排序對象中的每一個元素,右邊返回的值是元素中要進行排序的值。
ascending:決定排序后RDD中的元素是升序還是降序,默認是true,也就是升序,false為降序排序。

numPartitions:該參數決定排序后的RDD的分區個數,默認排序后的分區個數和排序之前的個數相等。

eg:按照每個元素的第二個值進行降序排序,將得到的結果存放到RDD "data2" 中

val data1 = sc.parallelize(List((1,3),(2,4),(5,7),(6,8)))
val data2 = data1.sortBy(x=>x._2,false,1)
val data3 = data1.sortBy(x=>x._1,false,1)

image-20211026093311567

image-20211026093321690

image-20211026093419570

image-20211026093601556

distinct 去重復

distinct([numPartitions]))

類型:Transformation類型算子
作用:去重。針對RDD中重復的元素,只保留一個元素

eg:

val data1 = sc.parallelize(List(1,2,3,3,3,4,4))
data1.collect
data1.distinct.collect
data1.collect

image-20211026094207580

union 合並

union(otherDataset)

作用:合並RDD,需要保證兩個RDD元素類型一致

eg:合並rdd1和rdd2

val rdd1 = sc.parallelize(List(1,2,3))
val rdd2 = sc.parallelize(List(4,5,6))
rdd1.union(rdd2).collect

image-20211026094545962

注意:union兩個RDD元素類型要一致

intersection 交集

intersection(otherDataset)

作用:找出兩個RDD的共同元素,也就是找出兩個RDD的交集

eg:找出c_rdd1和c_rdd2中相同的元素

val c_rdd1 = sc.parallelize(List(('a',1),('b',2),('a',1),('c',1)))
val c_rdd2 = sc.parallelize(List(('a',1),('b',1),('d',1),('e',1)))
c_rdd1.intersection(c_rdd2).collect

image-20211026100237219

subtract 差集

subtract (otherDataset)

作用:獲取兩個RDD之間的差集

eg:找出rdd1與rdd2之間的差集

val rdd1 = sc.parallelize(Array("A","B","C","D"))
val rdd2 = sc.parallelize(Array("C","D","E","F"))
val subtractRDD = rdd1.subtract(rdd2)
subtractRDD.collect

image-20211026095710838

cartesian

cartesian(otherDataset)

名稱:笛卡爾積

作用:將兩個集合的元素兩兩組合成一組

eg:

val rdd01 = sc.makeRDD(List(1,3,5,3))
val rdd02 = sc.makeRDD(List(2,4,5,1))
rdd01.cartesian(rdd02).collect

image-20211026095149825

take(num)

返回RDD前面num條記錄

val data = sc.parallelize(List(1,2,3,4))
data.take(2)

image-20211031033437628

鍵值對RDD

mapValues

val rdd = sc.parallelize(List("a","b","c","d"))
//通過map創建鍵值對
var rddp = rdd.map(x=>(x,1))
rddp.collect
rddp.keys.collect
rddp.values.collect
//通過mapValues讓所有Value值加一
rddp.mapValues(x=>x+1).collect

image-20211026162050337

image-20211028202752080

val rdd1 = sc.parallelize(List("I am a student","Hello word","Just Play"))
val rdd2 = rdd1.map(x=>(x,992))
rdd2.collect
rdd2.keys.collect
rdd2.values
rdd2.values.collect

image-20211026162407135

val rdd3 = sc.parallelize(List("I am a student","Hello word","Just Play"))
val rdd4 = rdd1.map(x=>x.split(" "))
rdd4.collect
val p1=rdd4.map(x=>(x.split(" "),x))
p1.collect

join按鍵內連接

val rdd = sc.parallelize(List("a","b","c","d"))
//通過map創建鍵值對
var rddp = rdd.map(x=>(x,1))
//通過mapValues讓所有Value值加一
var rdd1 = rddp.mapValues(x=>x+1)
//同理得到rdd2
val rdd2 = sc.parallelize(List("a","b","c","d","e")).map(x=>(x,1))
rdd1.collect
rdd2.collect
//使用join將rdd1和rdd2連接起來
rdd1.join(rdd2).collect
rdd2.join(rdd1).collect

image-20211028204433345

image-20211028204610894

leftOuterJoin和rightOuterJoin和fullOuterJoin

rightOuterJoin 右外連接。第二個RDD的鍵必須存在

leftOuterJoin 左外連接。第一個RDD的鍵必須存在

fullOuterJoin 全外連接。兩個鍵都要有

//rdd1和rdd2延用上方的
rdd1.collect
rdd2.collect
//右外連接
rdd1.rightOuterJoin(rdd2).collect
//左外連接
rdd1.leftOuterJoin(rdd2).collect
//全外連接
rdd1.fullOuterJoin(rdd2).collect

image-20211028211750459

zip

作用:組合兩個RDD為鍵值對RDD

  • 兩個RDD的分區數必須相同(查詢分區數rdd.partitions.size)
  • 兩個RDD的元素個數必須相同
val rdd1  = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(List("a","b","c"))
rdd1.collect
rdd2.collect
rdd2.zip(rdd1).collect
rdd1.zip(rdd2).collect
rdd1.partitions.size
rdd2.partitions.size
val rdd3  = sc.parallelize(1 to 3,3)//3是指的分區數
val rdd4 = sc.parallelize(List("a","b","c"),3)//3是指的分區數
rdd3.partitions.size
rdd4.partitions.size

image-20211028213651663

image-20211028213947300

CombineByKey

合並相同鍵的值,合並后值的類型可以不同

目標:想將值轉換為List類型

groupByKey([numPartitions])

按鍵分組,在(K,V)對組成的RDD上調用時,返回(K,Iterable )對組成的新的RDD。

val rdd1 = sc.parallelize(List("A","B","C","C","C","D","D")).map(x=>(x,1))
rdd1.groupByKey().collect
rdd1.groupByKey().collect()

image-20211029201616432

reduceByKey(func, [numPartitions])

將鍵值對RDD按鍵分組后進行聚合(Key相同,則只保留一個Key,值+1)

  • 當在(K,V)類型的鍵值對組成的RDD上調用時,返回一個(K,V)類型鍵值對組成的新RDD
  • 其中新RDD每個鍵的值使用給定的reduce函數func進行聚合,該函數必須是(V,V)=>V類型
  • 可用來統計每個鍵出現的次數
val rdd1 = sc.parallelize(List("A","B","C","C","C","D","D")).map(x=>(x,1))
rdd1.reduceByKey((x,y)=>x+y).collect
rdd1.reduceByKey((x,y)=>x+y).collect()

image-20211029195335339

文件讀取與存儲

結構名稱 結構化 描述
文本文件 普通文本文件,每一行一條記錄
SequenceFile 用於鍵值對數據的常見Hadoop文件格式
rdd.partitions.size

saveAsTextFile(Path:String)

把RDD保存到HDFS中

val rdd1 = sc.parallelize(List(1,2,3,4))
rdd1.saveAsTextFile("/302Spark/savetext")
//輸入IP:50070 查詢

image-20211029091516499

image-20211029091510913

image-20211029091526666

saveAsSequenceFile and sc.sequenceFile

saveAsSequenceFile(Path:String)

序列化文件,僅支持鍵值對RDD

sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int)

讀序列化文件:

//為了讓Spark支持Hadoop的數據類型,需要導包
import org.apache.hadoop.io.
sc.sequenceFile(Path:String,KeyClass:key[K])

實例

//序列化文件儲存
val rdd = sc.parallelize(List(("panda",3),("dog",6),("cat",3)))
rdd.saveAsSequenceFile("/hadoop-zwj25/testSeq")
rdd.partitions.size

image-20211029214747241

image-20211029214810121

image-20211029214821179

image-20211029214828860

//查看序列化文件
hdfs dfs -ls /hadoop-zwj25
hdfs dfs -ls /hadoop-zwj25/testSeq
hdfs dfs -cat /hadoop-zwj25/testSeq/part-00000

image-20211029214939010

//引入hadoop數據類型
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
//序列化文件讀取
//第1個classOf[Text]中的Text是鍵的類型
//第2個classOf[IntSWritable]中的IntSWritable是值的類型
val output = sc.sequenceFile("/hadoop-zwj25/testSeq",classOf[Text],classOf[IntWritable])
output.map{case(x,y)=>(x.toString,y.get())}.collect
rdd.collect
val rddtest = sc.parallelize(List(1,2,3))
rddtest.map{case 1=>"One";case 2=>"Two";case _=>"other"}.collect
rddtest.map{case x=>(x,"a")}.collect

image-20211029215507871

repartition() 重新分區

repartition(numPartitions: Int)

  • 可以增加或減少此RDD中的並行級別。在內部,它使用shuffle重新分發數據。
  • 如果要減少此RDD中的分區數,請考慮使用coalesce,這樣可以避免執行shuffle。
coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)

查詢分區:partitions.size

rdd.repartition(numPartitions:Int).partitions.size

減少分區數,請考慮使用coalesce,這樣可以避免執行

val rdd1 = sc.parallelize(List(1,2,3,4))
rdd1.saveAsTextFile("/302Spark/savetext")
//輸入IP:50070 查詢
//---------------------------------------
rdd1.partitions.size
rdd1.repartition(1).partitions.size
rdd1.repartition(1).saveAsTextFile("/302Spark/savetext1")

image-20211029092650290

image-20211029092703281

練習

Practice01

題目

找出考試成績得過100分的學生ID,最終的結果需要集合到一個RDD中。

素材

請將下面代碼塊中內容粘貼到文本文檔result_bigdata.txt中

1001	大數據基礎	90
1002	大數據基礎	94
1003	大數據基礎	100
1004	大數據基礎	99
1005	大數據基礎	90
1006	大數據基礎	94
1007	大數據基礎	100
1008	大數據基礎	93
1009	大數據基礎	89
1010	大數據基礎	78
1011	大數據基礎	91
1012	大數據基礎	84

代碼

//從本地文件創建RDD
val rdd_bigdata = sc.textFile("file:///home/用戶名/result_bigdata.txt")
//隨便輸出一個測試
rdd_bigdata.take(2)
//查看所有結果
rdd_bigdata.collect
//下面這種方法需要轉為Int型
val bigdata_100=rdd_bigdata.map(x=>x.split("\t")).map(x=>(x(0),x(1),x(2).toInt)).filter(x=>x._3==100).map(x=>x._1)
bigdata_100.collect
//下面這種方法無需轉為Int型
val bigdata_100=rdd_bigdata.map(x=>x.split("\t")).filter(x=>x(2)=="100").map(x=>x(0))
bigdata_100.collect

Practice02

題目

輸出每位學生的總成績,要求將兩個成績表中學生ID相同的成績相加。

素材

請將下面代碼塊中內容粘貼到文本文檔score.txt中

math John 90
math Betty 88
math Mike 95
math Lily 92
chinese John 78
chinese Betty 80
chinese Mike 88
chinese Lily 85
english John 92
english Betty 84
english Mike 90
english Lily 85

請將下面代碼塊中內容粘貼到文本文檔result_math.txt中

1001	應用數學	96
1002	應用數學	94
1003	應用數學	100
1004	應用數學	100
1005	應用數學	94
1006	應用數學	80
1007	應用數學	90
1008	應用數學	94
1009	應用數學	84
1010	應用數學	86
1011	應用數學	79
1012	應用數學	91

請將下面代碼塊中內容粘貼到文本文檔result_bigdata.txt中

1001	大數據基礎	90
1002	大數據基礎	94
1003	大數據基礎	100
1004	大數據基礎	99
1005	大數據基礎	90
1006	大數據基礎	94
1007	大數據基礎	100
1008	大數據基礎	93
1009	大數據基礎	89
1010	大數據基礎	78
1011	大數據基礎	91
1012	大數據基礎	84

代碼

//從本地文件創建RDD
val rdd_bigdata = sc.textFile("file:///home/hadoop-zwj25/result_bigdata.txt")
val rdd_math = sc.textFile("file:///home/hadoop-zwj25/result_math.txt")
//返回RDD中所有的元素
rdd_bigdata.collect
rdd_math.collect
//合並兩個RDD
val rddall = rdd_math.union(rdd_bigdata)
rddall.collect
rddall.map(x=>(x.split("\t"))).map(x=>(x(0),x(2).toInt)).reduceByKey((x,y)=>x+y).collect

image-20211028202051100

image-20211028202115692

Practice03

題目

1.輸出每位同學的平均成績,要求將兩個成績表中學生ID相同的成績相加並計算出平均分。

2.合並每位同學的總成績和平均成績

完成平均分及合並任務

素材

代碼

//1.創建RDD並轉換
val math_map = math.map(x=>x.split("\t")).map(x=>(x(0),x(2).toInt))
math_map.collect
val bigdata=sc.textFile("file:///home/hadoop-zwj25/result_bigdata.txt")
val bigdata_map = bigdata.map(x=>x.split("\t")).map(x=>(x(0),x(2).toInt))
bigdata_map.collect
//將兩個鍵值對RDD合並,使用take隨機測試一個
math_map.union(bigdata_map).take(3)
//為成績添加成績計數,從1開始,完成后的格式是 (ID,(成績,計數))
math_map.union(bigdata_map).mapValues(x=>(x,1)).take(3)
//注意,轉換后的類型需和原值類型保持一致
//reduceByKey((x,y)=>((x._1+y._1),(x._2+y._2)))解釋如下
//x是原來的成績,y是計數的
//(x._1+y._1)代表總成績(兩門學科成績相加)
//(x._2+y._2)代表總門數(兩個計數值相加)
math_map.union(bigdata_map).mapValues(x=>(x,1)).reduceByKey((x,y)=>((x._1+y._1),(x._2+y._2))).take(3)
//將合並后RDD按鍵 分組 並 聚合
//平均成績為 總成績/學科數
//x_1是ID ; x._2 是(成績,計數值)
//所以求平均成績為 (x._2._1/x._2._2)
val pj = math_map.union(bigdata_map).mapValues(x=>(x,1)).reduceByKey((x,y)=>((x._1+y._1),(x._2+y._2))).map(x=>(x._1,(x._2._1/x._2._2)))

//查詢結果
pj.collect
//輸出總成績
val zf = math_map.union(bigdata_map).reduceByKey((x,y)=>x+y)
zf.collect

pj.count
zf.count
//合並每個學生的總分與平均分
zf.join(pj).count
      

image-20211029162934946


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM