1.代碼
1 object LogPVAndUV{ 2 def main(args:Array[String]):Unit={ 3 val conf=new SparkConf() 4 .setMaster("local[*]") 5 .setAppName("PVAndUV") 6 val sc=SparkContext.getOrCreate(conf) 7 val logPath="/user/beifeng/spark/logs/page_views.data" 8 val logRDD=sc.textFile(logPath) 9 val filterRDD=logRDD.filter(_.length>0) 10 //轉換 11 val mapRDD=filterRDD.map(line=>{ 12 val arr=line.split("\t") 13 if(arr.length==7){ 14 val date=arr(0).trim 15 val url=arr(1) 16 val uuid=arr(2) 17 (date.subString(0,Math.min(10.date.length)).trim,url,uuid) 18 }else{ 19 (null,null,null) 20 } 21 }).filter(tuple=>tuple._1!=null&&tuple._1.length>0) 22 //PV計算 23 val pvRDD=mapRDD 24 .filter(tuple=>tuple._2.length>0) 25 .map(tuple=>(tuple._1,1)) 26 .reduceByKey(_+_) 27 //UV計算 28 val uvRDD=mapRDD 29 .filter(tuple=>tuple._3.length>0) 30 .map(tuple=>(tuple._1,tuple._3)) 31 .distinct 32 .reduceByKey(_+_) 33 //合並 34 val pvAndUv=pvRDD.join(uvRDD).map{ 35 case (date,(pv,uv))=>{ 36 (date,pv,uv) 37 } 38 } 39 //輸出 40 pvAndUv.saveAsTextFile("/user/beifeng/spark/output/"+System.currentTimeMillis()) 41 sc.stop() 42 } 43 }
2.PS
rdd.foreachPartition(iter=>{
//
})
對iter迭代器中的數據進行輸出,iter表示的是一個分區的所有數據,這里的迭代器和groupbyKey中的實現方式不同,不會產生OOM
主要用於將數據輸出到非HDFS的存儲系統中,不如MYSQL,Redis