Spark RDD----pyspark第四次作業


1.pyspark交互式編程

查看群里發的“data01.txt”數據集,該數據集包含了某大學計算機系的成績,數據格式如下所示:

Tom,DataBase,80

Tom,Algorithm,50

Tom,DataStructure,60

Jim,DataBase,90

Jim,Algorithm,60

Jim,DataStructure,80

……

請根據給定的實驗數據,在pyspark中通過編程來計算以下內容:

(1) 該系總共有多少學生;

 

>>> lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
>>> res = lines.map(lambda x:x.split(",")).map(lambda x: x[0]) //獲取每行數據的第1列 
>>> distinct_res = res.distinct()  //去重操作
>>> distinct_res.count()//取元素總個數

 答案為:265人

(2) 該系共開設了多少門課程;

 

>>> lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")
>>> res = lines.map(lambda x:x.split(",")).map(lambda x:x[1]) //獲取每行數據的第2列
>>> distinct_res = res.distinct()//去重操作
>>> distinct_res.count()//取元素總個數

 

 答案為8門

(3) Tom同學的總成績平均分是多少;

 

>>> lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")

>>> res = lines.map(lambda x:x.split(",")).filter(lambda x:x[0]=="Tom") //篩選Tom同學的成績信息

>>> res.foreach(print)

>>> score = res.map(lambda x:int(x[2])) //提取Tom同學的每門成績,並轉換為int類型

>>> num = res.count() //Tom同學選課門數

>>> sum_score = score.reduce(lambda x,y:x+y) //Tom同學的總成績

>>> avg = sum_score/num // 總成績/門數=平均分

>>> print(avg)

 

 Tom同學的平均分為30.8分

(4) 求每名同學的選修的課程門數;

 

>>> lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")

>>> res = lines.map(lambda x:x.split(",")).map(lambda x:(x[0],1)) //學生每門課程都對應(學生姓名,1),學生有n門課程則有n個(學生姓名,1)

>>> each_res = res.reduceByKey(lambda x,y: x+y) //按學生姓名獲取每個學生的選課總數

>>> each_res.foreach(print)

 

('Lewis', 4)答案共265行

('Mike', 3)

('Walter', 4)

('Conrad', 2)

('Borg', 4)

……

(5) 該系DataBase課程共有多少人選修;

 

>>> lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")

>>> res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")

>>> res.count()

 

 答案為126人

(6) 各門課程的平均分是多少;

 

>>> lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")

>>> res = lines.map(lambda x:x.split(",")).map(lambda x:(x[1],(int(x[2]),1))) //為每門課程的分數后面新增一列1,表示1個學生選擇了該課程。格式如('ComputerNetwork', (44, 1))

>>> temp = res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) //按課程名聚合課程總分和選課人數。格式如('ComputerNetwork', (7370, 142))

>>> avg = temp.map(lambda x:(x[0], round(x[1][0]/x[1][1],2)))//課程總分/選課人數 = 平均分,並利用round(x,2)保留兩位小數

>>> avg.foreach(print)

 

答案為:

('ComputerNetwork', 51.9)

('Software', 50.91)

('DataBase', 50.54)

('Algorithm', 48.83)

('OperatingSystem', 54.94)

('Python', 57.82)

('DataStructure', 47.57)

('CLanguage', 50.61)

(7)使用累加器計算共有多少人選了DataBase這門課。

 

>>> lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt")

>>> res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")//篩選出選了DataBase課程的數據

>>> accum = sc.accumulator(0) //定義一個從0開始的累加器accum

>>> res.foreach(lambda x:accum.add(1))//遍歷res,每掃描一條數據,累加器加1

>>> accum.value //輸出累加器的最終值

 

 答案:共有126人

2.編寫獨立應用程序實現數據去重

對於兩個輸入文件A和B,編寫Spark獨立應用程序,對兩個文件進行合並,並剔除其中重復的內容,得到一個新文件C。下面是輸入文件和輸出文件的一個樣例,供參考。

輸入文件A的樣例如下:

20170101    x

20170102    y

20170103    x

20170104    y

20170105    z

20170106    z

輸入文件B的樣例如下:

20170101    y

20170102    y

20170103    x

20170104    z

20170105    y

根據輸入的文件A和B合並得到的輸出文件C的樣例如下:

20170101    x

20170101    y

20170102    y

20170103    x

20170104    y

20170104    z

20170105    y

20170105    z

20170106    z

 

  實驗答案參考步驟如下:

(1)假設當前目錄為/usr/local/spark/mycode/remdup,在當前目錄下新建一個remdup.py文件,復制下面代碼;

 

from pyspark import SparkContext

#初始化SparkContext

sc = SparkContext('local','remdup')

#加載兩個文件A和B

lines1 = sc.textFile("file:///usr/local/spark/mycode/remdup/A")

lines2 = sc.textFile("file:///usr/local/spark/mycode/remdup/B")

#合並兩個文件的內容

lines = lines1.union(lines2)

#去重操作

distinct_lines = lines.distinct()

#排序操作

res = distinct_lines.sortBy(lambda x:x)

#將結果寫入result文件中,repartition(1)的作用是讓結果合並到一個文件中,不加的話會結果寫入到兩個文件

res.repartition(1).saveAsTextFile("file:///usr/local/spark/mycode/result/file")

 

(2)最后在目錄/usr/local/spark/mycode/remdup下執行下面命令執行程序(注意執行程序時請先退出pyspark shell,否則會出現“地址已在使用”的警告);

 

$ python3 remdup.py

 (3)在目錄/usr/local/spark/mycode/remdup/result下即可得到結果文件part-00000。

3.編寫獨立應用程序實現求平均值問題

每個輸入文件表示班級學生某個學科的成績,每行內容由兩個字段組成,第一個是學生名字,第二個是學生的成績;編寫Spark獨立應用程序求出所有學生的平均成績,並輸出到一個新文件中。下面是輸入文件和輸出文件的一個樣例,供參考。

Algorithm成績:

小明 92

小紅 87

小新 82

小麗 90

Database成績:

小明 95

小紅 81

小新 89

小麗 85

Python成績:

小明 82

小紅 83

小新 94

小麗 91

平均成績如下:

(小紅,83.67)

(小新,88.33)

(小明,89.67)

(小麗,88.67)

 

實驗答案參考步驟如下:

(1)假設當前目錄為/usr/local/spark/mycode/avgscore,在當前目錄下新建一個avgscore.py,復制下面代碼;

 

from pyspark import SparkContext

#初始化SparkContext

sc = SparkContext('local',' avgscore')

#加載三個文件Algorithm.txt、Database.txt和Python.txt

lines1 = sc.textFile("file:///usr/local/spark/mycode/avgscore/Algorithm.txt")

lines2 = sc.textFile("file:///usr/local/spark/mycode/avgscore/Database.txt")

lines3 = sc.textFile("file:///usr/local/spark/mycode/avgscore/Python.txt")

#合並三個文件的內容

lines = lines1.union(lines2).union(lines3)

#為每行數據新增一列1,方便后續統計每個學生選修的課程數目。data的數據格式為('小明', (92, 1))

data = lines.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1)))

#根據key也就是學生姓名合計每門課程的成績,以及選修的課程數目。res的數據格式為('小明', (269, 3))

res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))

#利用總成績除以選修的課程數來計算每個學生的每門課程的平均分,並利用round(x,2)保留兩位小數

result = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2)))

#將結果寫入result文件中,repartition(1)的作用是讓結果合並到一個文件中,不加的話會結果寫入到三個文件

result.repartition(1).saveAsTextFile("file:///usr/local/spark/mycode/avgscore/result")

 

(2)最后在目錄/usr/local/spark/mycode/avgscore下執行下面命令執行程序(注意執行程序時請先退出pyspark shell,否則會出現“地址已在使用”的警告)。

 

$ python3 avgscore.py

 

(3)在目錄/usr/local/spark/mycode/avgscore/result下即可得到結果文件part-00000。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM