014 在Spark中完成PV與UV的計算,重在源代碼


1.代碼

 1 object LogPVAndUV{
 2     def main(args:Array[String]):Unit={
 3         val conf=new SparkConf()
 4             .setMaster("local[*]")
 5             .setAppName("PVAndUV")
 6         val sc=SparkContext.getOrCreate(conf)
 7         val logPath="/user/beifeng/spark/logs/page_views.data"
 8         val logRDD=sc.textFile(logPath)
 9         val filterRDD=logRDD.filter(_.length>0)
10         //轉換
11         val mapRDD=filterRDD.map(line=>{
12             val arr=line.split("\t")
13             if(arr.length==7){
14                 val date=arr(0).trim
15                 val url=arr(1)
16                 val uuid=arr(2)
17                 (date.subString(0,Math.min(10.date.length)).trim,url,uuid)
18             }else{
19                 (null,null,null)
20             }
21         }).filter(tuple=>tuple._1!=null&&tuple._1.length>0)
22         //PV計算
23         val pvRDD=mapRDD
24             .filter(tuple=>tuple._2.length>0)
25             .map(tuple=>(tuple._1,1))
26             .reduceByKey(_+_)
27         //UV計算
28         val uvRDD=mapRDD
29             .filter(tuple=>tuple._3.length>0)
30             .map(tuple=>(tuple._1,tuple._3))
31             .distinct
32             .reduceByKey(_+_)
33         //合並
34         val pvAndUv=pvRDD.join(uvRDD).map{
35             case (date,(pv,uv))=>{
36                 (date,pv,uv)
37             }
38         }
39         //輸出
40         pvAndUv.saveAsTextFile("/user/beifeng/spark/output/"+System.currentTimeMillis())
41         sc.stop()
42     }
43 }

 

2.PS

  rdd.foreachPartition(iter=>{

    //

  })

  對iter迭代器中的數據進行輸出,iter表示的是一個分區的所有數據,這里的迭代器和groupbyKey中的實現方式不同,不會產生OOM

  主要用於將數據輸出到非HDFS的存儲系統中,不如MYSQL,Redis


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM