RDD
Author:萌狼藍天
【嗶哩嗶哩】萌狼藍天
【博客】https://mllt.cc
【博客園】萌狼藍天 - 博客園
【微信公眾號】mllt9920
【學習交流QQ群】238948804
@萌狼藍天
【!】啟動spark集群
【!】啟動
spark-shell

spark2.0將spark context 和hive context集成到了spark session
spark也可以作為程序入口
spark用scala編程

特點
它是集群節點上的不可改變的、已分區的集合對象;
- 通過並行轉換的方式來創建如(map、filter、join等);
- 失敗自動重建;
- 可以控制存儲級別(內存、磁盤等)來進行重用;
- 必須是可序列化的;在內存不足時可自動降級為磁盤存儲,把RDD存儲於磁盤上,這時性能有大的下降但不會差於現在的MapReduce;
- 對於丟失部分數據分區只需要根據它的lineage就可重新計算出來,而不需要做特定的checkpoint;
創建
從內存中創建RDD
啟動spark-shell
val list = List(1,2,3)
var rdd = sc.parallelize(list)
rdd.partitions.size

從外部存儲創建RDD
1.創建本地文件
cd /home
mkdir data
touch a.txt
- 不一定非要在家目錄創建
- 可以使用vim在a.txt中添加一些內容
2.啟動spark-shell
3.從本地文件系統中讀取
val localrdd = sc.textFile("file:///home/用戶名/data/a.txt")
路徑前面加
file://表示從本地文件系統讀取
localrdd.collect//返回RDD中所有的元素
注意:若在完全分布式spark-shell模式下,該文件需要在所有節點的相同位置保存才可以被讀取,否則會報錯“文件不存在”

從HDFS創建RDD
1.在HDFS根目錄下創建目錄(姓名學號)
hdfs dfs -mkdir /zwj25
hdfs dfs -ls /
訪問 http://[IP]:50070




2.上傳本地文件到HDFS
hdfs dfs -put file.txt /zwj25

3.進入spark4-shell
var hdfsrdd=sc.textFile("/zwj25/file.txt")
hdfsrdd.collect
hdfsrdd.partitions
hdfsrdd.partitions.size
sc.defaultMinPartitions=min(sc.defaultParallelism,2)
rdd分區數=max(hdfs文件的block數目,sc.defaultMinPartitions)


從其他RDD創建
算子
map(func)
類型:Transformation類型算子
map: 將原來RDD的每個數據項通過map中的用戶自定義函數f轉換成一個新的RDD,map操作不會改變RDD的分區數目
filter 過濾
filter(func)
Transformation類型算子
保留通過函數func,返回值為true的元素,組成新的RDD
eg:過濾掉data RDD中元素小於或等於2的元素
val data =sc.parallelize(List(1,2,3,4))
val result = data.filter(x=>x>2)
result.collect

flatMap(func) 分割單詞
類型:Transformation類型算子
flatMap:對集合中的每個元素進行map操作再扁平化
val data = sc.parallelize(List("I am Meng Lang Lan Tian","my wechat is mllt9920"))
data.map(x=>x.split(" ")).collect
data.flatMap(x=>x.split(" ")).collect

sortBy 排序
sortBy(f:(T) => K, ascending, numPartitions)
類型:Transformation類型算子
作用:對標准RDD進行排序
sortBy()可接受三個參數:
f:(T) => K:左邊是要被排序對象中的每一個元素,右邊返回的值是元素中要進行排序的值。
ascending:決定排序后RDD中的元素是升序還是降序,默認是true,也就是升序,false為降序排序。
numPartitions:該參數決定排序后的RDD的分區個數,默認排序后的分區個數和排序之前的個數相等。
eg:按照每個元素的第二個值進行降序排序,將得到的結果存放到RDD "data2" 中
val data1 = sc.parallelize(List((1,3),(2,4),(5,7),(6,8)))
val data2 = data1.sortBy(x=>x._2,false,1)
val data3 = data1.sortBy(x=>x._1,false,1)




distinct 去重復
distinct([numPartitions]))
類型:Transformation類型算子
作用:去重。針對RDD中重復的元素,只保留一個元素
eg:
val data1 = sc.parallelize(List(1,2,3,3,3,4,4))
data1.collect
data1.distinct.collect
data1.collect

union 合並
union(otherDataset)
作用:合並RDD,需要保證兩個RDD元素類型一致
eg:合並rdd1和rdd2
val rdd1 = sc.parallelize(List(1,2,3))
val rdd2 = sc.parallelize(List(4,5,6))
rdd1.union(rdd2).collect

注意:union兩個RDD元素類型要一致
intersection 交集
intersection(otherDataset)
作用:找出兩個RDD的共同元素,也就是找出兩個RDD的交集
eg:找出c_rdd1和c_rdd2中相同的元素
val c_rdd1 = sc.parallelize(List(('a',1),('b',2),('a',1),('c',1)))
val c_rdd2 = sc.parallelize(List(('a',1),('b',1),('d',1),('e',1)))
c_rdd1.intersection(c_rdd2).collect

subtract 差集
subtract (otherDataset)
作用:獲取兩個RDD之間的差集
eg:找出rdd1與rdd2之間的差集
val rdd1 = sc.parallelize(Array("A","B","C","D"))
val rdd2 = sc.parallelize(Array("C","D","E","F"))
val subtractRDD = rdd1.subtract(rdd2)
subtractRDD.collect

cartesian
cartesian(otherDataset)
名稱:笛卡爾積
作用:將兩個集合的元素兩兩組合成一組
eg:
val rdd01 = sc.makeRDD(List(1,3,5,3))
val rdd02 = sc.makeRDD(List(2,4,5,1))
rdd01.cartesian(rdd02).collect

take(num)
返回RDD前面num條記錄
val data = sc.parallelize(List(1,2,3,4))
data.take(2)

鍵值對RDD
mapValues
val rdd = sc.parallelize(List("a","b","c","d"))
//通過map創建鍵值對
var rddp = rdd.map(x=>(x,1))
rddp.collect
rddp.keys.collect
rddp.values.collect
//通過mapValues讓所有Value值加一
rddp.mapValues(x=>x+1).collect


val rdd1 = sc.parallelize(List("I am a student","Hello word","Just Play"))
val rdd2 = rdd1.map(x=>(x,992))
rdd2.collect
rdd2.keys.collect
rdd2.values
rdd2.values.collect

val rdd3 = sc.parallelize(List("I am a student","Hello word","Just Play"))
val rdd4 = rdd1.map(x=>x.split(" "))
rdd4.collect
val p1=rdd4.map(x=>(x.split(" "),x))
p1.collect
join按鍵內連接
val rdd = sc.parallelize(List("a","b","c","d"))
//通過map創建鍵值對
var rddp = rdd.map(x=>(x,1))
//通過mapValues讓所有Value值加一
var rdd1 = rddp.mapValues(x=>x+1)
//同理得到rdd2
val rdd2 = sc.parallelize(List("a","b","c","d","e")).map(x=>(x,1))
rdd1.collect
rdd2.collect
//使用join將rdd1和rdd2連接起來
rdd1.join(rdd2).collect
rdd2.join(rdd1).collect


leftOuterJoin和rightOuterJoin和fullOuterJoin
rightOuterJoin 右外連接。第二個RDD的鍵必須存在
leftOuterJoin 左外連接。第一個RDD的鍵必須存在
fullOuterJoin 全外連接。兩個鍵都要有
//rdd1和rdd2延用上方的
rdd1.collect
rdd2.collect
//右外連接
rdd1.rightOuterJoin(rdd2).collect
//左外連接
rdd1.leftOuterJoin(rdd2).collect
//全外連接
rdd1.fullOuterJoin(rdd2).collect

zip
作用:組合兩個RDD為鍵值對RDD
- 兩個RDD的分區數必須相同(查詢分區數rdd.partitions.size)
- 兩個RDD的元素個數必須相同
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(List("a","b","c"))
rdd1.collect
rdd2.collect
rdd2.zip(rdd1).collect
rdd1.zip(rdd2).collect
rdd1.partitions.size
rdd2.partitions.size
val rdd3 = sc.parallelize(1 to 3,3)//3是指的分區數
val rdd4 = sc.parallelize(List("a","b","c"),3)//3是指的分區數
rdd3.partitions.size
rdd4.partitions.size


CombineByKey
合並相同鍵的值,合並后值的類型可以不同
目標:想將值轉換為List類型
groupByKey([numPartitions])
按鍵分組,在(K,V)對組成的RDD上調用時,返回(K,Iterable
)對組成的新的RDD。
val rdd1 = sc.parallelize(List("A","B","C","C","C","D","D")).map(x=>(x,1))
rdd1.groupByKey().collect
rdd1.groupByKey().collect()

reduceByKey(func, [numPartitions])
將鍵值對RDD按鍵分組后進行聚合(Key相同,則只保留一個Key,值+1)
- 當在(K,V)類型的鍵值對組成的RDD上調用時,返回一個(K,V)類型鍵值對組成的新RDD
- 其中新RDD每個鍵的值使用給定的reduce函數func進行聚合,該函數必須是(V,V)=>V類型
- 可用來統計每個鍵出現的次數
val rdd1 = sc.parallelize(List("A","B","C","C","C","D","D")).map(x=>(x,1))
rdd1.reduceByKey((x,y)=>x+y).collect
rdd1.reduceByKey((x,y)=>x+y).collect()

文件讀取與存儲
| 結構名稱 | 結構化 | 描述 |
|---|---|---|
| 文本文件 | 否 | 普通文本文件,每一行一條記錄 |
| SequenceFile | 是 | 用於鍵值對數據的常見Hadoop文件格式 |
rdd.partitions.size
saveAsTextFile(Path:String)
把RDD保存到HDFS中
val rdd1 = sc.parallelize(List(1,2,3,4))
rdd1.saveAsTextFile("/302Spark/savetext")
//輸入IP:50070 查詢



saveAsSequenceFile and sc.sequenceFile
saveAsSequenceFile(Path:String)
序列化文件,僅支持鍵值對RDD
sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int)
讀序列化文件:
//為了讓Spark支持Hadoop的數據類型,需要導包
import org.apache.hadoop.io.
sc.sequenceFile(Path:String,KeyClass:key[K])
實例
//序列化文件儲存
val rdd = sc.parallelize(List(("panda",3),("dog",6),("cat",3)))
rdd.saveAsSequenceFile("/hadoop-zwj25/testSeq")
rdd.partitions.size




//查看序列化文件
hdfs dfs -ls /hadoop-zwj25
hdfs dfs -ls /hadoop-zwj25/testSeq
hdfs dfs -cat /hadoop-zwj25/testSeq/part-00000

//引入hadoop數據類型
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
//序列化文件讀取
//第1個classOf[Text]中的Text是鍵的類型
//第2個classOf[IntSWritable]中的IntSWritable是值的類型
val output = sc.sequenceFile("/hadoop-zwj25/testSeq",classOf[Text],classOf[IntWritable])
output.map{case(x,y)=>(x.toString,y.get())}.collect
rdd.collect
val rddtest = sc.parallelize(List(1,2,3))
rddtest.map{case 1=>"One";case 2=>"Two";case _=>"other"}.collect
rddtest.map{case x=>(x,"a")}.collect

repartition() 重新分區
repartition(numPartitions: Int)
- 可以增加或減少此RDD中的並行級別。在內部,它使用shuffle重新分發數據。
- 如果要減少此RDD中的分區數,請考慮使用coalesce,這樣可以避免執行shuffle。
coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
查詢分區:partitions.size
rdd.repartition(numPartitions:Int).partitions.size
減少分區數,請考慮使用coalesce,這樣可以避免執行
val rdd1 = sc.parallelize(List(1,2,3,4))
rdd1.saveAsTextFile("/302Spark/savetext")
//輸入IP:50070 查詢
//---------------------------------------
rdd1.partitions.size
rdd1.repartition(1).partitions.size
rdd1.repartition(1).saveAsTextFile("/302Spark/savetext1")


練習
Practice01
題目
找出考試成績得過100分的學生ID,最終的結果需要集合到一個RDD中。
素材
請將下面代碼塊中內容粘貼到文本文檔result_bigdata.txt中
1001 大數據基礎 90
1002 大數據基礎 94
1003 大數據基礎 100
1004 大數據基礎 99
1005 大數據基礎 90
1006 大數據基礎 94
1007 大數據基礎 100
1008 大數據基礎 93
1009 大數據基礎 89
1010 大數據基礎 78
1011 大數據基礎 91
1012 大數據基礎 84
代碼
//從本地文件創建RDD
val rdd_bigdata = sc.textFile("file:///home/用戶名/result_bigdata.txt")
//隨便輸出一個測試
rdd_bigdata.take(2)
//查看所有結果
rdd_bigdata.collect
//下面這種方法需要轉為Int型
val bigdata_100=rdd_bigdata.map(x=>x.split("\t")).map(x=>(x(0),x(1),x(2).toInt)).filter(x=>x._3==100).map(x=>x._1)
bigdata_100.collect
//下面這種方法無需轉為Int型
val bigdata_100=rdd_bigdata.map(x=>x.split("\t")).filter(x=>x(2)=="100").map(x=>x(0))
bigdata_100.collect
Practice02
題目
輸出每位學生的總成績,要求將兩個成績表中學生ID相同的成績相加。
素材
請將下面代碼塊中內容粘貼到文本文檔score.txt中
math John 90
math Betty 88
math Mike 95
math Lily 92
chinese John 78
chinese Betty 80
chinese Mike 88
chinese Lily 85
english John 92
english Betty 84
english Mike 90
english Lily 85
請將下面代碼塊中內容粘貼到文本文檔result_math.txt中
1001 應用數學 96
1002 應用數學 94
1003 應用數學 100
1004 應用數學 100
1005 應用數學 94
1006 應用數學 80
1007 應用數學 90
1008 應用數學 94
1009 應用數學 84
1010 應用數學 86
1011 應用數學 79
1012 應用數學 91
請將下面代碼塊中內容粘貼到文本文檔result_bigdata.txt中
1001 大數據基礎 90
1002 大數據基礎 94
1003 大數據基礎 100
1004 大數據基礎 99
1005 大數據基礎 90
1006 大數據基礎 94
1007 大數據基礎 100
1008 大數據基礎 93
1009 大數據基礎 89
1010 大數據基礎 78
1011 大數據基礎 91
1012 大數據基礎 84
代碼
//從本地文件創建RDD
val rdd_bigdata = sc.textFile("file:///home/hadoop-zwj25/result_bigdata.txt")
val rdd_math = sc.textFile("file:///home/hadoop-zwj25/result_math.txt")
//返回RDD中所有的元素
rdd_bigdata.collect
rdd_math.collect
//合並兩個RDD
val rddall = rdd_math.union(rdd_bigdata)
rddall.collect
rddall.map(x=>(x.split("\t"))).map(x=>(x(0),x(2).toInt)).reduceByKey((x,y)=>x+y).collect


Practice03
題目
1.輸出每位同學的平均成績,要求將兩個成績表中學生ID相同的成績相加並計算出平均分。
2.合並每位同學的總成績和平均成績
完成平均分及合並任務
素材
代碼
//1.創建RDD並轉換
val math_map = math.map(x=>x.split("\t")).map(x=>(x(0),x(2).toInt))
math_map.collect
val bigdata=sc.textFile("file:///home/hadoop-zwj25/result_bigdata.txt")
val bigdata_map = bigdata.map(x=>x.split("\t")).map(x=>(x(0),x(2).toInt))
bigdata_map.collect
//將兩個鍵值對RDD合並,使用take隨機測試一個
math_map.union(bigdata_map).take(3)
//為成績添加成績計數,從1開始,完成后的格式是 (ID,(成績,計數))
math_map.union(bigdata_map).mapValues(x=>(x,1)).take(3)
//注意,轉換后的類型需和原值類型保持一致
//reduceByKey((x,y)=>((x._1+y._1),(x._2+y._2)))解釋如下
//x是原來的成績,y是計數的
//(x._1+y._1)代表總成績(兩門學科成績相加)
//(x._2+y._2)代表總門數(兩個計數值相加)
math_map.union(bigdata_map).mapValues(x=>(x,1)).reduceByKey((x,y)=>((x._1+y._1),(x._2+y._2))).take(3)
//將合並后RDD按鍵 分組 並 聚合
//平均成績為 總成績/學科數
//x_1是ID ; x._2 是(成績,計數值)
//所以求平均成績為 (x._2._1/x._2._2)
val pj = math_map.union(bigdata_map).mapValues(x=>(x,1)).reduceByKey((x,y)=>((x._1+y._1),(x._2+y._2))).map(x=>(x._1,(x._2._1/x._2._2)))
//查詢結果
pj.collect
//輸出總成績
val zf = math_map.union(bigdata_map).reduceByKey((x,y)=>x+y)
zf.collect
pj.count
zf.count
//合並每個學生的總分與平均分
zf.join(pj).count


