前面一篇應該算是比較詳細的介紹了spark的基礎知識,在了解了一些spark的知識之后相必大家對spark應該不算陌生了吧!如果你之前寫過MapReduce,現在對spark也很熟悉的話我想你再也不想用MapReduce去寫一個應用程序了,不是說MapReduce有多繁瑣(相對而言),還有運行的效率等問題。而且用spark寫出來的程序比較優雅,這里我指的是scala版的,如果你用java版的spark去寫一個應用程序,對比scala版的,想必你肯定會愛上scala這門語言的,哈哈哈(以上純屬個人觀點,具體場景具體對待)
實現目標1:根據采集的日志信息,統計總的pv量 。
需求分析:在大數據領域,采集數據的常采用的手段就是懟網站進行埋點然后根據需求收集相關的數據,這里我們用的是最基本的日志信息來做處理,數據來源於某網站,可以分享出來給大家使用,完了后我會將代碼還有數據 文件放到GitHub上供大家下載。首先我們來看看日志文件(access.log)的格式:
這是標准的一條日志信息,當然我們如果是統計網站的pv總量的話不需要考慮對日志進行清洗的工作。以下是pv統計的代碼:
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} //todo:利用Spark程序統計運營商pv總量 object PV extends App{ //創建sparkConf對象 private val sparkConf: SparkConf = new SparkConf().setAppName("PV").setMaster("local[2]") //創建SparkContext對象 private val sc: SparkContext = new SparkContext(sparkConf) //設置輸出的日志級別 sc.setLogLevel("WARN") //讀取日志數據 private val dataRDD: RDD[String] = sc.textFile("E:\\access.log") //統計pv總量====方式一:計算有多少行及pv總量 private val finalResult1: Long = dataRDD.count() println(finalResult1) //方式二:每一條日志信息記為一條數據1 private val pvOne: RDD[(String, Int)] = dataRDD.map(x=>("PV",1)) //對pv根據key進行累加 private val resultPV: RDD[(String, Int)] = pvOne.reduceByKey(_+_) //打印pv總量 resultPV.foreach(x=>println(x)) //關閉資源 sc.stop() }
結果如下:
實現目標2:根據采集的日志信息,統計總的uv量 。
需求分析:目標數據文件還是access.log,比較簡單,直接看代碼:
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD //todo:利用spark統計運營商uv總量 object UV extends App{ //創建sparkConf對象 private val sparkConf: SparkConf = new SparkConf().setAppName("PV").setMaster("local[2]") //創建SparkContext對象 private val sc: SparkContext = new SparkContext(sparkConf) //設置輸出的日志級別 sc.setLogLevel("WARN") //讀取日志數據 private val dataRDD: RDD[String] = sc.textFile("E:\\access.log") //切分每一行,獲取對應的ip地址 private val ips: RDD[String] = dataRDD.map(_.split(" ")(0)) //去重 private val ipNum: Long = ips.distinct().count() println(ipNum) //g關閉資源 sc.stop() }
結果 如下:
實現目標3:根據采集的日志信息,統計訪問最多的前五位網站降序排列 TopN。
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD //todo:利用spark計算運營商訪問url最多的前n位=====TopN object TopN extends App{ //創建sparkConf對象 private val sparkConf: SparkConf = new SparkConf().setAppName("PV").setMaster("local[2]") //創建SparkContext對象 private val sc: SparkContext = new SparkContext(sparkConf) //設置輸出的日志級別 sc.setLogLevel("WARN") //讀取日志數據 private val dataRDD: RDD[String] = sc.textFile("E:\\access.log") //對每一行的日志信息進行切分並且過濾清洗掉不符合規則的數據 //通過對日志信息的分析,我們知道按照空格切分后,下標為10的是url,長度小於10的暫且認為是不符合規則的數據 private val urlAndOne: RDD[(String, Int)] = dataRDD.filter(_.split(" ").size>10).map(x=>(x.split(" ")(10),1)) //相同url進行累加 private val result: RDD[(String, Int)] = urlAndOne.reduceByKey(_+_) //訪問最多的url並進行倒敘排序 private val sortResult: RDD[(String, Int)] = result.sortBy(_._2,false) //取前五位 private val finalResult: Array[(String, Int)] = sortResult.take(5) //打印輸出 finalResult.foreach(println) sc.stop() }
運行結果: