1.pyspark交互式編程
查看群里發的“data01.txt”數據集,該數據集包含了某大學計算機系的成績,數據格式如下所示:
Tom,DataBase,80 Tom,Algorithm,50 Tom,DataStructure,60 Jim,DataBase,90 Jim,Algorithm,60 Jim,DataStructure,80 …… |
請根據給定的實驗數據,在pyspark中通過編程來計算以下內容:
(1) 該系總共有多少學生;
>>> lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt") >>> res = lines.map(lambda x:x.split(",")).map(lambda x: x[0]) //獲取每行數據的第1列 >>> distinct_res = res.distinct() //去重操作 >>> distinct_res.count()//取元素總個數
答案為:265人
(2) 該系共開設了多少門課程;
>>> lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt") >>> res = lines.map(lambda x:x.split(",")).map(lambda x:x[1]) //獲取每行數據的第2列 >>> distinct_res = res.distinct()//去重操作 >>> distinct_res.count()//取元素總個數
答案為8門
(3) Tom同學的總成績平均分是多少;
>>> lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt") >>> res = lines.map(lambda x:x.split(",")).filter(lambda x:x[0]=="Tom") //篩選Tom同學的成績信息 >>> res.foreach(print) >>> score = res.map(lambda x:int(x[2])) //提取Tom同學的每門成績,並轉換為int類型 >>> num = res.count() //Tom同學選課門數 >>> sum_score = score.reduce(lambda x,y:x+y) //Tom同學的總成績 >>> avg = sum_score/num // 總成績/門數=平均分 >>> print(avg)
Tom同學的平均分為30.8分
(4) 求每名同學的選修的課程門數;
>>> lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt") >>> res = lines.map(lambda x:x.split(",")).map(lambda x:(x[0],1)) //學生每門課程都對應(學生姓名,1),學生有n門課程則有n個(學生姓名,1) >>> each_res = res.reduceByKey(lambda x,y: x+y) //按學生姓名獲取每個學生的選課總數 >>> each_res.foreach(print)
('Lewis', 4)答案共265行
('Mike', 3)
('Walter', 4)
('Conrad', 2)
('Borg', 4)
……
(5) 該系DataBase課程共有多少人選修;
>>> lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt") >>> res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase") >>> res.count()
答案為126人
(6) 各門課程的平均分是多少;
>>> lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt") >>> res = lines.map(lambda x:x.split(",")).map(lambda x:(x[1],(int(x[2]),1))) //為每門課程的分數后面新增一列1,表示1個學生選擇了該課程。格式如('ComputerNetwork', (44, 1)) >>> temp = res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) //按課程名聚合課程總分和選課人數。格式如('ComputerNetwork', (7370, 142)) >>> avg = temp.map(lambda x:(x[0], round(x[1][0]/x[1][1],2)))//課程總分/選課人數 = 平均分,並利用round(x,2)保留兩位小數 >>> avg.foreach(print)
答案為:
('ComputerNetwork', 51.9)
('Software', 50.91)
('DataBase', 50.54)
('Algorithm', 48.83)
('OperatingSystem', 54.94)
('Python', 57.82)
('DataStructure', 47.57)
('CLanguage', 50.61)
(7)使用累加器計算共有多少人選了DataBase這門課。
>>> lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt") >>> res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")//篩選出選了DataBase課程的數據 >>> accum = sc.accumulator(0) //定義一個從0開始的累加器accum >>> res.foreach(lambda x:accum.add(1))//遍歷res,每掃描一條數據,累加器加1 >>> accum.value //輸出累加器的最終值
答案:共有126人
2.編寫獨立應用程序實現數據去重
對於兩個輸入文件A和B,編寫Spark獨立應用程序,對兩個文件進行合並,並剔除其中重復的內容,得到一個新文件C。下面是輸入文件和輸出文件的一個樣例,供參考。
輸入文件A的樣例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z
輸入文件B的樣例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根據輸入的文件A和B合並得到的輸出文件C的樣例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z
實驗答案參考步驟如下:
(1)假設當前目錄為/usr/local/spark/mycode/remdup,在當前目錄下新建一個remdup.py文件,復制下面代碼;
from pyspark import SparkContext #初始化SparkContext sc = SparkContext('local','remdup') #加載兩個文件A和B lines1 = sc.textFile("file:///usr/local/spark/mycode/remdup/A") lines2 = sc.textFile("file:///usr/local/spark/mycode/remdup/B") #合並兩個文件的內容 lines = lines1.union(lines2) #去重操作 distinct_lines = lines.distinct() #排序操作 res = distinct_lines.sortBy(lambda x:x) #將結果寫入result文件中,repartition(1)的作用是讓結果合並到一個文件中,不加的話會結果寫入到兩個文件 res.repartition(1).saveAsTextFile("file:///usr/local/spark/mycode/result/file")
(2)最后在目錄/usr/local/spark/mycode/remdup下執行下面命令執行程序(注意執行程序時請先退出pyspark shell,否則會出現“地址已在使用”的警告);
$ python3 remdup.py
(3)在目錄/usr/local/spark/mycode/remdup/result下即可得到結果文件part-00000。
3.編寫獨立應用程序實現求平均值問題
每個輸入文件表示班級學生某個學科的成績,每行內容由兩個字段組成,第一個是學生名字,第二個是學生的成績;編寫Spark獨立應用程序求出所有學生的平均成績,並輸出到一個新文件中。下面是輸入文件和輸出文件的一個樣例,供參考。
Algorithm成績:
小明 92
小紅 87
小新 82
小麗 90
Database成績:
小明 95
小紅 81
小新 89
小麗 85
Python成績:
小明 82
小紅 83
小新 94
小麗 91
平均成績如下:
(小紅,83.67)
(小新,88.33)
(小明,89.67)
(小麗,88.67)
實驗答案參考步驟如下:
(1)假設當前目錄為/usr/local/spark/mycode/avgscore,在當前目錄下新建一個avgscore.py,復制下面代碼;
from pyspark import SparkContext #初始化SparkContext sc = SparkContext('local',' avgscore') #加載三個文件Algorithm.txt、Database.txt和Python.txt lines1 = sc.textFile("file:///usr/local/spark/mycode/avgscore/Algorithm.txt") lines2 = sc.textFile("file:///usr/local/spark/mycode/avgscore/Database.txt") lines3 = sc.textFile("file:///usr/local/spark/mycode/avgscore/Python.txt") #合並三個文件的內容 lines = lines1.union(lines2).union(lines3) #為每行數據新增一列1,方便后續統計每個學生選修的課程數目。data的數據格式為('小明', (92, 1)) data = lines.map(lambda x:x.split(" ")).map(lambda x:(x[0],(int(x[1]),1))) #根據key也就是學生姓名合計每門課程的成績,以及選修的課程數目。res的數據格式為('小明', (269, 3)) res = data.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) #利用總成績除以選修的課程數來計算每個學生的每門課程的平均分,並利用round(x,2)保留兩位小數 result = res.map(lambda x:(x[0],round(x[1][0]/x[1][1],2))) #將結果寫入result文件中,repartition(1)的作用是讓結果合並到一個文件中,不加的話會結果寫入到三個文件 result.repartition(1).saveAsTextFile("file:///usr/local/spark/mycode/avgscore/result")
(2)最后在目錄/usr/local/spark/mycode/avgscore下執行下面命令執行程序(注意執行程序時請先退出pyspark shell,否則會出現“地址已在使用”的警告)。
$ python3 avgscore.py
(3)在目錄/usr/local/spark/mycode/avgscore/result下即可得到結果文件part-00000。