Spark RDD編程-大數據課設


一、實驗目的

1、熟悉Spark的RDD基本操作及鍵值對操作;
2、熟悉使用RDD編程解決實際具體問題的方法。

二、實驗平台

操作系統:Ubuntu16.04
Spark版本:2.4.0
Python版本:3.4.3

三、實驗內容、要求

1.pyspark交互式編程

本作業提供分析數據data.txt,該數據集包含了某大學計算機系的成績,數據格式如下所示:

Tom,Algorithm,50
Tom,Datastructure,60
Jim,Database,90
Jim,Algorithm,60
Jim,Datastructure,80
……

請根據給定的實驗數據,在pyspark中通過編程來計算以下內容:
(1)該系總共有多少學生;
(2)該系共開設了多少門課程;
(3)Tom同學的總成績平均分是多少;
(4)求每名同學的選修的課程門數;
(5)該系DataBase課程共有多少人選修;
(6)各門課程的平均分是多少;
(7)使用累加器計算共有多少人選了DataBase這門課。

2.編寫獨立應用程序實現數據去重

對於兩個輸入文件A和B,編寫Spark獨立應用程序,對兩個文件進行合並,並剔除其中重復的內容,得到一個新文件C。本文給出門課的成績(A.txt、B.txt)下面是輸入文件和輸出文件的一個樣例,供參考。

輸入文件A的樣例如下:
20200101    x
20200102    y
20200103    x
20200104    y
20200105    z
20200106    z
輸入文件B的樣例如下:
20200101    y
20200102    y
20200103    x
20200104    z
20200105    y
根據輸入的文件A和B合並得到的輸出文件C的樣例如下:
20200101    x
20200101    y
20200102    y
20200103    x
20200104    y
20200104    z
20200105    y
20200105    z
20200106    z

3.編寫獨立應用程序實現求平均值問題

每個輸入文件表示班級學生某個學科的成績,每行內容由兩個字段組成,第一個是學生名字,第二個是學生的成績;編寫Spark獨立應用程序求出所有學生的平均成績,並輸出到一個新文件中。本文給出門課的成績(Algorithm.txt、Database.txt、Python.txt),下面是輸入文件和輸出文件的一個樣例,供參考。

Algorithm成績:
小明 92
小紅 87
小新 82
小麗 90
Database成績:
小明 95
小紅 81
小新 89
小麗 85
Python成績:
小明 82
小紅 83
小新 94
小麗 91
平均成績如下:
 (小紅,83.67)
 (小新,88.33)
 (小明,89.67)
 (小麗,88.67)

四、實驗過程

實驗數據准備:
1、將數據文件復制到/usr/local/spark/sparksqldata/目錄下

hadoop@dblab-VirtualBox:~$ cd /usr/local/spark/sparksqldata
hadoop@dblab-VirtualBox:/usr/local/spark/sparksqldata$ ls
chapter4-data.txt
hadoop@dblab-VirtualBox:/usr/local/spark/sparksqldata$ cp -r /home/hadoop/桌面/ 大數據/* /usr/local/spark/sparksqldata
hadoop@dblab-VirtualBox:/usr/local/spark/sparksqldata$ ls
Algorithm.txt  B.txt              Database.txt  Python.txt A.txt      
chapter4-data.txt  data.txt

將數據文件放到實驗目錄下

(一)pyspark交互式編程

1、輸入pyspark開啟spark

hadoop@dblab-VirtualBox:/usr/local/spark/sparksqldata$ pyspark

開啟spark
2、加載data.txt文件

lines = sc.textFile("file:///usr/local/spark/sparksqldata/data.txt")

加載數據
3、查看數據共有多少行

lines.count()
15022

查看數據集行數
4、去重查看數據,以防重復

lines.distinct().count()
1073

去重查看數據集行數
5、通過去重計算數據行數發現與總行數不一致,所以數據文件存在數據內容重復現象,此時要將數據過濾得到一個沒有重復的數據集

data = lines.distinct()
data.count()
1073

保存去重后的數據集
6、完成各項需求
(1)該系總共有多少學生;

res = data.map(lambda x:x.split(",")).map(lambda x:x[0])
dis_res = res.distinct()
dis_res.count()
265

計算學生個數
(2)該系共開設了多少門課程;

res = data.map(lambda x:x.split(",")).map(lambda x:x[1])
dis_res = res.distinct()
dis_res.count()
8

在這里插入圖片描述
(3)Tom同學的總成績平均分是多少;

res = data.map(lambda x:x.split(",")).filter(lambda x:x[0]=="Tom")
score = res.map(lambda x:int(x[2]))
curriculum_num = res.count()
score_sum = score.reduce(lambda x,y:x+y)
score_avg = score_sum/curriculum_num 
score_avg
30.8

在這里插入圖片描述
(4)求每名同學的選修的課程門數;

res = data.map(lambda x:x.split(",")).map(lambda x:(x[0],1))
res_curriculum = res.reduceByKey(lambda x,y:x+y)
res_curriculum.foreach(print)
res = data.map(lambda x:x.split(",")).map(lambda x:(x[0],1))
curriculum = res.reduceByKey(lambda x,y:x+y)
curriculum.foreach(print)
('Lewis', 56)
('Mike', 42)
('Walter', 56)
('Conrad', 28)
('Borg', 56)
('Bert', 42)
('Eli', 70)
('Clare', 56)
('Charles', 42)
('Alston', 56)
('Scott', 42)
('Angelo', 28)
('Christopher', 56)
('Webb', 98)
('Bill', 28)
('Rock', 84)
('Jonathan', 56)

在這里插入圖片描述
(5)該系DataBase課程共有多少人選修;

res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase").map(lambda x:x[0])
dis_res = res.distinct()
dis_res.count()
125

在這里插入圖片描述
(6)各門課程的平均分是多少;

res = data.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")
res.count()
126

在這里插入圖片描述
(7)使用累加器計算共有多少人選了DataBase這門課。

res = data.map(lambda x:x.split(",")).map(lambda x:(x[1],(int(x[2]),1)))
temp = res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
avg = temp.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))
avg.foreach(print)
('ComputerNetwork', 51.9)
('Software', 50.91)
('DataBase', 50.54)
('Algorithm', 48.83)
('OperatingSystem', 54.94)
('DataStructure', 47.57)
('Python', 57.82)
('CLanguage', 50.61)
> res = data.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")
summation = sc.accumulator(0)
res.foreach(lambda x:summation.add(1))
summation.value
126

在這里插入圖片描述

(二)編寫獨立應用程序實現數據去重

假設/usr/local/spark/sparksqldata/目錄是當前目錄
1.在當前目錄下創建一個merge.py的文件,用於編寫程序代碼
在這里插入圖片描述
2.編寫程序實現數據去重,並寫入新文件
在這里插入圖片描述

from pyspark import SparkContext

# 初始化SparkContext
sc = SparkContext("local","sparksqldata")

# 加載文件數據
linesA = sc.textFile("file:///usr/local/spark/sparksqldata/A.txt")
linesB = sc.textFile("file:///usr/local/spark/sparksqldata/B.txt")

# 合並文件數據
linesC = linesA.union(linesB)

# 去重
dis_linesC = linesC.distinct()

# 排序
res = dis_linesC.sortBy(lambda x:x)

# 將得到的數據寫入一個新文件
res.repartition(1).saveAsTextFile("file:///usr/local/spark/sparksqldata/result")

3.運行程序代碼后會在當前目錄下生成一個新文件夾,新文件夾內有重新寫入的文件
在這里插入圖片描述
4.查看經過去重后新寫入的文件內容
在這里插入圖片描述

(三)編寫獨立應用程序實現求平均值問題

假設/usr/local/spark/sparksqldata/目錄是當前目錄
1.在當前目錄下創建一個avgScore.py的文件,用於編寫程序代碼
在這里插入圖片描述
2.編寫程序求得各同學得平均成績
在這里插入圖片描述

from pyspark import SparkContext
sc = SparkContext("local","sparksqldata")

# 加載文件數據
lines1 = sc.textFile("file:///usr/local/spark/sparksqldata/Algorithm.txt")
lines2 = sc.textFile("file:///usr/local/spark/sparksqldata/Database.txt")
lines3 = sc.textFile("file:///usr/local/spark/sparksqldata/Python.txt")

# 合並文件
lines = lines1.union(lines2).union(lines3)

# 去除空行
newlines = lines.filter(lambda x:x is not None).filter(lambda x:x is not '')

# 拆分
data = newlines.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1)))

# 分組統計
res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))

# 計算每個同學的平均成績
result = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))

3.運行程序后會在當前目錄下生成一個新的文件夾,文件夾下存放着程序運行得結果

在這里插入圖片描述
在這里插入圖片描述

4.查看結果
在這里插入圖片描述


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM