實驗4 RDD編程初級實踐


1.spark-shell交互式編程

(1) 該系總共有多少學生

scala> val lines = sc.textFile("file:///usr/local/spark/sparklab/Data01.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/sparklab/Data01.txt MapPartitionsRDD[4] at textFile at <console>:24

scala> val info = lines.map(row => row.split(",")(0))
info: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at map at <console>:25

scala> val latest = info.distinct()
latest: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at distinct at <console>:25

scala> latest.count
res0: Long = 265  

(2) 該系共開設來多少門課程

scala> val lines = sc.textFile("file:///usr/local/spark/sparklab/Data01.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/sparklab/Data01.txt MapPartitionsRDD[4] at textFile at <console>:24
                                                            

scala> val course = lines.map(row => row.split(",")(1))
course: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at map at <console>:25

scala> val course_num = course.distinct()
course_num: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at distinct at <console>:25

scala> course_num.count
res1: Long = 8

(3) Tom同學的總成績平均分是多少

scala> val tom = lines.map(row => row.split(",")(0)=="Tom")
tom: org.apache.spark.rdd.RDD[Boolean] = MapPartitionsRDD[13] at map at <console>:25


scala> val tom = lines.filter(row => row.split(",")(0)=="Tom")
tom: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at filter at <console>:25


scala> tom.foreach(println)
Tom,DataBase,26
Tom,Algorithm,12
Tom,OperatingSystem,16
Tom,Python,40
Tom,Software,60


scala> tom.map(row => (row.split(",")(0),row.split(",")(2).toInt)).mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2+y._2)).mapValues(x => (x._1/x._2)).collect()
res6: Array[(String, Int)] = Array((Tom,30))

(4) 求每名同學的選修的課程門數

scala> val c_num = lines.map(row=>(row.split(",")(0),row.split(",")(1)))
c_num: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[21] at map at <console>:25

scala> c_num.mapValues(x => (x,1)).reduceByKey((x,y) => (" ",x._2 + y._2)).mapValues(x => x._2).foreach(println)
(Ford,3)
(Lionel,4)
(Verne,3)
(Lennon,4)
(Joshua,4)
(Marvin,3)
(Marsh,4)
(Bartholomew,5)
(Conrad,2)
(Armand,3)
(Jonathan,4)
(Broderick,3)
(Brady,5)
(Derrick,6)
(Rod,4)
(Willie,4)
(Walter,4)
(Boyce,2)
(Duncann,5)
(Elvis,2)
(Elmer,4)
(Bennett,6)
(Elton,5)
(Jo,5)
(Jim,4)
(Adonis,5)
(Abel,4)
(Peter,4)
(Alvis,6)
(Joseph,3)
(Raymondt,6)
(Kerwin,3)
(Wright,4)
(Adam,3)
(Borg,4)
(Sandy,1)
(Ben,4)
(Miles,6)
(Clyde,7)
(Francis,4)
(Dempsey,4)
(Ellis,4)
(Edward,4)
(Mick,4)
(Cleveland,4)
(Luthers,5)
(Virgil,5)
(Ivan,4)
(Alvin,5)
(Dick,3)
(Bevis,4)
(Leo,5)
(Saxon,7)
(Armstrong,2)
(Hogan,4)
(Sid,3)
(Blair,4)
(Colbert,4)
(Lucien,5)
(Kerr,4)
(Montague,3)
(Giles,7)
(Kevin,4)
(Uriah,1)
(Jeffrey,4)
(Simon,2)
(Elijah,4)
(Greg,4)
(Colin,5)
(Arlen,4)
(Maxwell,4)
(Payne,6)
(Kennedy,4)
(Spencer,5)
(Kent,4)
(Griffith,4)
(Jeremy,6)
(Alan,5)
(Andrew,4)
(Jerry,3)
(Donahue,5)
(Gilbert,3)
(Bishop,2)
(Bernard,2)
(Egbert,4)
(George,4)
(Noah,4)
(Bruce,3)
(Mike,3)
(Frank,3)
(Boris,6)
(Tony,3)
(Christ,2)
(Ken,3)
(Milo,2)
(Victor,2)
(Clare,4)
(Nigel,3)
(Christopher,4)
(Robin,4)
(Chad,6)
(Alfred,2)
(Woodrow,3)
(Rory,4)
(Dennis,4)
(Ward,4)
(Chester,6)
(Emmanuel,3)
(Stan,3)
(Jerome,3)
(Corey,4)
(Harvey,7)
(Herbert,3)
(Maurice,2)
(Merle,3)
(Les,6)
(Bing,6)
(Charles,3)
(Clement,5)
(Leopold,7)
(Brian,6)
(Horace,5)
(Sebastian,6)
(Bernie,3)
(Basil,4)
(Michael,5)
(Ernest,5)
(Tom,5)
(Vic,3)
(Eli,5)
(Duke,4)
(Alva,5)
(Lester,4)
(Hayden,3)
(Bertram,3)
(Bart,5)
(Adair,3)
(Sidney,5)
(Bowen,5)
(Roderick,4)
(Colby,4)
(Jay,6)
(Meredith,4)
(Harold,4)
(Max,3)
(Scott,3)
(Barton,1)
(Elliot,3)
(Matthew,2)
(Alexander,4)
(Todd,3)
(Wordsworth,4)
(Geoffrey,4)
(Devin,4)
(Donald,4)
(Roy,6)
(Harry,4)
(Abbott,3)
(Baron,6)
(Mark,7)
(Lewis,4)
(Rock,6)
(Eugene,1)
(Aries,2)
(Samuel,4)
(Glenn,6)
(Will,3)
(Gerald,4)
(Henry,2)
(Jesse,7)
(Bradley,2)
(Merlin,5)
(Monroe,3)
(Hobart,4)
(Ron,6)
(Archer,5)
(Nick,5)
(Louis,6)
(Len,5)
(Randolph,3)
(Benson,4)
(John,6)
(Abraham,3)
(Benedict,6)
(Marico,6)
(Berg,4)
(Aldrich,3)
(Lou,2)
(Brook,4)
(Ronald,3)
(Pete,3)
(Nicholas,5)
(Bill,2)
(Harlan,6)
(Tracy,3)
(Gordon,4)
(Alston,4)
(Andy,3)
(Bruno,5)
(Beck,4)
(Phil,3)
(Barry,5)
(Nelson,5)
(Antony,5)
(Rodney,3)
(Truman,3)
(Marlon,4)
(Don,2)
(Philip,2)
(Sean,6)
(Webb,7)
(Solomon,5)
(Aaron,4)
(Blake,4)
(Amos,5)
(Chapman,4)
(Jonas,4)
(Valentine,8)
(Angelo,2)
(Boyd,3)
(Benjamin,4)
(Winston,4)
(Allen,4)
(Evan,3)
(Albert,3)
(Newman,2)
(Jason,4)
(Hilary,4)
(William,6)
(Dean,7)
(Claude,2)
(Booth,6)
(Channing,4)
(Jeff,4)
(Webster,2)
(Marshall,4)
(Cliff,5)
(Dominic,4)
(Upton,5)
(Herman,3)
(Levi,2)
(Clark,6)
(Hiram,6)
(Drew,5)
(Bert,3)
(Alger,5)
(Brandon,5)
(Antonio,3)
(Elroy,5)
(Leonard,2)
(Adolph,4)
(Blithe,3)
(Kenneth,3)
(Perry,5)
(Matt,4)
(Eric,4)
(Archibald,5)
(Martin,3)
(Kim,4)
(Clarence,7)
(Vincent,5)
(Winfred,3)
(Christian,2)
(Bob,3)
(Enoch,3)

(5) 該系DataBase課程共有多少人選修;

scala> val lines = sc.textFile("file:///usr/local/spark/sparklab/Data01.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/sparklab/Data01.txt MapPartitionsRDD[4] at textFile at <console>:24

scala> val database_num = lines.filter(row => row.split(",")(1)=="DataBase") database_num: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[20] at filter at <console>:25 scala> database_num.count res7: Long = 126

(6) 各門課程的平均分是多少

scala> val ave = lines.map(row=>(row.split(",")(1),row.split(",")(2).toInt))
ave: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[26] at map at <console>:25

scala> ave.mapValues(x=>(x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1/ x._2)).collect()
res9: Array[(String, Int)] = Array((CLanguage,50), (Software,50), (Python,57), (Algorithm,48), (DataStructure,47), (DataBase,50), (ComputerNetwork,51), (OperatingSystem,54))

(7)使用累加器計算共有多少人選了DataBase這門課

scala> val lines = sc.textFile("file:///usr/local/spark/sparklab/Data01.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/sparklab/Data01.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> val database_num = lines.filter(row=>row.split(",")(1)=="DataBase").map(row=>(row.split(",")(1),1))
database_num: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:25

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> database_num.values.foreach(x => accum.add(x))
                                                                                
scala> accum.value
res1: Long = 126

2.編寫獨立應用程序實現數據去重

對於兩個輸入文件A和B,編寫Spark獨立應用程序,對兩個文件進行合並,並剔除其中重復的內容,得到一個新文件C。下面是輸入文件和輸出文件的一個樣例,供參考。
輸入文件A的樣例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z
輸入文件B的樣例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根據輸入的文件A和B合並得到的輸出文件C的樣例如下:
20170101 x
20170101 y
20170102 y

20170103 x

20170104 y
20170104 z
20170105 y
20170105 z
20170106 z

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object lab04{
    def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("RemDup")
        val sc = new SparkContext(conf)
        val dataFile ="file:///usr/local/spark/sparklab/a.txt,file:///usr/local/spark/sparklab/b.txt"
        val data = sc.textFile(dataFile,2)
        val da = data.distinct()
        da.foreach(println)

}
}                     

3.編寫獨立應用程序實現求平均值問題

每個輸入文件表示班級學生某個學科的成績,每行內容由兩個字段組成,第一個是學生名字,第二個是學生的成績;編寫Spark獨立應用程序求出所有學生的平均成績,並輸出到一個新文件中。下面是輸入文件和輸出文件的一個樣例,供參考。
Algorithm成績:
小明 92
小紅 87
小新 82
小麗 90
Database成績:
小明 95
小紅 81
小新 89
小麗 85
Python成績:
小明 82
小紅 83
小新 94
小麗 91

平均成績如下:
(小紅,83.67)
(小新,88.33)
(小明,89.67)
(小麗,88.67)

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.HashPartitioner 
object lab043 { 
	def main(args: Array[String]) { 
	val conf = new SparkConf().setAppName("AvgScore") 
	val sc = new SparkContext(conf) 
	val dataFile = "file:///usr/local/spark/sparklab/lab043/1.txt,file:///usr/local/spark/sparklab/lab043/2.txt,file:///usr/local/spark/sparklab/lab043/3.txt" 
	val data = sc.textFile(dataFile,3)
	var score = data.map(line=>(line.split(" ")(0),line.split(" ")(1).toInt)).mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect().foreach(println)
	//res.saveAsTextFile("result") 
} 
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM