一、實驗目的
1、熟悉Spark的RDD基本操作及鍵值對操作;
2、熟悉使用RDD編程解決實際具體問題的方法。
二、實驗平台
操作系統:Ubuntu16.04
Spark版本:2.4.0
Python版本:3.4.3
三、實驗內容、要求
1.pyspark交互式編程
本作業提供分析數據data.txt,該數據集包含了某大學計算機系的成績,數據格式如下所示:
Tom,Algorithm,50
Tom,Datastructure,60
Jim,Database,90
Jim,Algorithm,60
Jim,Datastructure,80
……
請根據給定的實驗數據,在pyspark中通過編程來計算以下內容:
(1)該系總共有多少學生;
(2)該系共開設了多少門課程;
(3)Tom同學的總成績平均分是多少;
(4)求每名同學的選修的課程門數;
(5)該系DataBase課程共有多少人選修;
(6)各門課程的平均分是多少;
(7)使用累加器計算共有多少人選了DataBase這門課。
2.編寫獨立應用程序實現數據去重
對於兩個輸入文件A和B,編寫Spark獨立應用程序,對兩個文件進行合並,並剔除其中重復的內容,得到一個新文件C。本文給出門課的成績(A.txt、B.txt)下面是輸入文件和輸出文件的一個樣例,供參考。
輸入文件A的樣例如下:
20200101 x
20200102 y
20200103 x
20200104 y
20200105 z
20200106 z
輸入文件B的樣例如下:
20200101 y
20200102 y
20200103 x
20200104 z
20200105 y
根據輸入的文件A和B合並得到的輸出文件C的樣例如下:
20200101 x
20200101 y
20200102 y
20200103 x
20200104 y
20200104 z
20200105 y
20200105 z
20200106 z
3.編寫獨立應用程序實現求平均值問題
每個輸入文件表示班級學生某個學科的成績,每行內容由兩個字段組成,第一個是學生名字,第二個是學生的成績;編寫Spark獨立應用程序求出所有學生的平均成績,並輸出到一個新文件中。本文給出門課的成績(Algorithm.txt、Database.txt、Python.txt),下面是輸入文件和輸出文件的一個樣例,供參考。
Algorithm成績:
小明 92
小紅 87
小新 82
小麗 90
Database成績:
小明 95
小紅 81
小新 89
小麗 85
Python成績:
小明 82
小紅 83
小新 94
小麗 91
平均成績如下:
(小紅,83.67)
(小新,88.33)
(小明,89.67)
(小麗,88.67)
四、實驗過程
實驗數據准備:
1、將數據文件復制到/usr/local/spark/sparksqldata/目錄下
hadoop@dblab-VirtualBox:~$ cd /usr/local/spark/sparksqldata
hadoop@dblab-VirtualBox:/usr/local/spark/sparksqldata$ ls
chapter4-data.txt
hadoop@dblab-VirtualBox:/usr/local/spark/sparksqldata$ cp -r /home/hadoop/桌面/ 大數據/* /usr/local/spark/sparksqldata
hadoop@dblab-VirtualBox:/usr/local/spark/sparksqldata$ ls
Algorithm.txt B.txt Database.txt Python.txt A.txt
chapter4-data.txt data.txt
(一)pyspark交互式編程
1、輸入pyspark開啟spark
hadoop@dblab-VirtualBox:/usr/local/spark/sparksqldata$ pyspark
2、加載data.txt文件
lines = sc.textFile("file:///usr/local/spark/sparksqldata/data.txt")
3、查看數據共有多少行
lines.count()
15022
4、去重查看數據,以防重復
lines.distinct().count()
1073
5、通過去重計算數據行數發現與總行數不一致,所以數據文件存在數據內容重復現象,此時要將數據過濾得到一個沒有重復的數據集
data = lines.distinct()
data.count()
1073
6、完成各項需求
(1)該系總共有多少學生;
res = data.map(lambda x:x.split(",")).map(lambda x:x[0])
dis_res = res.distinct()
dis_res.count()
265
(2)該系共開設了多少門課程;
res = data.map(lambda x:x.split(",")).map(lambda x:x[1])
dis_res = res.distinct()
dis_res.count()
8
(3)Tom同學的總成績平均分是多少;
res = data.map(lambda x:x.split(",")).filter(lambda x:x[0]=="Tom")
score = res.map(lambda x:int(x[2]))
curriculum_num = res.count()
score_sum = score.reduce(lambda x,y:x+y)
score_avg = score_sum/curriculum_num
score_avg
30.8
(4)求每名同學的選修的課程門數;
res = data.map(lambda x:x.split(",")).map(lambda x:(x[0],1))
res_curriculum = res.reduceByKey(lambda x,y:x+y)
res_curriculum.foreach(print)
res = data.map(lambda x:x.split(",")).map(lambda x:(x[0],1))
curriculum = res.reduceByKey(lambda x,y:x+y)
curriculum.foreach(print)
('Lewis', 56)
('Mike', 42)
('Walter', 56)
('Conrad', 28)
('Borg', 56)
('Bert', 42)
('Eli', 70)
('Clare', 56)
('Charles', 42)
('Alston', 56)
('Scott', 42)
('Angelo', 28)
('Christopher', 56)
('Webb', 98)
('Bill', 28)
('Rock', 84)
('Jonathan', 56)
(5)該系DataBase課程共有多少人選修;
res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase").map(lambda x:x[0])
dis_res = res.distinct()
dis_res.count()
125
(6)各門課程的平均分是多少;
res = data.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")
res.count()
126
(7)使用累加器計算共有多少人選了DataBase這門課。
res = data.map(lambda x:x.split(",")).map(lambda x:(x[1],(int(x[2]),1)))
temp = res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
avg = temp.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
avg.foreach(print)
('ComputerNetwork', 51.9)
('Software', 50.91)
('DataBase', 50.54)
('Algorithm', 48.83)
('OperatingSystem', 54.94)
('DataStructure', 47.57)
('Python', 57.82)
('CLanguage', 50.61)
> res = data.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")
summation = sc.accumulator(0)
res.foreach(lambda x:summation.add(1))
summation.value
126
(二)編寫獨立應用程序實現數據去重
假設/usr/local/spark/sparksqldata/目錄是當前目錄
1.在當前目錄下創建一個merge.py的文件,用於編寫程序代碼
2.編寫程序實現數據去重,並寫入新文件
from pyspark import SparkContext
# 初始化SparkContext
sc = SparkContext("local","sparksqldata")
# 加載文件數據
linesA = sc.textFile("file:///usr/local/spark/sparksqldata/A.txt")
linesB = sc.textFile("file:///usr/local/spark/sparksqldata/B.txt")
# 合並文件數據
linesC = linesA.union(linesB)
# 去重
dis_linesC = linesC.distinct()
# 排序
res = dis_linesC.sortBy(lambda x:x)
# 將得到的數據寫入一個新文件
res.repartition(1).saveAsTextFile("file:///usr/local/spark/sparksqldata/result")
3.運行程序代碼后會在當前目錄下生成一個新文件夾,新文件夾內有重新寫入的文件
4.查看經過去重后新寫入的文件內容
(三)編寫獨立應用程序實現求平均值問題
假設/usr/local/spark/sparksqldata/目錄是當前目錄
1.在當前目錄下創建一個avgScore.py的文件,用於編寫程序代碼
2.編寫程序求得各同學得平均成績
from pyspark import SparkContext
sc = SparkContext("local","sparksqldata")
# 加載文件數據
lines1 = sc.textFile("file:///usr/local/spark/sparksqldata/Algorithm.txt")
lines2 = sc.textFile("file:///usr/local/spark/sparksqldata/Database.txt")
lines3 = sc.textFile("file:///usr/local/spark/sparksqldata/Python.txt")
# 合並文件
lines = lines1.union(lines2).union(lines3)
# 去除空行
newlines = lines.filter(lambda x:x is not None).filter(lambda x:x is not '')
# 拆分
data = newlines.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1)))
# 分組統計
res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
# 計算每個同學的平均成績
result = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
3.運行程序后會在當前目錄下生成一個新的文件夾,文件夾下存放着程序運行得結果
4.查看結果